Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
e943324a7ccdf1d17bb1c7a2328678aa58f26282
[simgrid.git] / src / smpi / smpi_mpi_dt.c
1 /* smpi_mpi_dt.c -- MPI primitives to handle datatypes                        */
2 /* FIXME: a very incomplete implementation                                    */
3
4 /* Copyright (c) 2009-2014. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include "private.h"
15 #include "smpi_mpi_dt_private.h"
16 #include "mc/mc.h"
17 #include "xbt/replay.h"
18 #include "simgrid/modelchecker.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi,
21                                 "Logging specific to SMPI (datatype)");
22
23 #define CREATE_MPI_DATATYPE(name, type)       \
24   static s_smpi_mpi_datatype_t mpi_##name = { \
25     (char*) # name,                                   \
26     sizeof(type),  /* size */                 \
27     0,             /*was 1 has_subtype*/             \
28     0,             /* lb */                   \
29     sizeof(type),  /* ub = lb + size */       \
30     DT_FLAG_BASIC,  /* flags */              \
31     NULL           /* pointer on extended struct*/ \
32   };                                          \
33 MPI_Datatype name = &mpi_##name;
34
35 #define CREATE_MPI_DATATYPE_NULL(name)       \
36   static s_smpi_mpi_datatype_t mpi_##name = { \
37     (char*) # name,                                   \
38     0,  /* size */                 \
39     0,             /*was 1 has_subtype*/             \
40     0,             /* lb */                   \
41     0,  /* ub = lb + size */       \
42     DT_FLAG_BASIC,  /* flags */              \
43     NULL           /* pointer on extended struct*/ \
44   };                                          \
45 MPI_Datatype name = &mpi_##name;
46
47 //The following are datatypes for the MPI functions MPI_MAXLOC and MPI_MINLOC.
48 typedef struct {
49   float value;
50   int index;
51 } float_int;
52 typedef struct {
53   float value;
54   float index;
55 } float_float;
56 typedef struct {
57   long value;
58   long index;
59 } long_long;
60 typedef struct {
61   double value;
62   double index;
63 } double_double;
64 typedef struct {
65   long value;
66   int index;
67 } long_int;
68 typedef struct {
69   double value;
70   int index;
71 } double_int;
72 typedef struct {
73   short value;
74   int index;
75 } short_int;
76 typedef struct {
77   int value;
78   int index;
79 } int_int;
80 typedef struct {
81   long double value;
82   int index;
83 } long_double_int;
84 typedef struct {
85   int64_t value;
86   int64_t index;
87 } integer128_t;
88 // Predefined data types
89 CREATE_MPI_DATATYPE(MPI_CHAR, char);
90 CREATE_MPI_DATATYPE(MPI_SHORT, short);
91 CREATE_MPI_DATATYPE(MPI_INT, int);
92 CREATE_MPI_DATATYPE(MPI_LONG, long);
93 CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long);
94 CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char);
95 CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char);
96 CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short);
97 CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int);
98 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long);
99 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long);
100 CREATE_MPI_DATATYPE(MPI_FLOAT, float);
101 CREATE_MPI_DATATYPE(MPI_DOUBLE, double);
102 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double);
103 CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t);
104 CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool);
105 CREATE_MPI_DATATYPE(MPI_BYTE, int8_t);
106 CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t);
107 CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t);
108 CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t);
109 CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t);
110 CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t);
111 CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t);
112 CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t);
113 CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t);
114 CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex);
115 CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex);
116 CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex);
117 CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint);
118 CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset);
119
120 CREATE_MPI_DATATYPE(MPI_FLOAT_INT, float_int);
121 CREATE_MPI_DATATYPE(MPI_LONG_INT, long_int);
122 CREATE_MPI_DATATYPE(MPI_DOUBLE_INT, double_int);
123 CREATE_MPI_DATATYPE(MPI_SHORT_INT, short_int);
124 CREATE_MPI_DATATYPE(MPI_2INT, int_int);
125 CREATE_MPI_DATATYPE(MPI_2FLOAT, float_float);
126 CREATE_MPI_DATATYPE(MPI_2DOUBLE, double_double);
127 CREATE_MPI_DATATYPE(MPI_2LONG, long_long);
128
129 CREATE_MPI_DATATYPE(MPI_REAL4, float);
130 CREATE_MPI_DATATYPE(MPI_REAL8, float);
131 CREATE_MPI_DATATYPE(MPI_REAL16, double);
132 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX8);
133 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX16);
134 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX32);
135 CREATE_MPI_DATATYPE(MPI_INTEGER1, int);
136 CREATE_MPI_DATATYPE(MPI_INTEGER2, int16_t);
137 CREATE_MPI_DATATYPE(MPI_INTEGER4, int32_t);
138 CREATE_MPI_DATATYPE(MPI_INTEGER8, int64_t);
139 CREATE_MPI_DATATYPE(MPI_INTEGER16, integer128_t);
140
141 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE_INT, long_double_int);
142
143 CREATE_MPI_DATATYPE_NULL(MPI_UB);
144 CREATE_MPI_DATATYPE_NULL(MPI_LB);
145 CREATE_MPI_DATATYPE_NULL(MPI_PACKED);
146 // Internal use only
147 CREATE_MPI_DATATYPE(MPI_PTR, void*);
148
149 /** Check if the datatype is usable for communications
150  */
151 int is_datatype_valid(MPI_Datatype datatype) {
152     return datatype != MPI_DATATYPE_NULL
153         && (datatype->flags & DT_FLAG_COMMITED);
154 }
155
156 size_t smpi_datatype_size(MPI_Datatype datatype)
157 {
158   return datatype->size;
159 }
160
161 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype)
162 {
163   return datatype->lb;
164 }
165
166 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
167 {
168   return datatype->ub;
169 }
170
171 MPI_Datatype smpi_datatype_dup(MPI_Datatype datatype)
172 {
173   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
174   memcpy(new_t, datatype, sizeof(s_smpi_mpi_datatype_t));
175   if (datatype->has_subtype)
176     memcpy(new_t->substruct, datatype->substruct, sizeof(s_smpi_subtype_t));
177   if(datatype->name)
178     new_t->name = strdup(datatype->name);
179   return new_t;
180 }
181
182 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
183                          MPI_Aint * extent)
184 {
185   if(datatype == MPI_DATATYPE_NULL){
186     *lb=0;
187     *extent=0;
188     return MPI_SUCCESS;
189   }
190   *lb = datatype->lb;
191   *extent = datatype->ub - datatype->lb;
192   return MPI_SUCCESS;
193 }
194
195 MPI_Aint smpi_datatype_get_extent(MPI_Datatype datatype){
196   if(datatype == MPI_DATATYPE_NULL){
197     return 0;
198   }
199   return datatype->ub - datatype->lb;
200 }
201
202 void smpi_datatype_get_name(MPI_Datatype datatype, char* name, int* length){
203   *length = strlen(datatype->name);
204   strcpy(name, datatype->name);
205 }
206
207 void smpi_datatype_set_name(MPI_Datatype datatype, char* name){
208   datatype->name = strdup(name);;
209 }
210
211 int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
212                        void *recvbuf, int recvcount, MPI_Datatype recvtype)
213 {
214   int count;
215   if(smpi_privatize_global_variables){
216     smpi_switch_data_segment(smpi_process_index());
217   }
218   /* First check if we really have something to do */
219   if (recvcount > 0 && recvbuf != sendbuf) {
220     /* FIXME: treat packed cases */
221     sendcount *= smpi_datatype_size(sendtype);
222     recvcount *= smpi_datatype_size(recvtype);
223     count = sendcount < recvcount ? sendcount : recvcount;
224
225     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
226       if(!smpi_process_get_replaying()) memcpy(recvbuf, sendbuf, count);
227     }
228     else if (sendtype->has_subtype == 0)
229     {
230       s_smpi_subtype_t *subtype =  recvtype->substruct;
231       subtype->unserialize( sendbuf, recvbuf,1, subtype, MPI_REPLACE);
232     }
233     else if (recvtype->has_subtype == 0)
234     {
235       s_smpi_subtype_t *subtype =  sendtype->substruct;
236       subtype->serialize(sendbuf, recvbuf,1, subtype);
237     }else{
238       s_smpi_subtype_t *subtype =  sendtype->substruct;
239
240
241       void * buf_tmp = xbt_malloc(count);
242
243       subtype->serialize( sendbuf, buf_tmp,count/smpi_datatype_size(sendtype), subtype);
244       subtype =  recvtype->substruct;
245       subtype->unserialize( buf_tmp, recvbuf,count/smpi_datatype_size(recvtype), subtype, MPI_REPLACE);
246
247       free(buf_tmp);
248     }
249   }
250
251   return sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
252 }
253
254 /*
255  *  Copies noncontiguous data into contiguous memory.
256  *  @param contiguous_vector - output vector
257  *  @param noncontiguous_vector - input vector
258  *  @param type - pointer contening :
259  *      - stride - stride of between noncontiguous data
260  *      - block_length - the width or height of blocked matrix
261  *      - count - the number of rows of matrix
262  */
263 void serialize_vector( const void *noncontiguous_vector,
264                        void *contiguous_vector,
265                        int count,
266                        void *type)
267 {
268   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
269   int i;
270   char* contiguous_vector_char = (char*)contiguous_vector;
271   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
272
273   for (i = 0; i < type_c->block_count * count; i++) {
274       if (type_c->old_type->has_subtype == 0)
275         memcpy(contiguous_vector_char,
276                noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
277       else
278         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
279                                                                      contiguous_vector_char,
280                                                                      type_c->block_length,
281                                                                      type_c->old_type->substruct);
282
283     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
284     if((i+1)%type_c->block_count ==0)
285     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
286     else
287     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
288   }
289 }
290
291 /*
292  *  Copies contiguous data into noncontiguous memory.
293  *  @param noncontiguous_vector - output vector
294  *  @param contiguous_vector - input vector
295  *  @param type - pointer contening :
296  *      - stride - stride of between noncontiguous data
297  *      - block_length - the width or height of blocked matrix
298  *      - count - the number of rows of matrix
299  */
300 void unserialize_vector( const void *contiguous_vector,
301                          void *noncontiguous_vector,
302                          int count,
303                          void *type,
304                          MPI_Op op)
305 {
306   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
307   int i;
308
309   char* contiguous_vector_char = (char*)contiguous_vector;
310   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
311
312   for (i = 0; i < type_c->block_count * count; i++) {
313     if (type_c->old_type->has_subtype == 0)
314       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
315           &type_c->old_type);
316      /* memcpy(noncontiguous_vector_char,
317              contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
318     else
319       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
320                                                                      noncontiguous_vector_char,
321                                                                      type_c->block_length,
322                                                                      type_c->old_type->substruct,
323                                                                      op);
324     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
325     if((i+1)%type_c->block_count ==0)
326     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
327     else
328     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
329   }
330 }
331
332 /*
333  * Create a Sub type vector to be able to serialize and unserialize it
334  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
335  * required the functions unserialize and serialize
336  *
337  */
338 s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
339                                                   int block_length,
340                                                   int block_count,
341                                                   MPI_Datatype old_type,
342                                                   int size_oldtype){
343   s_smpi_mpi_vector_t *new_t= xbt_new(s_smpi_mpi_vector_t,1);
344   new_t->base.serialize = &serialize_vector;
345   new_t->base.unserialize = &unserialize_vector;
346   new_t->base.subtype_free = &free_vector;
347   new_t->block_stride = block_stride;
348   new_t->block_length = block_length;
349   new_t->block_count = block_count;
350   smpi_datatype_use(old_type);
351   new_t->old_type = old_type;
352   new_t->size_oldtype = size_oldtype;
353   return new_t;
354 }
355
356 void smpi_datatype_create(MPI_Datatype* new_type, int size,int lb, int ub, int has_subtype,
357                           void *struct_type, int flags){
358   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
359   new_t->name = NULL;
360   new_t->size = size;
361   new_t->has_subtype = size>0? has_subtype:0;
362   new_t->lb = lb;
363   new_t->ub = ub;
364   new_t->flags = flags;
365   new_t->substruct = struct_type;
366   new_t->in_use=0;
367   *new_type = new_t;
368
369 #ifdef HAVE_MC
370   if(MC_is_active())
371     MC_ignore(&(new_t->in_use), sizeof(new_t->in_use));
372 #endif
373 }
374
375 void smpi_datatype_free(MPI_Datatype* type){
376
377   if((*type)->flags & DT_FLAG_PREDEFINED)return;
378
379   //if still used, mark for deletion
380   if((*type)->in_use!=0){
381       (*type)->flags |=DT_FLAG_DESTROYED;
382       return;
383   }
384
385   if ((*type)->has_subtype == 1){
386     ((s_smpi_subtype_t *)(*type)->substruct)->subtype_free(type);  
387     xbt_free((*type)->substruct);
388   }
389   if ((*type)->name != NULL){
390     xbt_free((*type)->name);
391   }
392   xbt_free(*type);
393   *type = MPI_DATATYPE_NULL;
394 }
395
396 void smpi_datatype_use(MPI_Datatype type){
397   if(type)type->in_use++;
398
399 #ifdef HAVE_MC
400   if(MC_is_active())
401     MC_ignore(&(type->in_use), sizeof(type->in_use));
402 #endif
403 }
404
405
406 void smpi_datatype_unuse(MPI_Datatype type){
407   if(type && type->in_use-- == 0 && (type->flags & DT_FLAG_DESTROYED))
408     smpi_datatype_free(&type);
409
410 #ifdef HAVE_MC
411   if(MC_is_active())
412     MC_ignore(&(type->in_use), sizeof(type->in_use));
413 #endif
414 }
415
416
417
418
419 /*
420 Contiguous Implementation
421 */
422
423
424 /*
425  *  Copies noncontiguous data into contiguous memory.
426  *  @param contiguous_hvector - output hvector
427  *  @param noncontiguous_hvector - input hvector
428  *  @param type - pointer contening :
429  *      - stride - stride of between noncontiguous data, in bytes
430  *      - block_length - the width or height of blocked matrix
431  *      - count - the number of rows of matrix
432  */
433 void serialize_contiguous( const void *noncontiguous_hvector,
434                        void *contiguous_hvector,
435                        int count,
436                        void *type)
437 {
438   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
439   char* contiguous_vector_char = (char*)contiguous_hvector;
440   char* noncontiguous_vector_char = (char*)noncontiguous_hvector+type_c->lb;
441   memcpy(contiguous_vector_char,
442            noncontiguous_vector_char, count* type_c->block_count * type_c->size_oldtype);
443 }
444 /*
445  *  Copies contiguous data into noncontiguous memory.
446  *  @param noncontiguous_vector - output hvector
447  *  @param contiguous_vector - input hvector
448  *  @param type - pointer contening :
449  *      - stride - stride of between noncontiguous data, in bytes
450  *      - block_length - the width or height of blocked matrix
451  *      - count - the number of rows of matrix
452  */
453 void unserialize_contiguous( const void *contiguous_vector,
454                          void *noncontiguous_vector,
455                          int count,
456                          void *type,
457                          MPI_Op op)
458 {
459   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
460   char* contiguous_vector_char = (char*)contiguous_vector;
461   char* noncontiguous_vector_char = (char*)noncontiguous_vector+type_c->lb;
462   int n= count* type_c->block_count;
463   smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &n,
464             &type_c->old_type);
465        /*memcpy(noncontiguous_vector_char,
466            contiguous_vector_char, count*  type_c->block_count * type_c->size_oldtype);*/
467 }
468
469 void free_contiguous(MPI_Datatype* d){
470   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
471 }
472
473 /*
474  * Create a Sub type contiguous to be able to serialize and unserialize it
475  * the structure s_smpi_mpi_contiguous_t is derived from s_smpi_subtype which
476  * required the functions unserialize and serialize
477  *
478  */
479 s_smpi_mpi_contiguous_t* smpi_datatype_contiguous_create( MPI_Aint lb,
480                                                   int block_count,
481                                                   MPI_Datatype old_type,
482                                                   int size_oldtype){
483   s_smpi_mpi_contiguous_t *new_t= xbt_new(s_smpi_mpi_contiguous_t,1);
484   new_t->base.serialize = &serialize_contiguous;
485   new_t->base.unserialize = &unserialize_contiguous;
486   new_t->base.subtype_free = &free_contiguous;
487   new_t->lb = lb;
488   new_t->block_count = block_count;
489   new_t->old_type = old_type;
490   new_t->size_oldtype = size_oldtype;
491   smpi_datatype_use(old_type);
492   return new_t;
493 }
494
495
496
497
498 int smpi_datatype_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* new_type, MPI_Aint lb)
499 {
500   int retval;
501   if(old_type->has_subtype){
502           //handle this case as a hvector with stride equals to the extent of the datatype
503           return smpi_datatype_hvector(count, 1, smpi_datatype_get_extent(old_type), old_type, new_type);
504   }
505   
506   s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
507                                                                 count,
508                                                                 old_type,
509                                                                 smpi_datatype_size(old_type));
510                                                                 
511   smpi_datatype_create(new_type,
512                                           count * smpi_datatype_size(old_type),
513                                           lb,lb + count * smpi_datatype_size(old_type),
514                                           1,subtype, DT_FLAG_CONTIGUOUS);
515   retval=MPI_SUCCESS;
516   return retval;
517 }
518
519 int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_type, MPI_Datatype* new_type)
520 {
521   int retval;
522   if (blocklen<0) return MPI_ERR_ARG;
523   MPI_Aint lb = 0;
524   MPI_Aint ub = 0;
525   if(count>0){
526     lb=smpi_datatype_lb(old_type);
527     ub=((count-1)*stride+blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
528   }
529   if(old_type->has_subtype || stride != blocklen){
530
531
532     s_smpi_mpi_vector_t* subtype = smpi_datatype_vector_create( stride,
533                                                                 blocklen,
534                                                                 count,
535                                                                 old_type,
536                                                                 smpi_datatype_size(old_type));
537     smpi_datatype_create(new_type,
538                          count * (blocklen) * smpi_datatype_size(old_type), lb,
539                          ub,
540                          1,
541                          subtype,
542                          DT_FLAG_VECTOR);
543     retval=MPI_SUCCESS;
544   }else{
545     /* in this situation the data are contignous thus it's not
546      * required to serialize and unserialize it*/
547     smpi_datatype_create(new_type, count * blocklen *
548                          smpi_datatype_size(old_type), 0, ((count -1) * stride + blocklen)*
549                          smpi_datatype_size(old_type),
550                          0,
551                          NULL,
552                          DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
553     retval=MPI_SUCCESS;
554   }
555   return retval;
556 }
557
558 void free_vector(MPI_Datatype* d){
559   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
560 }
561
562 /*
563 Hvector Implementation - Vector with stride in bytes
564 */
565
566
567 /*
568  *  Copies noncontiguous data into contiguous memory.
569  *  @param contiguous_hvector - output hvector
570  *  @param noncontiguous_hvector - input hvector
571  *  @param type - pointer contening :
572  *      - stride - stride of between noncontiguous data, in bytes
573  *      - block_length - the width or height of blocked matrix
574  *      - count - the number of rows of matrix
575  */
576 void serialize_hvector( const void *noncontiguous_hvector,
577                        void *contiguous_hvector,
578                        int count,
579                        void *type)
580 {
581   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
582   int i;
583   char* contiguous_vector_char = (char*)contiguous_hvector;
584   char* noncontiguous_vector_char = (char*)noncontiguous_hvector;
585
586   for (i = 0; i < type_c->block_count * count; i++) {
587     if (type_c->old_type->has_subtype == 0)
588       memcpy(contiguous_vector_char,
589            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
590     else
591       ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
592                                                                    contiguous_vector_char,
593                                                                    type_c->block_length,
594                                                                    type_c->old_type->substruct);
595
596     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
597     if((i+1)%type_c->block_count ==0)
598     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
599     else
600     noncontiguous_vector_char += type_c->block_stride;
601   }
602 }
603 /*
604  *  Copies contiguous data into noncontiguous memory.
605  *  @param noncontiguous_vector - output hvector
606  *  @param contiguous_vector - input hvector
607  *  @param type - pointer contening :
608  *      - stride - stride of between noncontiguous data, in bytes
609  *      - block_length - the width or height of blocked matrix
610  *      - count - the number of rows of matrix
611  */
612 void unserialize_hvector( const void *contiguous_vector,
613                          void *noncontiguous_vector,
614                          int count,
615                          void *type,
616                          MPI_Op op)
617 {
618   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
619   int i;
620
621   char* contiguous_vector_char = (char*)contiguous_vector;
622   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
623
624   for (i = 0; i < type_c->block_count * count; i++) {
625     if (type_c->old_type->has_subtype == 0)
626       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
627                   &type_c->old_type);
628              /*memcpy(noncontiguous_vector_char,
629            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
630     else
631       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
632                                                                      noncontiguous_vector_char,
633                                                                      type_c->block_length,
634                                                                      type_c->old_type->substruct,
635                                                                      op);
636     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
637     if((i+1)%type_c->block_count ==0)
638     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
639     else
640     noncontiguous_vector_char += type_c->block_stride;
641   }
642 }
643
644 /*
645  * Create a Sub type vector to be able to serialize and unserialize it
646  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
647  * required the functions unserialize and serialize
648  *
649  */
650 s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
651                                                   int block_length,
652                                                   int block_count,
653                                                   MPI_Datatype old_type,
654                                                   int size_oldtype){
655   s_smpi_mpi_hvector_t *new_t= xbt_new(s_smpi_mpi_hvector_t,1);
656   new_t->base.serialize = &serialize_hvector;
657   new_t->base.unserialize = &unserialize_hvector;
658   new_t->base.subtype_free = &free_hvector;
659   new_t->block_stride = block_stride;
660   new_t->block_length = block_length;
661   new_t->block_count = block_count;
662   new_t->old_type = old_type;
663   new_t->size_oldtype = size_oldtype;
664   smpi_datatype_use(old_type);
665   return new_t;
666 }
667
668 //do nothing for vector types
669 void free_hvector(MPI_Datatype* d){
670   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
671 }
672
673 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
674 {
675   int retval;
676   if (blocklen<0) return MPI_ERR_ARG;
677   MPI_Aint lb = 0;
678   MPI_Aint ub = 0;
679   if(count>0){
680     lb=smpi_datatype_lb(old_type);
681     ub=((count-1)*stride)+(blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
682   }
683   if(old_type->has_subtype || stride != blocklen*smpi_datatype_get_extent(old_type)){
684     s_smpi_mpi_hvector_t* subtype = smpi_datatype_hvector_create( stride,
685                                                                   blocklen,
686                                                                   count,
687                                                                   old_type,
688                                                                   smpi_datatype_size(old_type));
689
690     smpi_datatype_create(new_type, count * blocklen * smpi_datatype_size(old_type),
691                                                  lb,ub,
692                          1,
693                          subtype,
694                          DT_FLAG_VECTOR);
695     retval=MPI_SUCCESS;
696   }else{
697     smpi_datatype_create(new_type, count * blocklen *
698                                              smpi_datatype_size(old_type),0,count * blocklen *
699                                              smpi_datatype_size(old_type),
700                                             0,
701                                             NULL,
702                                             DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
703     retval=MPI_SUCCESS;
704   }
705   return retval;
706 }
707
708
709 /*
710 Indexed Implementation
711 */
712
713 /*
714  *  Copies noncontiguous data into contiguous memory.
715  *  @param contiguous_indexed - output indexed
716  *  @param noncontiguous_indexed - input indexed
717  *  @param type - pointer contening :
718  *      - block_lengths - the width or height of blocked matrix
719  *      - block_indices - indices of each data, in element
720  *      - count - the number of rows of matrix
721  */
722 void serialize_indexed( const void *noncontiguous_indexed,
723                        void *contiguous_indexed,
724                        int count,
725                        void *type)
726 {
727   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
728   int i,j;
729   char* contiguous_indexed_char = (char*)contiguous_indexed;
730   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0] * type_c->size_oldtype;
731   for(j=0; j<count;j++){
732     for (i = 0; i < type_c->block_count; i++) {
733       if (type_c->old_type->has_subtype == 0)
734         memcpy(contiguous_indexed_char,
735                      noncontiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
736       else
737         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_indexed_char,
738                                                                      contiguous_indexed_char,
739                                                                      type_c->block_lengths[i],
740                                                                      type_c->old_type->substruct);
741
742
743       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
744       if (i<type_c->block_count-1)noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
745       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
746     }
747     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
748   }
749 }
750 /*
751  *  Copies contiguous data into noncontiguous memory.
752  *  @param noncontiguous_indexed - output indexed
753  *  @param contiguous_indexed - input indexed
754  *  @param type - pointer contening :
755  *      - block_lengths - the width or height of blocked matrix
756  *      - block_indices - indices of each data, in element
757  *      - count - the number of rows of matrix
758  */
759 void unserialize_indexed( const void *contiguous_indexed,
760                          void *noncontiguous_indexed,
761                          int count,
762                          void *type,
763                          MPI_Op op)
764 {
765
766   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
767   int i,j;
768   char* contiguous_indexed_char = (char*)contiguous_indexed;
769   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0]*smpi_datatype_get_extent(type_c->old_type);
770   for(j=0; j<count;j++){
771     for (i = 0; i < type_c->block_count; i++) {
772       if (type_c->old_type->has_subtype == 0)
773         smpi_op_apply(op, contiguous_indexed_char, noncontiguous_indexed_char, &type_c->block_lengths[i],
774                     &type_c->old_type);
775                /*memcpy(noncontiguous_indexed_char ,
776              contiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
777       else
778         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_indexed_char,
779                                                                        noncontiguous_indexed_char,
780                                                                        type_c->block_lengths[i],
781                                                                        type_c->old_type->substruct,
782                                                                        op);
783
784       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
785       if (i<type_c->block_count-1)
786         noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
787       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
788     }
789     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
790   }
791 }
792
793 void free_indexed(MPI_Datatype* type){
794   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths);
795   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices);
796   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
797 }
798
799 /*
800  * Create a Sub type indexed to be able to serialize and unserialize it
801  * the structure s_smpi_mpi_indexed_t is derived from s_smpi_subtype which
802  * required the functions unserialize and serialize
803  */
804 s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
805                                                   int* block_indices,
806                                                   int block_count,
807                                                   MPI_Datatype old_type,
808                                                   int size_oldtype){
809   s_smpi_mpi_indexed_t *new_t= xbt_new(s_smpi_mpi_indexed_t,1);
810   new_t->base.serialize = &serialize_indexed;
811   new_t->base.unserialize = &unserialize_indexed;
812   new_t->base.subtype_free = &free_indexed;
813  //TODO : add a custom function for each time to clean these 
814   new_t->block_lengths= xbt_new(int, block_count);
815   new_t->block_indices= xbt_new(int, block_count);
816   int i;
817   for(i=0;i<block_count;i++){
818     new_t->block_lengths[i]=block_lengths[i];
819     new_t->block_indices[i]=block_indices[i];
820   }
821   new_t->block_count = block_count;
822   smpi_datatype_use(old_type);
823   new_t->old_type = old_type;
824   new_t->size_oldtype = size_oldtype;
825   return new_t;
826 }
827
828
829 int smpi_datatype_indexed(int count, int* blocklens, int* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
830 {
831   int i;
832   int retval;
833   int size = 0;
834   int contiguous=1;
835   MPI_Aint lb = 0;
836   MPI_Aint ub = 0;
837   if(count>0){
838     lb=indices[0]*smpi_datatype_get_extent(old_type);
839     ub=indices[0]*smpi_datatype_get_extent(old_type) + blocklens[0]*smpi_datatype_ub(old_type);
840   }
841
842   for(i=0; i< count; i++){
843     if   (blocklens[i]<0)
844       return MPI_ERR_ARG;
845     size += blocklens[i];
846
847     if(indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type)<lb)
848         lb = indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type);
849     if(indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type)>ub)
850         ub = indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type);
851
852     if ( (i< count -1) && (indices[i]+blocklens[i] != indices[i+1]) )contiguous=0;
853   }
854   if (old_type->has_subtype == 1)
855     contiguous=0;
856
857   if(!contiguous){
858     s_smpi_mpi_indexed_t* subtype = smpi_datatype_indexed_create( blocklens,
859                                                                   indices,
860                                                                   count,
861                                                                   old_type,
862                                                                   smpi_datatype_size(old_type));
863      smpi_datatype_create(new_type,  size *
864                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA);
865   }else{
866     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
867                                                                   size,
868                                                                   old_type,
869                                                                   smpi_datatype_size(old_type));
870     smpi_datatype_create(new_type,  size *
871                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
872   }
873   retval=MPI_SUCCESS;
874   return retval;
875 }
876
877
878 /*
879 Hindexed Implementation - Indexed with indices in bytes 
880 */
881
882 /*
883  *  Copies noncontiguous data into contiguous memory.
884  *  @param contiguous_hindexed - output hindexed
885  *  @param noncontiguous_hindexed - input hindexed
886  *  @param type - pointer contening :
887  *      - block_lengths - the width or height of blocked matrix
888  *      - block_indices - indices of each data, in bytes
889  *      - count - the number of rows of matrix
890  */
891 void serialize_hindexed( const void *noncontiguous_hindexed,
892                        void *contiguous_hindexed,
893                        int count,
894                        void *type)
895 {
896   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
897   int i,j;
898   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
899   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
900   for(j=0; j<count;j++){
901     for (i = 0; i < type_c->block_count; i++) {
902       if (type_c->old_type->has_subtype == 0)
903         memcpy(contiguous_hindexed_char,
904                      noncontiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
905       else
906         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_hindexed_char,
907                                                                      contiguous_hindexed_char,
908                                                                      type_c->block_lengths[i],
909                                                                      type_c->old_type->substruct);
910
911       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
912       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
913       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
914     }
915     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
916   }
917 }
918 /*
919  *  Copies contiguous data into noncontiguous memory.
920  *  @param noncontiguous_hindexed - output hindexed
921  *  @param contiguous_hindexed - input hindexed
922  *  @param type - pointer contening :
923  *      - block_lengths - the width or height of blocked matrix
924  *      - block_indices - indices of each data, in bytes
925  *      - count - the number of rows of matrix
926  */
927 void unserialize_hindexed( const void *contiguous_hindexed,
928                          void *noncontiguous_hindexed,
929                          int count,
930                          void *type,
931                          MPI_Op op)
932 {
933   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
934   int i,j;
935
936   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
937   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
938   for(j=0; j<count;j++){
939     for (i = 0; i < type_c->block_count; i++) {
940       if (type_c->old_type->has_subtype == 0)
941         smpi_op_apply(op, contiguous_hindexed_char, noncontiguous_hindexed_char, &type_c->block_lengths[i],
942                             &type_c->old_type);
943                        /*memcpy(noncontiguous_hindexed_char,
944                contiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
945       else
946         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_hindexed_char,
947                                                                        noncontiguous_hindexed_char,
948                                                                        type_c->block_lengths[i],
949                                                                        type_c->old_type->substruct,
950                                                                        op);
951
952       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
953       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
954       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
955     }
956     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
957   }
958 }
959
960 void free_hindexed(MPI_Datatype* type){
961   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths);
962   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices);
963   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
964 }
965
966 /*
967  * Create a Sub type hindexed to be able to serialize and unserialize it
968  * the structure s_smpi_mpi_hindexed_t is derived from s_smpi_subtype which
969  * required the functions unserialize and serialize
970  */
971 s_smpi_mpi_hindexed_t* smpi_datatype_hindexed_create( int* block_lengths,
972                                                   MPI_Aint* block_indices,
973                                                   int block_count,
974                                                   MPI_Datatype old_type,
975                                                   int size_oldtype){
976   s_smpi_mpi_hindexed_t *new_t= xbt_new(s_smpi_mpi_hindexed_t,1);
977   new_t->base.serialize = &serialize_hindexed;
978   new_t->base.unserialize = &unserialize_hindexed;
979   new_t->base.subtype_free = &free_hindexed;
980  //TODO : add a custom function for each time to clean these 
981   new_t->block_lengths= xbt_new(int, block_count);
982   new_t->block_indices= xbt_new(MPI_Aint, block_count);
983   int i;
984   for(i=0;i<block_count;i++){
985     new_t->block_lengths[i]=block_lengths[i];
986     new_t->block_indices[i]=block_indices[i];
987   }
988   new_t->block_count = block_count;
989   new_t->old_type = old_type;
990   new_t->size_oldtype = size_oldtype;
991   return new_t;
992 }
993
994
995 int smpi_datatype_hindexed(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
996 {
997   int i;
998   int retval;
999   int size = 0;
1000   int contiguous=1;
1001   MPI_Aint lb = 0;
1002   MPI_Aint ub = 0;
1003   if(count>0){
1004     lb=indices[0] + smpi_datatype_lb(old_type);
1005     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_type);
1006   }
1007   for(i=0; i< count; i++){
1008     if   (blocklens[i]<0)
1009       return MPI_ERR_ARG;
1010     size += blocklens[i];
1011
1012     if(indices[i]+smpi_datatype_lb(old_type)<lb) lb = indices[i]+smpi_datatype_lb(old_type);
1013     if(indices[i]+blocklens[i]*smpi_datatype_ub(old_type)>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_type);
1014
1015     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_type) != indices[i+1]) )contiguous=0;
1016   }
1017   if (old_type->has_subtype == 1 || lb!=0)
1018     contiguous=0;
1019
1020   if(!contiguous){
1021     s_smpi_mpi_hindexed_t* subtype = smpi_datatype_hindexed_create( blocklens,
1022                                                                   indices,
1023                                                                   count,
1024                                                                   old_type,
1025                                                                   smpi_datatype_size(old_type));
1026     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1027                                                  lb,
1028                          ub
1029                          ,1, subtype, DT_FLAG_DATA);
1030   }else{
1031     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1032                                                                   size,
1033                                                                   old_type,
1034                                                                   smpi_datatype_size(old_type));
1035     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1036                                              0,size * smpi_datatype_size(old_type),
1037                                              1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1038   }
1039   retval=MPI_SUCCESS;
1040   return retval;
1041 }
1042
1043
1044 /*
1045 struct Implementation - Indexed with indices in bytes 
1046 */
1047
1048 /*
1049  *  Copies noncontiguous data into contiguous memory.
1050  *  @param contiguous_struct - output struct
1051  *  @param noncontiguous_struct - input struct
1052  *  @param type - pointer contening :
1053  *      - stride - stride of between noncontiguous data
1054  *      - block_length - the width or height of blocked matrix
1055  *      - count - the number of rows of matrix
1056  */
1057 void serialize_struct( const void *noncontiguous_struct,
1058                        void *contiguous_struct,
1059                        int count,
1060                        void *type)
1061 {
1062   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1063   int i,j;
1064   char* contiguous_struct_char = (char*)contiguous_struct;
1065   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1066   for(j=0; j<count;j++){
1067     for (i = 0; i < type_c->block_count; i++) {
1068       if (type_c->old_types[i]->has_subtype == 0)
1069         memcpy(contiguous_struct_char,
1070              noncontiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
1071       else
1072         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->serialize( noncontiguous_struct_char,
1073                                                                          contiguous_struct_char,
1074                                                                          type_c->block_lengths[i],
1075                                                                          type_c->old_types[i]->substruct);
1076
1077
1078       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1079       if (i<type_c->block_count-1)noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
1080       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);//let's hope this is MPI_UB ?
1081     }
1082     noncontiguous_struct=(void*)noncontiguous_struct_char;
1083   }
1084 }
1085 /*
1086  *  Copies contiguous data into noncontiguous memory.
1087  *  @param noncontiguous_struct - output struct
1088  *  @param contiguous_struct - input struct
1089  *  @param type - pointer contening :
1090  *      - stride - stride of between noncontiguous data
1091  *      - block_length - the width or height of blocked matrix
1092  *      - count - the number of rows of matrix
1093  */
1094 void unserialize_struct( const void *contiguous_struct,
1095                          void *noncontiguous_struct,
1096                          int count,
1097                          void *type,
1098                          MPI_Op op)
1099 {
1100   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1101   int i,j;
1102
1103   char* contiguous_struct_char = (char*)contiguous_struct;
1104   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1105   for(j=0; j<count;j++){
1106     for (i = 0; i < type_c->block_count; i++) {
1107       if (type_c->old_types[i]->has_subtype == 0)
1108         smpi_op_apply(op, contiguous_struct_char, noncontiguous_struct_char, &type_c->block_lengths[i],
1109            & type_c->old_types[i]);
1110                        /*memcpy(noncontiguous_struct_char,
1111              contiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));*/
1112       else
1113         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->unserialize( contiguous_struct_char,
1114                                                                            noncontiguous_struct_char,
1115                                                                            type_c->block_lengths[i],
1116                                                                            type_c->old_types[i]->substruct,
1117                                                                            op);
1118
1119       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1120       if (i<type_c->block_count-1)noncontiguous_struct_char =  (char*)noncontiguous_struct + type_c->block_indices[i+1];
1121       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);
1122     }
1123     noncontiguous_struct=(void*)noncontiguous_struct_char;
1124     
1125   }
1126 }
1127
1128 void free_struct(MPI_Datatype* type){
1129   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths);
1130   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices);
1131   int i=0;
1132   for (i = 0; i < ((s_smpi_mpi_struct_t *)(*type)->substruct)->block_count; i++)
1133     smpi_datatype_unuse(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types[i]);
1134   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types);
1135 }
1136
1137 /*
1138  * Create a Sub type struct to be able to serialize and unserialize it
1139  * the structure s_smpi_mpi_struct_t is derived from s_smpi_subtype which
1140  * required the functions unserialize and serialize
1141  */
1142 s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
1143                                                   MPI_Aint* block_indices,
1144                                                   int block_count,
1145                                                   MPI_Datatype* old_types){
1146   s_smpi_mpi_struct_t *new_t= xbt_new(s_smpi_mpi_struct_t,1);
1147   new_t->base.serialize = &serialize_struct;
1148   new_t->base.unserialize = &unserialize_struct;
1149   new_t->base.subtype_free = &free_struct;
1150  //TODO : add a custom function for each time to clean these 
1151   new_t->block_lengths= xbt_new(int, block_count);
1152   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1153   new_t->old_types=  xbt_new(MPI_Datatype, block_count);
1154   int i;
1155   for(i=0;i<block_count;i++){
1156     new_t->block_lengths[i]=block_lengths[i];
1157     new_t->block_indices[i]=block_indices[i];
1158     new_t->old_types[i]=old_types[i];
1159     smpi_datatype_use(new_t->old_types[i]);
1160   }
1161   //new_t->block_lengths = block_lengths;
1162   //new_t->block_indices = block_indices;
1163   new_t->block_count = block_count;
1164   //new_t->old_types = old_types;
1165   return new_t;
1166 }
1167
1168
1169 int smpi_datatype_struct(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype* old_types, MPI_Datatype* new_type)
1170 {
1171   int i;
1172   size_t size = 0;
1173   int contiguous=1;
1174   size = 0;
1175   MPI_Aint lb = 0;
1176   MPI_Aint ub = 0;
1177   if(count>0){
1178     lb=indices[0] + smpi_datatype_lb(old_types[0]);
1179     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_types[0]);
1180   }
1181   int forced_lb=0;
1182   int forced_ub=0;
1183   for(i=0; i< count; i++){
1184     if (blocklens[i]<0)
1185       return MPI_ERR_ARG;
1186     if (old_types[i]->has_subtype == 1)
1187       contiguous=0;
1188
1189     size += blocklens[i]*smpi_datatype_size(old_types[i]);
1190     if (old_types[i]==MPI_LB){
1191       lb=indices[i];
1192       forced_lb=1;
1193     }
1194     if (old_types[i]==MPI_UB){
1195       ub=indices[i];
1196       forced_ub=1;
1197     }
1198
1199     if(!forced_lb && indices[i]+smpi_datatype_lb(old_types[i])<lb) lb = indices[i];
1200     if(!forced_ub && indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i])>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i]);
1201
1202     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_types[i]) != indices[i+1]) )contiguous=0;
1203   }
1204
1205   if(!contiguous){
1206     s_smpi_mpi_struct_t* subtype = smpi_datatype_struct_create( blocklens,
1207                                                               indices,
1208                                                               count,
1209                                                               old_types);
1210
1211     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA);
1212   }else{
1213     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1214                                                                   size,
1215                                                                   MPI_CHAR,
1216                                                                   1);
1217     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1218   }
1219   return MPI_SUCCESS;
1220 }
1221
1222 void smpi_datatype_commit(MPI_Datatype *datatype)
1223 {
1224   (*datatype)->flags=  ((*datatype)->flags | DT_FLAG_COMMITED);
1225 }
1226
1227 typedef struct s_smpi_mpi_op {
1228   MPI_User_function *func;
1229   int is_commute;
1230 } s_smpi_mpi_op_t;
1231
1232 #define MAX_OP(a, b)  (b) = (a) < (b) ? (b) : (a)
1233 #define MIN_OP(a, b)  (b) = (a) < (b) ? (a) : (b)
1234 #define SUM_OP(a, b)  (b) += (a)
1235 #define PROD_OP(a, b) (b) *= (a)
1236 #define LAND_OP(a, b) (b) = (a) && (b)
1237 #define LOR_OP(a, b)  (b) = (a) || (b)
1238 #define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b))
1239 #define BAND_OP(a, b) (b) &= (a)
1240 #define BOR_OP(a, b)  (b) |= (a)
1241 #define BXOR_OP(a, b) (b) ^= (a)
1242 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
1243 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
1244
1245 #define APPLY_FUNC(a, b, length, type, func) \
1246 {                                          \
1247   int i;                                   \
1248   type* x = (type*)(a);                    \
1249   type* y = (type*)(b);                    \
1250   for(i = 0; i < *(length); i++) {         \
1251     func(x[i], y[i]);                      \
1252   }                                        \
1253 }
1254
1255 static void max_func(void *a, void *b, int *length,
1256                      MPI_Datatype * datatype)
1257 {
1258   if (*datatype == MPI_CHAR) {
1259     APPLY_FUNC(a, b, length, char, MAX_OP);
1260   } else if (*datatype == MPI_SHORT) {
1261     APPLY_FUNC(a, b, length, short, MAX_OP);
1262   } else if (*datatype == MPI_INT) {
1263     APPLY_FUNC(a, b, length, int, MAX_OP);
1264   } else if (*datatype == MPI_LONG) {
1265     APPLY_FUNC(a, b, length, long, MAX_OP);
1266   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1267     APPLY_FUNC(a, b, length, unsigned short, MAX_OP);
1268   } else if (*datatype == MPI_UNSIGNED) {
1269     APPLY_FUNC(a, b, length, unsigned int, MAX_OP);
1270   } else if (*datatype == MPI_UNSIGNED_LONG) {
1271     APPLY_FUNC(a, b, length, unsigned long, MAX_OP);
1272   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1273     APPLY_FUNC(a, b, length, unsigned char, MAX_OP);
1274   } else if (*datatype == MPI_FLOAT) {
1275     APPLY_FUNC(a, b, length, float, MAX_OP);
1276   } else if (*datatype == MPI_DOUBLE) {
1277     APPLY_FUNC(a, b, length, double, MAX_OP);
1278   } else if (*datatype == MPI_LONG_DOUBLE) {
1279     APPLY_FUNC(a, b, length, long double, MAX_OP);
1280   }
1281 }
1282
1283 static void min_func(void *a, void *b, int *length,
1284                      MPI_Datatype * datatype)
1285 {
1286   if (*datatype == MPI_CHAR) {
1287     APPLY_FUNC(a, b, length, char, MIN_OP);
1288   } else if (*datatype == MPI_SHORT) {
1289     APPLY_FUNC(a, b, length, short, MIN_OP);
1290   } else if (*datatype == MPI_INT) {
1291     APPLY_FUNC(a, b, length, int, MIN_OP);
1292   } else if (*datatype == MPI_LONG) {
1293     APPLY_FUNC(a, b, length, long, MIN_OP);
1294   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1295     APPLY_FUNC(a, b, length, unsigned short, MIN_OP);
1296   } else if (*datatype == MPI_UNSIGNED) {
1297     APPLY_FUNC(a, b, length, unsigned int, MIN_OP);
1298   } else if (*datatype == MPI_UNSIGNED_LONG) {
1299     APPLY_FUNC(a, b, length, unsigned long, MIN_OP);
1300   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1301     APPLY_FUNC(a, b, length, unsigned char, MIN_OP);
1302   } else if (*datatype == MPI_FLOAT) {
1303     APPLY_FUNC(a, b, length, float, MIN_OP);
1304   } else if (*datatype == MPI_DOUBLE) {
1305     APPLY_FUNC(a, b, length, double, MIN_OP);
1306   } else if (*datatype == MPI_LONG_DOUBLE) {
1307     APPLY_FUNC(a, b, length, long double, MIN_OP);
1308   }
1309 }
1310
1311 static void sum_func(void *a, void *b, int *length,
1312                      MPI_Datatype * datatype)
1313 {
1314   if (*datatype == MPI_CHAR) {
1315     APPLY_FUNC(a, b, length, char, SUM_OP);
1316   } else if (*datatype == MPI_SHORT) {
1317     APPLY_FUNC(a, b, length, short, SUM_OP);
1318   } else if (*datatype == MPI_INT) {
1319     APPLY_FUNC(a, b, length, int, SUM_OP);
1320   } else if (*datatype == MPI_LONG) {
1321     APPLY_FUNC(a, b, length, long, SUM_OP);
1322   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1323     APPLY_FUNC(a, b, length, unsigned short, SUM_OP);
1324   } else if (*datatype == MPI_UNSIGNED) {
1325     APPLY_FUNC(a, b, length, unsigned int, SUM_OP);
1326   } else if (*datatype == MPI_UNSIGNED_LONG) {
1327     APPLY_FUNC(a, b, length, unsigned long, SUM_OP);
1328   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1329     APPLY_FUNC(a, b, length, unsigned char, SUM_OP);
1330   } else if (*datatype == MPI_FLOAT) {
1331     APPLY_FUNC(a, b, length, float, SUM_OP);
1332   } else if (*datatype == MPI_DOUBLE) {
1333     APPLY_FUNC(a, b, length, double, SUM_OP);
1334   } else if (*datatype == MPI_LONG_DOUBLE) {
1335     APPLY_FUNC(a, b, length, long double, SUM_OP);
1336   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1337     APPLY_FUNC(a, b, length, float _Complex, SUM_OP);
1338   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1339     APPLY_FUNC(a, b, length, double _Complex, SUM_OP);
1340   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1341     APPLY_FUNC(a, b, length, long double _Complex, SUM_OP);
1342   }
1343 }
1344
1345 static void prod_func(void *a, void *b, int *length,
1346                       MPI_Datatype * datatype)
1347 {
1348   if (*datatype == MPI_CHAR) {
1349     APPLY_FUNC(a, b, length, char, PROD_OP);
1350   } else if (*datatype == MPI_SHORT) {
1351     APPLY_FUNC(a, b, length, short, PROD_OP);
1352   } else if (*datatype == MPI_INT) {
1353     APPLY_FUNC(a, b, length, int, PROD_OP);
1354   } else if (*datatype == MPI_LONG) {
1355     APPLY_FUNC(a, b, length, long, PROD_OP);
1356   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1357     APPLY_FUNC(a, b, length, unsigned short, PROD_OP);
1358   } else if (*datatype == MPI_UNSIGNED) {
1359     APPLY_FUNC(a, b, length, unsigned int, PROD_OP);
1360   } else if (*datatype == MPI_UNSIGNED_LONG) {
1361     APPLY_FUNC(a, b, length, unsigned long, PROD_OP);
1362   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1363     APPLY_FUNC(a, b, length, unsigned char, PROD_OP);
1364   } else if (*datatype == MPI_FLOAT) {
1365     APPLY_FUNC(a, b, length, float, PROD_OP);
1366   } else if (*datatype == MPI_DOUBLE) {
1367     APPLY_FUNC(a, b, length, double, PROD_OP);
1368   } else if (*datatype == MPI_LONG_DOUBLE) {
1369     APPLY_FUNC(a, b, length, long double, PROD_OP);
1370   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1371     APPLY_FUNC(a, b, length, float _Complex, PROD_OP);
1372   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1373     APPLY_FUNC(a, b, length, double _Complex, PROD_OP);
1374   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1375     APPLY_FUNC(a, b, length, long double _Complex, PROD_OP);
1376   }
1377 }
1378
1379 static void land_func(void *a, void *b, int *length,
1380                       MPI_Datatype * datatype)
1381 {
1382   if (*datatype == MPI_CHAR) {
1383     APPLY_FUNC(a, b, length, char, LAND_OP);
1384   } else if (*datatype == MPI_SHORT) {
1385     APPLY_FUNC(a, b, length, short, LAND_OP);
1386   } else if (*datatype == MPI_INT) {
1387     APPLY_FUNC(a, b, length, int, LAND_OP);
1388   } else if (*datatype == MPI_LONG) {
1389     APPLY_FUNC(a, b, length, long, LAND_OP);
1390   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1391     APPLY_FUNC(a, b, length, unsigned short, LAND_OP);
1392   } else if (*datatype == MPI_UNSIGNED) {
1393     APPLY_FUNC(a, b, length, unsigned int, LAND_OP);
1394   } else if (*datatype == MPI_UNSIGNED_LONG) {
1395     APPLY_FUNC(a, b, length, unsigned long, LAND_OP);
1396   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1397     APPLY_FUNC(a, b, length, unsigned char, LAND_OP);
1398   } else if (*datatype == MPI_C_BOOL) {
1399     APPLY_FUNC(a, b, length, _Bool, LAND_OP);
1400   }
1401 }
1402
1403 static void lor_func(void *a, void *b, int *length,
1404                      MPI_Datatype * datatype)
1405 {
1406   if (*datatype == MPI_CHAR) {
1407     APPLY_FUNC(a, b, length, char, LOR_OP);
1408   } else if (*datatype == MPI_SHORT) {
1409     APPLY_FUNC(a, b, length, short, LOR_OP);
1410   } else if (*datatype == MPI_INT) {
1411     APPLY_FUNC(a, b, length, int, LOR_OP);
1412   } else if (*datatype == MPI_LONG) {
1413     APPLY_FUNC(a, b, length, long, LOR_OP);
1414   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1415     APPLY_FUNC(a, b, length, unsigned short, LOR_OP);
1416   } else if (*datatype == MPI_UNSIGNED) {
1417     APPLY_FUNC(a, b, length, unsigned int, LOR_OP);
1418   } else if (*datatype == MPI_UNSIGNED_LONG) {
1419     APPLY_FUNC(a, b, length, unsigned long, LOR_OP);
1420   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1421     APPLY_FUNC(a, b, length, unsigned char, LOR_OP);
1422   } else if (*datatype == MPI_C_BOOL) {
1423     APPLY_FUNC(a, b, length, _Bool, LOR_OP);
1424   }
1425 }
1426
1427 static void lxor_func(void *a, void *b, int *length,
1428                       MPI_Datatype * datatype)
1429 {
1430   if (*datatype == MPI_CHAR) {
1431     APPLY_FUNC(a, b, length, char, LXOR_OP);
1432   } else if (*datatype == MPI_SHORT) {
1433     APPLY_FUNC(a, b, length, short, LXOR_OP);
1434   } else if (*datatype == MPI_INT) {
1435     APPLY_FUNC(a, b, length, int, LXOR_OP);
1436   } else if (*datatype == MPI_LONG) {
1437     APPLY_FUNC(a, b, length, long, LXOR_OP);
1438   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1439     APPLY_FUNC(a, b, length, unsigned short, LXOR_OP);
1440   } else if (*datatype == MPI_UNSIGNED) {
1441     APPLY_FUNC(a, b, length, unsigned int, LXOR_OP);
1442   } else if (*datatype == MPI_UNSIGNED_LONG) {
1443     APPLY_FUNC(a, b, length, unsigned long, LXOR_OP);
1444   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1445     APPLY_FUNC(a, b, length, unsigned char, LXOR_OP);
1446   } else if (*datatype == MPI_C_BOOL) {
1447     APPLY_FUNC(a, b, length, _Bool, LXOR_OP);
1448   }
1449 }
1450
1451 static void band_func(void *a, void *b, int *length,
1452                       MPI_Datatype * datatype)
1453 {
1454   if (*datatype == MPI_CHAR) {
1455     APPLY_FUNC(a, b, length, char, BAND_OP);
1456   }else if (*datatype == MPI_SHORT) {
1457     APPLY_FUNC(a, b, length, short, BAND_OP);
1458   } else if (*datatype == MPI_INT) {
1459     APPLY_FUNC(a, b, length, int, BAND_OP);
1460   } else if (*datatype == MPI_LONG) {
1461     APPLY_FUNC(a, b, length, long, BAND_OP);
1462   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1463     APPLY_FUNC(a, b, length, unsigned short, BAND_OP);
1464   } else if (*datatype == MPI_UNSIGNED) {
1465     APPLY_FUNC(a, b, length, unsigned int, BAND_OP);
1466   } else if (*datatype == MPI_UNSIGNED_LONG) {
1467     APPLY_FUNC(a, b, length, unsigned long, BAND_OP);
1468   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1469     APPLY_FUNC(a, b, length, unsigned char, BAND_OP);
1470   } else if (*datatype == MPI_BYTE) {
1471     APPLY_FUNC(a, b, length, uint8_t, BAND_OP);
1472   }
1473 }
1474
1475 static void bor_func(void *a, void *b, int *length,
1476                      MPI_Datatype * datatype)
1477 {
1478   if (*datatype == MPI_CHAR) {
1479     APPLY_FUNC(a, b, length, char, BOR_OP);
1480   } else if (*datatype == MPI_SHORT) {
1481     APPLY_FUNC(a, b, length, short, BOR_OP);
1482   } else if (*datatype == MPI_INT) {
1483     APPLY_FUNC(a, b, length, int, BOR_OP);
1484   } else if (*datatype == MPI_LONG) {
1485     APPLY_FUNC(a, b, length, long, BOR_OP);
1486   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1487     APPLY_FUNC(a, b, length, unsigned short, BOR_OP);
1488   } else if (*datatype == MPI_UNSIGNED) {
1489     APPLY_FUNC(a, b, length, unsigned int, BOR_OP);
1490   } else if (*datatype == MPI_UNSIGNED_LONG) {
1491     APPLY_FUNC(a, b, length, unsigned long, BOR_OP);
1492   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1493     APPLY_FUNC(a, b, length, unsigned char, BOR_OP);
1494   } else if (*datatype == MPI_BYTE) {
1495     APPLY_FUNC(a, b, length, uint8_t, BOR_OP);
1496   }
1497 }
1498
1499 static void bxor_func(void *a, void *b, int *length,
1500                       MPI_Datatype * datatype)
1501 {
1502   if (*datatype == MPI_CHAR) {
1503     APPLY_FUNC(a, b, length, char, BXOR_OP);
1504   } else if (*datatype == MPI_SHORT) {
1505     APPLY_FUNC(a, b, length, short, BXOR_OP);
1506   } else if (*datatype == MPI_INT) {
1507     APPLY_FUNC(a, b, length, int, BXOR_OP);
1508   } else if (*datatype == MPI_LONG) {
1509     APPLY_FUNC(a, b, length, long, BXOR_OP);
1510   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1511     APPLY_FUNC(a, b, length, unsigned short, BXOR_OP);
1512   } else if (*datatype == MPI_UNSIGNED) {
1513     APPLY_FUNC(a, b, length, unsigned int, BXOR_OP);
1514   } else if (*datatype == MPI_UNSIGNED_LONG) {
1515     APPLY_FUNC(a, b, length, unsigned long, BXOR_OP);
1516   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1517     APPLY_FUNC(a, b, length, unsigned char, BXOR_OP);
1518   } else if (*datatype == MPI_BYTE) {
1519     APPLY_FUNC(a, b, length, uint8_t, BXOR_OP);
1520   }
1521 }
1522
1523 static void minloc_func(void *a, void *b, int *length,
1524                         MPI_Datatype * datatype)
1525 {
1526   if (*datatype == MPI_FLOAT_INT) {
1527     APPLY_FUNC(a, b, length, float_int, MINLOC_OP);
1528   } else if (*datatype == MPI_LONG_INT) {
1529     APPLY_FUNC(a, b, length, long_int, MINLOC_OP);
1530   } else if (*datatype == MPI_DOUBLE_INT) {
1531     APPLY_FUNC(a, b, length, double_int, MINLOC_OP);
1532   } else if (*datatype == MPI_SHORT_INT) {
1533     APPLY_FUNC(a, b, length, short_int, MINLOC_OP);
1534   } else if (*datatype == MPI_2LONG) {
1535     APPLY_FUNC(a, b, length, long_long, MINLOC_OP);
1536   } else if (*datatype == MPI_2INT) {
1537     APPLY_FUNC(a, b, length, int_int, MINLOC_OP);
1538   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1539     APPLY_FUNC(a, b, length, long_double_int, MINLOC_OP);
1540   } else if (*datatype == MPI_2FLOAT) {
1541     APPLY_FUNC(a, b, length, float_float, MINLOC_OP);
1542   } else if (*datatype == MPI_2DOUBLE) {
1543     APPLY_FUNC(a, b, length, double_double, MINLOC_OP);
1544   }
1545 }
1546
1547 static void maxloc_func(void *a, void *b, int *length,
1548                         MPI_Datatype * datatype)
1549 {
1550   if (*datatype == MPI_FLOAT_INT) {
1551     APPLY_FUNC(a, b, length, float_int, MAXLOC_OP);
1552   } else if (*datatype == MPI_LONG_INT) {
1553     APPLY_FUNC(a, b, length, long_int, MAXLOC_OP);
1554   } else if (*datatype == MPI_DOUBLE_INT) {
1555     APPLY_FUNC(a, b, length, double_int, MAXLOC_OP);
1556   } else if (*datatype == MPI_SHORT_INT) {
1557     APPLY_FUNC(a, b, length, short_int, MAXLOC_OP);
1558   } else if (*datatype == MPI_2LONG) {
1559     APPLY_FUNC(a, b, length, long_long, MAXLOC_OP);
1560   } else if (*datatype == MPI_2INT) {
1561     APPLY_FUNC(a, b, length, int_int, MAXLOC_OP);
1562   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1563     APPLY_FUNC(a, b, length, long_double_int, MAXLOC_OP);
1564   } else if (*datatype == MPI_2FLOAT) {
1565     APPLY_FUNC(a, b, length, float_float, MAXLOC_OP);
1566   } else if (*datatype == MPI_2DOUBLE) {
1567     APPLY_FUNC(a, b, length, double_double, MAXLOC_OP);
1568   }
1569 }
1570
1571 static void replace_func(void *a, void *b, int *length,
1572                         MPI_Datatype * datatype)
1573 {
1574   memcpy(b, a, *length * smpi_datatype_size(*datatype));
1575 }
1576
1577 #define CREATE_MPI_OP(name, func)                             \
1578   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */, TRUE }; \
1579 MPI_Op name = &mpi_##name;
1580
1581 CREATE_MPI_OP(MPI_MAX, max_func);
1582 CREATE_MPI_OP(MPI_MIN, min_func);
1583 CREATE_MPI_OP(MPI_SUM, sum_func);
1584 CREATE_MPI_OP(MPI_PROD, prod_func);
1585 CREATE_MPI_OP(MPI_LAND, land_func);
1586 CREATE_MPI_OP(MPI_LOR, lor_func);
1587 CREATE_MPI_OP(MPI_LXOR, lxor_func);
1588 CREATE_MPI_OP(MPI_BAND, band_func);
1589 CREATE_MPI_OP(MPI_BOR, bor_func);
1590 CREATE_MPI_OP(MPI_BXOR, bxor_func);
1591 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
1592 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
1593 CREATE_MPI_OP(MPI_REPLACE, replace_func);
1594
1595
1596 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
1597 {
1598   MPI_Op op;
1599   op = xbt_new(s_smpi_mpi_op_t, 1);
1600   op->func = function;
1601   op-> is_commute = commute;
1602   return op;
1603 }
1604
1605 int smpi_op_is_commute(MPI_Op op)
1606 {
1607   return (op==MPI_OP_NULL) ? 1 : op-> is_commute;
1608 }
1609
1610 void smpi_op_destroy(MPI_Op op)
1611 {
1612   xbt_free(op);
1613 }
1614
1615 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
1616                    MPI_Datatype * datatype)
1617 {
1618   if(op==MPI_OP_NULL)
1619     return;
1620
1621   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
1622     XBT_DEBUG("Applying operation, switch to the right data frame ");
1623     smpi_switch_data_segment(smpi_process_index());
1624   }
1625
1626   if(!smpi_process_get_replaying())
1627   op->func(invec, inoutvec, len, datatype);
1628 }