Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
923193b576d3ac2c62fc995f8fb9432ba280b0f7
[simgrid.git] / src / smpi / smpi_mpi_dt.c
1 /* smpi_mpi_dt.c -- MPI primitives to handle datatypes                        */
2 /* FIXME: a very incomplete implementation                                    */
3
4 /* Copyright (c) 2009-2014. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include "private.h"
15 #include "smpi_mpi_dt_private.h"
16 #include "mc/mc.h"
17 #include "xbt/replay.h"
18 #include "simgrid/modelchecker.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi,
21                                 "Logging specific to SMPI (datatype)");
22
23 #define CREATE_MPI_DATATYPE(name, type)       \
24   static s_smpi_mpi_datatype_t mpi_##name = { \
25     (char*) # name,                                   \
26     sizeof(type),  /* size */                 \
27     0,             /*was 1 has_subtype*/             \
28     0,             /* lb */                   \
29     sizeof(type),  /* ub = lb + size */       \
30     DT_FLAG_BASIC,  /* flags */              \
31     NULL           /* pointer on extended struct*/ \
32   };                                          \
33 MPI_Datatype name = &mpi_##name;
34
35 #define CREATE_MPI_DATATYPE_NULL(name)       \
36   static s_smpi_mpi_datatype_t mpi_##name = { \
37     (char*) # name,                                   \
38     0,  /* size */                 \
39     0,             /*was 1 has_subtype*/             \
40     0,             /* lb */                   \
41     0,  /* ub = lb + size */       \
42     DT_FLAG_BASIC,  /* flags */              \
43     NULL           /* pointer on extended struct*/ \
44   };                                          \
45 MPI_Datatype name = &mpi_##name;
46
47 //The following are datatypes for the MPI functions MPI_MAXLOC and MPI_MINLOC.
48 typedef struct {
49   float value;
50   int index;
51 } float_int;
52 typedef struct {
53   float value;
54   float index;
55 } float_float;
56 typedef struct {
57   long value;
58   long index;
59 } long_long;
60 typedef struct {
61   double value;
62   double index;
63 } double_double;
64 typedef struct {
65   long value;
66   int index;
67 } long_int;
68 typedef struct {
69   double value;
70   int index;
71 } double_int;
72 typedef struct {
73   short value;
74   int index;
75 } short_int;
76 typedef struct {
77   int value;
78   int index;
79 } int_int;
80 typedef struct {
81   long double value;
82   int index;
83 } long_double_int;
84 typedef struct {
85   int64_t value;
86   int64_t index;
87 } integer128_t;
88 // Predefined data types
89 CREATE_MPI_DATATYPE(MPI_CHAR, char);
90 CREATE_MPI_DATATYPE(MPI_SHORT, short);
91 CREATE_MPI_DATATYPE(MPI_INT, int);
92 CREATE_MPI_DATATYPE(MPI_LONG, long);
93 CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long);
94 CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char);
95 CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char);
96 CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short);
97 CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int);
98 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long);
99 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long);
100 CREATE_MPI_DATATYPE(MPI_FLOAT, float);
101 CREATE_MPI_DATATYPE(MPI_DOUBLE, double);
102 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double);
103 CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t);
104 CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool);
105 CREATE_MPI_DATATYPE(MPI_BYTE, int8_t);
106 CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t);
107 CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t);
108 CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t);
109 CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t);
110 CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t);
111 CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t);
112 CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t);
113 CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t);
114 CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex);
115 CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex);
116 CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex);
117 CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint);
118 CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset);
119
120 CREATE_MPI_DATATYPE(MPI_FLOAT_INT, float_int);
121 CREATE_MPI_DATATYPE(MPI_LONG_INT, long_int);
122 CREATE_MPI_DATATYPE(MPI_DOUBLE_INT, double_int);
123 CREATE_MPI_DATATYPE(MPI_SHORT_INT, short_int);
124 CREATE_MPI_DATATYPE(MPI_2INT, int_int);
125 CREATE_MPI_DATATYPE(MPI_2FLOAT, float_float);
126 CREATE_MPI_DATATYPE(MPI_2DOUBLE, double_double);
127 CREATE_MPI_DATATYPE(MPI_2LONG, long_long);
128
129 CREATE_MPI_DATATYPE(MPI_REAL4, float);
130 CREATE_MPI_DATATYPE(MPI_REAL8, float);
131 CREATE_MPI_DATATYPE(MPI_REAL16, double);
132 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX8);
133 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX16);
134 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX32);
135 CREATE_MPI_DATATYPE(MPI_INTEGER1, int);
136 CREATE_MPI_DATATYPE(MPI_INTEGER2, int16_t);
137 CREATE_MPI_DATATYPE(MPI_INTEGER4, int32_t);
138 CREATE_MPI_DATATYPE(MPI_INTEGER8, int64_t);
139 CREATE_MPI_DATATYPE(MPI_INTEGER16, integer128_t);
140
141 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE_INT, long_double_int);
142
143 CREATE_MPI_DATATYPE_NULL(MPI_UB);
144 CREATE_MPI_DATATYPE_NULL(MPI_LB);
145 CREATE_MPI_DATATYPE_NULL(MPI_PACKED);
146 // Internal use only
147 CREATE_MPI_DATATYPE(MPI_PTR, void*);
148
149 /** Check if the datatype is usable for communications
150  */
151 int is_datatype_valid(MPI_Datatype datatype) {
152     return datatype != MPI_DATATYPE_NULL
153         && (datatype->flags & DT_FLAG_COMMITED);
154 }
155
156 size_t smpi_datatype_size(MPI_Datatype datatype)
157 {
158   return datatype->size;
159 }
160
161 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype)
162 {
163   return datatype->lb;
164 }
165
166 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
167 {
168   return datatype->ub;
169 }
170
171 MPI_Datatype smpi_datatype_dup(MPI_Datatype datatype)
172 {
173   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
174   memcpy(new_t, datatype, sizeof(s_smpi_mpi_datatype_t));
175   if (datatype->has_subtype)
176     memcpy(new_t->substruct, datatype->substruct, sizeof(s_smpi_subtype_t));
177   if(datatype->name)
178     new_t->name = strdup(datatype->name);
179   return new_t;
180 }
181
182 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
183                          MPI_Aint * extent)
184 {
185   *lb = datatype->lb;
186   *extent = datatype->ub - datatype->lb;
187   return MPI_SUCCESS;
188 }
189
190 MPI_Aint smpi_datatype_get_extent(MPI_Datatype datatype){
191   return datatype->ub - datatype->lb;
192 }
193
194 void smpi_datatype_get_name(MPI_Datatype datatype, char* name, int* length){
195   *length = strlen(datatype->name);
196   strcpy(name, datatype->name);
197 }
198
199 void smpi_datatype_set_name(MPI_Datatype datatype, char* name){
200   datatype->name = strdup(name);;
201 }
202
203 int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
204                        void *recvbuf, int recvcount, MPI_Datatype recvtype)
205 {
206   int count;
207   if(smpi_privatize_global_variables){
208     switch_data_segment(smpi_process_index());
209   }
210   /* First check if we really have something to do */
211   if (recvcount > 0 && recvbuf != sendbuf) {
212     /* FIXME: treat packed cases */
213     sendcount *= smpi_datatype_size(sendtype);
214     recvcount *= smpi_datatype_size(recvtype);
215     count = sendcount < recvcount ? sendcount : recvcount;
216
217     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
218       if(!_xbt_replay_is_active()) memcpy(recvbuf, sendbuf, count);
219     }
220     else if (sendtype->has_subtype == 0)
221     {
222       s_smpi_subtype_t *subtype =  recvtype->substruct;
223       subtype->unserialize( sendbuf, recvbuf,1, subtype, MPI_REPLACE);
224     }
225     else if (recvtype->has_subtype == 0)
226     {
227       s_smpi_subtype_t *subtype =  sendtype->substruct;
228       subtype->serialize(sendbuf, recvbuf,1, subtype);
229     }else{
230       s_smpi_subtype_t *subtype =  sendtype->substruct;
231
232
233       void * buf_tmp = xbt_malloc(count);
234
235       subtype->serialize( sendbuf, buf_tmp,count/smpi_datatype_size(sendtype), subtype);
236       subtype =  recvtype->substruct;
237       subtype->unserialize( buf_tmp, recvbuf,count/smpi_datatype_size(recvtype), subtype, MPI_REPLACE);
238
239       free(buf_tmp);
240     }
241   }
242
243   return sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
244 }
245
246 /*
247  *  Copies noncontiguous data into contiguous memory.
248  *  @param contiguous_vector - output vector
249  *  @param noncontiguous_vector - input vector
250  *  @param type - pointer contening :
251  *      - stride - stride of between noncontiguous data
252  *      - block_length - the width or height of blocked matrix
253  *      - count - the number of rows of matrix
254  */
255 void serialize_vector( const void *noncontiguous_vector,
256                        void *contiguous_vector,
257                        int count,
258                        void *type)
259 {
260   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
261   int i;
262   char* contiguous_vector_char = (char*)contiguous_vector;
263   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
264
265   for (i = 0; i < type_c->block_count * count; i++) {
266       if (type_c->old_type->has_subtype == 0)
267         memcpy(contiguous_vector_char,
268                noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
269       else
270         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
271                                                                      contiguous_vector_char,
272                                                                      type_c->block_length,
273                                                                      type_c->old_type->substruct);
274
275     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
276     if((i+1)%type_c->block_count ==0)
277     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
278     else
279     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
280   }
281 }
282
283 /*
284  *  Copies contiguous data into noncontiguous memory.
285  *  @param noncontiguous_vector - output vector
286  *  @param contiguous_vector - input vector
287  *  @param type - pointer contening :
288  *      - stride - stride of between noncontiguous data
289  *      - block_length - the width or height of blocked matrix
290  *      - count - the number of rows of matrix
291  */
292 void unserialize_vector( const void *contiguous_vector,
293                          void *noncontiguous_vector,
294                          int count,
295                          void *type,
296                          MPI_Op op)
297 {
298   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
299   int i;
300
301   char* contiguous_vector_char = (char*)contiguous_vector;
302   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
303
304   for (i = 0; i < type_c->block_count * count; i++) {
305     if (type_c->old_type->has_subtype == 0)
306       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
307           &type_c->old_type);
308      /* memcpy(noncontiguous_vector_char,
309              contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
310     else
311       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
312                                                                      noncontiguous_vector_char,
313                                                                      type_c->block_length,
314                                                                      type_c->old_type->substruct,
315                                                                      op);
316     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
317     if((i+1)%type_c->block_count ==0)
318     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
319     else
320     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
321   }
322 }
323
324 /*
325  * Create a Sub type vector to be able to serialize and unserialize it
326  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
327  * required the functions unserialize and serialize
328  *
329  */
330 s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
331                                                   int block_length,
332                                                   int block_count,
333                                                   MPI_Datatype old_type,
334                                                   int size_oldtype){
335   s_smpi_mpi_vector_t *new_t= xbt_new(s_smpi_mpi_vector_t,1);
336   new_t->base.serialize = &serialize_vector;
337   new_t->base.unserialize = &unserialize_vector;
338   new_t->base.subtype_free = &free_vector;
339   new_t->block_stride = block_stride;
340   new_t->block_length = block_length;
341   new_t->block_count = block_count;
342   smpi_datatype_use(old_type);
343   new_t->old_type = old_type;
344   new_t->size_oldtype = size_oldtype;
345   return new_t;
346 }
347
348 void smpi_datatype_create(MPI_Datatype* new_type, int size,int lb, int ub, int has_subtype,
349                           void *struct_type, int flags){
350   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
351   new_t->name = NULL;
352   new_t->size = size;
353   new_t->has_subtype = size>0? has_subtype:0;
354   new_t->lb = lb;
355   new_t->ub = ub;
356   new_t->flags = flags;
357   new_t->substruct = struct_type;
358   new_t->in_use=0;
359   *new_type = new_t;
360
361 #ifdef HAVE_MC
362   if(MC_is_active())
363     MC_ignore(&(new_t->in_use), sizeof(new_t->in_use));
364 #endif
365 }
366
367 void smpi_datatype_free(MPI_Datatype* type){
368
369   if((*type)->flags & DT_FLAG_PREDEFINED)return;
370
371   //if still used, mark for deletion
372   if((*type)->in_use!=0){
373       (*type)->flags |=DT_FLAG_DESTROYED;
374       return;
375   }
376
377   if ((*type)->has_subtype == 1){
378     ((s_smpi_subtype_t *)(*type)->substruct)->subtype_free(type);  
379     xbt_free((*type)->substruct);
380   }
381   if ((*type)->name != NULL){
382     xbt_free((*type)->name);
383   }
384   xbt_free(*type);
385   *type = MPI_DATATYPE_NULL;
386 }
387
388 void smpi_datatype_use(MPI_Datatype type){
389   if(type)type->in_use++;
390
391 #ifdef HAVE_MC
392   if(MC_is_active())
393     MC_ignore(&(type->in_use), sizeof(type->in_use));
394 #endif
395 }
396
397
398 void smpi_datatype_unuse(MPI_Datatype type){
399   if(type && type->in_use-- == 0 && (type->flags & DT_FLAG_DESTROYED))
400     smpi_datatype_free(&type);
401
402 #ifdef HAVE_MC
403   if(MC_is_active())
404     MC_ignore(&(type->in_use), sizeof(type->in_use));
405 #endif
406 }
407
408
409
410
411 /*
412 Contiguous Implementation
413 */
414
415
416 /*
417  *  Copies noncontiguous data into contiguous memory.
418  *  @param contiguous_hvector - output hvector
419  *  @param noncontiguous_hvector - input hvector
420  *  @param type - pointer contening :
421  *      - stride - stride of between noncontiguous data, in bytes
422  *      - block_length - the width or height of blocked matrix
423  *      - count - the number of rows of matrix
424  */
425 void serialize_contiguous( const void *noncontiguous_hvector,
426                        void *contiguous_hvector,
427                        int count,
428                        void *type)
429 {
430   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
431   char* contiguous_vector_char = (char*)contiguous_hvector;
432   char* noncontiguous_vector_char = (char*)noncontiguous_hvector+type_c->lb;
433   memcpy(contiguous_vector_char,
434            noncontiguous_vector_char, count* type_c->block_count * type_c->size_oldtype);
435 }
436 /*
437  *  Copies contiguous data into noncontiguous memory.
438  *  @param noncontiguous_vector - output hvector
439  *  @param contiguous_vector - input hvector
440  *  @param type - pointer contening :
441  *      - stride - stride of between noncontiguous data, in bytes
442  *      - block_length - the width or height of blocked matrix
443  *      - count - the number of rows of matrix
444  */
445 void unserialize_contiguous( const void *contiguous_vector,
446                          void *noncontiguous_vector,
447                          int count,
448                          void *type,
449                          MPI_Op op)
450 {
451   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
452   char* contiguous_vector_char = (char*)contiguous_vector;
453   char* noncontiguous_vector_char = (char*)noncontiguous_vector+type_c->lb;
454   int n= count* type_c->block_count;
455   smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &n,
456             &type_c->old_type);
457        /*memcpy(noncontiguous_vector_char,
458            contiguous_vector_char, count*  type_c->block_count * type_c->size_oldtype);*/
459 }
460
461 void free_contiguous(MPI_Datatype* d){
462   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
463 }
464
465 /*
466  * Create a Sub type contiguous to be able to serialize and unserialize it
467  * the structure s_smpi_mpi_contiguous_t is derived from s_smpi_subtype which
468  * required the functions unserialize and serialize
469  *
470  */
471 s_smpi_mpi_contiguous_t* smpi_datatype_contiguous_create( MPI_Aint lb,
472                                                   int block_count,
473                                                   MPI_Datatype old_type,
474                                                   int size_oldtype){
475   s_smpi_mpi_contiguous_t *new_t= xbt_new(s_smpi_mpi_contiguous_t,1);
476   new_t->base.serialize = &serialize_contiguous;
477   new_t->base.unserialize = &unserialize_contiguous;
478   new_t->base.subtype_free = &free_contiguous;
479   new_t->lb = lb;
480   new_t->block_count = block_count;
481   new_t->old_type = old_type;
482   new_t->size_oldtype = size_oldtype;
483   smpi_datatype_use(old_type);
484   return new_t;
485 }
486
487
488
489
490 int smpi_datatype_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* new_type, MPI_Aint lb)
491 {
492   int retval;
493   if(old_type->has_subtype){
494           //handle this case as a hvector with stride equals to the extent of the datatype
495           return smpi_datatype_hvector(count, 1, smpi_datatype_get_extent(old_type), old_type, new_type);
496   }
497   
498   s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
499                                                                 count,
500                                                                 old_type,
501                                                                 smpi_datatype_size(old_type));
502                                                                 
503   smpi_datatype_create(new_type,
504                                           count * smpi_datatype_size(old_type),
505                                           lb,lb + count * smpi_datatype_size(old_type),
506                                           1,subtype, DT_FLAG_CONTIGUOUS);
507   retval=MPI_SUCCESS;
508   return retval;
509 }
510
511 int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_type, MPI_Datatype* new_type)
512 {
513   int retval;
514   if (blocklen<0) return MPI_ERR_ARG;
515   MPI_Aint lb = 0;
516   MPI_Aint ub = 0;
517   if(count>0){
518     lb=smpi_datatype_lb(old_type);
519     ub=((count-1)*stride+blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
520   }
521   if(old_type->has_subtype || stride != blocklen){
522
523
524     s_smpi_mpi_vector_t* subtype = smpi_datatype_vector_create( stride,
525                                                                 blocklen,
526                                                                 count,
527                                                                 old_type,
528                                                                 smpi_datatype_size(old_type));
529     smpi_datatype_create(new_type,
530                          count * (blocklen) * smpi_datatype_size(old_type), lb,
531                          ub,
532                          1,
533                          subtype,
534                          DT_FLAG_VECTOR);
535     retval=MPI_SUCCESS;
536   }else{
537     /* in this situation the data are contignous thus it's not
538      * required to serialize and unserialize it*/
539     smpi_datatype_create(new_type, count * blocklen *
540                          smpi_datatype_size(old_type), 0, ((count -1) * stride + blocklen)*
541                          smpi_datatype_size(old_type),
542                          0,
543                          NULL,
544                          DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
545     retval=MPI_SUCCESS;
546   }
547   return retval;
548 }
549
550 void free_vector(MPI_Datatype* d){
551   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
552 }
553
554 /*
555 Hvector Implementation - Vector with stride in bytes
556 */
557
558
559 /*
560  *  Copies noncontiguous data into contiguous memory.
561  *  @param contiguous_hvector - output hvector
562  *  @param noncontiguous_hvector - input hvector
563  *  @param type - pointer contening :
564  *      - stride - stride of between noncontiguous data, in bytes
565  *      - block_length - the width or height of blocked matrix
566  *      - count - the number of rows of matrix
567  */
568 void serialize_hvector( const void *noncontiguous_hvector,
569                        void *contiguous_hvector,
570                        int count,
571                        void *type)
572 {
573   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
574   int i;
575   char* contiguous_vector_char = (char*)contiguous_hvector;
576   char* noncontiguous_vector_char = (char*)noncontiguous_hvector;
577
578   for (i = 0; i < type_c->block_count * count; i++) {
579     if (type_c->old_type->has_subtype == 0)
580       memcpy(contiguous_vector_char,
581            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
582     else
583       ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
584                                                                    contiguous_vector_char,
585                                                                    type_c->block_length,
586                                                                    type_c->old_type->substruct);
587
588     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
589     if((i+1)%type_c->block_count ==0)
590     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
591     else
592     noncontiguous_vector_char += type_c->block_stride;
593   }
594 }
595 /*
596  *  Copies contiguous data into noncontiguous memory.
597  *  @param noncontiguous_vector - output hvector
598  *  @param contiguous_vector - input hvector
599  *  @param type - pointer contening :
600  *      - stride - stride of between noncontiguous data, in bytes
601  *      - block_length - the width or height of blocked matrix
602  *      - count - the number of rows of matrix
603  */
604 void unserialize_hvector( const void *contiguous_vector,
605                          void *noncontiguous_vector,
606                          int count,
607                          void *type,
608                          MPI_Op op)
609 {
610   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
611   int i;
612
613   char* contiguous_vector_char = (char*)contiguous_vector;
614   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
615
616   for (i = 0; i < type_c->block_count * count; i++) {
617     if (type_c->old_type->has_subtype == 0)
618       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
619                   &type_c->old_type);
620              /*memcpy(noncontiguous_vector_char,
621            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
622     else
623       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
624                                                                      noncontiguous_vector_char,
625                                                                      type_c->block_length,
626                                                                      type_c->old_type->substruct,
627                                                                      op);
628     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
629     if((i+1)%type_c->block_count ==0)
630     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
631     else
632     noncontiguous_vector_char += type_c->block_stride;
633   }
634 }
635
636 /*
637  * Create a Sub type vector to be able to serialize and unserialize it
638  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
639  * required the functions unserialize and serialize
640  *
641  */
642 s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
643                                                   int block_length,
644                                                   int block_count,
645                                                   MPI_Datatype old_type,
646                                                   int size_oldtype){
647   s_smpi_mpi_hvector_t *new_t= xbt_new(s_smpi_mpi_hvector_t,1);
648   new_t->base.serialize = &serialize_hvector;
649   new_t->base.unserialize = &unserialize_hvector;
650   new_t->base.subtype_free = &free_hvector;
651   new_t->block_stride = block_stride;
652   new_t->block_length = block_length;
653   new_t->block_count = block_count;
654   new_t->old_type = old_type;
655   new_t->size_oldtype = size_oldtype;
656   smpi_datatype_use(old_type);
657   return new_t;
658 }
659
660 //do nothing for vector types
661 void free_hvector(MPI_Datatype* d){
662   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
663 }
664
665 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
666 {
667   int retval;
668   if (blocklen<0) return MPI_ERR_ARG;
669   MPI_Aint lb = 0;
670   MPI_Aint ub = 0;
671   if(count>0){
672     lb=smpi_datatype_lb(old_type);
673     ub=((count-1)*stride)+(blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
674   }
675   if(old_type->has_subtype || stride != blocklen*smpi_datatype_get_extent(old_type)){
676     s_smpi_mpi_hvector_t* subtype = smpi_datatype_hvector_create( stride,
677                                                                   blocklen,
678                                                                   count,
679                                                                   old_type,
680                                                                   smpi_datatype_size(old_type));
681
682     smpi_datatype_create(new_type, count * blocklen * smpi_datatype_size(old_type),
683                                                  lb,ub,
684                          1,
685                          subtype,
686                          DT_FLAG_VECTOR);
687     retval=MPI_SUCCESS;
688   }else{
689     smpi_datatype_create(new_type, count * blocklen *
690                                              smpi_datatype_size(old_type),0,count * blocklen *
691                                              smpi_datatype_size(old_type),
692                                             0,
693                                             NULL,
694                                             DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
695     retval=MPI_SUCCESS;
696   }
697   return retval;
698 }
699
700
701 /*
702 Indexed Implementation
703 */
704
705 /*
706  *  Copies noncontiguous data into contiguous memory.
707  *  @param contiguous_indexed - output indexed
708  *  @param noncontiguous_indexed - input indexed
709  *  @param type - pointer contening :
710  *      - block_lengths - the width or height of blocked matrix
711  *      - block_indices - indices of each data, in element
712  *      - count - the number of rows of matrix
713  */
714 void serialize_indexed( const void *noncontiguous_indexed,
715                        void *contiguous_indexed,
716                        int count,
717                        void *type)
718 {
719   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
720   int i,j;
721   char* contiguous_indexed_char = (char*)contiguous_indexed;
722   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0] * type_c->size_oldtype;
723   for(j=0; j<count;j++){
724     for (i = 0; i < type_c->block_count; i++) {
725       if (type_c->old_type->has_subtype == 0)
726         memcpy(contiguous_indexed_char,
727                      noncontiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
728       else
729         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_indexed_char,
730                                                                      contiguous_indexed_char,
731                                                                      type_c->block_lengths[i],
732                                                                      type_c->old_type->substruct);
733
734
735       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
736       if (i<type_c->block_count-1)noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
737       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
738     }
739     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
740   }
741 }
742 /*
743  *  Copies contiguous data into noncontiguous memory.
744  *  @param noncontiguous_indexed - output indexed
745  *  @param contiguous_indexed - input indexed
746  *  @param type - pointer contening :
747  *      - block_lengths - the width or height of blocked matrix
748  *      - block_indices - indices of each data, in element
749  *      - count - the number of rows of matrix
750  */
751 void unserialize_indexed( const void *contiguous_indexed,
752                          void *noncontiguous_indexed,
753                          int count,
754                          void *type,
755                          MPI_Op op)
756 {
757
758   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
759   int i,j;
760   char* contiguous_indexed_char = (char*)contiguous_indexed;
761   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0]*smpi_datatype_get_extent(type_c->old_type);
762   for(j=0; j<count;j++){
763     for (i = 0; i < type_c->block_count; i++) {
764       if (type_c->old_type->has_subtype == 0)
765         smpi_op_apply(op, contiguous_indexed_char, noncontiguous_indexed_char, &type_c->block_lengths[i],
766                     &type_c->old_type);
767                /*memcpy(noncontiguous_indexed_char ,
768              contiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
769       else
770         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_indexed_char,
771                                                                        noncontiguous_indexed_char,
772                                                                        type_c->block_lengths[i],
773                                                                        type_c->old_type->substruct,
774                                                                        op);
775
776       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
777       if (i<type_c->block_count-1)
778         noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
779       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
780     }
781     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
782   }
783 }
784
785 void free_indexed(MPI_Datatype* type){
786   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths);
787   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices);
788   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
789 }
790
791 /*
792  * Create a Sub type indexed to be able to serialize and unserialize it
793  * the structure s_smpi_mpi_indexed_t is derived from s_smpi_subtype which
794  * required the functions unserialize and serialize
795  */
796 s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
797                                                   int* block_indices,
798                                                   int block_count,
799                                                   MPI_Datatype old_type,
800                                                   int size_oldtype){
801   s_smpi_mpi_indexed_t *new_t= xbt_new(s_smpi_mpi_indexed_t,1);
802   new_t->base.serialize = &serialize_indexed;
803   new_t->base.unserialize = &unserialize_indexed;
804   new_t->base.subtype_free = &free_indexed;
805  //TODO : add a custom function for each time to clean these 
806   new_t->block_lengths= xbt_new(int, block_count);
807   new_t->block_indices= xbt_new(int, block_count);
808   int i;
809   for(i=0;i<block_count;i++){
810     new_t->block_lengths[i]=block_lengths[i];
811     new_t->block_indices[i]=block_indices[i];
812   }
813   new_t->block_count = block_count;
814   smpi_datatype_use(old_type);
815   new_t->old_type = old_type;
816   new_t->size_oldtype = size_oldtype;
817   return new_t;
818 }
819
820
821 int smpi_datatype_indexed(int count, int* blocklens, int* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
822 {
823   int i;
824   int retval;
825   int size = 0;
826   int contiguous=1;
827   MPI_Aint lb = 0;
828   MPI_Aint ub = 0;
829   if(count>0){
830     lb=indices[0]*smpi_datatype_get_extent(old_type);
831     ub=indices[0]*smpi_datatype_get_extent(old_type) + blocklens[0]*smpi_datatype_ub(old_type);
832   }
833
834   for(i=0; i< count; i++){
835     if   (blocklens[i]<0)
836       return MPI_ERR_ARG;
837     size += blocklens[i];
838
839     if(indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type)<lb)
840         lb = indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type);
841     if(indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type)>ub)
842         ub = indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type);
843
844     if ( (i< count -1) && (indices[i]+blocklens[i] != indices[i+1]) )contiguous=0;
845   }
846   if (old_type->has_subtype == 1)
847     contiguous=0;
848
849   if(!contiguous){
850     s_smpi_mpi_indexed_t* subtype = smpi_datatype_indexed_create( blocklens,
851                                                                   indices,
852                                                                   count,
853                                                                   old_type,
854                                                                   smpi_datatype_size(old_type));
855      smpi_datatype_create(new_type,  size *
856                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA);
857   }else{
858     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
859                                                                   size,
860                                                                   old_type,
861                                                                   smpi_datatype_size(old_type));
862     smpi_datatype_create(new_type,  size *
863                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
864   }
865   retval=MPI_SUCCESS;
866   return retval;
867 }
868
869
870 /*
871 Hindexed Implementation - Indexed with indices in bytes 
872 */
873
874 /*
875  *  Copies noncontiguous data into contiguous memory.
876  *  @param contiguous_hindexed - output hindexed
877  *  @param noncontiguous_hindexed - input hindexed
878  *  @param type - pointer contening :
879  *      - block_lengths - the width or height of blocked matrix
880  *      - block_indices - indices of each data, in bytes
881  *      - count - the number of rows of matrix
882  */
883 void serialize_hindexed( const void *noncontiguous_hindexed,
884                        void *contiguous_hindexed,
885                        int count,
886                        void *type)
887 {
888   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
889   int i,j;
890   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
891   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
892   for(j=0; j<count;j++){
893     for (i = 0; i < type_c->block_count; i++) {
894       if (type_c->old_type->has_subtype == 0)
895         memcpy(contiguous_hindexed_char,
896                      noncontiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
897       else
898         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_hindexed_char,
899                                                                      contiguous_hindexed_char,
900                                                                      type_c->block_lengths[i],
901                                                                      type_c->old_type->substruct);
902
903       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
904       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
905       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
906     }
907     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
908   }
909 }
910 /*
911  *  Copies contiguous data into noncontiguous memory.
912  *  @param noncontiguous_hindexed - output hindexed
913  *  @param contiguous_hindexed - input hindexed
914  *  @param type - pointer contening :
915  *      - block_lengths - the width or height of blocked matrix
916  *      - block_indices - indices of each data, in bytes
917  *      - count - the number of rows of matrix
918  */
919 void unserialize_hindexed( const void *contiguous_hindexed,
920                          void *noncontiguous_hindexed,
921                          int count,
922                          void *type,
923                          MPI_Op op)
924 {
925   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
926   int i,j;
927
928   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
929   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
930   for(j=0; j<count;j++){
931     for (i = 0; i < type_c->block_count; i++) {
932       if (type_c->old_type->has_subtype == 0)
933         smpi_op_apply(op, contiguous_hindexed_char, noncontiguous_hindexed_char, &type_c->block_lengths[i],
934                             &type_c->old_type);
935                        /*memcpy(noncontiguous_hindexed_char,
936                contiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
937       else
938         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_hindexed_char,
939                                                                        noncontiguous_hindexed_char,
940                                                                        type_c->block_lengths[i],
941                                                                        type_c->old_type->substruct,
942                                                                        op);
943
944       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
945       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
946       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
947     }
948     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
949   }
950 }
951
952 void free_hindexed(MPI_Datatype* type){
953   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths);
954   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices);
955   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
956 }
957
958 /*
959  * Create a Sub type hindexed to be able to serialize and unserialize it
960  * the structure s_smpi_mpi_hindexed_t is derived from s_smpi_subtype which
961  * required the functions unserialize and serialize
962  */
963 s_smpi_mpi_hindexed_t* smpi_datatype_hindexed_create( int* block_lengths,
964                                                   MPI_Aint* block_indices,
965                                                   int block_count,
966                                                   MPI_Datatype old_type,
967                                                   int size_oldtype){
968   s_smpi_mpi_hindexed_t *new_t= xbt_new(s_smpi_mpi_hindexed_t,1);
969   new_t->base.serialize = &serialize_hindexed;
970   new_t->base.unserialize = &unserialize_hindexed;
971   new_t->base.subtype_free = &free_hindexed;
972  //TODO : add a custom function for each time to clean these 
973   new_t->block_lengths= xbt_new(int, block_count);
974   new_t->block_indices= xbt_new(MPI_Aint, block_count);
975   int i;
976   for(i=0;i<block_count;i++){
977     new_t->block_lengths[i]=block_lengths[i];
978     new_t->block_indices[i]=block_indices[i];
979   }
980   new_t->block_count = block_count;
981   new_t->old_type = old_type;
982   new_t->size_oldtype = size_oldtype;
983   return new_t;
984 }
985
986
987 int smpi_datatype_hindexed(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
988 {
989   int i;
990   int retval;
991   int size = 0;
992   int contiguous=1;
993   MPI_Aint lb = 0;
994   MPI_Aint ub = 0;
995   if(count>0){
996     lb=indices[0] + smpi_datatype_lb(old_type);
997     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_type);
998   }
999   for(i=0; i< count; i++){
1000     if   (blocklens[i]<0)
1001       return MPI_ERR_ARG;
1002     size += blocklens[i];
1003
1004     if(indices[i]+smpi_datatype_lb(old_type)<lb) lb = indices[i]+smpi_datatype_lb(old_type);
1005     if(indices[i]+blocklens[i]*smpi_datatype_ub(old_type)>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_type);
1006
1007     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_type) != indices[i+1]) )contiguous=0;
1008   }
1009   if (old_type->has_subtype == 1 || lb!=0)
1010     contiguous=0;
1011
1012   if(!contiguous){
1013     s_smpi_mpi_hindexed_t* subtype = smpi_datatype_hindexed_create( blocklens,
1014                                                                   indices,
1015                                                                   count,
1016                                                                   old_type,
1017                                                                   smpi_datatype_size(old_type));
1018     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1019                                                  lb,
1020                          ub
1021                          ,1, subtype, DT_FLAG_DATA);
1022   }else{
1023     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1024                                                                   size,
1025                                                                   old_type,
1026                                                                   smpi_datatype_size(old_type));
1027     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1028                                              0,size * smpi_datatype_size(old_type),
1029                                              1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1030   }
1031   retval=MPI_SUCCESS;
1032   return retval;
1033 }
1034
1035
1036 /*
1037 struct Implementation - Indexed with indices in bytes 
1038 */
1039
1040 /*
1041  *  Copies noncontiguous data into contiguous memory.
1042  *  @param contiguous_struct - output struct
1043  *  @param noncontiguous_struct - input struct
1044  *  @param type - pointer contening :
1045  *      - stride - stride of between noncontiguous data
1046  *      - block_length - the width or height of blocked matrix
1047  *      - count - the number of rows of matrix
1048  */
1049 void serialize_struct( const void *noncontiguous_struct,
1050                        void *contiguous_struct,
1051                        int count,
1052                        void *type)
1053 {
1054   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1055   int i,j;
1056   char* contiguous_struct_char = (char*)contiguous_struct;
1057   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1058   for(j=0; j<count;j++){
1059     for (i = 0; i < type_c->block_count; i++) {
1060       if (type_c->old_types[i]->has_subtype == 0)
1061         memcpy(contiguous_struct_char,
1062              noncontiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
1063       else
1064         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->serialize( noncontiguous_struct_char,
1065                                                                          contiguous_struct_char,
1066                                                                          type_c->block_lengths[i],
1067                                                                          type_c->old_types[i]->substruct);
1068
1069
1070       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1071       if (i<type_c->block_count-1)noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
1072       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);//let's hope this is MPI_UB ?
1073     }
1074     noncontiguous_struct=(void*)noncontiguous_struct_char;
1075   }
1076 }
1077 /*
1078  *  Copies contiguous data into noncontiguous memory.
1079  *  @param noncontiguous_struct - output struct
1080  *  @param contiguous_struct - input struct
1081  *  @param type - pointer contening :
1082  *      - stride - stride of between noncontiguous data
1083  *      - block_length - the width or height of blocked matrix
1084  *      - count - the number of rows of matrix
1085  */
1086 void unserialize_struct( const void *contiguous_struct,
1087                          void *noncontiguous_struct,
1088                          int count,
1089                          void *type,
1090                          MPI_Op op)
1091 {
1092   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1093   int i,j;
1094
1095   char* contiguous_struct_char = (char*)contiguous_struct;
1096   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1097   for(j=0; j<count;j++){
1098     for (i = 0; i < type_c->block_count; i++) {
1099       if (type_c->old_types[i]->has_subtype == 0)
1100         smpi_op_apply(op, contiguous_struct_char, noncontiguous_struct_char, &type_c->block_lengths[i],
1101            & type_c->old_types[i]);
1102                        /*memcpy(noncontiguous_struct_char,
1103              contiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));*/
1104       else
1105         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->unserialize( contiguous_struct_char,
1106                                                                            noncontiguous_struct_char,
1107                                                                            type_c->block_lengths[i],
1108                                                                            type_c->old_types[i]->substruct,
1109                                                                            op);
1110
1111       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1112       if (i<type_c->block_count-1)noncontiguous_struct_char =  (char*)noncontiguous_struct + type_c->block_indices[i+1];
1113       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);
1114     }
1115     noncontiguous_struct=(void*)noncontiguous_struct_char;
1116     
1117   }
1118 }
1119
1120 void free_struct(MPI_Datatype* type){
1121   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths);
1122   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices);
1123   int i=0;
1124   for (i = 0; i < ((s_smpi_mpi_struct_t *)(*type)->substruct)->block_count; i++)
1125     smpi_datatype_unuse(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types[i]);
1126   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types);
1127 }
1128
1129 /*
1130  * Create a Sub type struct to be able to serialize and unserialize it
1131  * the structure s_smpi_mpi_struct_t is derived from s_smpi_subtype which
1132  * required the functions unserialize and serialize
1133  */
1134 s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
1135                                                   MPI_Aint* block_indices,
1136                                                   int block_count,
1137                                                   MPI_Datatype* old_types){
1138   s_smpi_mpi_struct_t *new_t= xbt_new(s_smpi_mpi_struct_t,1);
1139   new_t->base.serialize = &serialize_struct;
1140   new_t->base.unserialize = &unserialize_struct;
1141   new_t->base.subtype_free = &free_struct;
1142  //TODO : add a custom function for each time to clean these 
1143   new_t->block_lengths= xbt_new(int, block_count);
1144   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1145   new_t->old_types=  xbt_new(MPI_Datatype, block_count);
1146   int i;
1147   for(i=0;i<block_count;i++){
1148     new_t->block_lengths[i]=block_lengths[i];
1149     new_t->block_indices[i]=block_indices[i];
1150     new_t->old_types[i]=old_types[i];
1151     smpi_datatype_use(new_t->old_types[i]);
1152   }
1153   //new_t->block_lengths = block_lengths;
1154   //new_t->block_indices = block_indices;
1155   new_t->block_count = block_count;
1156   //new_t->old_types = old_types;
1157   return new_t;
1158 }
1159
1160
1161 int smpi_datatype_struct(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype* old_types, MPI_Datatype* new_type)
1162 {
1163   int i;
1164   size_t size = 0;
1165   int contiguous=1;
1166   size = 0;
1167   MPI_Aint lb = 0;
1168   MPI_Aint ub = 0;
1169   if(count>0){
1170     lb=indices[0] + smpi_datatype_lb(old_types[0]);
1171     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_types[0]);
1172   }
1173   int forced_lb=0;
1174   int forced_ub=0;
1175   for(i=0; i< count; i++){
1176     if (blocklens[i]<0)
1177       return MPI_ERR_ARG;
1178     if (old_types[i]->has_subtype == 1)
1179       contiguous=0;
1180
1181     size += blocklens[i]*smpi_datatype_size(old_types[i]);
1182     if (old_types[i]==MPI_LB){
1183       lb=indices[i];
1184       forced_lb=1;
1185     }
1186     if (old_types[i]==MPI_UB){
1187       ub=indices[i];
1188       forced_ub=1;
1189     }
1190
1191     if(!forced_lb && indices[i]+smpi_datatype_lb(old_types[i])<lb) lb = indices[i];
1192     if(!forced_ub && indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i])>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i]);
1193
1194     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_types[i]) != indices[i+1]) )contiguous=0;
1195   }
1196
1197   if(!contiguous){
1198     s_smpi_mpi_struct_t* subtype = smpi_datatype_struct_create( blocklens,
1199                                                               indices,
1200                                                               count,
1201                                                               old_types);
1202
1203     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA);
1204   }else{
1205     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1206                                                                   size,
1207                                                                   MPI_CHAR,
1208                                                                   1);
1209     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1210   }
1211   return MPI_SUCCESS;
1212 }
1213
1214 void smpi_datatype_commit(MPI_Datatype *datatype)
1215 {
1216   (*datatype)->flags=  ((*datatype)->flags | DT_FLAG_COMMITED);
1217 }
1218
1219 typedef struct s_smpi_mpi_op {
1220   MPI_User_function *func;
1221   int is_commute;
1222 } s_smpi_mpi_op_t;
1223
1224 #define MAX_OP(a, b)  (b) = (a) < (b) ? (b) : (a)
1225 #define MIN_OP(a, b)  (b) = (a) < (b) ? (a) : (b)
1226 #define SUM_OP(a, b)  (b) += (a)
1227 #define PROD_OP(a, b) (b) *= (a)
1228 #define LAND_OP(a, b) (b) = (a) && (b)
1229 #define LOR_OP(a, b)  (b) = (a) || (b)
1230 #define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b))
1231 #define BAND_OP(a, b) (b) &= (a)
1232 #define BOR_OP(a, b)  (b) |= (a)
1233 #define BXOR_OP(a, b) (b) ^= (a)
1234 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
1235 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
1236
1237 #define APPLY_FUNC(a, b, length, type, func) \
1238 {                                          \
1239   int i;                                   \
1240   type* x = (type*)(a);                    \
1241   type* y = (type*)(b);                    \
1242   for(i = 0; i < *(length); i++) {         \
1243     func(x[i], y[i]);                      \
1244   }                                        \
1245 }
1246
1247 static void max_func(void *a, void *b, int *length,
1248                      MPI_Datatype * datatype)
1249 {
1250   if (*datatype == MPI_CHAR) {
1251     APPLY_FUNC(a, b, length, char, MAX_OP);
1252   } else if (*datatype == MPI_SHORT) {
1253     APPLY_FUNC(a, b, length, short, MAX_OP);
1254   } else if (*datatype == MPI_INT) {
1255     APPLY_FUNC(a, b, length, int, MAX_OP);
1256   } else if (*datatype == MPI_LONG) {
1257     APPLY_FUNC(a, b, length, long, MAX_OP);
1258   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1259     APPLY_FUNC(a, b, length, unsigned short, MAX_OP);
1260   } else if (*datatype == MPI_UNSIGNED) {
1261     APPLY_FUNC(a, b, length, unsigned int, MAX_OP);
1262   } else if (*datatype == MPI_UNSIGNED_LONG) {
1263     APPLY_FUNC(a, b, length, unsigned long, MAX_OP);
1264   } else if (*datatype == MPI_FLOAT) {
1265     APPLY_FUNC(a, b, length, float, MAX_OP);
1266   } else if (*datatype == MPI_DOUBLE) {
1267     APPLY_FUNC(a, b, length, double, MAX_OP);
1268   } else if (*datatype == MPI_LONG_DOUBLE) {
1269     APPLY_FUNC(a, b, length, long double, MAX_OP);
1270   }
1271 }
1272
1273 static void min_func(void *a, void *b, int *length,
1274                      MPI_Datatype * datatype)
1275 {
1276   if (*datatype == MPI_CHAR) {
1277     APPLY_FUNC(a, b, length, char, MIN_OP);
1278   } else if (*datatype == MPI_SHORT) {
1279     APPLY_FUNC(a, b, length, short, MIN_OP);
1280   } else if (*datatype == MPI_INT) {
1281     APPLY_FUNC(a, b, length, int, MIN_OP);
1282   } else if (*datatype == MPI_LONG) {
1283     APPLY_FUNC(a, b, length, long, MIN_OP);
1284   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1285     APPLY_FUNC(a, b, length, unsigned short, MIN_OP);
1286   } else if (*datatype == MPI_UNSIGNED) {
1287     APPLY_FUNC(a, b, length, unsigned int, MIN_OP);
1288   } else if (*datatype == MPI_UNSIGNED_LONG) {
1289     APPLY_FUNC(a, b, length, unsigned long, MIN_OP);
1290   } else if (*datatype == MPI_FLOAT) {
1291     APPLY_FUNC(a, b, length, float, MIN_OP);
1292   } else if (*datatype == MPI_DOUBLE) {
1293     APPLY_FUNC(a, b, length, double, MIN_OP);
1294   } else if (*datatype == MPI_LONG_DOUBLE) {
1295     APPLY_FUNC(a, b, length, long double, MIN_OP);
1296   }
1297 }
1298
1299 static void sum_func(void *a, void *b, int *length,
1300                      MPI_Datatype * datatype)
1301 {
1302   if (*datatype == MPI_CHAR) {
1303     APPLY_FUNC(a, b, length, char, SUM_OP);
1304   } else if (*datatype == MPI_SHORT) {
1305     APPLY_FUNC(a, b, length, short, SUM_OP);
1306   } else if (*datatype == MPI_INT) {
1307     APPLY_FUNC(a, b, length, int, SUM_OP);
1308   } else if (*datatype == MPI_LONG) {
1309     APPLY_FUNC(a, b, length, long, SUM_OP);
1310   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1311     APPLY_FUNC(a, b, length, unsigned short, SUM_OP);
1312   } else if (*datatype == MPI_UNSIGNED) {
1313     APPLY_FUNC(a, b, length, unsigned int, SUM_OP);
1314   } else if (*datatype == MPI_UNSIGNED_LONG) {
1315     APPLY_FUNC(a, b, length, unsigned long, SUM_OP);
1316   } else if (*datatype == MPI_FLOAT) {
1317     APPLY_FUNC(a, b, length, float, SUM_OP);
1318   } else if (*datatype == MPI_DOUBLE) {
1319     APPLY_FUNC(a, b, length, double, SUM_OP);
1320   } else if (*datatype == MPI_LONG_DOUBLE) {
1321     APPLY_FUNC(a, b, length, long double, SUM_OP);
1322   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1323     APPLY_FUNC(a, b, length, float _Complex, SUM_OP);
1324   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1325     APPLY_FUNC(a, b, length, double _Complex, SUM_OP);
1326   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1327     APPLY_FUNC(a, b, length, long double _Complex, SUM_OP);
1328   }
1329 }
1330
1331 static void prod_func(void *a, void *b, int *length,
1332                       MPI_Datatype * datatype)
1333 {
1334   if (*datatype == MPI_CHAR) {
1335     APPLY_FUNC(a, b, length, char, PROD_OP);
1336   } else if (*datatype == MPI_SHORT) {
1337     APPLY_FUNC(a, b, length, short, PROD_OP);
1338   } else if (*datatype == MPI_INT) {
1339     APPLY_FUNC(a, b, length, int, PROD_OP);
1340   } else if (*datatype == MPI_LONG) {
1341     APPLY_FUNC(a, b, length, long, PROD_OP);
1342   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1343     APPLY_FUNC(a, b, length, unsigned short, PROD_OP);
1344   } else if (*datatype == MPI_UNSIGNED) {
1345     APPLY_FUNC(a, b, length, unsigned int, PROD_OP);
1346   } else if (*datatype == MPI_UNSIGNED_LONG) {
1347     APPLY_FUNC(a, b, length, unsigned long, PROD_OP);
1348   } else if (*datatype == MPI_FLOAT) {
1349     APPLY_FUNC(a, b, length, float, PROD_OP);
1350   } else if (*datatype == MPI_DOUBLE) {
1351     APPLY_FUNC(a, b, length, double, PROD_OP);
1352   } else if (*datatype == MPI_LONG_DOUBLE) {
1353     APPLY_FUNC(a, b, length, long double, PROD_OP);
1354   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1355     APPLY_FUNC(a, b, length, float _Complex, PROD_OP);
1356   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1357     APPLY_FUNC(a, b, length, double _Complex, PROD_OP);
1358   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1359     APPLY_FUNC(a, b, length, long double _Complex, PROD_OP);
1360   }
1361 }
1362
1363 static void land_func(void *a, void *b, int *length,
1364                       MPI_Datatype * datatype)
1365 {
1366   if (*datatype == MPI_CHAR) {
1367     APPLY_FUNC(a, b, length, char, LAND_OP);
1368   } else if (*datatype == MPI_SHORT) {
1369     APPLY_FUNC(a, b, length, short, LAND_OP);
1370   } else if (*datatype == MPI_INT) {
1371     APPLY_FUNC(a, b, length, int, LAND_OP);
1372   } else if (*datatype == MPI_LONG) {
1373     APPLY_FUNC(a, b, length, long, LAND_OP);
1374   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1375     APPLY_FUNC(a, b, length, unsigned short, LAND_OP);
1376   } else if (*datatype == MPI_UNSIGNED) {
1377     APPLY_FUNC(a, b, length, unsigned int, LAND_OP);
1378   } else if (*datatype == MPI_UNSIGNED_LONG) {
1379     APPLY_FUNC(a, b, length, unsigned long, LAND_OP);
1380   } else if (*datatype == MPI_C_BOOL) {
1381     APPLY_FUNC(a, b, length, _Bool, LAND_OP);
1382   }
1383 }
1384
1385 static void lor_func(void *a, void *b, int *length,
1386                      MPI_Datatype * datatype)
1387 {
1388   if (*datatype == MPI_CHAR) {
1389     APPLY_FUNC(a, b, length, char, LOR_OP);
1390   } else if (*datatype == MPI_SHORT) {
1391     APPLY_FUNC(a, b, length, short, LOR_OP);
1392   } else if (*datatype == MPI_INT) {
1393     APPLY_FUNC(a, b, length, int, LOR_OP);
1394   } else if (*datatype == MPI_LONG) {
1395     APPLY_FUNC(a, b, length, long, LOR_OP);
1396   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1397     APPLY_FUNC(a, b, length, unsigned short, LOR_OP);
1398   } else if (*datatype == MPI_UNSIGNED) {
1399     APPLY_FUNC(a, b, length, unsigned int, LOR_OP);
1400   } else if (*datatype == MPI_UNSIGNED_LONG) {
1401     APPLY_FUNC(a, b, length, unsigned long, LOR_OP);
1402   } else if (*datatype == MPI_C_BOOL) {
1403     APPLY_FUNC(a, b, length, _Bool, LOR_OP);
1404   }
1405 }
1406
1407 static void lxor_func(void *a, void *b, int *length,
1408                       MPI_Datatype * datatype)
1409 {
1410   if (*datatype == MPI_CHAR) {
1411     APPLY_FUNC(a, b, length, char, LXOR_OP);
1412   } else if (*datatype == MPI_SHORT) {
1413     APPLY_FUNC(a, b, length, short, LXOR_OP);
1414   } else if (*datatype == MPI_INT) {
1415     APPLY_FUNC(a, b, length, int, LXOR_OP);
1416   } else if (*datatype == MPI_LONG) {
1417     APPLY_FUNC(a, b, length, long, LXOR_OP);
1418   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1419     APPLY_FUNC(a, b, length, unsigned short, LXOR_OP);
1420   } else if (*datatype == MPI_UNSIGNED) {
1421     APPLY_FUNC(a, b, length, unsigned int, LXOR_OP);
1422   } else if (*datatype == MPI_UNSIGNED_LONG) {
1423     APPLY_FUNC(a, b, length, unsigned long, LXOR_OP);
1424   } else if (*datatype == MPI_C_BOOL) {
1425     APPLY_FUNC(a, b, length, _Bool, LXOR_OP);
1426   }
1427 }
1428
1429 static void band_func(void *a, void *b, int *length,
1430                       MPI_Datatype * datatype)
1431 {
1432   if (*datatype == MPI_CHAR) {
1433     APPLY_FUNC(a, b, length, char, BAND_OP);
1434   }
1435   if (*datatype == MPI_SHORT) {
1436     APPLY_FUNC(a, b, length, short, BAND_OP);
1437   } else if (*datatype == MPI_INT) {
1438     APPLY_FUNC(a, b, length, int, BAND_OP);
1439   } else if (*datatype == MPI_LONG) {
1440     APPLY_FUNC(a, b, length, long, BAND_OP);
1441   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1442     APPLY_FUNC(a, b, length, unsigned short, BAND_OP);
1443   } else if (*datatype == MPI_UNSIGNED) {
1444     APPLY_FUNC(a, b, length, unsigned int, BAND_OP);
1445   } else if (*datatype == MPI_UNSIGNED_LONG) {
1446     APPLY_FUNC(a, b, length, unsigned long, BAND_OP);
1447   } else if (*datatype == MPI_BYTE) {
1448     APPLY_FUNC(a, b, length, uint8_t, BAND_OP);
1449   }
1450 }
1451
1452 static void bor_func(void *a, void *b, int *length,
1453                      MPI_Datatype * datatype)
1454 {
1455   if (*datatype == MPI_CHAR) {
1456     APPLY_FUNC(a, b, length, char, BOR_OP);
1457   } else if (*datatype == MPI_SHORT) {
1458     APPLY_FUNC(a, b, length, short, BOR_OP);
1459   } else if (*datatype == MPI_INT) {
1460     APPLY_FUNC(a, b, length, int, BOR_OP);
1461   } else if (*datatype == MPI_LONG) {
1462     APPLY_FUNC(a, b, length, long, BOR_OP);
1463   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1464     APPLY_FUNC(a, b, length, unsigned short, BOR_OP);
1465   } else if (*datatype == MPI_UNSIGNED) {
1466     APPLY_FUNC(a, b, length, unsigned int, BOR_OP);
1467   } else if (*datatype == MPI_UNSIGNED_LONG) {
1468     APPLY_FUNC(a, b, length, unsigned long, BOR_OP);
1469   } else if (*datatype == MPI_BYTE) {
1470     APPLY_FUNC(a, b, length, uint8_t, BOR_OP);
1471   }
1472 }
1473
1474 static void bxor_func(void *a, void *b, int *length,
1475                       MPI_Datatype * datatype)
1476 {
1477   if (*datatype == MPI_CHAR) {
1478     APPLY_FUNC(a, b, length, char, BXOR_OP);
1479   } else if (*datatype == MPI_SHORT) {
1480     APPLY_FUNC(a, b, length, short, BXOR_OP);
1481   } else if (*datatype == MPI_INT) {
1482     APPLY_FUNC(a, b, length, int, BXOR_OP);
1483   } else if (*datatype == MPI_LONG) {
1484     APPLY_FUNC(a, b, length, long, BXOR_OP);
1485   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1486     APPLY_FUNC(a, b, length, unsigned short, BXOR_OP);
1487   } else if (*datatype == MPI_UNSIGNED) {
1488     APPLY_FUNC(a, b, length, unsigned int, BXOR_OP);
1489   } else if (*datatype == MPI_UNSIGNED_LONG) {
1490     APPLY_FUNC(a, b, length, unsigned long, BXOR_OP);
1491   } else if (*datatype == MPI_BYTE) {
1492     APPLY_FUNC(a, b, length, uint8_t, BXOR_OP);
1493   }
1494 }
1495
1496 static void minloc_func(void *a, void *b, int *length,
1497                         MPI_Datatype * datatype)
1498 {
1499   if (*datatype == MPI_FLOAT_INT) {
1500     APPLY_FUNC(a, b, length, float_int, MINLOC_OP);
1501   } else if (*datatype == MPI_LONG_INT) {
1502     APPLY_FUNC(a, b, length, long_int, MINLOC_OP);
1503   } else if (*datatype == MPI_DOUBLE_INT) {
1504     APPLY_FUNC(a, b, length, double_int, MINLOC_OP);
1505   } else if (*datatype == MPI_SHORT_INT) {
1506     APPLY_FUNC(a, b, length, short_int, MINLOC_OP);
1507   } else if (*datatype == MPI_2LONG) {
1508     APPLY_FUNC(a, b, length, long_long, MINLOC_OP);
1509   } else if (*datatype == MPI_2INT) {
1510     APPLY_FUNC(a, b, length, int_int, MINLOC_OP);
1511   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1512     APPLY_FUNC(a, b, length, long_double_int, MINLOC_OP);
1513   } else if (*datatype == MPI_2FLOAT) {
1514     APPLY_FUNC(a, b, length, float_float, MINLOC_OP);
1515   } else if (*datatype == MPI_2DOUBLE) {
1516     APPLY_FUNC(a, b, length, double_double, MINLOC_OP);
1517   }
1518 }
1519
1520 static void maxloc_func(void *a, void *b, int *length,
1521                         MPI_Datatype * datatype)
1522 {
1523   if (*datatype == MPI_FLOAT_INT) {
1524     APPLY_FUNC(a, b, length, float_int, MAXLOC_OP);
1525   } else if (*datatype == MPI_LONG_INT) {
1526     APPLY_FUNC(a, b, length, long_int, MAXLOC_OP);
1527   } else if (*datatype == MPI_DOUBLE_INT) {
1528     APPLY_FUNC(a, b, length, double_int, MAXLOC_OP);
1529   } else if (*datatype == MPI_SHORT_INT) {
1530     APPLY_FUNC(a, b, length, short_int, MAXLOC_OP);
1531   } else if (*datatype == MPI_2LONG) {
1532     APPLY_FUNC(a, b, length, long_long, MAXLOC_OP);
1533   } else if (*datatype == MPI_2INT) {
1534     APPLY_FUNC(a, b, length, int_int, MAXLOC_OP);
1535   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1536     APPLY_FUNC(a, b, length, long_double_int, MAXLOC_OP);
1537   } else if (*datatype == MPI_2FLOAT) {
1538     APPLY_FUNC(a, b, length, float_float, MAXLOC_OP);
1539   } else if (*datatype == MPI_2DOUBLE) {
1540     APPLY_FUNC(a, b, length, double_double, MAXLOC_OP);
1541   }
1542 }
1543
1544 static void replace_func(void *a, void *b, int *length,
1545                         MPI_Datatype * datatype)
1546 {
1547   memcpy(b, a, *length * smpi_datatype_size(*datatype));
1548 }
1549
1550 #define CREATE_MPI_OP(name, func)                             \
1551   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */, TRUE }; \
1552 MPI_Op name = &mpi_##name;
1553
1554 CREATE_MPI_OP(MPI_MAX, max_func);
1555 CREATE_MPI_OP(MPI_MIN, min_func);
1556 CREATE_MPI_OP(MPI_SUM, sum_func);
1557 CREATE_MPI_OP(MPI_PROD, prod_func);
1558 CREATE_MPI_OP(MPI_LAND, land_func);
1559 CREATE_MPI_OP(MPI_LOR, lor_func);
1560 CREATE_MPI_OP(MPI_LXOR, lxor_func);
1561 CREATE_MPI_OP(MPI_BAND, band_func);
1562 CREATE_MPI_OP(MPI_BOR, bor_func);
1563 CREATE_MPI_OP(MPI_BXOR, bxor_func);
1564 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
1565 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
1566 CREATE_MPI_OP(MPI_REPLACE, replace_func);
1567
1568
1569 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
1570 {
1571   MPI_Op op;
1572   op = xbt_new(s_smpi_mpi_op_t, 1);
1573   op->func = function;
1574   op-> is_commute = commute;
1575   return op;
1576 }
1577
1578 int smpi_op_is_commute(MPI_Op op)
1579 {
1580   return (op==MPI_OP_NULL) ? 1 : op-> is_commute;
1581 }
1582
1583 void smpi_op_destroy(MPI_Op op)
1584 {
1585   xbt_free(op);
1586 }
1587
1588 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
1589                    MPI_Datatype * datatype)
1590 {
1591   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
1592     XBT_VERB("Applying operation, switch to the right data frame ");
1593     switch_data_segment(smpi_process_index());
1594   }
1595
1596   if(!_xbt_replay_is_active())
1597   op->func(invec, inoutvec, len, datatype);
1598 }