Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
7ce041dc92bd6c30391c4b6dd7b5b76276f92b86
[simgrid.git] / src / smpi / smpi_mpi_dt.c
1 /* smpi_mpi_dt.c -- MPI primitives to handle datatypes                        */
2 /* FIXME: a very incomplete implementation                                    */
3
4 /* Copyright (c) 2009-2014. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include "private.h"
15 #include "smpi_mpi_dt_private.h"
16 #include "mc/mc.h"
17 #include "xbt/replay.h"
18 #include "simgrid/modelchecker.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi,
21                                 "Logging specific to SMPI (datatype)");
22
23 #define CREATE_MPI_DATATYPE(name, type)       \
24   static s_smpi_mpi_datatype_t mpi_##name = { \
25     sizeof(type),  /* size */                 \
26     0,             /*was 1 has_subtype*/             \
27     0,             /* lb */                   \
28     sizeof(type),  /* ub = lb + size */       \
29     DT_FLAG_BASIC,  /* flags */              \
30     NULL           /* pointer on extended struct*/ \
31   };                                          \
32 MPI_Datatype name = &mpi_##name;
33
34 #define CREATE_MPI_DATATYPE_NULL(name)       \
35   static s_smpi_mpi_datatype_t mpi_##name = { \
36     0,  /* size */                 \
37     0,             /*was 1 has_subtype*/             \
38     0,             /* lb */                   \
39     0,  /* ub = lb + size */       \
40     DT_FLAG_BASIC,  /* flags */              \
41     NULL           /* pointer on extended struct*/ \
42   };                                          \
43 MPI_Datatype name = &mpi_##name;
44
45 //The following are datatypes for the MPI functions MPI_MAXLOC and MPI_MINLOC.
46 typedef struct {
47   float value;
48   int index;
49 } float_int;
50 typedef struct {
51   float value;
52   float index;
53 } float_float;
54 typedef struct {
55   long value;
56   long index;
57 } long_long;
58 typedef struct {
59   double value;
60   double index;
61 } double_double;
62 typedef struct {
63   long value;
64   int index;
65 } long_int;
66 typedef struct {
67   double value;
68   int index;
69 } double_int;
70 typedef struct {
71   short value;
72   int index;
73 } short_int;
74 typedef struct {
75   int value;
76   int index;
77 } int_int;
78 typedef struct {
79   long double value;
80   int index;
81 } long_double_int;
82
83 // Predefined data types
84 CREATE_MPI_DATATYPE(MPI_CHAR, char);
85 CREATE_MPI_DATATYPE(MPI_SHORT, short);
86 CREATE_MPI_DATATYPE(MPI_INT, int);
87 CREATE_MPI_DATATYPE(MPI_LONG, long);
88 CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long);
89 CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char);
90 CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char);
91 CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short);
92 CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int);
93 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long);
94 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long);
95 CREATE_MPI_DATATYPE(MPI_FLOAT, float);
96 CREATE_MPI_DATATYPE(MPI_DOUBLE, double);
97 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double);
98 CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t);
99 CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool);
100 CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t);
101 CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t);
102 CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t);
103 CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t);
104 CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t);
105 CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t);
106 CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t);
107 CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t);
108 CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex);
109 CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex);
110 CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex);
111 CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint);
112 CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset);
113
114 CREATE_MPI_DATATYPE(MPI_FLOAT_INT, float_int);
115 CREATE_MPI_DATATYPE(MPI_LONG_INT, long_int);
116 CREATE_MPI_DATATYPE(MPI_DOUBLE_INT, double_int);
117 CREATE_MPI_DATATYPE(MPI_SHORT_INT, short_int);
118 CREATE_MPI_DATATYPE(MPI_2INT, int_int);
119 CREATE_MPI_DATATYPE(MPI_2FLOAT, float_float);
120 CREATE_MPI_DATATYPE(MPI_2DOUBLE, double_double);
121 CREATE_MPI_DATATYPE(MPI_2LONG, long_long);
122
123 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE_INT, long_double_int);
124
125 CREATE_MPI_DATATYPE_NULL(MPI_UB);
126 CREATE_MPI_DATATYPE_NULL(MPI_LB);
127 CREATE_MPI_DATATYPE_NULL(MPI_PACKED);
128 // Internal use only
129 CREATE_MPI_DATATYPE(MPI_PTR, void*);
130
131 /** Check if the datatype is usable for communications
132  */
133 int is_datatype_valid(MPI_Datatype datatype) {
134     return datatype != MPI_DATATYPE_NULL
135         && (datatype->flags & DT_FLAG_COMMITED);
136 }
137
138 size_t smpi_datatype_size(MPI_Datatype datatype)
139 {
140   return datatype->size;
141 }
142
143 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype)
144 {
145   return datatype->lb;
146 }
147
148 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
149 {
150   return datatype->ub;
151 }
152
153 MPI_Datatype smpi_datatype_dup(MPI_Datatype datatype)
154 {
155   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
156   memcpy(new_t, datatype, sizeof(s_smpi_mpi_datatype_t));
157   if (datatype->has_subtype)
158     memcpy(new_t->substruct, datatype->substruct, sizeof(s_smpi_subtype_t));
159   return new_t;
160 }
161
162 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
163                          MPI_Aint * extent)
164 {
165   *lb = datatype->lb;
166   *extent = datatype->ub - datatype->lb;
167   return MPI_SUCCESS;
168 }
169
170 MPI_Aint smpi_datatype_get_extent(MPI_Datatype datatype){
171   return datatype->ub - datatype->lb;
172 }
173
174 int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
175                        void *recvbuf, int recvcount, MPI_Datatype recvtype)
176 {
177   int count;
178   if(smpi_privatize_global_variables){
179     switch_data_segment(smpi_process_index());
180   }
181   /* First check if we really have something to do */
182   if (recvcount > 0 && recvbuf != sendbuf) {
183     /* FIXME: treat packed cases */
184     sendcount *= smpi_datatype_size(sendtype);
185     recvcount *= smpi_datatype_size(recvtype);
186     count = sendcount < recvcount ? sendcount : recvcount;
187
188     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
189       if(!_xbt_replay_is_active()) memcpy(recvbuf, sendbuf, count);
190     }
191     else if (sendtype->has_subtype == 0)
192     {
193       s_smpi_subtype_t *subtype =  recvtype->substruct;
194       subtype->unserialize( sendbuf, recvbuf,1, subtype, MPI_REPLACE);
195     }
196     else if (recvtype->has_subtype == 0)
197     {
198       s_smpi_subtype_t *subtype =  sendtype->substruct;
199       subtype->serialize(sendbuf, recvbuf,1, subtype);
200     }else{
201       s_smpi_subtype_t *subtype =  sendtype->substruct;
202
203
204       void * buf_tmp = xbt_malloc(count);
205
206       subtype->serialize( sendbuf, buf_tmp,count/smpi_datatype_size(sendtype), subtype);
207       subtype =  recvtype->substruct;
208       subtype->unserialize( buf_tmp, recvbuf,count/smpi_datatype_size(recvtype), subtype, MPI_REPLACE);
209
210       free(buf_tmp);
211     }
212   }
213
214   return sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
215 }
216
217 /*
218  *  Copies noncontiguous data into contiguous memory.
219  *  @param contiguous_vector - output vector
220  *  @param noncontiguous_vector - input vector
221  *  @param type - pointer contening :
222  *      - stride - stride of between noncontiguous data
223  *      - block_length - the width or height of blocked matrix
224  *      - count - the number of rows of matrix
225  */
226 void serialize_vector( const void *noncontiguous_vector,
227                        void *contiguous_vector,
228                        int count,
229                        void *type)
230 {
231   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
232   int i;
233   char* contiguous_vector_char = (char*)contiguous_vector;
234   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
235
236   for (i = 0; i < type_c->block_count * count; i++) {
237       if (type_c->old_type->has_subtype == 0)
238         memcpy(contiguous_vector_char,
239                noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
240       else
241         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
242                                                                      contiguous_vector_char,
243                                                                      type_c->block_length,
244                                                                      type_c->old_type->substruct);
245
246     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
247     if((i+1)%type_c->block_count ==0)
248     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
249     else
250     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
251   }
252 }
253
254 /*
255  *  Copies contiguous data into noncontiguous memory.
256  *  @param noncontiguous_vector - output vector
257  *  @param contiguous_vector - input vector
258  *  @param type - pointer contening :
259  *      - stride - stride of between noncontiguous data
260  *      - block_length - the width or height of blocked matrix
261  *      - count - the number of rows of matrix
262  */
263 void unserialize_vector( const void *contiguous_vector,
264                          void *noncontiguous_vector,
265                          int count,
266                          void *type,
267                          MPI_Op op)
268 {
269   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
270   int i;
271
272   char* contiguous_vector_char = (char*)contiguous_vector;
273   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
274
275   for (i = 0; i < type_c->block_count * count; i++) {
276     if (type_c->old_type->has_subtype == 0)
277       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
278           &type_c->old_type);
279      /* memcpy(noncontiguous_vector_char,
280              contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
281     else
282       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
283                                                                      noncontiguous_vector_char,
284                                                                      type_c->block_length,
285                                                                      type_c->old_type->substruct,
286                                                                      op);
287     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
288     if((i+1)%type_c->block_count ==0)
289     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
290     else
291     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
292   }
293 }
294
295 /*
296  * Create a Sub type vector to be able to serialize and unserialize it
297  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
298  * required the functions unserialize and serialize
299  *
300  */
301 s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
302                                                   int block_length,
303                                                   int block_count,
304                                                   MPI_Datatype old_type,
305                                                   int size_oldtype){
306   s_smpi_mpi_vector_t *new_t= xbt_new(s_smpi_mpi_vector_t,1);
307   new_t->base.serialize = &serialize_vector;
308   new_t->base.unserialize = &unserialize_vector;
309   new_t->base.subtype_free = &free_vector;
310   new_t->block_stride = block_stride;
311   new_t->block_length = block_length;
312   new_t->block_count = block_count;
313   smpi_datatype_use(old_type);
314   new_t->old_type = old_type;
315   new_t->size_oldtype = size_oldtype;
316   return new_t;
317 }
318
319 void smpi_datatype_create(MPI_Datatype* new_type, int size,int lb, int ub, int has_subtype,
320                           void *struct_type, int flags){
321   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
322   new_t->size = size;
323   new_t->has_subtype = size>0? has_subtype:0;
324   new_t->lb = lb;
325   new_t->ub = ub;
326   new_t->flags = flags;
327   new_t->substruct = struct_type;
328   new_t->in_use=0;
329   *new_type = new_t;
330
331 #ifdef HAVE_MC
332   if(MC_is_active())
333     MC_ignore(&(new_t->in_use), sizeof(new_t->in_use));
334 #endif
335 }
336
337 void smpi_datatype_free(MPI_Datatype* type){
338
339   if((*type)->flags & DT_FLAG_PREDEFINED)return;
340
341   //if still used, mark for deletion
342   if((*type)->in_use!=0){
343       (*type)->flags |=DT_FLAG_DESTROYED;
344       return;
345   }
346
347   if ((*type)->has_subtype == 1){
348     ((s_smpi_subtype_t *)(*type)->substruct)->subtype_free(type);  
349     xbt_free((*type)->substruct);
350   }
351   xbt_free(*type);
352   *type = MPI_DATATYPE_NULL;
353 }
354
355 void smpi_datatype_use(MPI_Datatype type){
356   if(type)type->in_use++;
357
358 #ifdef HAVE_MC
359   if(MC_is_active())
360     MC_ignore(&(type->in_use), sizeof(type->in_use));
361 #endif
362 }
363
364
365 void smpi_datatype_unuse(MPI_Datatype type){
366   if(type && type->in_use-- == 0 && (type->flags & DT_FLAG_DESTROYED))
367     smpi_datatype_free(&type);
368
369 #ifdef HAVE_MC
370   if(MC_is_active())
371     MC_ignore(&(type->in_use), sizeof(type->in_use));
372 #endif
373 }
374
375
376
377
378 /*
379 Contiguous Implementation
380 */
381
382
383 /*
384  *  Copies noncontiguous data into contiguous memory.
385  *  @param contiguous_hvector - output hvector
386  *  @param noncontiguous_hvector - input hvector
387  *  @param type - pointer contening :
388  *      - stride - stride of between noncontiguous data, in bytes
389  *      - block_length - the width or height of blocked matrix
390  *      - count - the number of rows of matrix
391  */
392 void serialize_contiguous( const void *noncontiguous_hvector,
393                        void *contiguous_hvector,
394                        int count,
395                        void *type)
396 {
397   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
398   char* contiguous_vector_char = (char*)contiguous_hvector;
399   char* noncontiguous_vector_char = (char*)noncontiguous_hvector+type_c->lb;
400   memcpy(contiguous_vector_char,
401            noncontiguous_vector_char, count* type_c->block_count * type_c->size_oldtype);
402 }
403 /*
404  *  Copies contiguous data into noncontiguous memory.
405  *  @param noncontiguous_vector - output hvector
406  *  @param contiguous_vector - input hvector
407  *  @param type - pointer contening :
408  *      - stride - stride of between noncontiguous data, in bytes
409  *      - block_length - the width or height of blocked matrix
410  *      - count - the number of rows of matrix
411  */
412 void unserialize_contiguous( const void *contiguous_vector,
413                          void *noncontiguous_vector,
414                          int count,
415                          void *type,
416                          MPI_Op op)
417 {
418   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
419   char* contiguous_vector_char = (char*)contiguous_vector;
420   char* noncontiguous_vector_char = (char*)noncontiguous_vector+type_c->lb;
421   int n= count* type_c->block_count;
422   smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &n,
423             &type_c->old_type);
424        /*memcpy(noncontiguous_vector_char,
425            contiguous_vector_char, count*  type_c->block_count * type_c->size_oldtype);*/
426 }
427
428 void free_contiguous(MPI_Datatype* d){
429   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
430 }
431
432 /*
433  * Create a Sub type contiguous to be able to serialize and unserialize it
434  * the structure s_smpi_mpi_contiguous_t is derived from s_smpi_subtype which
435  * required the functions unserialize and serialize
436  *
437  */
438 s_smpi_mpi_contiguous_t* smpi_datatype_contiguous_create( MPI_Aint lb,
439                                                   int block_count,
440                                                   MPI_Datatype old_type,
441                                                   int size_oldtype){
442   s_smpi_mpi_contiguous_t *new_t= xbt_new(s_smpi_mpi_contiguous_t,1);
443   new_t->base.serialize = &serialize_contiguous;
444   new_t->base.unserialize = &unserialize_contiguous;
445   new_t->base.subtype_free = &free_contiguous;
446   new_t->lb = lb;
447   new_t->block_count = block_count;
448   new_t->old_type = old_type;
449   new_t->size_oldtype = size_oldtype;
450   smpi_datatype_use(old_type);
451   return new_t;
452 }
453
454
455
456
457 int smpi_datatype_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* new_type, MPI_Aint lb)
458 {
459   int retval;
460   if(old_type->has_subtype){
461           //handle this case as a hvector with stride equals to the extent of the datatype
462           return smpi_datatype_hvector(count, 1, smpi_datatype_get_extent(old_type), old_type, new_type);
463   }
464   
465   s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
466                                                                 count,
467                                                                 old_type,
468                                                                 smpi_datatype_size(old_type));
469                                                                 
470   smpi_datatype_create(new_type,
471                                           count * smpi_datatype_size(old_type),
472                                           lb,lb + count * smpi_datatype_size(old_type),
473                                           1,subtype, DT_FLAG_CONTIGUOUS);
474   retval=MPI_SUCCESS;
475   return retval;
476 }
477
478 int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_type, MPI_Datatype* new_type)
479 {
480   int retval;
481   if (blocklen<0) return MPI_ERR_ARG;
482   MPI_Aint lb = 0;
483   MPI_Aint ub = 0;
484   if(count>0){
485     lb=smpi_datatype_lb(old_type);
486     ub=((count-1)*stride+blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
487   }
488   if(old_type->has_subtype || stride != blocklen){
489
490
491     s_smpi_mpi_vector_t* subtype = smpi_datatype_vector_create( stride,
492                                                                 blocklen,
493                                                                 count,
494                                                                 old_type,
495                                                                 smpi_datatype_size(old_type));
496     smpi_datatype_create(new_type,
497                          count * (blocklen) * smpi_datatype_size(old_type), lb,
498                          ub,
499                          1,
500                          subtype,
501                          DT_FLAG_VECTOR);
502     retval=MPI_SUCCESS;
503   }else{
504     /* in this situation the data are contignous thus it's not
505      * required to serialize and unserialize it*/
506     smpi_datatype_create(new_type, count * blocklen *
507                          smpi_datatype_size(old_type), 0, ((count -1) * stride + blocklen)*
508                          smpi_datatype_size(old_type),
509                          0,
510                          NULL,
511                          DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
512     retval=MPI_SUCCESS;
513   }
514   return retval;
515 }
516
517 void free_vector(MPI_Datatype* d){
518   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
519 }
520
521 /*
522 Hvector Implementation - Vector with stride in bytes
523 */
524
525
526 /*
527  *  Copies noncontiguous data into contiguous memory.
528  *  @param contiguous_hvector - output hvector
529  *  @param noncontiguous_hvector - input hvector
530  *  @param type - pointer contening :
531  *      - stride - stride of between noncontiguous data, in bytes
532  *      - block_length - the width or height of blocked matrix
533  *      - count - the number of rows of matrix
534  */
535 void serialize_hvector( const void *noncontiguous_hvector,
536                        void *contiguous_hvector,
537                        int count,
538                        void *type)
539 {
540   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
541   int i;
542   char* contiguous_vector_char = (char*)contiguous_hvector;
543   char* noncontiguous_vector_char = (char*)noncontiguous_hvector;
544
545   for (i = 0; i < type_c->block_count * count; i++) {
546     if (type_c->old_type->has_subtype == 0)
547       memcpy(contiguous_vector_char,
548            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
549     else
550       ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
551                                                                    contiguous_vector_char,
552                                                                    type_c->block_length,
553                                                                    type_c->old_type->substruct);
554
555     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
556     if((i+1)%type_c->block_count ==0)
557     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
558     else
559     noncontiguous_vector_char += type_c->block_stride;
560   }
561 }
562 /*
563  *  Copies contiguous data into noncontiguous memory.
564  *  @param noncontiguous_vector - output hvector
565  *  @param contiguous_vector - input hvector
566  *  @param type - pointer contening :
567  *      - stride - stride of between noncontiguous data, in bytes
568  *      - block_length - the width or height of blocked matrix
569  *      - count - the number of rows of matrix
570  */
571 void unserialize_hvector( const void *contiguous_vector,
572                          void *noncontiguous_vector,
573                          int count,
574                          void *type,
575                          MPI_Op op)
576 {
577   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
578   int i;
579
580   char* contiguous_vector_char = (char*)contiguous_vector;
581   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
582
583   for (i = 0; i < type_c->block_count * count; i++) {
584     if (type_c->old_type->has_subtype == 0)
585       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
586                   &type_c->old_type);
587              /*memcpy(noncontiguous_vector_char,
588            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
589     else
590       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
591                                                                      noncontiguous_vector_char,
592                                                                      type_c->block_length,
593                                                                      type_c->old_type->substruct,
594                                                                      op);
595     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
596     if((i+1)%type_c->block_count ==0)
597     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
598     else
599     noncontiguous_vector_char += type_c->block_stride;
600   }
601 }
602
603 /*
604  * Create a Sub type vector to be able to serialize and unserialize it
605  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
606  * required the functions unserialize and serialize
607  *
608  */
609 s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
610                                                   int block_length,
611                                                   int block_count,
612                                                   MPI_Datatype old_type,
613                                                   int size_oldtype){
614   s_smpi_mpi_hvector_t *new_t= xbt_new(s_smpi_mpi_hvector_t,1);
615   new_t->base.serialize = &serialize_hvector;
616   new_t->base.unserialize = &unserialize_hvector;
617   new_t->base.subtype_free = &free_hvector;
618   new_t->block_stride = block_stride;
619   new_t->block_length = block_length;
620   new_t->block_count = block_count;
621   new_t->old_type = old_type;
622   new_t->size_oldtype = size_oldtype;
623   smpi_datatype_use(old_type);
624   return new_t;
625 }
626
627 //do nothing for vector types
628 void free_hvector(MPI_Datatype* d){
629   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
630 }
631
632 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
633 {
634   int retval;
635   if (blocklen<0) return MPI_ERR_ARG;
636   MPI_Aint lb = 0;
637   MPI_Aint ub = 0;
638   if(count>0){
639     lb=smpi_datatype_lb(old_type);
640     ub=((count-1)*stride)+(blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
641   }
642   if(old_type->has_subtype || stride != blocklen*smpi_datatype_get_extent(old_type)){
643     s_smpi_mpi_hvector_t* subtype = smpi_datatype_hvector_create( stride,
644                                                                   blocklen,
645                                                                   count,
646                                                                   old_type,
647                                                                   smpi_datatype_size(old_type));
648
649     smpi_datatype_create(new_type, count * blocklen * smpi_datatype_size(old_type),
650                                                  lb,ub,
651                          1,
652                          subtype,
653                          DT_FLAG_VECTOR);
654     retval=MPI_SUCCESS;
655   }else{
656     smpi_datatype_create(new_type, count * blocklen *
657                                              smpi_datatype_size(old_type),0,count * blocklen *
658                                              smpi_datatype_size(old_type),
659                                             0,
660                                             NULL,
661                                             DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
662     retval=MPI_SUCCESS;
663   }
664   return retval;
665 }
666
667
668 /*
669 Indexed Implementation
670 */
671
672 /*
673  *  Copies noncontiguous data into contiguous memory.
674  *  @param contiguous_indexed - output indexed
675  *  @param noncontiguous_indexed - input indexed
676  *  @param type - pointer contening :
677  *      - block_lengths - the width or height of blocked matrix
678  *      - block_indices - indices of each data, in element
679  *      - count - the number of rows of matrix
680  */
681 void serialize_indexed( const void *noncontiguous_indexed,
682                        void *contiguous_indexed,
683                        int count,
684                        void *type)
685 {
686   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
687   int i,j;
688   char* contiguous_indexed_char = (char*)contiguous_indexed;
689   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0] * type_c->size_oldtype;
690   for(j=0; j<count;j++){
691     for (i = 0; i < type_c->block_count; i++) {
692       if (type_c->old_type->has_subtype == 0)
693         memcpy(contiguous_indexed_char,
694                      noncontiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
695       else
696         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_indexed_char,
697                                                                      contiguous_indexed_char,
698                                                                      type_c->block_lengths[i],
699                                                                      type_c->old_type->substruct);
700
701
702       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
703       if (i<type_c->block_count-1)noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
704       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
705     }
706     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
707   }
708 }
709 /*
710  *  Copies contiguous data into noncontiguous memory.
711  *  @param noncontiguous_indexed - output indexed
712  *  @param contiguous_indexed - input indexed
713  *  @param type - pointer contening :
714  *      - block_lengths - the width or height of blocked matrix
715  *      - block_indices - indices of each data, in element
716  *      - count - the number of rows of matrix
717  */
718 void unserialize_indexed( const void *contiguous_indexed,
719                          void *noncontiguous_indexed,
720                          int count,
721                          void *type,
722                          MPI_Op op)
723 {
724
725   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
726   int i,j;
727   char* contiguous_indexed_char = (char*)contiguous_indexed;
728   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0]*smpi_datatype_get_extent(type_c->old_type);
729   for(j=0; j<count;j++){
730     for (i = 0; i < type_c->block_count; i++) {
731       if (type_c->old_type->has_subtype == 0)
732         smpi_op_apply(op, contiguous_indexed_char, noncontiguous_indexed_char, &type_c->block_lengths[i],
733                     &type_c->old_type);
734                /*memcpy(noncontiguous_indexed_char ,
735              contiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
736       else
737         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_indexed_char,
738                                                                        noncontiguous_indexed_char,
739                                                                        type_c->block_lengths[i],
740                                                                        type_c->old_type->substruct,
741                                                                        op);
742
743       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
744       if (i<type_c->block_count-1)
745         noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
746       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
747     }
748     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
749   }
750 }
751
752 void free_indexed(MPI_Datatype* type){
753   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths);
754   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices);
755   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
756 }
757
758 /*
759  * Create a Sub type indexed to be able to serialize and unserialize it
760  * the structure s_smpi_mpi_indexed_t is derived from s_smpi_subtype which
761  * required the functions unserialize and serialize
762  */
763 s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
764                                                   int* block_indices,
765                                                   int block_count,
766                                                   MPI_Datatype old_type,
767                                                   int size_oldtype){
768   s_smpi_mpi_indexed_t *new_t= xbt_new(s_smpi_mpi_indexed_t,1);
769   new_t->base.serialize = &serialize_indexed;
770   new_t->base.unserialize = &unserialize_indexed;
771   new_t->base.subtype_free = &free_indexed;
772  //TODO : add a custom function for each time to clean these 
773   new_t->block_lengths= xbt_new(int, block_count);
774   new_t->block_indices= xbt_new(int, block_count);
775   int i;
776   for(i=0;i<block_count;i++){
777     new_t->block_lengths[i]=block_lengths[i];
778     new_t->block_indices[i]=block_indices[i];
779   }
780   new_t->block_count = block_count;
781   smpi_datatype_use(old_type);
782   new_t->old_type = old_type;
783   new_t->size_oldtype = size_oldtype;
784   return new_t;
785 }
786
787
788 int smpi_datatype_indexed(int count, int* blocklens, int* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
789 {
790   int i;
791   int retval;
792   int size = 0;
793   int contiguous=1;
794   MPI_Aint lb = 0;
795   MPI_Aint ub = 0;
796   if(count>0){
797     lb=indices[0]*smpi_datatype_get_extent(old_type);
798     ub=indices[0]*smpi_datatype_get_extent(old_type) + blocklens[0]*smpi_datatype_ub(old_type);
799   }
800
801   for(i=0; i< count; i++){
802     if   (blocklens[i]<0)
803       return MPI_ERR_ARG;
804     size += blocklens[i];
805
806     if(indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type)<lb)
807         lb = indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type);
808     if(indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type)>ub)
809         ub = indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type);
810
811     if ( (i< count -1) && (indices[i]+blocklens[i] != indices[i+1]) )contiguous=0;
812   }
813   if (old_type->has_subtype == 1)
814     contiguous=0;
815
816   if(!contiguous){
817     s_smpi_mpi_indexed_t* subtype = smpi_datatype_indexed_create( blocklens,
818                                                                   indices,
819                                                                   count,
820                                                                   old_type,
821                                                                   smpi_datatype_size(old_type));
822      smpi_datatype_create(new_type,  size *
823                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA);
824   }else{
825     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
826                                                                   size,
827                                                                   old_type,
828                                                                   smpi_datatype_size(old_type));
829     smpi_datatype_create(new_type,  size *
830                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
831   }
832   retval=MPI_SUCCESS;
833   return retval;
834 }
835
836
837 /*
838 Hindexed Implementation - Indexed with indices in bytes 
839 */
840
841 /*
842  *  Copies noncontiguous data into contiguous memory.
843  *  @param contiguous_hindexed - output hindexed
844  *  @param noncontiguous_hindexed - input hindexed
845  *  @param type - pointer contening :
846  *      - block_lengths - the width or height of blocked matrix
847  *      - block_indices - indices of each data, in bytes
848  *      - count - the number of rows of matrix
849  */
850 void serialize_hindexed( const void *noncontiguous_hindexed,
851                        void *contiguous_hindexed,
852                        int count,
853                        void *type)
854 {
855   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
856   int i,j;
857   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
858   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
859   for(j=0; j<count;j++){
860     for (i = 0; i < type_c->block_count; i++) {
861       if (type_c->old_type->has_subtype == 0)
862         memcpy(contiguous_hindexed_char,
863                      noncontiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
864       else
865         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_hindexed_char,
866                                                                      contiguous_hindexed_char,
867                                                                      type_c->block_lengths[i],
868                                                                      type_c->old_type->substruct);
869
870       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
871       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
872       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
873     }
874     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
875   }
876 }
877 /*
878  *  Copies contiguous data into noncontiguous memory.
879  *  @param noncontiguous_hindexed - output hindexed
880  *  @param contiguous_hindexed - input hindexed
881  *  @param type - pointer contening :
882  *      - block_lengths - the width or height of blocked matrix
883  *      - block_indices - indices of each data, in bytes
884  *      - count - the number of rows of matrix
885  */
886 void unserialize_hindexed( const void *contiguous_hindexed,
887                          void *noncontiguous_hindexed,
888                          int count,
889                          void *type,
890                          MPI_Op op)
891 {
892   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
893   int i,j;
894
895   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
896   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
897   for(j=0; j<count;j++){
898     for (i = 0; i < type_c->block_count; i++) {
899       if (type_c->old_type->has_subtype == 0)
900         smpi_op_apply(op, contiguous_hindexed_char, noncontiguous_hindexed_char, &type_c->block_lengths[i],
901                             &type_c->old_type);
902                        /*memcpy(noncontiguous_hindexed_char,
903                contiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
904       else
905         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_hindexed_char,
906                                                                        noncontiguous_hindexed_char,
907                                                                        type_c->block_lengths[i],
908                                                                        type_c->old_type->substruct,
909                                                                        op);
910
911       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
912       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
913       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
914     }
915     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
916   }
917 }
918
919 void free_hindexed(MPI_Datatype* type){
920   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths);
921   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices);
922   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
923 }
924
925 /*
926  * Create a Sub type hindexed to be able to serialize and unserialize it
927  * the structure s_smpi_mpi_hindexed_t is derived from s_smpi_subtype which
928  * required the functions unserialize and serialize
929  */
930 s_smpi_mpi_hindexed_t* smpi_datatype_hindexed_create( int* block_lengths,
931                                                   MPI_Aint* block_indices,
932                                                   int block_count,
933                                                   MPI_Datatype old_type,
934                                                   int size_oldtype){
935   s_smpi_mpi_hindexed_t *new_t= xbt_new(s_smpi_mpi_hindexed_t,1);
936   new_t->base.serialize = &serialize_hindexed;
937   new_t->base.unserialize = &unserialize_hindexed;
938   new_t->base.subtype_free = &free_hindexed;
939  //TODO : add a custom function for each time to clean these 
940   new_t->block_lengths= xbt_new(int, block_count);
941   new_t->block_indices= xbt_new(MPI_Aint, block_count);
942   int i;
943   for(i=0;i<block_count;i++){
944     new_t->block_lengths[i]=block_lengths[i];
945     new_t->block_indices[i]=block_indices[i];
946   }
947   new_t->block_count = block_count;
948   new_t->old_type = old_type;
949   new_t->size_oldtype = size_oldtype;
950   return new_t;
951 }
952
953
954 int smpi_datatype_hindexed(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
955 {
956   int i;
957   int retval;
958   int size = 0;
959   int contiguous=1;
960   MPI_Aint lb = 0;
961   MPI_Aint ub = 0;
962   if(count>0){
963     lb=indices[0] + smpi_datatype_lb(old_type);
964     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_type);
965   }
966   for(i=0; i< count; i++){
967     if   (blocklens[i]<0)
968       return MPI_ERR_ARG;
969     size += blocklens[i];
970
971     if(indices[i]+smpi_datatype_lb(old_type)<lb) lb = indices[i]+smpi_datatype_lb(old_type);
972     if(indices[i]+blocklens[i]*smpi_datatype_ub(old_type)>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_type);
973
974     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_type) != indices[i+1]) )contiguous=0;
975   }
976   if (old_type->has_subtype == 1 || lb!=0)
977     contiguous=0;
978
979   if(!contiguous){
980     s_smpi_mpi_hindexed_t* subtype = smpi_datatype_hindexed_create( blocklens,
981                                                                   indices,
982                                                                   count,
983                                                                   old_type,
984                                                                   smpi_datatype_size(old_type));
985     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
986                                                  lb,
987                          ub
988                          ,1, subtype, DT_FLAG_DATA);
989   }else{
990     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
991                                                                   size,
992                                                                   old_type,
993                                                                   smpi_datatype_size(old_type));
994     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
995                                              0,size * smpi_datatype_size(old_type),
996                                              1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
997   }
998   retval=MPI_SUCCESS;
999   return retval;
1000 }
1001
1002
1003 /*
1004 struct Implementation - Indexed with indices in bytes 
1005 */
1006
1007 /*
1008  *  Copies noncontiguous data into contiguous memory.
1009  *  @param contiguous_struct - output struct
1010  *  @param noncontiguous_struct - input struct
1011  *  @param type - pointer contening :
1012  *      - stride - stride of between noncontiguous data
1013  *      - block_length - the width or height of blocked matrix
1014  *      - count - the number of rows of matrix
1015  */
1016 void serialize_struct( const void *noncontiguous_struct,
1017                        void *contiguous_struct,
1018                        int count,
1019                        void *type)
1020 {
1021   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1022   int i,j;
1023   char* contiguous_struct_char = (char*)contiguous_struct;
1024   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1025   for(j=0; j<count;j++){
1026     for (i = 0; i < type_c->block_count; i++) {
1027       if (type_c->old_types[i]->has_subtype == 0)
1028         memcpy(contiguous_struct_char,
1029              noncontiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
1030       else
1031         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->serialize( noncontiguous_struct_char,
1032                                                                          contiguous_struct_char,
1033                                                                          type_c->block_lengths[i],
1034                                                                          type_c->old_types[i]->substruct);
1035
1036
1037       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1038       if (i<type_c->block_count-1)noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
1039       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);//let's hope this is MPI_UB ?
1040     }
1041     noncontiguous_struct=(void*)noncontiguous_struct_char;
1042   }
1043 }
1044 /*
1045  *  Copies contiguous data into noncontiguous memory.
1046  *  @param noncontiguous_struct - output struct
1047  *  @param contiguous_struct - input struct
1048  *  @param type - pointer contening :
1049  *      - stride - stride of between noncontiguous data
1050  *      - block_length - the width or height of blocked matrix
1051  *      - count - the number of rows of matrix
1052  */
1053 void unserialize_struct( const void *contiguous_struct,
1054                          void *noncontiguous_struct,
1055                          int count,
1056                          void *type,
1057                          MPI_Op op)
1058 {
1059   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1060   int i,j;
1061
1062   char* contiguous_struct_char = (char*)contiguous_struct;
1063   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1064   for(j=0; j<count;j++){
1065     for (i = 0; i < type_c->block_count; i++) {
1066       if (type_c->old_types[i]->has_subtype == 0)
1067         smpi_op_apply(op, contiguous_struct_char, noncontiguous_struct_char, &type_c->block_lengths[i],
1068            & type_c->old_types[i]);
1069                        /*memcpy(noncontiguous_struct_char,
1070              contiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));*/
1071       else
1072         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->unserialize( contiguous_struct_char,
1073                                                                            noncontiguous_struct_char,
1074                                                                            type_c->block_lengths[i],
1075                                                                            type_c->old_types[i]->substruct,
1076                                                                            op);
1077
1078       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1079       if (i<type_c->block_count-1)noncontiguous_struct_char =  (char*)noncontiguous_struct + type_c->block_indices[i+1];
1080       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);
1081     }
1082     noncontiguous_struct=(void*)noncontiguous_struct_char;
1083     
1084   }
1085 }
1086
1087 void free_struct(MPI_Datatype* type){
1088   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths);
1089   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices);
1090   int i=0;
1091   for (i = 0; i < ((s_smpi_mpi_struct_t *)(*type)->substruct)->block_count; i++)
1092     smpi_datatype_unuse(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types[i]);
1093   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types);
1094 }
1095
1096 /*
1097  * Create a Sub type struct to be able to serialize and unserialize it
1098  * the structure s_smpi_mpi_struct_t is derived from s_smpi_subtype which
1099  * required the functions unserialize and serialize
1100  */
1101 s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
1102                                                   MPI_Aint* block_indices,
1103                                                   int block_count,
1104                                                   MPI_Datatype* old_types){
1105   s_smpi_mpi_struct_t *new_t= xbt_new(s_smpi_mpi_struct_t,1);
1106   new_t->base.serialize = &serialize_struct;
1107   new_t->base.unserialize = &unserialize_struct;
1108   new_t->base.subtype_free = &free_struct;
1109  //TODO : add a custom function for each time to clean these 
1110   new_t->block_lengths= xbt_new(int, block_count);
1111   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1112   new_t->old_types=  xbt_new(MPI_Datatype, block_count);
1113   int i;
1114   for(i=0;i<block_count;i++){
1115     new_t->block_lengths[i]=block_lengths[i];
1116     new_t->block_indices[i]=block_indices[i];
1117     new_t->old_types[i]=old_types[i];
1118     smpi_datatype_use(new_t->old_types[i]);
1119   }
1120   //new_t->block_lengths = block_lengths;
1121   //new_t->block_indices = block_indices;
1122   new_t->block_count = block_count;
1123   //new_t->old_types = old_types;
1124   return new_t;
1125 }
1126
1127
1128 int smpi_datatype_struct(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype* old_types, MPI_Datatype* new_type)
1129 {
1130   int i;
1131   size_t size = 0;
1132   int contiguous=1;
1133   size = 0;
1134   MPI_Aint lb = 0;
1135   MPI_Aint ub = 0;
1136   if(count>0){
1137     lb=indices[0] + smpi_datatype_lb(old_types[0]);
1138     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_types[0]);
1139   }
1140   int forced_lb=0;
1141   int forced_ub=0;
1142   for(i=0; i< count; i++){
1143     if (blocklens[i]<0)
1144       return MPI_ERR_ARG;
1145     if (old_types[i]->has_subtype == 1)
1146       contiguous=0;
1147
1148     size += blocklens[i]*smpi_datatype_size(old_types[i]);
1149     if (old_types[i]==MPI_LB){
1150       lb=indices[i];
1151       forced_lb=1;
1152     }
1153     if (old_types[i]==MPI_UB){
1154       ub=indices[i];
1155       forced_ub=1;
1156     }
1157
1158     if(!forced_lb && indices[i]+smpi_datatype_lb(old_types[i])<lb) lb = indices[i];
1159     if(!forced_ub && indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i])>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i]);
1160
1161     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_types[i]) != indices[i+1]) )contiguous=0;
1162   }
1163
1164   if(!contiguous){
1165     s_smpi_mpi_struct_t* subtype = smpi_datatype_struct_create( blocklens,
1166                                                               indices,
1167                                                               count,
1168                                                               old_types);
1169
1170     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA);
1171   }else{
1172     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1173                                                                   size,
1174                                                                   MPI_CHAR,
1175                                                                   1);
1176     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1177   }
1178   return MPI_SUCCESS;
1179 }
1180
1181 void smpi_datatype_commit(MPI_Datatype *datatype)
1182 {
1183   (*datatype)->flags=  ((*datatype)->flags | DT_FLAG_COMMITED);
1184 }
1185
1186 typedef struct s_smpi_mpi_op {
1187   MPI_User_function *func;
1188   int is_commute;
1189 } s_smpi_mpi_op_t;
1190
1191 #define MAX_OP(a, b)  (b) = (a) < (b) ? (b) : (a)
1192 #define MIN_OP(a, b)  (b) = (a) < (b) ? (a) : (b)
1193 #define SUM_OP(a, b)  (b) += (a)
1194 #define PROD_OP(a, b) (b) *= (a)
1195 #define LAND_OP(a, b) (b) = (a) && (b)
1196 #define LOR_OP(a, b)  (b) = (a) || (b)
1197 #define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b))
1198 #define BAND_OP(a, b) (b) &= (a)
1199 #define BOR_OP(a, b)  (b) |= (a)
1200 #define BXOR_OP(a, b) (b) ^= (a)
1201 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
1202 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
1203
1204 #define APPLY_FUNC(a, b, length, type, func) \
1205 {                                          \
1206   int i;                                   \
1207   type* x = (type*)(a);                    \
1208   type* y = (type*)(b);                    \
1209   for(i = 0; i < *(length); i++) {         \
1210     func(x[i], y[i]);                      \
1211   }                                        \
1212 }
1213
1214 static void max_func(void *a, void *b, int *length,
1215                      MPI_Datatype * datatype)
1216 {
1217   if (*datatype == MPI_CHAR) {
1218     APPLY_FUNC(a, b, length, char, MAX_OP);
1219   } else if (*datatype == MPI_SHORT) {
1220     APPLY_FUNC(a, b, length, short, MAX_OP);
1221   } else if (*datatype == MPI_INT) {
1222     APPLY_FUNC(a, b, length, int, MAX_OP);
1223   } else if (*datatype == MPI_LONG) {
1224     APPLY_FUNC(a, b, length, long, MAX_OP);
1225   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1226     APPLY_FUNC(a, b, length, unsigned short, MAX_OP);
1227   } else if (*datatype == MPI_UNSIGNED) {
1228     APPLY_FUNC(a, b, length, unsigned int, MAX_OP);
1229   } else if (*datatype == MPI_UNSIGNED_LONG) {
1230     APPLY_FUNC(a, b, length, unsigned long, MAX_OP);
1231   } else if (*datatype == MPI_FLOAT) {
1232     APPLY_FUNC(a, b, length, float, MAX_OP);
1233   } else if (*datatype == MPI_DOUBLE) {
1234     APPLY_FUNC(a, b, length, double, MAX_OP);
1235   } else if (*datatype == MPI_LONG_DOUBLE) {
1236     APPLY_FUNC(a, b, length, long double, MAX_OP);
1237   }
1238 }
1239
1240 static void min_func(void *a, void *b, int *length,
1241                      MPI_Datatype * datatype)
1242 {
1243   if (*datatype == MPI_CHAR) {
1244     APPLY_FUNC(a, b, length, char, MIN_OP);
1245   } else if (*datatype == MPI_SHORT) {
1246     APPLY_FUNC(a, b, length, short, MIN_OP);
1247   } else if (*datatype == MPI_INT) {
1248     APPLY_FUNC(a, b, length, int, MIN_OP);
1249   } else if (*datatype == MPI_LONG) {
1250     APPLY_FUNC(a, b, length, long, MIN_OP);
1251   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1252     APPLY_FUNC(a, b, length, unsigned short, MIN_OP);
1253   } else if (*datatype == MPI_UNSIGNED) {
1254     APPLY_FUNC(a, b, length, unsigned int, MIN_OP);
1255   } else if (*datatype == MPI_UNSIGNED_LONG) {
1256     APPLY_FUNC(a, b, length, unsigned long, MIN_OP);
1257   } else if (*datatype == MPI_FLOAT) {
1258     APPLY_FUNC(a, b, length, float, MIN_OP);
1259   } else if (*datatype == MPI_DOUBLE) {
1260     APPLY_FUNC(a, b, length, double, MIN_OP);
1261   } else if (*datatype == MPI_LONG_DOUBLE) {
1262     APPLY_FUNC(a, b, length, long double, MIN_OP);
1263   }
1264 }
1265
1266 static void sum_func(void *a, void *b, int *length,
1267                      MPI_Datatype * datatype)
1268 {
1269   if (*datatype == MPI_CHAR) {
1270     APPLY_FUNC(a, b, length, char, SUM_OP);
1271   } else if (*datatype == MPI_SHORT) {
1272     APPLY_FUNC(a, b, length, short, SUM_OP);
1273   } else if (*datatype == MPI_INT) {
1274     APPLY_FUNC(a, b, length, int, SUM_OP);
1275   } else if (*datatype == MPI_LONG) {
1276     APPLY_FUNC(a, b, length, long, SUM_OP);
1277   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1278     APPLY_FUNC(a, b, length, unsigned short, SUM_OP);
1279   } else if (*datatype == MPI_UNSIGNED) {
1280     APPLY_FUNC(a, b, length, unsigned int, SUM_OP);
1281   } else if (*datatype == MPI_UNSIGNED_LONG) {
1282     APPLY_FUNC(a, b, length, unsigned long, SUM_OP);
1283   } else if (*datatype == MPI_FLOAT) {
1284     APPLY_FUNC(a, b, length, float, SUM_OP);
1285   } else if (*datatype == MPI_DOUBLE) {
1286     APPLY_FUNC(a, b, length, double, SUM_OP);
1287   } else if (*datatype == MPI_LONG_DOUBLE) {
1288     APPLY_FUNC(a, b, length, long double, SUM_OP);
1289   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1290     APPLY_FUNC(a, b, length, float _Complex, SUM_OP);
1291   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1292     APPLY_FUNC(a, b, length, double _Complex, SUM_OP);
1293   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1294     APPLY_FUNC(a, b, length, long double _Complex, SUM_OP);
1295   }
1296 }
1297
1298 static void prod_func(void *a, void *b, int *length,
1299                       MPI_Datatype * datatype)
1300 {
1301   if (*datatype == MPI_CHAR) {
1302     APPLY_FUNC(a, b, length, char, PROD_OP);
1303   } else if (*datatype == MPI_SHORT) {
1304     APPLY_FUNC(a, b, length, short, PROD_OP);
1305   } else if (*datatype == MPI_INT) {
1306     APPLY_FUNC(a, b, length, int, PROD_OP);
1307   } else if (*datatype == MPI_LONG) {
1308     APPLY_FUNC(a, b, length, long, PROD_OP);
1309   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1310     APPLY_FUNC(a, b, length, unsigned short, PROD_OP);
1311   } else if (*datatype == MPI_UNSIGNED) {
1312     APPLY_FUNC(a, b, length, unsigned int, PROD_OP);
1313   } else if (*datatype == MPI_UNSIGNED_LONG) {
1314     APPLY_FUNC(a, b, length, unsigned long, PROD_OP);
1315   } else if (*datatype == MPI_FLOAT) {
1316     APPLY_FUNC(a, b, length, float, PROD_OP);
1317   } else if (*datatype == MPI_DOUBLE) {
1318     APPLY_FUNC(a, b, length, double, PROD_OP);
1319   } else if (*datatype == MPI_LONG_DOUBLE) {
1320     APPLY_FUNC(a, b, length, long double, PROD_OP);
1321   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1322     APPLY_FUNC(a, b, length, float _Complex, PROD_OP);
1323   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1324     APPLY_FUNC(a, b, length, double _Complex, PROD_OP);
1325   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1326     APPLY_FUNC(a, b, length, long double _Complex, PROD_OP);
1327   }
1328 }
1329
1330 static void land_func(void *a, void *b, int *length,
1331                       MPI_Datatype * datatype)
1332 {
1333   if (*datatype == MPI_CHAR) {
1334     APPLY_FUNC(a, b, length, char, LAND_OP);
1335   } else if (*datatype == MPI_SHORT) {
1336     APPLY_FUNC(a, b, length, short, LAND_OP);
1337   } else if (*datatype == MPI_INT) {
1338     APPLY_FUNC(a, b, length, int, LAND_OP);
1339   } else if (*datatype == MPI_LONG) {
1340     APPLY_FUNC(a, b, length, long, LAND_OP);
1341   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1342     APPLY_FUNC(a, b, length, unsigned short, LAND_OP);
1343   } else if (*datatype == MPI_UNSIGNED) {
1344     APPLY_FUNC(a, b, length, unsigned int, LAND_OP);
1345   } else if (*datatype == MPI_UNSIGNED_LONG) {
1346     APPLY_FUNC(a, b, length, unsigned long, LAND_OP);
1347   } else if (*datatype == MPI_C_BOOL) {
1348     APPLY_FUNC(a, b, length, _Bool, LAND_OP);
1349   }
1350 }
1351
1352 static void lor_func(void *a, void *b, int *length,
1353                      MPI_Datatype * datatype)
1354 {
1355   if (*datatype == MPI_CHAR) {
1356     APPLY_FUNC(a, b, length, char, LOR_OP);
1357   } else if (*datatype == MPI_SHORT) {
1358     APPLY_FUNC(a, b, length, short, LOR_OP);
1359   } else if (*datatype == MPI_INT) {
1360     APPLY_FUNC(a, b, length, int, LOR_OP);
1361   } else if (*datatype == MPI_LONG) {
1362     APPLY_FUNC(a, b, length, long, LOR_OP);
1363   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1364     APPLY_FUNC(a, b, length, unsigned short, LOR_OP);
1365   } else if (*datatype == MPI_UNSIGNED) {
1366     APPLY_FUNC(a, b, length, unsigned int, LOR_OP);
1367   } else if (*datatype == MPI_UNSIGNED_LONG) {
1368     APPLY_FUNC(a, b, length, unsigned long, LOR_OP);
1369   } else if (*datatype == MPI_C_BOOL) {
1370     APPLY_FUNC(a, b, length, _Bool, LOR_OP);
1371   }
1372 }
1373
1374 static void lxor_func(void *a, void *b, int *length,
1375                       MPI_Datatype * datatype)
1376 {
1377   if (*datatype == MPI_CHAR) {
1378     APPLY_FUNC(a, b, length, char, LXOR_OP);
1379   } else if (*datatype == MPI_SHORT) {
1380     APPLY_FUNC(a, b, length, short, LXOR_OP);
1381   } else if (*datatype == MPI_INT) {
1382     APPLY_FUNC(a, b, length, int, LXOR_OP);
1383   } else if (*datatype == MPI_LONG) {
1384     APPLY_FUNC(a, b, length, long, LXOR_OP);
1385   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1386     APPLY_FUNC(a, b, length, unsigned short, LXOR_OP);
1387   } else if (*datatype == MPI_UNSIGNED) {
1388     APPLY_FUNC(a, b, length, unsigned int, LXOR_OP);
1389   } else if (*datatype == MPI_UNSIGNED_LONG) {
1390     APPLY_FUNC(a, b, length, unsigned long, LXOR_OP);
1391   } else if (*datatype == MPI_C_BOOL) {
1392     APPLY_FUNC(a, b, length, _Bool, LXOR_OP);
1393   }
1394 }
1395
1396 static void band_func(void *a, void *b, int *length,
1397                       MPI_Datatype * datatype)
1398 {
1399   if (*datatype == MPI_CHAR) {
1400     APPLY_FUNC(a, b, length, char, BAND_OP);
1401   }
1402   if (*datatype == MPI_SHORT) {
1403     APPLY_FUNC(a, b, length, short, BAND_OP);
1404   } else if (*datatype == MPI_INT) {
1405     APPLY_FUNC(a, b, length, int, BAND_OP);
1406   } else if (*datatype == MPI_LONG) {
1407     APPLY_FUNC(a, b, length, long, BAND_OP);
1408   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1409     APPLY_FUNC(a, b, length, unsigned short, BAND_OP);
1410   } else if (*datatype == MPI_UNSIGNED) {
1411     APPLY_FUNC(a, b, length, unsigned int, BAND_OP);
1412   } else if (*datatype == MPI_UNSIGNED_LONG) {
1413     APPLY_FUNC(a, b, length, unsigned long, BAND_OP);
1414   } else if (*datatype == MPI_BYTE) {
1415     APPLY_FUNC(a, b, length, uint8_t, BAND_OP);
1416   }
1417 }
1418
1419 static void bor_func(void *a, void *b, int *length,
1420                      MPI_Datatype * datatype)
1421 {
1422   if (*datatype == MPI_CHAR) {
1423     APPLY_FUNC(a, b, length, char, BOR_OP);
1424   } else if (*datatype == MPI_SHORT) {
1425     APPLY_FUNC(a, b, length, short, BOR_OP);
1426   } else if (*datatype == MPI_INT) {
1427     APPLY_FUNC(a, b, length, int, BOR_OP);
1428   } else if (*datatype == MPI_LONG) {
1429     APPLY_FUNC(a, b, length, long, BOR_OP);
1430   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1431     APPLY_FUNC(a, b, length, unsigned short, BOR_OP);
1432   } else if (*datatype == MPI_UNSIGNED) {
1433     APPLY_FUNC(a, b, length, unsigned int, BOR_OP);
1434   } else if (*datatype == MPI_UNSIGNED_LONG) {
1435     APPLY_FUNC(a, b, length, unsigned long, BOR_OP);
1436   } else if (*datatype == MPI_BYTE) {
1437     APPLY_FUNC(a, b, length, uint8_t, BOR_OP);
1438   }
1439 }
1440
1441 static void bxor_func(void *a, void *b, int *length,
1442                       MPI_Datatype * datatype)
1443 {
1444   if (*datatype == MPI_CHAR) {
1445     APPLY_FUNC(a, b, length, char, BXOR_OP);
1446   } else if (*datatype == MPI_SHORT) {
1447     APPLY_FUNC(a, b, length, short, BXOR_OP);
1448   } else if (*datatype == MPI_INT) {
1449     APPLY_FUNC(a, b, length, int, BXOR_OP);
1450   } else if (*datatype == MPI_LONG) {
1451     APPLY_FUNC(a, b, length, long, BXOR_OP);
1452   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1453     APPLY_FUNC(a, b, length, unsigned short, BXOR_OP);
1454   } else if (*datatype == MPI_UNSIGNED) {
1455     APPLY_FUNC(a, b, length, unsigned int, BXOR_OP);
1456   } else if (*datatype == MPI_UNSIGNED_LONG) {
1457     APPLY_FUNC(a, b, length, unsigned long, BXOR_OP);
1458   } else if (*datatype == MPI_BYTE) {
1459     APPLY_FUNC(a, b, length, uint8_t, BXOR_OP);
1460   }
1461 }
1462
1463 static void minloc_func(void *a, void *b, int *length,
1464                         MPI_Datatype * datatype)
1465 {
1466   if (*datatype == MPI_FLOAT_INT) {
1467     APPLY_FUNC(a, b, length, float_int, MINLOC_OP);
1468   } else if (*datatype == MPI_LONG_INT) {
1469     APPLY_FUNC(a, b, length, long_int, MINLOC_OP);
1470   } else if (*datatype == MPI_DOUBLE_INT) {
1471     APPLY_FUNC(a, b, length, double_int, MINLOC_OP);
1472   } else if (*datatype == MPI_SHORT_INT) {
1473     APPLY_FUNC(a, b, length, short_int, MINLOC_OP);
1474   } else if (*datatype == MPI_2LONG) {
1475     APPLY_FUNC(a, b, length, long_long, MINLOC_OP);
1476   } else if (*datatype == MPI_2INT) {
1477     APPLY_FUNC(a, b, length, int_int, MINLOC_OP);
1478   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1479     APPLY_FUNC(a, b, length, long_double_int, MINLOC_OP);
1480   } else if (*datatype == MPI_2FLOAT) {
1481     APPLY_FUNC(a, b, length, float_float, MINLOC_OP);
1482   } else if (*datatype == MPI_2DOUBLE) {
1483     APPLY_FUNC(a, b, length, double_double, MINLOC_OP);
1484   }
1485 }
1486
1487 static void maxloc_func(void *a, void *b, int *length,
1488                         MPI_Datatype * datatype)
1489 {
1490   if (*datatype == MPI_FLOAT_INT) {
1491     APPLY_FUNC(a, b, length, float_int, MAXLOC_OP);
1492   } else if (*datatype == MPI_LONG_INT) {
1493     APPLY_FUNC(a, b, length, long_int, MAXLOC_OP);
1494   } else if (*datatype == MPI_DOUBLE_INT) {
1495     APPLY_FUNC(a, b, length, double_int, MAXLOC_OP);
1496   } else if (*datatype == MPI_SHORT_INT) {
1497     APPLY_FUNC(a, b, length, short_int, MAXLOC_OP);
1498   } else if (*datatype == MPI_2LONG) {
1499     APPLY_FUNC(a, b, length, long_long, MAXLOC_OP);
1500   } else if (*datatype == MPI_2INT) {
1501     APPLY_FUNC(a, b, length, int_int, MAXLOC_OP);
1502   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1503     APPLY_FUNC(a, b, length, long_double_int, MAXLOC_OP);
1504   } else if (*datatype == MPI_2FLOAT) {
1505     APPLY_FUNC(a, b, length, float_float, MAXLOC_OP);
1506   } else if (*datatype == MPI_2DOUBLE) {
1507     APPLY_FUNC(a, b, length, double_double, MAXLOC_OP);
1508   }
1509 }
1510
1511 static void replace_func(void *a, void *b, int *length,
1512                         MPI_Datatype * datatype)
1513 {
1514   memcpy(b, a, *length * smpi_datatype_size(*datatype));
1515 }
1516
1517 #define CREATE_MPI_OP(name, func)                             \
1518   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */, TRUE }; \
1519 MPI_Op name = &mpi_##name;
1520
1521 CREATE_MPI_OP(MPI_MAX, max_func);
1522 CREATE_MPI_OP(MPI_MIN, min_func);
1523 CREATE_MPI_OP(MPI_SUM, sum_func);
1524 CREATE_MPI_OP(MPI_PROD, prod_func);
1525 CREATE_MPI_OP(MPI_LAND, land_func);
1526 CREATE_MPI_OP(MPI_LOR, lor_func);
1527 CREATE_MPI_OP(MPI_LXOR, lxor_func);
1528 CREATE_MPI_OP(MPI_BAND, band_func);
1529 CREATE_MPI_OP(MPI_BOR, bor_func);
1530 CREATE_MPI_OP(MPI_BXOR, bxor_func);
1531 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
1532 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
1533 CREATE_MPI_OP(MPI_REPLACE, replace_func);
1534
1535
1536 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
1537 {
1538   MPI_Op op;
1539   op = xbt_new(s_smpi_mpi_op_t, 1);
1540   op->func = function;
1541   op-> is_commute = commute;
1542   return op;
1543 }
1544
1545 int smpi_op_is_commute(MPI_Op op)
1546 {
1547   return (op==MPI_OP_NULL) ? 1 : op-> is_commute;
1548 }
1549
1550 void smpi_op_destroy(MPI_Op op)
1551 {
1552   xbt_free(op);
1553 }
1554
1555 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
1556                    MPI_Datatype * datatype)
1557 {
1558   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
1559     XBT_VERB("Applying operation, switch to the right data frame ");
1560     switch_data_segment(smpi_process_index());
1561   }
1562
1563   if(!_xbt_replay_is_active())
1564   op->func(invec, inoutvec, len, datatype);
1565 }