Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[smpi] Allow zero-sized custom datatypes with MPI_Type_contiguous()
[simgrid.git] / src / smpi / smpi_mpi_dt.c
1 /* smpi_mpi_dt.c -- MPI primitives to handle datatypes                        */
2 /* FIXME: a very incomplete implementation                                    */
3
4 /* Copyright (c) 2009-2014. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include "private.h"
15 #include "smpi_mpi_dt_private.h"
16 #include "mc/mc.h"
17 #include "xbt/replay.h"
18 #include "simgrid/modelchecker.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi,
21                                 "Logging specific to SMPI (datatype)");
22
23 #define CREATE_MPI_DATATYPE(name, type)       \
24   static s_smpi_mpi_datatype_t mpi_##name = { \
25     (char*) # name,                                   \
26     sizeof(type),  /* size */                 \
27     0,             /*was 1 has_subtype*/             \
28     0,             /* lb */                   \
29     sizeof(type),  /* ub = lb + size */       \
30     DT_FLAG_BASIC,  /* flags */              \
31     NULL           /* pointer on extended struct*/ \
32   };                                          \
33 MPI_Datatype name = &mpi_##name;
34
35 #define CREATE_MPI_DATATYPE_NULL(name)       \
36   static s_smpi_mpi_datatype_t mpi_##name = { \
37     (char*) # name,                                   \
38     0,  /* size */                 \
39     0,             /*was 1 has_subtype*/             \
40     0,             /* lb */                   \
41     0,  /* ub = lb + size */       \
42     DT_FLAG_BASIC,  /* flags */              \
43     NULL           /* pointer on extended struct*/ \
44   };                                          \
45 MPI_Datatype name = &mpi_##name;
46
47 //The following are datatypes for the MPI functions MPI_MAXLOC and MPI_MINLOC.
48 typedef struct {
49   float value;
50   int index;
51 } float_int;
52 typedef struct {
53   float value;
54   float index;
55 } float_float;
56 typedef struct {
57   long value;
58   long index;
59 } long_long;
60 typedef struct {
61   double value;
62   double index;
63 } double_double;
64 typedef struct {
65   long value;
66   int index;
67 } long_int;
68 typedef struct {
69   double value;
70   int index;
71 } double_int;
72 typedef struct {
73   short value;
74   int index;
75 } short_int;
76 typedef struct {
77   int value;
78   int index;
79 } int_int;
80 typedef struct {
81   long double value;
82   int index;
83 } long_double_int;
84 typedef struct {
85   int64_t value;
86   int64_t index;
87 } integer128_t;
88 // Predefined data types
89 CREATE_MPI_DATATYPE(MPI_CHAR, char);
90 CREATE_MPI_DATATYPE(MPI_SHORT, short);
91 CREATE_MPI_DATATYPE(MPI_INT, int);
92 CREATE_MPI_DATATYPE(MPI_LONG, long);
93 CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long);
94 CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char);
95 CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char);
96 CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short);
97 CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int);
98 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long);
99 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long);
100 CREATE_MPI_DATATYPE(MPI_FLOAT, float);
101 CREATE_MPI_DATATYPE(MPI_DOUBLE, double);
102 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double);
103 CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t);
104 CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool);
105 CREATE_MPI_DATATYPE(MPI_BYTE, int8_t);
106 CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t);
107 CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t);
108 CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t);
109 CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t);
110 CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t);
111 CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t);
112 CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t);
113 CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t);
114 CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex);
115 CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex);
116 CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex);
117 CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint);
118 CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset);
119
120 CREATE_MPI_DATATYPE(MPI_FLOAT_INT, float_int);
121 CREATE_MPI_DATATYPE(MPI_LONG_INT, long_int);
122 CREATE_MPI_DATATYPE(MPI_DOUBLE_INT, double_int);
123 CREATE_MPI_DATATYPE(MPI_SHORT_INT, short_int);
124 CREATE_MPI_DATATYPE(MPI_2INT, int_int);
125 CREATE_MPI_DATATYPE(MPI_2FLOAT, float_float);
126 CREATE_MPI_DATATYPE(MPI_2DOUBLE, double_double);
127 CREATE_MPI_DATATYPE(MPI_2LONG, long_long);
128
129 CREATE_MPI_DATATYPE(MPI_REAL4, float);
130 CREATE_MPI_DATATYPE(MPI_REAL8, float);
131 CREATE_MPI_DATATYPE(MPI_REAL16, double);
132 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX8);
133 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX16);
134 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX32);
135 CREATE_MPI_DATATYPE(MPI_INTEGER1, int);
136 CREATE_MPI_DATATYPE(MPI_INTEGER2, int16_t);
137 CREATE_MPI_DATATYPE(MPI_INTEGER4, int32_t);
138 CREATE_MPI_DATATYPE(MPI_INTEGER8, int64_t);
139 CREATE_MPI_DATATYPE(MPI_INTEGER16, integer128_t);
140
141 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE_INT, long_double_int);
142
143 CREATE_MPI_DATATYPE_NULL(MPI_UB);
144 CREATE_MPI_DATATYPE_NULL(MPI_LB);
145 CREATE_MPI_DATATYPE_NULL(MPI_PACKED);
146 // Internal use only
147 CREATE_MPI_DATATYPE(MPI_PTR, void*);
148
149 /** Check if the datatype is usable for communications
150  */
151 int is_datatype_valid(MPI_Datatype datatype) {
152     return datatype != MPI_DATATYPE_NULL
153         && (datatype->flags & DT_FLAG_COMMITED)
154         && (smpi_datatype_size(datatype)>=0);
155 }
156
157 size_t smpi_datatype_size(MPI_Datatype datatype)
158 {
159   return datatype->size;
160 }
161
162 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype)
163 {
164   return datatype->lb;
165 }
166
167 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
168 {
169   return datatype->ub;
170 }
171
172 MPI_Datatype smpi_datatype_dup(MPI_Datatype datatype)
173 {
174   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
175   memcpy(new_t, datatype, sizeof(s_smpi_mpi_datatype_t));
176   if (datatype->has_subtype)
177     memcpy(new_t->substruct, datatype->substruct, sizeof(s_smpi_subtype_t));
178   if(datatype->name)
179     new_t->name = strdup(datatype->name);
180   return new_t;
181 }
182
183 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
184                          MPI_Aint * extent)
185 {
186   if(datatype == MPI_DATATYPE_NULL){
187     *lb=0;
188     *extent=0;
189     return MPI_SUCCESS;
190   }
191   *lb = datatype->lb;
192   *extent = datatype->ub - datatype->lb;
193   return MPI_SUCCESS;
194 }
195
196 MPI_Aint smpi_datatype_get_extent(MPI_Datatype datatype){
197   if(datatype == MPI_DATATYPE_NULL){
198     return 0;
199   }
200   return datatype->ub - datatype->lb;
201 }
202
203 void smpi_datatype_get_name(MPI_Datatype datatype, char* name, int* length){
204   *length = strlen(datatype->name);
205   strcpy(name, datatype->name);
206 }
207
208 void smpi_datatype_set_name(MPI_Datatype datatype, char* name){
209   datatype->name = strdup(name);;
210 }
211
212 int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
213                        void *recvbuf, int recvcount, MPI_Datatype recvtype)
214 {
215   int count;
216   if(smpi_privatize_global_variables){
217     switch_data_segment(smpi_process_index());
218   }
219   /* First check if we really have something to do */
220   if (recvcount > 0 && recvbuf != sendbuf) {
221     /* FIXME: treat packed cases */
222     sendcount *= smpi_datatype_size(sendtype);
223     recvcount *= smpi_datatype_size(recvtype);
224     count = sendcount < recvcount ? sendcount : recvcount;
225
226     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
227       if(!_xbt_replay_is_active()) memcpy(recvbuf, sendbuf, count);
228     }
229     else if (sendtype->has_subtype == 0)
230     {
231       s_smpi_subtype_t *subtype =  recvtype->substruct;
232       subtype->unserialize( sendbuf, recvbuf,1, subtype, MPI_REPLACE);
233     }
234     else if (recvtype->has_subtype == 0)
235     {
236       s_smpi_subtype_t *subtype =  sendtype->substruct;
237       subtype->serialize(sendbuf, recvbuf,1, subtype);
238     }else{
239       s_smpi_subtype_t *subtype =  sendtype->substruct;
240
241
242       void * buf_tmp = xbt_malloc(count);
243
244       subtype->serialize( sendbuf, buf_tmp,count/smpi_datatype_size(sendtype), subtype);
245       subtype =  recvtype->substruct;
246       subtype->unserialize( buf_tmp, recvbuf,count/smpi_datatype_size(recvtype), subtype, MPI_REPLACE);
247
248       free(buf_tmp);
249     }
250   }
251
252   return sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
253 }
254
255 /*
256  *  Copies noncontiguous data into contiguous memory.
257  *  @param contiguous_vector - output vector
258  *  @param noncontiguous_vector - input vector
259  *  @param type - pointer contening :
260  *      - stride - stride of between noncontiguous data
261  *      - block_length - the width or height of blocked matrix
262  *      - count - the number of rows of matrix
263  */
264 void serialize_vector( const void *noncontiguous_vector,
265                        void *contiguous_vector,
266                        int count,
267                        void *type)
268 {
269   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
270   int i;
271   char* contiguous_vector_char = (char*)contiguous_vector;
272   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
273
274   for (i = 0; i < type_c->block_count * count; i++) {
275       if (type_c->old_type->has_subtype == 0)
276         memcpy(contiguous_vector_char,
277                noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
278       else
279         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
280                                                                      contiguous_vector_char,
281                                                                      type_c->block_length,
282                                                                      type_c->old_type->substruct);
283
284     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
285     if((i+1)%type_c->block_count ==0)
286     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
287     else
288     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
289   }
290 }
291
292 /*
293  *  Copies contiguous data into noncontiguous memory.
294  *  @param noncontiguous_vector - output vector
295  *  @param contiguous_vector - input vector
296  *  @param type - pointer contening :
297  *      - stride - stride of between noncontiguous data
298  *      - block_length - the width or height of blocked matrix
299  *      - count - the number of rows of matrix
300  */
301 void unserialize_vector( const void *contiguous_vector,
302                          void *noncontiguous_vector,
303                          int count,
304                          void *type,
305                          MPI_Op op)
306 {
307   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
308   int i;
309
310   char* contiguous_vector_char = (char*)contiguous_vector;
311   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
312
313   for (i = 0; i < type_c->block_count * count; i++) {
314     if (type_c->old_type->has_subtype == 0)
315       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
316           &type_c->old_type);
317      /* memcpy(noncontiguous_vector_char,
318              contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
319     else
320       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
321                                                                      noncontiguous_vector_char,
322                                                                      type_c->block_length,
323                                                                      type_c->old_type->substruct,
324                                                                      op);
325     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
326     if((i+1)%type_c->block_count ==0)
327     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
328     else
329     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
330   }
331 }
332
333 /*
334  * Create a Sub type vector to be able to serialize and unserialize it
335  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
336  * required the functions unserialize and serialize
337  *
338  */
339 s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
340                                                   int block_length,
341                                                   int block_count,
342                                                   MPI_Datatype old_type,
343                                                   int size_oldtype){
344   s_smpi_mpi_vector_t *new_t= xbt_new(s_smpi_mpi_vector_t,1);
345   new_t->base.serialize = &serialize_vector;
346   new_t->base.unserialize = &unserialize_vector;
347   new_t->base.subtype_free = &free_vector;
348   new_t->block_stride = block_stride;
349   new_t->block_length = block_length;
350   new_t->block_count = block_count;
351   smpi_datatype_use(old_type);
352   new_t->old_type = old_type;
353   new_t->size_oldtype = size_oldtype;
354   return new_t;
355 }
356
357 void smpi_datatype_create(MPI_Datatype* new_type, int size,int lb, int ub, int has_subtype,
358                           void *struct_type, int flags){
359   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
360   new_t->name = NULL;
361   new_t->size = size;
362   new_t->has_subtype = size>0? has_subtype:0;
363   new_t->lb = lb;
364   new_t->ub = ub;
365   new_t->flags = flags;
366   new_t->substruct = struct_type;
367   new_t->in_use=0;
368   *new_type = new_t;
369
370 #ifdef HAVE_MC
371   if(MC_is_active())
372     MC_ignore(&(new_t->in_use), sizeof(new_t->in_use));
373 #endif
374 }
375
376 void smpi_datatype_free(MPI_Datatype* type){
377
378   if((*type)->flags & DT_FLAG_PREDEFINED)return;
379
380   //if still used, mark for deletion
381   if((*type)->in_use!=0){
382       (*type)->flags |=DT_FLAG_DESTROYED;
383       return;
384   }
385
386   if ((*type)->has_subtype == 1){
387     ((s_smpi_subtype_t *)(*type)->substruct)->subtype_free(type);  
388     xbt_free((*type)->substruct);
389   }
390   if ((*type)->name != NULL){
391     xbt_free((*type)->name);
392   }
393   xbt_free(*type);
394   *type = MPI_DATATYPE_NULL;
395 }
396
397 void smpi_datatype_use(MPI_Datatype type){
398   if(type)type->in_use++;
399
400 #ifdef HAVE_MC
401   if(MC_is_active())
402     MC_ignore(&(type->in_use), sizeof(type->in_use));
403 #endif
404 }
405
406
407 void smpi_datatype_unuse(MPI_Datatype type){
408   if(type && type->in_use-- == 0 && (type->flags & DT_FLAG_DESTROYED))
409     smpi_datatype_free(&type);
410
411 #ifdef HAVE_MC
412   if(MC_is_active())
413     MC_ignore(&(type->in_use), sizeof(type->in_use));
414 #endif
415 }
416
417
418
419
420 /*
421 Contiguous Implementation
422 */
423
424
425 /*
426  *  Copies noncontiguous data into contiguous memory.
427  *  @param contiguous_hvector - output hvector
428  *  @param noncontiguous_hvector - input hvector
429  *  @param type - pointer contening :
430  *      - stride - stride of between noncontiguous data, in bytes
431  *      - block_length - the width or height of blocked matrix
432  *      - count - the number of rows of matrix
433  */
434 void serialize_contiguous( const void *noncontiguous_hvector,
435                        void *contiguous_hvector,
436                        int count,
437                        void *type)
438 {
439   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
440   char* contiguous_vector_char = (char*)contiguous_hvector;
441   char* noncontiguous_vector_char = (char*)noncontiguous_hvector+type_c->lb;
442   memcpy(contiguous_vector_char,
443            noncontiguous_vector_char, count* type_c->block_count * type_c->size_oldtype);
444 }
445 /*
446  *  Copies contiguous data into noncontiguous memory.
447  *  @param noncontiguous_vector - output hvector
448  *  @param contiguous_vector - input hvector
449  *  @param type - pointer contening :
450  *      - stride - stride of between noncontiguous data, in bytes
451  *      - block_length - the width or height of blocked matrix
452  *      - count - the number of rows of matrix
453  */
454 void unserialize_contiguous( const void *contiguous_vector,
455                          void *noncontiguous_vector,
456                          int count,
457                          void *type,
458                          MPI_Op op)
459 {
460   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
461   char* contiguous_vector_char = (char*)contiguous_vector;
462   char* noncontiguous_vector_char = (char*)noncontiguous_vector+type_c->lb;
463   int n= count* type_c->block_count;
464   smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &n,
465             &type_c->old_type);
466        /*memcpy(noncontiguous_vector_char,
467            contiguous_vector_char, count*  type_c->block_count * type_c->size_oldtype);*/
468 }
469
470 void free_contiguous(MPI_Datatype* d){
471   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
472 }
473
474 /*
475  * Create a Sub type contiguous to be able to serialize and unserialize it
476  * the structure s_smpi_mpi_contiguous_t is derived from s_smpi_subtype which
477  * required the functions unserialize and serialize
478  *
479  */
480 s_smpi_mpi_contiguous_t* smpi_datatype_contiguous_create( MPI_Aint lb,
481                                                   int block_count,
482                                                   MPI_Datatype old_type,
483                                                   int size_oldtype){
484   s_smpi_mpi_contiguous_t *new_t= xbt_new(s_smpi_mpi_contiguous_t,1);
485   new_t->base.serialize = &serialize_contiguous;
486   new_t->base.unserialize = &unserialize_contiguous;
487   new_t->base.subtype_free = &free_contiguous;
488   new_t->lb = lb;
489   new_t->block_count = block_count;
490   new_t->old_type = old_type;
491   new_t->size_oldtype = size_oldtype;
492   smpi_datatype_use(old_type);
493   return new_t;
494 }
495
496
497
498
499 int smpi_datatype_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* new_type, MPI_Aint lb)
500 {
501   int retval;
502   if(old_type->has_subtype){
503           //handle this case as a hvector with stride equals to the extent of the datatype
504           return smpi_datatype_hvector(count, 1, smpi_datatype_get_extent(old_type), old_type, new_type);
505   }
506   
507   s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
508                                                                 count,
509                                                                 old_type,
510                                                                 smpi_datatype_size(old_type));
511                                                                 
512   smpi_datatype_create(new_type,
513                                           count * smpi_datatype_size(old_type),
514                                           lb,lb + count * smpi_datatype_size(old_type),
515                                           1,subtype, DT_FLAG_CONTIGUOUS);
516   retval=MPI_SUCCESS;
517   return retval;
518 }
519
520 int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_type, MPI_Datatype* new_type)
521 {
522   int retval;
523   if (blocklen<0) return MPI_ERR_ARG;
524   MPI_Aint lb = 0;
525   MPI_Aint ub = 0;
526   if(count>0){
527     lb=smpi_datatype_lb(old_type);
528     ub=((count-1)*stride+blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
529   }
530   if(old_type->has_subtype || stride != blocklen){
531
532
533     s_smpi_mpi_vector_t* subtype = smpi_datatype_vector_create( stride,
534                                                                 blocklen,
535                                                                 count,
536                                                                 old_type,
537                                                                 smpi_datatype_size(old_type));
538     smpi_datatype_create(new_type,
539                          count * (blocklen) * smpi_datatype_size(old_type), lb,
540                          ub,
541                          1,
542                          subtype,
543                          DT_FLAG_VECTOR);
544     retval=MPI_SUCCESS;
545   }else{
546     /* in this situation the data are contignous thus it's not
547      * required to serialize and unserialize it*/
548     smpi_datatype_create(new_type, count * blocklen *
549                          smpi_datatype_size(old_type), 0, ((count -1) * stride + blocklen)*
550                          smpi_datatype_size(old_type),
551                          0,
552                          NULL,
553                          DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
554     retval=MPI_SUCCESS;
555   }
556   return retval;
557 }
558
559 void free_vector(MPI_Datatype* d){
560   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
561 }
562
563 /*
564 Hvector Implementation - Vector with stride in bytes
565 */
566
567
568 /*
569  *  Copies noncontiguous data into contiguous memory.
570  *  @param contiguous_hvector - output hvector
571  *  @param noncontiguous_hvector - input hvector
572  *  @param type - pointer contening :
573  *      - stride - stride of between noncontiguous data, in bytes
574  *      - block_length - the width or height of blocked matrix
575  *      - count - the number of rows of matrix
576  */
577 void serialize_hvector( const void *noncontiguous_hvector,
578                        void *contiguous_hvector,
579                        int count,
580                        void *type)
581 {
582   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
583   int i;
584   char* contiguous_vector_char = (char*)contiguous_hvector;
585   char* noncontiguous_vector_char = (char*)noncontiguous_hvector;
586
587   for (i = 0; i < type_c->block_count * count; i++) {
588     if (type_c->old_type->has_subtype == 0)
589       memcpy(contiguous_vector_char,
590            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
591     else
592       ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
593                                                                    contiguous_vector_char,
594                                                                    type_c->block_length,
595                                                                    type_c->old_type->substruct);
596
597     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
598     if((i+1)%type_c->block_count ==0)
599     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
600     else
601     noncontiguous_vector_char += type_c->block_stride;
602   }
603 }
604 /*
605  *  Copies contiguous data into noncontiguous memory.
606  *  @param noncontiguous_vector - output hvector
607  *  @param contiguous_vector - input hvector
608  *  @param type - pointer contening :
609  *      - stride - stride of between noncontiguous data, in bytes
610  *      - block_length - the width or height of blocked matrix
611  *      - count - the number of rows of matrix
612  */
613 void unserialize_hvector( const void *contiguous_vector,
614                          void *noncontiguous_vector,
615                          int count,
616                          void *type,
617                          MPI_Op op)
618 {
619   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
620   int i;
621
622   char* contiguous_vector_char = (char*)contiguous_vector;
623   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
624
625   for (i = 0; i < type_c->block_count * count; i++) {
626     if (type_c->old_type->has_subtype == 0)
627       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
628                   &type_c->old_type);
629              /*memcpy(noncontiguous_vector_char,
630            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
631     else
632       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
633                                                                      noncontiguous_vector_char,
634                                                                      type_c->block_length,
635                                                                      type_c->old_type->substruct,
636                                                                      op);
637     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
638     if((i+1)%type_c->block_count ==0)
639     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
640     else
641     noncontiguous_vector_char += type_c->block_stride;
642   }
643 }
644
645 /*
646  * Create a Sub type vector to be able to serialize and unserialize it
647  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
648  * required the functions unserialize and serialize
649  *
650  */
651 s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
652                                                   int block_length,
653                                                   int block_count,
654                                                   MPI_Datatype old_type,
655                                                   int size_oldtype){
656   s_smpi_mpi_hvector_t *new_t= xbt_new(s_smpi_mpi_hvector_t,1);
657   new_t->base.serialize = &serialize_hvector;
658   new_t->base.unserialize = &unserialize_hvector;
659   new_t->base.subtype_free = &free_hvector;
660   new_t->block_stride = block_stride;
661   new_t->block_length = block_length;
662   new_t->block_count = block_count;
663   new_t->old_type = old_type;
664   new_t->size_oldtype = size_oldtype;
665   smpi_datatype_use(old_type);
666   return new_t;
667 }
668
669 //do nothing for vector types
670 void free_hvector(MPI_Datatype* d){
671   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
672 }
673
674 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
675 {
676   int retval;
677   if (blocklen<0) return MPI_ERR_ARG;
678   MPI_Aint lb = 0;
679   MPI_Aint ub = 0;
680   if(count>0){
681     lb=smpi_datatype_lb(old_type);
682     ub=((count-1)*stride)+(blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
683   }
684   if(old_type->has_subtype || stride != blocklen*smpi_datatype_get_extent(old_type)){
685     s_smpi_mpi_hvector_t* subtype = smpi_datatype_hvector_create( stride,
686                                                                   blocklen,
687                                                                   count,
688                                                                   old_type,
689                                                                   smpi_datatype_size(old_type));
690
691     smpi_datatype_create(new_type, count * blocklen * smpi_datatype_size(old_type),
692                                                  lb,ub,
693                          1,
694                          subtype,
695                          DT_FLAG_VECTOR);
696     retval=MPI_SUCCESS;
697   }else{
698     smpi_datatype_create(new_type, count * blocklen *
699                                              smpi_datatype_size(old_type),0,count * blocklen *
700                                              smpi_datatype_size(old_type),
701                                             0,
702                                             NULL,
703                                             DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
704     retval=MPI_SUCCESS;
705   }
706   return retval;
707 }
708
709
710 /*
711 Indexed Implementation
712 */
713
714 /*
715  *  Copies noncontiguous data into contiguous memory.
716  *  @param contiguous_indexed - output indexed
717  *  @param noncontiguous_indexed - input indexed
718  *  @param type - pointer contening :
719  *      - block_lengths - the width or height of blocked matrix
720  *      - block_indices - indices of each data, in element
721  *      - count - the number of rows of matrix
722  */
723 void serialize_indexed( const void *noncontiguous_indexed,
724                        void *contiguous_indexed,
725                        int count,
726                        void *type)
727 {
728   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
729   int i,j;
730   char* contiguous_indexed_char = (char*)contiguous_indexed;
731   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0] * type_c->size_oldtype;
732   for(j=0; j<count;j++){
733     for (i = 0; i < type_c->block_count; i++) {
734       if (type_c->old_type->has_subtype == 0)
735         memcpy(contiguous_indexed_char,
736                      noncontiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
737       else
738         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_indexed_char,
739                                                                      contiguous_indexed_char,
740                                                                      type_c->block_lengths[i],
741                                                                      type_c->old_type->substruct);
742
743
744       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
745       if (i<type_c->block_count-1)noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
746       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
747     }
748     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
749   }
750 }
751 /*
752  *  Copies contiguous data into noncontiguous memory.
753  *  @param noncontiguous_indexed - output indexed
754  *  @param contiguous_indexed - input indexed
755  *  @param type - pointer contening :
756  *      - block_lengths - the width or height of blocked matrix
757  *      - block_indices - indices of each data, in element
758  *      - count - the number of rows of matrix
759  */
760 void unserialize_indexed( const void *contiguous_indexed,
761                          void *noncontiguous_indexed,
762                          int count,
763                          void *type,
764                          MPI_Op op)
765 {
766
767   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
768   int i,j;
769   char* contiguous_indexed_char = (char*)contiguous_indexed;
770   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0]*smpi_datatype_get_extent(type_c->old_type);
771   for(j=0; j<count;j++){
772     for (i = 0; i < type_c->block_count; i++) {
773       if (type_c->old_type->has_subtype == 0)
774         smpi_op_apply(op, contiguous_indexed_char, noncontiguous_indexed_char, &type_c->block_lengths[i],
775                     &type_c->old_type);
776                /*memcpy(noncontiguous_indexed_char ,
777              contiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
778       else
779         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_indexed_char,
780                                                                        noncontiguous_indexed_char,
781                                                                        type_c->block_lengths[i],
782                                                                        type_c->old_type->substruct,
783                                                                        op);
784
785       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
786       if (i<type_c->block_count-1)
787         noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
788       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
789     }
790     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
791   }
792 }
793
794 void free_indexed(MPI_Datatype* type){
795   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths);
796   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices);
797   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
798 }
799
800 /*
801  * Create a Sub type indexed to be able to serialize and unserialize it
802  * the structure s_smpi_mpi_indexed_t is derived from s_smpi_subtype which
803  * required the functions unserialize and serialize
804  */
805 s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
806                                                   int* block_indices,
807                                                   int block_count,
808                                                   MPI_Datatype old_type,
809                                                   int size_oldtype){
810   s_smpi_mpi_indexed_t *new_t= xbt_new(s_smpi_mpi_indexed_t,1);
811   new_t->base.serialize = &serialize_indexed;
812   new_t->base.unserialize = &unserialize_indexed;
813   new_t->base.subtype_free = &free_indexed;
814  //TODO : add a custom function for each time to clean these 
815   new_t->block_lengths= xbt_new(int, block_count);
816   new_t->block_indices= xbt_new(int, block_count);
817   int i;
818   for(i=0;i<block_count;i++){
819     new_t->block_lengths[i]=block_lengths[i];
820     new_t->block_indices[i]=block_indices[i];
821   }
822   new_t->block_count = block_count;
823   smpi_datatype_use(old_type);
824   new_t->old_type = old_type;
825   new_t->size_oldtype = size_oldtype;
826   return new_t;
827 }
828
829
830 int smpi_datatype_indexed(int count, int* blocklens, int* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
831 {
832   int i;
833   int retval;
834   int size = 0;
835   int contiguous=1;
836   MPI_Aint lb = 0;
837   MPI_Aint ub = 0;
838   if(count>0){
839     lb=indices[0]*smpi_datatype_get_extent(old_type);
840     ub=indices[0]*smpi_datatype_get_extent(old_type) + blocklens[0]*smpi_datatype_ub(old_type);
841   }
842
843   for(i=0; i< count; i++){
844     if   (blocklens[i]<0)
845       return MPI_ERR_ARG;
846     size += blocklens[i];
847
848     if(indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type)<lb)
849         lb = indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type);
850     if(indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type)>ub)
851         ub = indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type);
852
853     if ( (i< count -1) && (indices[i]+blocklens[i] != indices[i+1]) )contiguous=0;
854   }
855   if (old_type->has_subtype == 1)
856     contiguous=0;
857
858   if(!contiguous){
859     s_smpi_mpi_indexed_t* subtype = smpi_datatype_indexed_create( blocklens,
860                                                                   indices,
861                                                                   count,
862                                                                   old_type,
863                                                                   smpi_datatype_size(old_type));
864      smpi_datatype_create(new_type,  size *
865                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA);
866   }else{
867     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
868                                                                   size,
869                                                                   old_type,
870                                                                   smpi_datatype_size(old_type));
871     smpi_datatype_create(new_type,  size *
872                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
873   }
874   retval=MPI_SUCCESS;
875   return retval;
876 }
877
878
879 /*
880 Hindexed Implementation - Indexed with indices in bytes 
881 */
882
883 /*
884  *  Copies noncontiguous data into contiguous memory.
885  *  @param contiguous_hindexed - output hindexed
886  *  @param noncontiguous_hindexed - input hindexed
887  *  @param type - pointer contening :
888  *      - block_lengths - the width or height of blocked matrix
889  *      - block_indices - indices of each data, in bytes
890  *      - count - the number of rows of matrix
891  */
892 void serialize_hindexed( const void *noncontiguous_hindexed,
893                        void *contiguous_hindexed,
894                        int count,
895                        void *type)
896 {
897   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
898   int i,j;
899   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
900   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
901   for(j=0; j<count;j++){
902     for (i = 0; i < type_c->block_count; i++) {
903       if (type_c->old_type->has_subtype == 0)
904         memcpy(contiguous_hindexed_char,
905                      noncontiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
906       else
907         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_hindexed_char,
908                                                                      contiguous_hindexed_char,
909                                                                      type_c->block_lengths[i],
910                                                                      type_c->old_type->substruct);
911
912       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
913       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
914       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
915     }
916     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
917   }
918 }
919 /*
920  *  Copies contiguous data into noncontiguous memory.
921  *  @param noncontiguous_hindexed - output hindexed
922  *  @param contiguous_hindexed - input hindexed
923  *  @param type - pointer contening :
924  *      - block_lengths - the width or height of blocked matrix
925  *      - block_indices - indices of each data, in bytes
926  *      - count - the number of rows of matrix
927  */
928 void unserialize_hindexed( const void *contiguous_hindexed,
929                          void *noncontiguous_hindexed,
930                          int count,
931                          void *type,
932                          MPI_Op op)
933 {
934   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
935   int i,j;
936
937   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
938   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
939   for(j=0; j<count;j++){
940     for (i = 0; i < type_c->block_count; i++) {
941       if (type_c->old_type->has_subtype == 0)
942         smpi_op_apply(op, contiguous_hindexed_char, noncontiguous_hindexed_char, &type_c->block_lengths[i],
943                             &type_c->old_type);
944                        /*memcpy(noncontiguous_hindexed_char,
945                contiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
946       else
947         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_hindexed_char,
948                                                                        noncontiguous_hindexed_char,
949                                                                        type_c->block_lengths[i],
950                                                                        type_c->old_type->substruct,
951                                                                        op);
952
953       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
954       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
955       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
956     }
957     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
958   }
959 }
960
961 void free_hindexed(MPI_Datatype* type){
962   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths);
963   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices);
964   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
965 }
966
967 /*
968  * Create a Sub type hindexed to be able to serialize and unserialize it
969  * the structure s_smpi_mpi_hindexed_t is derived from s_smpi_subtype which
970  * required the functions unserialize and serialize
971  */
972 s_smpi_mpi_hindexed_t* smpi_datatype_hindexed_create( int* block_lengths,
973                                                   MPI_Aint* block_indices,
974                                                   int block_count,
975                                                   MPI_Datatype old_type,
976                                                   int size_oldtype){
977   s_smpi_mpi_hindexed_t *new_t= xbt_new(s_smpi_mpi_hindexed_t,1);
978   new_t->base.serialize = &serialize_hindexed;
979   new_t->base.unserialize = &unserialize_hindexed;
980   new_t->base.subtype_free = &free_hindexed;
981  //TODO : add a custom function for each time to clean these 
982   new_t->block_lengths= xbt_new(int, block_count);
983   new_t->block_indices= xbt_new(MPI_Aint, block_count);
984   int i;
985   for(i=0;i<block_count;i++){
986     new_t->block_lengths[i]=block_lengths[i];
987     new_t->block_indices[i]=block_indices[i];
988   }
989   new_t->block_count = block_count;
990   new_t->old_type = old_type;
991   new_t->size_oldtype = size_oldtype;
992   return new_t;
993 }
994
995
996 int smpi_datatype_hindexed(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
997 {
998   int i;
999   int retval;
1000   int size = 0;
1001   int contiguous=1;
1002   MPI_Aint lb = 0;
1003   MPI_Aint ub = 0;
1004   if(count>0){
1005     lb=indices[0] + smpi_datatype_lb(old_type);
1006     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_type);
1007   }
1008   for(i=0; i< count; i++){
1009     if   (blocklens[i]<0)
1010       return MPI_ERR_ARG;
1011     size += blocklens[i];
1012
1013     if(indices[i]+smpi_datatype_lb(old_type)<lb) lb = indices[i]+smpi_datatype_lb(old_type);
1014     if(indices[i]+blocklens[i]*smpi_datatype_ub(old_type)>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_type);
1015
1016     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_type) != indices[i+1]) )contiguous=0;
1017   }
1018   if (old_type->has_subtype == 1 || lb!=0)
1019     contiguous=0;
1020
1021   if(!contiguous){
1022     s_smpi_mpi_hindexed_t* subtype = smpi_datatype_hindexed_create( blocklens,
1023                                                                   indices,
1024                                                                   count,
1025                                                                   old_type,
1026                                                                   smpi_datatype_size(old_type));
1027     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1028                                                  lb,
1029                          ub
1030                          ,1, subtype, DT_FLAG_DATA);
1031   }else{
1032     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1033                                                                   size,
1034                                                                   old_type,
1035                                                                   smpi_datatype_size(old_type));
1036     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1037                                              0,size * smpi_datatype_size(old_type),
1038                                              1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1039   }
1040   retval=MPI_SUCCESS;
1041   return retval;
1042 }
1043
1044
1045 /*
1046 struct Implementation - Indexed with indices in bytes 
1047 */
1048
1049 /*
1050  *  Copies noncontiguous data into contiguous memory.
1051  *  @param contiguous_struct - output struct
1052  *  @param noncontiguous_struct - input struct
1053  *  @param type - pointer contening :
1054  *      - stride - stride of between noncontiguous data
1055  *      - block_length - the width or height of blocked matrix
1056  *      - count - the number of rows of matrix
1057  */
1058 void serialize_struct( const void *noncontiguous_struct,
1059                        void *contiguous_struct,
1060                        int count,
1061                        void *type)
1062 {
1063   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1064   int i,j;
1065   char* contiguous_struct_char = (char*)contiguous_struct;
1066   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1067   for(j=0; j<count;j++){
1068     for (i = 0; i < type_c->block_count; i++) {
1069       if (type_c->old_types[i]->has_subtype == 0)
1070         memcpy(contiguous_struct_char,
1071              noncontiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
1072       else
1073         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->serialize( noncontiguous_struct_char,
1074                                                                          contiguous_struct_char,
1075                                                                          type_c->block_lengths[i],
1076                                                                          type_c->old_types[i]->substruct);
1077
1078
1079       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1080       if (i<type_c->block_count-1)noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
1081       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);//let's hope this is MPI_UB ?
1082     }
1083     noncontiguous_struct=(void*)noncontiguous_struct_char;
1084   }
1085 }
1086 /*
1087  *  Copies contiguous data into noncontiguous memory.
1088  *  @param noncontiguous_struct - output struct
1089  *  @param contiguous_struct - input struct
1090  *  @param type - pointer contening :
1091  *      - stride - stride of between noncontiguous data
1092  *      - block_length - the width or height of blocked matrix
1093  *      - count - the number of rows of matrix
1094  */
1095 void unserialize_struct( const void *contiguous_struct,
1096                          void *noncontiguous_struct,
1097                          int count,
1098                          void *type,
1099                          MPI_Op op)
1100 {
1101   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1102   int i,j;
1103
1104   char* contiguous_struct_char = (char*)contiguous_struct;
1105   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1106   for(j=0; j<count;j++){
1107     for (i = 0; i < type_c->block_count; i++) {
1108       if (type_c->old_types[i]->has_subtype == 0)
1109         smpi_op_apply(op, contiguous_struct_char, noncontiguous_struct_char, &type_c->block_lengths[i],
1110            & type_c->old_types[i]);
1111                        /*memcpy(noncontiguous_struct_char,
1112              contiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));*/
1113       else
1114         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->unserialize( contiguous_struct_char,
1115                                                                            noncontiguous_struct_char,
1116                                                                            type_c->block_lengths[i],
1117                                                                            type_c->old_types[i]->substruct,
1118                                                                            op);
1119
1120       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1121       if (i<type_c->block_count-1)noncontiguous_struct_char =  (char*)noncontiguous_struct + type_c->block_indices[i+1];
1122       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);
1123     }
1124     noncontiguous_struct=(void*)noncontiguous_struct_char;
1125     
1126   }
1127 }
1128
1129 void free_struct(MPI_Datatype* type){
1130   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths);
1131   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices);
1132   int i=0;
1133   for (i = 0; i < ((s_smpi_mpi_struct_t *)(*type)->substruct)->block_count; i++)
1134     smpi_datatype_unuse(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types[i]);
1135   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types);
1136 }
1137
1138 /*
1139  * Create a Sub type struct to be able to serialize and unserialize it
1140  * the structure s_smpi_mpi_struct_t is derived from s_smpi_subtype which
1141  * required the functions unserialize and serialize
1142  */
1143 s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
1144                                                   MPI_Aint* block_indices,
1145                                                   int block_count,
1146                                                   MPI_Datatype* old_types){
1147   s_smpi_mpi_struct_t *new_t= xbt_new(s_smpi_mpi_struct_t,1);
1148   new_t->base.serialize = &serialize_struct;
1149   new_t->base.unserialize = &unserialize_struct;
1150   new_t->base.subtype_free = &free_struct;
1151  //TODO : add a custom function for each time to clean these 
1152   new_t->block_lengths= xbt_new(int, block_count);
1153   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1154   new_t->old_types=  xbt_new(MPI_Datatype, block_count);
1155   int i;
1156   for(i=0;i<block_count;i++){
1157     new_t->block_lengths[i]=block_lengths[i];
1158     new_t->block_indices[i]=block_indices[i];
1159     new_t->old_types[i]=old_types[i];
1160     smpi_datatype_use(new_t->old_types[i]);
1161   }
1162   //new_t->block_lengths = block_lengths;
1163   //new_t->block_indices = block_indices;
1164   new_t->block_count = block_count;
1165   //new_t->old_types = old_types;
1166   return new_t;
1167 }
1168
1169
1170 int smpi_datatype_struct(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype* old_types, MPI_Datatype* new_type)
1171 {
1172   int i;
1173   size_t size = 0;
1174   int contiguous=1;
1175   size = 0;
1176   MPI_Aint lb = 0;
1177   MPI_Aint ub = 0;
1178   if(count>0){
1179     lb=indices[0] + smpi_datatype_lb(old_types[0]);
1180     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_types[0]);
1181   }
1182   int forced_lb=0;
1183   int forced_ub=0;
1184   for(i=0; i< count; i++){
1185     if (blocklens[i]<0)
1186       return MPI_ERR_ARG;
1187     if (old_types[i]->has_subtype == 1)
1188       contiguous=0;
1189
1190     size += blocklens[i]*smpi_datatype_size(old_types[i]);
1191     if (old_types[i]==MPI_LB){
1192       lb=indices[i];
1193       forced_lb=1;
1194     }
1195     if (old_types[i]==MPI_UB){
1196       ub=indices[i];
1197       forced_ub=1;
1198     }
1199
1200     if(!forced_lb && indices[i]+smpi_datatype_lb(old_types[i])<lb) lb = indices[i];
1201     if(!forced_ub && indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i])>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i]);
1202
1203     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_types[i]) != indices[i+1]) )contiguous=0;
1204   }
1205
1206   if(!contiguous){
1207     s_smpi_mpi_struct_t* subtype = smpi_datatype_struct_create( blocklens,
1208                                                               indices,
1209                                                               count,
1210                                                               old_types);
1211
1212     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA);
1213   }else{
1214     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1215                                                                   size,
1216                                                                   MPI_CHAR,
1217                                                                   1);
1218     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1219   }
1220   return MPI_SUCCESS;
1221 }
1222
1223 void smpi_datatype_commit(MPI_Datatype *datatype)
1224 {
1225   (*datatype)->flags=  ((*datatype)->flags | DT_FLAG_COMMITED);
1226 }
1227
1228 typedef struct s_smpi_mpi_op {
1229   MPI_User_function *func;
1230   int is_commute;
1231 } s_smpi_mpi_op_t;
1232
1233 #define MAX_OP(a, b)  (b) = (a) < (b) ? (b) : (a)
1234 #define MIN_OP(a, b)  (b) = (a) < (b) ? (a) : (b)
1235 #define SUM_OP(a, b)  (b) += (a)
1236 #define PROD_OP(a, b) (b) *= (a)
1237 #define LAND_OP(a, b) (b) = (a) && (b)
1238 #define LOR_OP(a, b)  (b) = (a) || (b)
1239 #define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b))
1240 #define BAND_OP(a, b) (b) &= (a)
1241 #define BOR_OP(a, b)  (b) |= (a)
1242 #define BXOR_OP(a, b) (b) ^= (a)
1243 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
1244 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
1245
1246 #define APPLY_FUNC(a, b, length, type, func) \
1247 {                                          \
1248   int i;                                   \
1249   type* x = (type*)(a);                    \
1250   type* y = (type*)(b);                    \
1251   for(i = 0; i < *(length); i++) {         \
1252     func(x[i], y[i]);                      \
1253   }                                        \
1254 }
1255
1256 static void max_func(void *a, void *b, int *length,
1257                      MPI_Datatype * datatype)
1258 {
1259   if (*datatype == MPI_CHAR) {
1260     APPLY_FUNC(a, b, length, char, MAX_OP);
1261   } else if (*datatype == MPI_SHORT) {
1262     APPLY_FUNC(a, b, length, short, MAX_OP);
1263   } else if (*datatype == MPI_INT) {
1264     APPLY_FUNC(a, b, length, int, MAX_OP);
1265   } else if (*datatype == MPI_LONG) {
1266     APPLY_FUNC(a, b, length, long, MAX_OP);
1267   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1268     APPLY_FUNC(a, b, length, unsigned short, MAX_OP);
1269   } else if (*datatype == MPI_UNSIGNED) {
1270     APPLY_FUNC(a, b, length, unsigned int, MAX_OP);
1271   } else if (*datatype == MPI_UNSIGNED_LONG) {
1272     APPLY_FUNC(a, b, length, unsigned long, MAX_OP);
1273   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1274     APPLY_FUNC(a, b, length, unsigned char, MAX_OP);
1275   } else if (*datatype == MPI_FLOAT) {
1276     APPLY_FUNC(a, b, length, float, MAX_OP);
1277   } else if (*datatype == MPI_DOUBLE) {
1278     APPLY_FUNC(a, b, length, double, MAX_OP);
1279   } else if (*datatype == MPI_LONG_DOUBLE) {
1280     APPLY_FUNC(a, b, length, long double, MAX_OP);
1281   }
1282 }
1283
1284 static void min_func(void *a, void *b, int *length,
1285                      MPI_Datatype * datatype)
1286 {
1287   if (*datatype == MPI_CHAR) {
1288     APPLY_FUNC(a, b, length, char, MIN_OP);
1289   } else if (*datatype == MPI_SHORT) {
1290     APPLY_FUNC(a, b, length, short, MIN_OP);
1291   } else if (*datatype == MPI_INT) {
1292     APPLY_FUNC(a, b, length, int, MIN_OP);
1293   } else if (*datatype == MPI_LONG) {
1294     APPLY_FUNC(a, b, length, long, MIN_OP);
1295   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1296     APPLY_FUNC(a, b, length, unsigned short, MIN_OP);
1297   } else if (*datatype == MPI_UNSIGNED) {
1298     APPLY_FUNC(a, b, length, unsigned int, MIN_OP);
1299   } else if (*datatype == MPI_UNSIGNED_LONG) {
1300     APPLY_FUNC(a, b, length, unsigned long, MIN_OP);
1301   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1302     APPLY_FUNC(a, b, length, unsigned char, MIN_OP);
1303   } else if (*datatype == MPI_FLOAT) {
1304     APPLY_FUNC(a, b, length, float, MIN_OP);
1305   } else if (*datatype == MPI_DOUBLE) {
1306     APPLY_FUNC(a, b, length, double, MIN_OP);
1307   } else if (*datatype == MPI_LONG_DOUBLE) {
1308     APPLY_FUNC(a, b, length, long double, MIN_OP);
1309   }
1310 }
1311
1312 static void sum_func(void *a, void *b, int *length,
1313                      MPI_Datatype * datatype)
1314 {
1315   if (*datatype == MPI_CHAR) {
1316     APPLY_FUNC(a, b, length, char, SUM_OP);
1317   } else if (*datatype == MPI_SHORT) {
1318     APPLY_FUNC(a, b, length, short, SUM_OP);
1319   } else if (*datatype == MPI_INT) {
1320     APPLY_FUNC(a, b, length, int, SUM_OP);
1321   } else if (*datatype == MPI_LONG) {
1322     APPLY_FUNC(a, b, length, long, SUM_OP);
1323   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1324     APPLY_FUNC(a, b, length, unsigned short, SUM_OP);
1325   } else if (*datatype == MPI_UNSIGNED) {
1326     APPLY_FUNC(a, b, length, unsigned int, SUM_OP);
1327   } else if (*datatype == MPI_UNSIGNED_LONG) {
1328     APPLY_FUNC(a, b, length, unsigned long, SUM_OP);
1329   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1330     APPLY_FUNC(a, b, length, unsigned char, SUM_OP);
1331   } else if (*datatype == MPI_FLOAT) {
1332     APPLY_FUNC(a, b, length, float, SUM_OP);
1333   } else if (*datatype == MPI_DOUBLE) {
1334     APPLY_FUNC(a, b, length, double, SUM_OP);
1335   } else if (*datatype == MPI_LONG_DOUBLE) {
1336     APPLY_FUNC(a, b, length, long double, SUM_OP);
1337   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1338     APPLY_FUNC(a, b, length, float _Complex, SUM_OP);
1339   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1340     APPLY_FUNC(a, b, length, double _Complex, SUM_OP);
1341   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1342     APPLY_FUNC(a, b, length, long double _Complex, SUM_OP);
1343   }
1344 }
1345
1346 static void prod_func(void *a, void *b, int *length,
1347                       MPI_Datatype * datatype)
1348 {
1349   if (*datatype == MPI_CHAR) {
1350     APPLY_FUNC(a, b, length, char, PROD_OP);
1351   } else if (*datatype == MPI_SHORT) {
1352     APPLY_FUNC(a, b, length, short, PROD_OP);
1353   } else if (*datatype == MPI_INT) {
1354     APPLY_FUNC(a, b, length, int, PROD_OP);
1355   } else if (*datatype == MPI_LONG) {
1356     APPLY_FUNC(a, b, length, long, PROD_OP);
1357   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1358     APPLY_FUNC(a, b, length, unsigned short, PROD_OP);
1359   } else if (*datatype == MPI_UNSIGNED) {
1360     APPLY_FUNC(a, b, length, unsigned int, PROD_OP);
1361   } else if (*datatype == MPI_UNSIGNED_LONG) {
1362     APPLY_FUNC(a, b, length, unsigned long, PROD_OP);
1363   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1364     APPLY_FUNC(a, b, length, unsigned char, PROD_OP);
1365   } else if (*datatype == MPI_FLOAT) {
1366     APPLY_FUNC(a, b, length, float, PROD_OP);
1367   } else if (*datatype == MPI_DOUBLE) {
1368     APPLY_FUNC(a, b, length, double, PROD_OP);
1369   } else if (*datatype == MPI_LONG_DOUBLE) {
1370     APPLY_FUNC(a, b, length, long double, PROD_OP);
1371   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1372     APPLY_FUNC(a, b, length, float _Complex, PROD_OP);
1373   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1374     APPLY_FUNC(a, b, length, double _Complex, PROD_OP);
1375   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1376     APPLY_FUNC(a, b, length, long double _Complex, PROD_OP);
1377   }
1378 }
1379
1380 static void land_func(void *a, void *b, int *length,
1381                       MPI_Datatype * datatype)
1382 {
1383   if (*datatype == MPI_CHAR) {
1384     APPLY_FUNC(a, b, length, char, LAND_OP);
1385   } else if (*datatype == MPI_SHORT) {
1386     APPLY_FUNC(a, b, length, short, LAND_OP);
1387   } else if (*datatype == MPI_INT) {
1388     APPLY_FUNC(a, b, length, int, LAND_OP);
1389   } else if (*datatype == MPI_LONG) {
1390     APPLY_FUNC(a, b, length, long, LAND_OP);
1391   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1392     APPLY_FUNC(a, b, length, unsigned short, LAND_OP);
1393   } else if (*datatype == MPI_UNSIGNED) {
1394     APPLY_FUNC(a, b, length, unsigned int, LAND_OP);
1395   } else if (*datatype == MPI_UNSIGNED_LONG) {
1396     APPLY_FUNC(a, b, length, unsigned long, LAND_OP);
1397   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1398     APPLY_FUNC(a, b, length, unsigned char, LAND_OP);
1399   } else if (*datatype == MPI_C_BOOL) {
1400     APPLY_FUNC(a, b, length, _Bool, LAND_OP);
1401   }
1402 }
1403
1404 static void lor_func(void *a, void *b, int *length,
1405                      MPI_Datatype * datatype)
1406 {
1407   if (*datatype == MPI_CHAR) {
1408     APPLY_FUNC(a, b, length, char, LOR_OP);
1409   } else if (*datatype == MPI_SHORT) {
1410     APPLY_FUNC(a, b, length, short, LOR_OP);
1411   } else if (*datatype == MPI_INT) {
1412     APPLY_FUNC(a, b, length, int, LOR_OP);
1413   } else if (*datatype == MPI_LONG) {
1414     APPLY_FUNC(a, b, length, long, LOR_OP);
1415   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1416     APPLY_FUNC(a, b, length, unsigned short, LOR_OP);
1417   } else if (*datatype == MPI_UNSIGNED) {
1418     APPLY_FUNC(a, b, length, unsigned int, LOR_OP);
1419   } else if (*datatype == MPI_UNSIGNED_LONG) {
1420     APPLY_FUNC(a, b, length, unsigned long, LOR_OP);
1421   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1422     APPLY_FUNC(a, b, length, unsigned char, LOR_OP);
1423   } else if (*datatype == MPI_C_BOOL) {
1424     APPLY_FUNC(a, b, length, _Bool, LOR_OP);
1425   }
1426 }
1427
1428 static void lxor_func(void *a, void *b, int *length,
1429                       MPI_Datatype * datatype)
1430 {
1431   if (*datatype == MPI_CHAR) {
1432     APPLY_FUNC(a, b, length, char, LXOR_OP);
1433   } else if (*datatype == MPI_SHORT) {
1434     APPLY_FUNC(a, b, length, short, LXOR_OP);
1435   } else if (*datatype == MPI_INT) {
1436     APPLY_FUNC(a, b, length, int, LXOR_OP);
1437   } else if (*datatype == MPI_LONG) {
1438     APPLY_FUNC(a, b, length, long, LXOR_OP);
1439   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1440     APPLY_FUNC(a, b, length, unsigned short, LXOR_OP);
1441   } else if (*datatype == MPI_UNSIGNED) {
1442     APPLY_FUNC(a, b, length, unsigned int, LXOR_OP);
1443   } else if (*datatype == MPI_UNSIGNED_LONG) {
1444     APPLY_FUNC(a, b, length, unsigned long, LXOR_OP);
1445   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1446     APPLY_FUNC(a, b, length, unsigned char, LXOR_OP);
1447   } else if (*datatype == MPI_C_BOOL) {
1448     APPLY_FUNC(a, b, length, _Bool, LXOR_OP);
1449   }
1450 }
1451
1452 static void band_func(void *a, void *b, int *length,
1453                       MPI_Datatype * datatype)
1454 {
1455   if (*datatype == MPI_CHAR) {
1456     APPLY_FUNC(a, b, length, char, BAND_OP);
1457   }else if (*datatype == MPI_SHORT) {
1458     APPLY_FUNC(a, b, length, short, BAND_OP);
1459   } else if (*datatype == MPI_INT) {
1460     APPLY_FUNC(a, b, length, int, BAND_OP);
1461   } else if (*datatype == MPI_LONG) {
1462     APPLY_FUNC(a, b, length, long, BAND_OP);
1463   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1464     APPLY_FUNC(a, b, length, unsigned short, BAND_OP);
1465   } else if (*datatype == MPI_UNSIGNED) {
1466     APPLY_FUNC(a, b, length, unsigned int, BAND_OP);
1467   } else if (*datatype == MPI_UNSIGNED_LONG) {
1468     APPLY_FUNC(a, b, length, unsigned long, BAND_OP);
1469   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1470     APPLY_FUNC(a, b, length, unsigned char, BAND_OP);
1471   } else if (*datatype == MPI_BYTE) {
1472     APPLY_FUNC(a, b, length, uint8_t, BAND_OP);
1473   }
1474 }
1475
1476 static void bor_func(void *a, void *b, int *length,
1477                      MPI_Datatype * datatype)
1478 {
1479   if (*datatype == MPI_CHAR) {
1480     APPLY_FUNC(a, b, length, char, BOR_OP);
1481   } else if (*datatype == MPI_SHORT) {
1482     APPLY_FUNC(a, b, length, short, BOR_OP);
1483   } else if (*datatype == MPI_INT) {
1484     APPLY_FUNC(a, b, length, int, BOR_OP);
1485   } else if (*datatype == MPI_LONG) {
1486     APPLY_FUNC(a, b, length, long, BOR_OP);
1487   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1488     APPLY_FUNC(a, b, length, unsigned short, BOR_OP);
1489   } else if (*datatype == MPI_UNSIGNED) {
1490     APPLY_FUNC(a, b, length, unsigned int, BOR_OP);
1491   } else if (*datatype == MPI_UNSIGNED_LONG) {
1492     APPLY_FUNC(a, b, length, unsigned long, BOR_OP);
1493   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1494     APPLY_FUNC(a, b, length, unsigned char, BOR_OP);
1495   } else if (*datatype == MPI_BYTE) {
1496     APPLY_FUNC(a, b, length, uint8_t, BOR_OP);
1497   }
1498 }
1499
1500 static void bxor_func(void *a, void *b, int *length,
1501                       MPI_Datatype * datatype)
1502 {
1503   if (*datatype == MPI_CHAR) {
1504     APPLY_FUNC(a, b, length, char, BXOR_OP);
1505   } else if (*datatype == MPI_SHORT) {
1506     APPLY_FUNC(a, b, length, short, BXOR_OP);
1507   } else if (*datatype == MPI_INT) {
1508     APPLY_FUNC(a, b, length, int, BXOR_OP);
1509   } else if (*datatype == MPI_LONG) {
1510     APPLY_FUNC(a, b, length, long, BXOR_OP);
1511   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1512     APPLY_FUNC(a, b, length, unsigned short, BXOR_OP);
1513   } else if (*datatype == MPI_UNSIGNED) {
1514     APPLY_FUNC(a, b, length, unsigned int, BXOR_OP);
1515   } else if (*datatype == MPI_UNSIGNED_LONG) {
1516     APPLY_FUNC(a, b, length, unsigned long, BXOR_OP);
1517   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1518     APPLY_FUNC(a, b, length, unsigned char, BXOR_OP);
1519   } else if (*datatype == MPI_BYTE) {
1520     APPLY_FUNC(a, b, length, uint8_t, BXOR_OP);
1521   }
1522 }
1523
1524 static void minloc_func(void *a, void *b, int *length,
1525                         MPI_Datatype * datatype)
1526 {
1527   if (*datatype == MPI_FLOAT_INT) {
1528     APPLY_FUNC(a, b, length, float_int, MINLOC_OP);
1529   } else if (*datatype == MPI_LONG_INT) {
1530     APPLY_FUNC(a, b, length, long_int, MINLOC_OP);
1531   } else if (*datatype == MPI_DOUBLE_INT) {
1532     APPLY_FUNC(a, b, length, double_int, MINLOC_OP);
1533   } else if (*datatype == MPI_SHORT_INT) {
1534     APPLY_FUNC(a, b, length, short_int, MINLOC_OP);
1535   } else if (*datatype == MPI_2LONG) {
1536     APPLY_FUNC(a, b, length, long_long, MINLOC_OP);
1537   } else if (*datatype == MPI_2INT) {
1538     APPLY_FUNC(a, b, length, int_int, MINLOC_OP);
1539   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1540     APPLY_FUNC(a, b, length, long_double_int, MINLOC_OP);
1541   } else if (*datatype == MPI_2FLOAT) {
1542     APPLY_FUNC(a, b, length, float_float, MINLOC_OP);
1543   } else if (*datatype == MPI_2DOUBLE) {
1544     APPLY_FUNC(a, b, length, double_double, MINLOC_OP);
1545   }
1546 }
1547
1548 static void maxloc_func(void *a, void *b, int *length,
1549                         MPI_Datatype * datatype)
1550 {
1551   if (*datatype == MPI_FLOAT_INT) {
1552     APPLY_FUNC(a, b, length, float_int, MAXLOC_OP);
1553   } else if (*datatype == MPI_LONG_INT) {
1554     APPLY_FUNC(a, b, length, long_int, MAXLOC_OP);
1555   } else if (*datatype == MPI_DOUBLE_INT) {
1556     APPLY_FUNC(a, b, length, double_int, MAXLOC_OP);
1557   } else if (*datatype == MPI_SHORT_INT) {
1558     APPLY_FUNC(a, b, length, short_int, MAXLOC_OP);
1559   } else if (*datatype == MPI_2LONG) {
1560     APPLY_FUNC(a, b, length, long_long, MAXLOC_OP);
1561   } else if (*datatype == MPI_2INT) {
1562     APPLY_FUNC(a, b, length, int_int, MAXLOC_OP);
1563   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1564     APPLY_FUNC(a, b, length, long_double_int, MAXLOC_OP);
1565   } else if (*datatype == MPI_2FLOAT) {
1566     APPLY_FUNC(a, b, length, float_float, MAXLOC_OP);
1567   } else if (*datatype == MPI_2DOUBLE) {
1568     APPLY_FUNC(a, b, length, double_double, MAXLOC_OP);
1569   }
1570 }
1571
1572 static void replace_func(void *a, void *b, int *length,
1573                         MPI_Datatype * datatype)
1574 {
1575   memcpy(b, a, *length * smpi_datatype_size(*datatype));
1576 }
1577
1578 #define CREATE_MPI_OP(name, func)                             \
1579   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */, TRUE }; \
1580 MPI_Op name = &mpi_##name;
1581
1582 CREATE_MPI_OP(MPI_MAX, max_func);
1583 CREATE_MPI_OP(MPI_MIN, min_func);
1584 CREATE_MPI_OP(MPI_SUM, sum_func);
1585 CREATE_MPI_OP(MPI_PROD, prod_func);
1586 CREATE_MPI_OP(MPI_LAND, land_func);
1587 CREATE_MPI_OP(MPI_LOR, lor_func);
1588 CREATE_MPI_OP(MPI_LXOR, lxor_func);
1589 CREATE_MPI_OP(MPI_BAND, band_func);
1590 CREATE_MPI_OP(MPI_BOR, bor_func);
1591 CREATE_MPI_OP(MPI_BXOR, bxor_func);
1592 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
1593 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
1594 CREATE_MPI_OP(MPI_REPLACE, replace_func);
1595
1596
1597 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
1598 {
1599   MPI_Op op;
1600   op = xbt_new(s_smpi_mpi_op_t, 1);
1601   op->func = function;
1602   op-> is_commute = commute;
1603   return op;
1604 }
1605
1606 int smpi_op_is_commute(MPI_Op op)
1607 {
1608   return (op==MPI_OP_NULL) ? 1 : op-> is_commute;
1609 }
1610
1611 void smpi_op_destroy(MPI_Op op)
1612 {
1613   xbt_free(op);
1614 }
1615
1616 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
1617                    MPI_Datatype * datatype)
1618 {
1619   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
1620     XBT_VERB("Applying operation, switch to the right data frame ");
1621     switch_data_segment(smpi_process_index());
1622   }
1623
1624   if(!_xbt_replay_is_active())
1625   op->func(invec, inoutvec, len, datatype);
1626 }