Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
8746df92a693f8a8595613c0a3d438ac680a0f1e
[simgrid.git] / src / smpi / smpi_mpi_dt.c
1 /* smpi_mpi_dt.c -- MPI primitives to handle datatypes                        */
2 /* FIXME: a very incomplete implementation                                    */
3
4 /* Copyright (c) 2009-2014. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <limits.h>
14 #include "private.h"
15 #include "smpi_mpi_dt_private.h"
16 #include "mc/mc.h"
17 #include "xbt/replay.h"
18 #include "simgrid/modelchecker.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi,
21                                 "Logging specific to SMPI (datatype)");
22
23 xbt_dict_t smpi_type_keyvals = NULL;
24 int type_keyval_id=0;//avoid collisions
25
26 #define CREATE_MPI_DATATYPE(name, type)               \
27   static s_smpi_mpi_datatype_t mpi_##name = {         \
28     (char*) # name,                                   \
29     sizeof(type),   /* size */                        \
30     0,              /*was 1 has_subtype*/             \
31     0,              /* lb */                          \
32     sizeof(type),   /* ub = lb + size */              \
33     DT_FLAG_BASIC,  /* flags */                       \
34     NULL,           /* attributes */                  \
35     NULL,           /* pointer on extended struct*/   \
36     0               /* in_use counter */              \
37   };                                                  \
38 MPI_Datatype name = &mpi_##name;
39
40 #define CREATE_MPI_DATATYPE_NULL(name)                \
41   static s_smpi_mpi_datatype_t mpi_##name = {         \
42     (char*) # name,                                   \
43     0,              /* size */                        \
44     0,              /* was 1 has_subtype*/            \
45     0,              /* lb */                          \
46     0,              /* ub = lb + size */              \
47     DT_FLAG_BASIC,  /* flags */                       \
48     NULL,           /* attributes */                  \
49     NULL,           /* pointer on extended struct*/   \
50     0               /* in_use counter */              \
51   };                                                  \
52 MPI_Datatype name = &mpi_##name;
53
54 //The following are datatypes for the MPI functions MPI_MAXLOC and MPI_MINLOC.
55 typedef struct {
56   float value;
57   int index;
58 } float_int;
59 typedef struct {
60   float value;
61   float index;
62 } float_float;
63 typedef struct {
64   long value;
65   long index;
66 } long_long;
67 typedef struct {
68   double value;
69   double index;
70 } double_double;
71 typedef struct {
72   long value;
73   int index;
74 } long_int;
75 typedef struct {
76   double value;
77   int index;
78 } double_int;
79 typedef struct {
80   short value;
81   int index;
82 } short_int;
83 typedef struct {
84   int value;
85   int index;
86 } int_int;
87 typedef struct {
88   long double value;
89   int index;
90 } long_double_int;
91 typedef struct {
92   int64_t value;
93   int64_t index;
94 } integer128_t;
95 // Predefined data types
96 CREATE_MPI_DATATYPE(MPI_CHAR, char);
97 CREATE_MPI_DATATYPE(MPI_SHORT, short);
98 CREATE_MPI_DATATYPE(MPI_INT, int);
99 CREATE_MPI_DATATYPE(MPI_LONG, long);
100 CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long);
101 CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char);
102 CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char);
103 CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short);
104 CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int);
105 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long);
106 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long);
107 CREATE_MPI_DATATYPE(MPI_FLOAT, float);
108 CREATE_MPI_DATATYPE(MPI_DOUBLE, double);
109 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double);
110 CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t);
111 CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool);
112 CREATE_MPI_DATATYPE(MPI_BYTE, int8_t);
113 CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t);
114 CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t);
115 CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t);
116 CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t);
117 CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t);
118 CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t);
119 CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t);
120 CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t);
121 CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex);
122 CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex);
123 CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex);
124 CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint);
125 CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset);
126
127 CREATE_MPI_DATATYPE(MPI_FLOAT_INT, float_int);
128 CREATE_MPI_DATATYPE(MPI_LONG_INT, long_int);
129 CREATE_MPI_DATATYPE(MPI_DOUBLE_INT, double_int);
130 CREATE_MPI_DATATYPE(MPI_SHORT_INT, short_int);
131 CREATE_MPI_DATATYPE(MPI_2INT, int_int);
132 CREATE_MPI_DATATYPE(MPI_2FLOAT, float_float);
133 CREATE_MPI_DATATYPE(MPI_2DOUBLE, double_double);
134 CREATE_MPI_DATATYPE(MPI_2LONG, long_long);
135
136 CREATE_MPI_DATATYPE(MPI_REAL, float);
137 CREATE_MPI_DATATYPE(MPI_REAL4, float);
138 CREATE_MPI_DATATYPE(MPI_REAL8, float);
139 CREATE_MPI_DATATYPE(MPI_REAL16, double);
140 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX8);
141 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX16);
142 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX32);
143 CREATE_MPI_DATATYPE(MPI_INTEGER1, int);
144 CREATE_MPI_DATATYPE(MPI_INTEGER2, int16_t);
145 CREATE_MPI_DATATYPE(MPI_INTEGER4, int32_t);
146 CREATE_MPI_DATATYPE(MPI_INTEGER8, int64_t);
147 CREATE_MPI_DATATYPE(MPI_INTEGER16, integer128_t);
148
149 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE_INT, long_double_int);
150
151 CREATE_MPI_DATATYPE_NULL(MPI_UB);
152 CREATE_MPI_DATATYPE_NULL(MPI_LB);
153 CREATE_MPI_DATATYPE(MPI_PACKED, char);
154 // Internal use only
155 CREATE_MPI_DATATYPE(MPI_PTR, void*);
156
157 /** Check if the datatype is usable for communications
158  */
159 int is_datatype_valid(MPI_Datatype datatype) {
160     return datatype != MPI_DATATYPE_NULL
161         && (datatype->flags & DT_FLAG_COMMITED);
162 }
163
164 size_t smpi_datatype_size(MPI_Datatype datatype)
165 {
166   return datatype->size;
167 }
168
169 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype)
170 {
171   return datatype->lb;
172 }
173
174 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
175 {
176   return datatype->ub;
177 }
178
179 int smpi_datatype_dup(MPI_Datatype datatype, MPI_Datatype* new_t)
180 {
181   int ret=MPI_SUCCESS;
182   *new_t= xbt_new(s_smpi_mpi_datatype_t,1);
183   memcpy(*new_t, datatype, sizeof(s_smpi_mpi_datatype_t));
184   if (datatype->has_subtype){
185     //FIXME: may copy too much information.
186     (*new_t)->substruct=xbt_malloc(sizeof(s_smpi_mpi_struct_t));
187     memcpy((*new_t)->substruct, datatype->substruct, sizeof(s_smpi_mpi_struct_t));
188   }
189   if(datatype->name)
190     (*new_t)->name = strdup(datatype->name);
191   if(datatype->attributes !=NULL){
192       (*new_t)->attributes=xbt_dict_new();
193       xbt_dict_cursor_t cursor = NULL;
194       int *key;
195       int flag;
196       void* value_in;
197       void* value_out;
198       xbt_dict_foreach(datatype->attributes, cursor, key, value_in){
199         smpi_type_key_elem elem = xbt_dict_get_or_null_ext(smpi_type_keyvals,  (const char*)key, sizeof(int));
200         if(elem && elem->copy_fn!=MPI_NULL_COPY_FN){
201           ret = elem->copy_fn(datatype, *key, NULL, value_in, &value_out, &flag );
202           if(ret!=MPI_SUCCESS){
203             *new_t=MPI_DATATYPE_NULL;
204             return ret;
205           }
206           if(flag)
207             xbt_dict_set_ext((*new_t)->attributes, (const char*)key, sizeof(int),value_out, NULL);
208         }
209       }
210     }
211   return ret;
212 }
213
214 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
215                          MPI_Aint * extent)
216 {
217   if(datatype == MPI_DATATYPE_NULL){
218     *lb=0;
219     *extent=0;
220     return MPI_SUCCESS;
221   }
222   *lb = datatype->lb;
223   *extent = datatype->ub - datatype->lb;
224   return MPI_SUCCESS;
225 }
226
227 MPI_Aint smpi_datatype_get_extent(MPI_Datatype datatype){
228   if(datatype == MPI_DATATYPE_NULL){
229     return 0;
230   }
231   return datatype->ub - datatype->lb;
232 }
233
234 void smpi_datatype_get_name(MPI_Datatype datatype, char* name, int* length){
235   *length = strlen(datatype->name);
236   strcpy(name, datatype->name);
237 }
238
239 void smpi_datatype_set_name(MPI_Datatype datatype, char* name){
240   datatype->name = strdup(name);;
241 }
242
243 int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
244                        void *recvbuf, int recvcount, MPI_Datatype recvtype)
245 {
246   int count;
247   if(smpi_privatize_global_variables){
248     smpi_switch_data_segment(smpi_process_index());
249   }
250   /* First check if we really have something to do */
251   if (recvcount > 0 && recvbuf != sendbuf) {
252     /* FIXME: treat packed cases */
253     sendcount *= smpi_datatype_size(sendtype);
254     recvcount *= smpi_datatype_size(recvtype);
255     count = sendcount < recvcount ? sendcount : recvcount;
256
257     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
258       if(!smpi_process_get_replaying()) memcpy(recvbuf, sendbuf, count);
259     }
260     else if (sendtype->has_subtype == 0)
261     {
262       s_smpi_subtype_t *subtype =  recvtype->substruct;
263       subtype->unserialize( sendbuf, recvbuf, recvcount/smpi_datatype_size(recvtype), subtype, MPI_REPLACE);
264     }
265     else if (recvtype->has_subtype == 0)
266     {
267       s_smpi_subtype_t *subtype =  sendtype->substruct;
268       subtype->serialize(sendbuf, recvbuf, sendcount/smpi_datatype_size(sendtype), subtype);
269     }else{
270       s_smpi_subtype_t *subtype =  sendtype->substruct;
271
272
273       void * buf_tmp = xbt_malloc(count);
274
275       subtype->serialize( sendbuf, buf_tmp,count/smpi_datatype_size(sendtype), subtype);
276       subtype =  recvtype->substruct;
277       subtype->unserialize( buf_tmp, recvbuf,count/smpi_datatype_size(recvtype), subtype, MPI_REPLACE);
278
279       free(buf_tmp);
280     }
281   }
282
283   return sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
284 }
285
286 /*
287  *  Copies noncontiguous data into contiguous memory.
288  *  @param contiguous_vector - output vector
289  *  @param noncontiguous_vector - input vector
290  *  @param type - pointer contening :
291  *      - stride - stride of between noncontiguous data
292  *      - block_length - the width or height of blocked matrix
293  *      - count - the number of rows of matrix
294  */
295 void serialize_vector( const void *noncontiguous_vector,
296                        void *contiguous_vector,
297                        int count,
298                        void *type)
299 {
300   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
301   int i;
302   char* contiguous_vector_char = (char*)contiguous_vector;
303   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
304
305   for (i = 0; i < type_c->block_count * count; i++) {
306       if (type_c->old_type->has_subtype == 0)
307         memcpy(contiguous_vector_char,
308                noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
309       else
310         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
311                                                                      contiguous_vector_char,
312                                                                      type_c->block_length,
313                                                                      type_c->old_type->substruct);
314
315     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
316     if((i+1)%type_c->block_count ==0)
317     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
318     else
319     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
320   }
321 }
322
323 /*
324  *  Copies contiguous data into noncontiguous memory.
325  *  @param noncontiguous_vector - output vector
326  *  @param contiguous_vector - input vector
327  *  @param type - pointer contening :
328  *      - stride - stride of between noncontiguous data
329  *      - block_length - the width or height of blocked matrix
330  *      - count - the number of rows of matrix
331  */
332 void unserialize_vector( const void *contiguous_vector,
333                          void *noncontiguous_vector,
334                          int count,
335                          void *type,
336                          MPI_Op op)
337 {
338   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
339   int i;
340
341   char* contiguous_vector_char = (char*)contiguous_vector;
342   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
343
344   for (i = 0; i < type_c->block_count * count; i++) {
345     if (type_c->old_type->has_subtype == 0)
346       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
347           &type_c->old_type);
348      /* memcpy(noncontiguous_vector_char,
349              contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
350     else
351       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
352                                                                      noncontiguous_vector_char,
353                                                                      type_c->block_length,
354                                                                      type_c->old_type->substruct,
355                                                                      op);
356     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
357     if((i+1)%type_c->block_count ==0)
358     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
359     else
360     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
361   }
362 }
363
364 /*
365  * Create a Sub type vector to be able to serialize and unserialize it
366  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
367  * required the functions unserialize and serialize
368  *
369  */
370 s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
371                                                   int block_length,
372                                                   int block_count,
373                                                   MPI_Datatype old_type,
374                                                   int size_oldtype){
375   s_smpi_mpi_vector_t *new_t= xbt_new(s_smpi_mpi_vector_t,1);
376   new_t->base.serialize = &serialize_vector;
377   new_t->base.unserialize = &unserialize_vector;
378   new_t->base.subtype_free = &free_vector;
379   new_t->block_stride = block_stride;
380   new_t->block_length = block_length;
381   new_t->block_count = block_count;
382   smpi_datatype_use(old_type);
383   new_t->old_type = old_type;
384   new_t->size_oldtype = size_oldtype;
385   return new_t;
386 }
387
388 void smpi_datatype_create(MPI_Datatype* new_type, int size,int lb, int ub, int has_subtype,
389                           void *struct_type, int flags){
390   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
391   new_t->name = NULL;
392   new_t->size = size;
393   new_t->has_subtype = size>0? has_subtype:0;
394   new_t->lb = lb;
395   new_t->ub = ub;
396   new_t->flags = flags;
397   new_t->substruct = struct_type;
398   new_t->in_use=0;
399   new_t->attributes=NULL;
400   *new_type = new_t;
401
402 #ifdef HAVE_MC
403   if(MC_is_active())
404     MC_ignore(&(new_t->in_use), sizeof(new_t->in_use));
405 #endif
406 }
407
408 void smpi_datatype_free(MPI_Datatype* type){
409   if((*type)->attributes !=NULL){
410       xbt_dict_cursor_t cursor = NULL;
411       int* key;
412       void * value;
413       int flag;
414       xbt_dict_foreach((*type)->attributes, cursor, key, value){
415         smpi_type_key_elem elem = xbt_dict_get_or_null_ext(smpi_type_keyvals, (const char*)key, sizeof(int));
416         if(elem &&  elem->delete_fn)
417           elem->delete_fn(*type,*key, value, &flag);
418       }
419   }
420
421   if((*type)->flags & DT_FLAG_PREDEFINED)return;
422
423   //if still used, mark for deletion
424   if((*type)->in_use!=0){
425       (*type)->flags |=DT_FLAG_DESTROYED;
426       return;
427   }
428
429   if ((*type)->has_subtype == 1){
430     ((s_smpi_subtype_t *)(*type)->substruct)->subtype_free(type);  
431     xbt_free((*type)->substruct);
432   }
433   if ((*type)->name != NULL){
434     xbt_free((*type)->name);
435   }
436   xbt_free(*type);
437   *type = MPI_DATATYPE_NULL;
438 }
439
440 void smpi_datatype_use(MPI_Datatype type){
441   if(type)type->in_use++;
442
443 #ifdef HAVE_MC
444   if(MC_is_active())
445     MC_ignore(&(type->in_use), sizeof(type->in_use));
446 #endif
447 }
448
449
450 void smpi_datatype_unuse(MPI_Datatype type){
451   if (type->in_use > 0)
452     type->in_use--;
453
454   if(type && type->in_use == 0 && (type->flags & DT_FLAG_DESTROYED))
455     smpi_datatype_free(&type);
456
457 #ifdef HAVE_MC
458   if(MC_is_active())
459     MC_ignore(&(type->in_use), sizeof(type->in_use));
460 #endif
461 }
462
463
464
465
466 /*
467 Contiguous Implementation
468 */
469
470
471 /*
472  *  Copies noncontiguous data into contiguous memory.
473  *  @param contiguous_hvector - output hvector
474  *  @param noncontiguous_hvector - input hvector
475  *  @param type - pointer contening :
476  *      - stride - stride of between noncontiguous data, in bytes
477  *      - block_length - the width or height of blocked matrix
478  *      - count - the number of rows of matrix
479  */
480 void serialize_contiguous( const void *noncontiguous_hvector,
481                        void *contiguous_hvector,
482                        int count,
483                        void *type)
484 {
485   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
486   char* contiguous_vector_char = (char*)contiguous_hvector;
487   char* noncontiguous_vector_char = (char*)noncontiguous_hvector+type_c->lb;
488   memcpy(contiguous_vector_char,
489            noncontiguous_vector_char, count* type_c->block_count * type_c->size_oldtype);
490 }
491 /*
492  *  Copies contiguous data into noncontiguous memory.
493  *  @param noncontiguous_vector - output hvector
494  *  @param contiguous_vector - input hvector
495  *  @param type - pointer contening :
496  *      - stride - stride of between noncontiguous data, in bytes
497  *      - block_length - the width or height of blocked matrix
498  *      - count - the number of rows of matrix
499  */
500 void unserialize_contiguous( const void *contiguous_vector,
501                          void *noncontiguous_vector,
502                          int count,
503                          void *type,
504                          MPI_Op op)
505 {
506   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
507   char* contiguous_vector_char = (char*)contiguous_vector;
508   char* noncontiguous_vector_char = (char*)noncontiguous_vector+type_c->lb;
509   int n= count* type_c->block_count;
510   smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &n,
511             &type_c->old_type);
512        /*memcpy(noncontiguous_vector_char,
513            contiguous_vector_char, count*  type_c->block_count * type_c->size_oldtype);*/
514 }
515
516 void free_contiguous(MPI_Datatype* d){
517   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
518 }
519
520 /*
521  * Create a Sub type contiguous to be able to serialize and unserialize it
522  * the structure s_smpi_mpi_contiguous_t is derived from s_smpi_subtype which
523  * required the functions unserialize and serialize
524  *
525  */
526 s_smpi_mpi_contiguous_t* smpi_datatype_contiguous_create( MPI_Aint lb,
527                                                   int block_count,
528                                                   MPI_Datatype old_type,
529                                                   int size_oldtype){
530   s_smpi_mpi_contiguous_t *new_t= xbt_new(s_smpi_mpi_contiguous_t,1);
531   new_t->base.serialize = &serialize_contiguous;
532   new_t->base.unserialize = &unserialize_contiguous;
533   new_t->base.subtype_free = &free_contiguous;
534   new_t->lb = lb;
535   new_t->block_count = block_count;
536   new_t->old_type = old_type;
537   new_t->size_oldtype = size_oldtype;
538   smpi_datatype_use(old_type);
539   return new_t;
540 }
541
542
543
544
545 int smpi_datatype_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* new_type, MPI_Aint lb)
546 {
547   int retval;
548   if(old_type->has_subtype){
549           //handle this case as a hvector with stride equals to the extent of the datatype
550           return smpi_datatype_hvector(count, 1, smpi_datatype_get_extent(old_type), old_type, new_type);
551   }
552   
553   s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
554                                                                 count,
555                                                                 old_type,
556                                                                 smpi_datatype_size(old_type));
557                                                                 
558   smpi_datatype_create(new_type,
559                                           count * smpi_datatype_size(old_type),
560                                           lb,lb + count * smpi_datatype_size(old_type),
561                                           1,subtype, DT_FLAG_CONTIGUOUS);
562   retval=MPI_SUCCESS;
563   return retval;
564 }
565
566 int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_type, MPI_Datatype* new_type)
567 {
568   int retval;
569   if (blocklen<0) return MPI_ERR_ARG;
570   MPI_Aint lb = 0;
571   MPI_Aint ub = 0;
572   if(count>0){
573     lb=smpi_datatype_lb(old_type);
574     ub=((count-1)*stride+blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
575   }
576   if(old_type->has_subtype || stride != blocklen){
577
578
579     s_smpi_mpi_vector_t* subtype = smpi_datatype_vector_create( stride,
580                                                                 blocklen,
581                                                                 count,
582                                                                 old_type,
583                                                                 smpi_datatype_size(old_type));
584     smpi_datatype_create(new_type,
585                          count * (blocklen) * smpi_datatype_size(old_type), lb,
586                          ub,
587                          1,
588                          subtype,
589                          DT_FLAG_VECTOR);
590     retval=MPI_SUCCESS;
591   }else{
592     /* in this situation the data are contignous thus it's not
593      * required to serialize and unserialize it*/
594     smpi_datatype_create(new_type, count * blocklen *
595                          smpi_datatype_size(old_type), 0, ((count -1) * stride + blocklen)*
596                          smpi_datatype_size(old_type),
597                          0,
598                          NULL,
599                          DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
600     retval=MPI_SUCCESS;
601   }
602   return retval;
603 }
604
605 void free_vector(MPI_Datatype* d){
606   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
607 }
608
609 /*
610 Hvector Implementation - Vector with stride in bytes
611 */
612
613
614 /*
615  *  Copies noncontiguous data into contiguous memory.
616  *  @param contiguous_hvector - output hvector
617  *  @param noncontiguous_hvector - input hvector
618  *  @param type - pointer contening :
619  *      - stride - stride of between noncontiguous data, in bytes
620  *      - block_length - the width or height of blocked matrix
621  *      - count - the number of rows of matrix
622  */
623 void serialize_hvector( const void *noncontiguous_hvector,
624                        void *contiguous_hvector,
625                        int count,
626                        void *type)
627 {
628   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
629   int i;
630   char* contiguous_vector_char = (char*)contiguous_hvector;
631   char* noncontiguous_vector_char = (char*)noncontiguous_hvector;
632
633   for (i = 0; i < type_c->block_count * count; i++) {
634     if (type_c->old_type->has_subtype == 0)
635       memcpy(contiguous_vector_char,
636            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
637     else
638       ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
639                                                                    contiguous_vector_char,
640                                                                    type_c->block_length,
641                                                                    type_c->old_type->substruct);
642
643     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
644     if((i+1)%type_c->block_count ==0)
645     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
646     else
647     noncontiguous_vector_char += type_c->block_stride;
648   }
649 }
650 /*
651  *  Copies contiguous data into noncontiguous memory.
652  *  @param noncontiguous_vector - output hvector
653  *  @param contiguous_vector - input hvector
654  *  @param type - pointer contening :
655  *      - stride - stride of between noncontiguous data, in bytes
656  *      - block_length - the width or height of blocked matrix
657  *      - count - the number of rows of matrix
658  */
659 void unserialize_hvector( const void *contiguous_vector,
660                          void *noncontiguous_vector,
661                          int count,
662                          void *type,
663                          MPI_Op op)
664 {
665   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
666   int i;
667
668   char* contiguous_vector_char = (char*)contiguous_vector;
669   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
670
671   for (i = 0; i < type_c->block_count * count; i++) {
672     if (type_c->old_type->has_subtype == 0)
673       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
674                   &type_c->old_type);
675              /*memcpy(noncontiguous_vector_char,
676            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
677     else
678       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
679                                                                      noncontiguous_vector_char,
680                                                                      type_c->block_length,
681                                                                      type_c->old_type->substruct,
682                                                                      op);
683     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
684     if((i+1)%type_c->block_count ==0)
685     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
686     else
687     noncontiguous_vector_char += type_c->block_stride;
688   }
689 }
690
691 /*
692  * Create a Sub type vector to be able to serialize and unserialize it
693  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
694  * required the functions unserialize and serialize
695  *
696  */
697 s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
698                                                   int block_length,
699                                                   int block_count,
700                                                   MPI_Datatype old_type,
701                                                   int size_oldtype){
702   s_smpi_mpi_hvector_t *new_t= xbt_new(s_smpi_mpi_hvector_t,1);
703   new_t->base.serialize = &serialize_hvector;
704   new_t->base.unserialize = &unserialize_hvector;
705   new_t->base.subtype_free = &free_hvector;
706   new_t->block_stride = block_stride;
707   new_t->block_length = block_length;
708   new_t->block_count = block_count;
709   new_t->old_type = old_type;
710   new_t->size_oldtype = size_oldtype;
711   smpi_datatype_use(old_type);
712   return new_t;
713 }
714
715 //do nothing for vector types
716 void free_hvector(MPI_Datatype* d){
717   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
718 }
719
720 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
721 {
722   int retval;
723   if (blocklen<0) return MPI_ERR_ARG;
724   MPI_Aint lb = 0;
725   MPI_Aint ub = 0;
726   if(count>0){
727     lb=smpi_datatype_lb(old_type);
728     ub=((count-1)*stride)+(blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
729   }
730   if(old_type->has_subtype || stride != blocklen*smpi_datatype_get_extent(old_type)){
731     s_smpi_mpi_hvector_t* subtype = smpi_datatype_hvector_create( stride,
732                                                                   blocklen,
733                                                                   count,
734                                                                   old_type,
735                                                                   smpi_datatype_size(old_type));
736
737     smpi_datatype_create(new_type, count * blocklen * smpi_datatype_size(old_type),
738                                                  lb,ub,
739                          1,
740                          subtype,
741                          DT_FLAG_VECTOR);
742     retval=MPI_SUCCESS;
743   }else{
744     smpi_datatype_create(new_type, count * blocklen *
745                                              smpi_datatype_size(old_type),0,count * blocklen *
746                                              smpi_datatype_size(old_type),
747                                             0,
748                                             NULL,
749                                             DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
750     retval=MPI_SUCCESS;
751   }
752   return retval;
753 }
754
755
756 /*
757 Indexed Implementation
758 */
759
760 /*
761  *  Copies noncontiguous data into contiguous memory.
762  *  @param contiguous_indexed - output indexed
763  *  @param noncontiguous_indexed - input indexed
764  *  @param type - pointer contening :
765  *      - block_lengths - the width or height of blocked matrix
766  *      - block_indices - indices of each data, in element
767  *      - count - the number of rows of matrix
768  */
769 void serialize_indexed( const void *noncontiguous_indexed,
770                        void *contiguous_indexed,
771                        int count,
772                        void *type)
773 {
774   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
775   int i,j;
776   char* contiguous_indexed_char = (char*)contiguous_indexed;
777   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0] * type_c->size_oldtype;
778   for(j=0; j<count;j++){
779     for (i = 0; i < type_c->block_count; i++) {
780       if (type_c->old_type->has_subtype == 0)
781         memcpy(contiguous_indexed_char,
782                      noncontiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
783       else
784         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_indexed_char,
785                                                                      contiguous_indexed_char,
786                                                                      type_c->block_lengths[i],
787                                                                      type_c->old_type->substruct);
788
789
790       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
791       if (i<type_c->block_count-1)noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
792       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
793     }
794     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
795   }
796 }
797 /*
798  *  Copies contiguous data into noncontiguous memory.
799  *  @param noncontiguous_indexed - output indexed
800  *  @param contiguous_indexed - input indexed
801  *  @param type - pointer contening :
802  *      - block_lengths - the width or height of blocked matrix
803  *      - block_indices - indices of each data, in element
804  *      - count - the number of rows of matrix
805  */
806 void unserialize_indexed( const void *contiguous_indexed,
807                          void *noncontiguous_indexed,
808                          int count,
809                          void *type,
810                          MPI_Op op)
811 {
812
813   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
814   int i,j;
815   char* contiguous_indexed_char = (char*)contiguous_indexed;
816   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0]*smpi_datatype_get_extent(type_c->old_type);
817   for(j=0; j<count;j++){
818     for (i = 0; i < type_c->block_count; i++) {
819       if (type_c->old_type->has_subtype == 0)
820         smpi_op_apply(op, contiguous_indexed_char, noncontiguous_indexed_char, &type_c->block_lengths[i],
821                     &type_c->old_type);
822                /*memcpy(noncontiguous_indexed_char ,
823              contiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
824       else
825         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_indexed_char,
826                                                                        noncontiguous_indexed_char,
827                                                                        type_c->block_lengths[i],
828                                                                        type_c->old_type->substruct,
829                                                                        op);
830
831       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
832       if (i<type_c->block_count-1)
833         noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
834       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
835     }
836     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
837   }
838 }
839
840 void free_indexed(MPI_Datatype* type){
841   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths);
842   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices);
843   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
844 }
845
846 /*
847  * Create a Sub type indexed to be able to serialize and unserialize it
848  * the structure s_smpi_mpi_indexed_t is derived from s_smpi_subtype which
849  * required the functions unserialize and serialize
850  */
851 s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
852                                                   int* block_indices,
853                                                   int block_count,
854                                                   MPI_Datatype old_type,
855                                                   int size_oldtype){
856   s_smpi_mpi_indexed_t *new_t= xbt_new(s_smpi_mpi_indexed_t,1);
857   new_t->base.serialize = &serialize_indexed;
858   new_t->base.unserialize = &unserialize_indexed;
859   new_t->base.subtype_free = &free_indexed;
860  //TODO : add a custom function for each time to clean these 
861   new_t->block_lengths= xbt_new(int, block_count);
862   new_t->block_indices= xbt_new(int, block_count);
863   int i;
864   for(i=0;i<block_count;i++){
865     new_t->block_lengths[i]=block_lengths[i];
866     new_t->block_indices[i]=block_indices[i];
867   }
868   new_t->block_count = block_count;
869   smpi_datatype_use(old_type);
870   new_t->old_type = old_type;
871   new_t->size_oldtype = size_oldtype;
872   return new_t;
873 }
874
875
876 int smpi_datatype_indexed(int count, int* blocklens, int* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
877 {
878   int i;
879   int retval;
880   int size = 0;
881   int contiguous=1;
882   MPI_Aint lb = 0;
883   MPI_Aint ub = 0;
884   if(count>0){
885     lb=indices[0]*smpi_datatype_get_extent(old_type);
886     ub=indices[0]*smpi_datatype_get_extent(old_type) + blocklens[0]*smpi_datatype_ub(old_type);
887   }
888
889   for(i=0; i< count; i++){
890     if   (blocklens[i]<0)
891       return MPI_ERR_ARG;
892     size += blocklens[i];
893
894     if(indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type)<lb)
895         lb = indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type);
896     if(indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type)>ub)
897         ub = indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type);
898
899     if ( (i< count -1) && (indices[i]+blocklens[i] != indices[i+1]) )contiguous=0;
900   }
901   if (old_type->has_subtype == 1)
902     contiguous=0;
903
904   if(!contiguous){
905     s_smpi_mpi_indexed_t* subtype = smpi_datatype_indexed_create( blocklens,
906                                                                   indices,
907                                                                   count,
908                                                                   old_type,
909                                                                   smpi_datatype_size(old_type));
910      smpi_datatype_create(new_type,  size *
911                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA);
912   }else{
913     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
914                                                                   size,
915                                                                   old_type,
916                                                                   smpi_datatype_size(old_type));
917     smpi_datatype_create(new_type,  size *
918                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
919   }
920   retval=MPI_SUCCESS;
921   return retval;
922 }
923
924
925 /*
926 Hindexed Implementation - Indexed with indices in bytes 
927 */
928
929 /*
930  *  Copies noncontiguous data into contiguous memory.
931  *  @param contiguous_hindexed - output hindexed
932  *  @param noncontiguous_hindexed - input hindexed
933  *  @param type - pointer contening :
934  *      - block_lengths - the width or height of blocked matrix
935  *      - block_indices - indices of each data, in bytes
936  *      - count - the number of rows of matrix
937  */
938 void serialize_hindexed( const void *noncontiguous_hindexed,
939                        void *contiguous_hindexed,
940                        int count,
941                        void *type)
942 {
943   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
944   int i,j;
945   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
946   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
947   for(j=0; j<count;j++){
948     for (i = 0; i < type_c->block_count; i++) {
949       if (type_c->old_type->has_subtype == 0)
950         memcpy(contiguous_hindexed_char,
951                      noncontiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
952       else
953         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_hindexed_char,
954                                                                      contiguous_hindexed_char,
955                                                                      type_c->block_lengths[i],
956                                                                      type_c->old_type->substruct);
957
958       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
959       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
960       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
961     }
962     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
963   }
964 }
965 /*
966  *  Copies contiguous data into noncontiguous memory.
967  *  @param noncontiguous_hindexed - output hindexed
968  *  @param contiguous_hindexed - input hindexed
969  *  @param type - pointer contening :
970  *      - block_lengths - the width or height of blocked matrix
971  *      - block_indices - indices of each data, in bytes
972  *      - count - the number of rows of matrix
973  */
974 void unserialize_hindexed( const void *contiguous_hindexed,
975                          void *noncontiguous_hindexed,
976                          int count,
977                          void *type,
978                          MPI_Op op)
979 {
980   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
981   int i,j;
982
983   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
984   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
985   for(j=0; j<count;j++){
986     for (i = 0; i < type_c->block_count; i++) {
987       if (type_c->old_type->has_subtype == 0)
988         smpi_op_apply(op, contiguous_hindexed_char, noncontiguous_hindexed_char, &type_c->block_lengths[i],
989                             &type_c->old_type);
990                        /*memcpy(noncontiguous_hindexed_char,
991                contiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
992       else
993         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_hindexed_char,
994                                                                        noncontiguous_hindexed_char,
995                                                                        type_c->block_lengths[i],
996                                                                        type_c->old_type->substruct,
997                                                                        op);
998
999       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
1000       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
1001       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
1002     }
1003     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
1004   }
1005 }
1006
1007 void free_hindexed(MPI_Datatype* type){
1008   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths);
1009   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices);
1010   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
1011 }
1012
1013 /*
1014  * Create a Sub type hindexed to be able to serialize and unserialize it
1015  * the structure s_smpi_mpi_hindexed_t is derived from s_smpi_subtype which
1016  * required the functions unserialize and serialize
1017  */
1018 s_smpi_mpi_hindexed_t* smpi_datatype_hindexed_create( int* block_lengths,
1019                                                   MPI_Aint* block_indices,
1020                                                   int block_count,
1021                                                   MPI_Datatype old_type,
1022                                                   int size_oldtype){
1023   s_smpi_mpi_hindexed_t *new_t= xbt_new(s_smpi_mpi_hindexed_t,1);
1024   new_t->base.serialize = &serialize_hindexed;
1025   new_t->base.unserialize = &unserialize_hindexed;
1026   new_t->base.subtype_free = &free_hindexed;
1027  //TODO : add a custom function for each time to clean these 
1028   new_t->block_lengths= xbt_new(int, block_count);
1029   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1030   int i;
1031   for(i=0;i<block_count;i++){
1032     new_t->block_lengths[i]=block_lengths[i];
1033     new_t->block_indices[i]=block_indices[i];
1034   }
1035   new_t->block_count = block_count;
1036   new_t->old_type = old_type;
1037   new_t->size_oldtype = size_oldtype;
1038   return new_t;
1039 }
1040
1041
1042 int smpi_datatype_hindexed(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
1043 {
1044   int i;
1045   int retval;
1046   int size = 0;
1047   int contiguous=1;
1048   MPI_Aint lb = 0;
1049   MPI_Aint ub = 0;
1050   if(count>0){
1051     lb=indices[0] + smpi_datatype_lb(old_type);
1052     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_type);
1053   }
1054   for(i=0; i< count; i++){
1055     if   (blocklens[i]<0)
1056       return MPI_ERR_ARG;
1057     size += blocklens[i];
1058
1059     if(indices[i]+smpi_datatype_lb(old_type)<lb) lb = indices[i]+smpi_datatype_lb(old_type);
1060     if(indices[i]+blocklens[i]*smpi_datatype_ub(old_type)>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_type);
1061
1062     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_type) != indices[i+1]) )contiguous=0;
1063   }
1064   if (old_type->has_subtype == 1 || lb!=0)
1065     contiguous=0;
1066
1067   if(!contiguous){
1068     s_smpi_mpi_hindexed_t* subtype = smpi_datatype_hindexed_create( blocklens,
1069                                                                   indices,
1070                                                                   count,
1071                                                                   old_type,
1072                                                                   smpi_datatype_size(old_type));
1073     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1074                                                  lb,
1075                          ub
1076                          ,1, subtype, DT_FLAG_DATA);
1077   }else{
1078     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1079                                                                   size,
1080                                                                   old_type,
1081                                                                   smpi_datatype_size(old_type));
1082     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1083                                              0,size * smpi_datatype_size(old_type),
1084                                              1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1085   }
1086   retval=MPI_SUCCESS;
1087   return retval;
1088 }
1089
1090
1091 /*
1092 struct Implementation - Indexed with indices in bytes 
1093 */
1094
1095 /*
1096  *  Copies noncontiguous data into contiguous memory.
1097  *  @param contiguous_struct - output struct
1098  *  @param noncontiguous_struct - input struct
1099  *  @param type - pointer contening :
1100  *      - stride - stride of between noncontiguous data
1101  *      - block_length - the width or height of blocked matrix
1102  *      - count - the number of rows of matrix
1103  */
1104 void serialize_struct( const void *noncontiguous_struct,
1105                        void *contiguous_struct,
1106                        int count,
1107                        void *type)
1108 {
1109   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1110   int i,j;
1111   char* contiguous_struct_char = (char*)contiguous_struct;
1112   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1113   for(j=0; j<count;j++){
1114     for (i = 0; i < type_c->block_count; i++) {
1115       if (type_c->old_types[i]->has_subtype == 0)
1116         memcpy(contiguous_struct_char,
1117              noncontiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
1118       else
1119         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->serialize( noncontiguous_struct_char,
1120                                                                          contiguous_struct_char,
1121                                                                          type_c->block_lengths[i],
1122                                                                          type_c->old_types[i]->substruct);
1123
1124
1125       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1126       if (i<type_c->block_count-1)noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
1127       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);//let's hope this is MPI_UB ?
1128     }
1129     noncontiguous_struct=(void*)noncontiguous_struct_char;
1130   }
1131 }
1132 /*
1133  *  Copies contiguous data into noncontiguous memory.
1134  *  @param noncontiguous_struct - output struct
1135  *  @param contiguous_struct - input struct
1136  *  @param type - pointer contening :
1137  *      - stride - stride of between noncontiguous data
1138  *      - block_length - the width or height of blocked matrix
1139  *      - count - the number of rows of matrix
1140  */
1141 void unserialize_struct( const void *contiguous_struct,
1142                          void *noncontiguous_struct,
1143                          int count,
1144                          void *type,
1145                          MPI_Op op)
1146 {
1147   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1148   int i,j;
1149
1150   char* contiguous_struct_char = (char*)contiguous_struct;
1151   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1152   for(j=0; j<count;j++){
1153     for (i = 0; i < type_c->block_count; i++) {
1154       if (type_c->old_types[i]->has_subtype == 0)
1155         smpi_op_apply(op, contiguous_struct_char, noncontiguous_struct_char, &type_c->block_lengths[i],
1156            & type_c->old_types[i]);
1157                        /*memcpy(noncontiguous_struct_char,
1158              contiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));*/
1159       else
1160         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->unserialize( contiguous_struct_char,
1161                                                                            noncontiguous_struct_char,
1162                                                                            type_c->block_lengths[i],
1163                                                                            type_c->old_types[i]->substruct,
1164                                                                            op);
1165
1166       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1167       if (i<type_c->block_count-1)noncontiguous_struct_char =  (char*)noncontiguous_struct + type_c->block_indices[i+1];
1168       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);
1169     }
1170     noncontiguous_struct=(void*)noncontiguous_struct_char;
1171     
1172   }
1173 }
1174
1175 void free_struct(MPI_Datatype* type){
1176   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths);
1177   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices);
1178   int i=0;
1179   for (i = 0; i < ((s_smpi_mpi_struct_t *)(*type)->substruct)->block_count; i++)
1180     smpi_datatype_unuse(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types[i]);
1181   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types);
1182 }
1183
1184 /*
1185  * Create a Sub type struct to be able to serialize and unserialize it
1186  * the structure s_smpi_mpi_struct_t is derived from s_smpi_subtype which
1187  * required the functions unserialize and serialize
1188  */
1189 s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
1190                                                   MPI_Aint* block_indices,
1191                                                   int block_count,
1192                                                   MPI_Datatype* old_types){
1193   s_smpi_mpi_struct_t *new_t= xbt_new(s_smpi_mpi_struct_t,1);
1194   new_t->base.serialize = &serialize_struct;
1195   new_t->base.unserialize = &unserialize_struct;
1196   new_t->base.subtype_free = &free_struct;
1197  //TODO : add a custom function for each time to clean these 
1198   new_t->block_lengths= xbt_new(int, block_count);
1199   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1200   new_t->old_types=  xbt_new(MPI_Datatype, block_count);
1201   int i;
1202   for(i=0;i<block_count;i++){
1203     new_t->block_lengths[i]=block_lengths[i];
1204     new_t->block_indices[i]=block_indices[i];
1205     new_t->old_types[i]=old_types[i];
1206     smpi_datatype_use(new_t->old_types[i]);
1207   }
1208   //new_t->block_lengths = block_lengths;
1209   //new_t->block_indices = block_indices;
1210   new_t->block_count = block_count;
1211   //new_t->old_types = old_types;
1212   return new_t;
1213 }
1214
1215
1216 int smpi_datatype_struct(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype* old_types, MPI_Datatype* new_type)
1217 {
1218   int i;
1219   size_t size = 0;
1220   int contiguous=1;
1221   size = 0;
1222   MPI_Aint lb = 0;
1223   MPI_Aint ub = 0;
1224   if(count>0){
1225     lb=indices[0] + smpi_datatype_lb(old_types[0]);
1226     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_types[0]);
1227   }
1228   int forced_lb=0;
1229   int forced_ub=0;
1230   for(i=0; i< count; i++){
1231     if (blocklens[i]<0)
1232       return MPI_ERR_ARG;
1233     if (old_types[i]->has_subtype == 1)
1234       contiguous=0;
1235
1236     size += blocklens[i]*smpi_datatype_size(old_types[i]);
1237     if (old_types[i]==MPI_LB){
1238       lb=indices[i];
1239       forced_lb=1;
1240     }
1241     if (old_types[i]==MPI_UB){
1242       ub=indices[i];
1243       forced_ub=1;
1244     }
1245
1246     if(!forced_lb && indices[i]+smpi_datatype_lb(old_types[i])<lb) lb = indices[i];
1247     if(!forced_ub && indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i])>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i]);
1248
1249     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_types[i]) != indices[i+1]) )contiguous=0;
1250   }
1251
1252   if(!contiguous){
1253     s_smpi_mpi_struct_t* subtype = smpi_datatype_struct_create( blocklens,
1254                                                               indices,
1255                                                               count,
1256                                                               old_types);
1257
1258     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA);
1259   }else{
1260     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1261                                                                   size,
1262                                                                   MPI_CHAR,
1263                                                                   1);
1264     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1265   }
1266   return MPI_SUCCESS;
1267 }
1268
1269 void smpi_datatype_commit(MPI_Datatype *datatype)
1270 {
1271   (*datatype)->flags=  ((*datatype)->flags | DT_FLAG_COMMITED);
1272 }
1273
1274 typedef struct s_smpi_mpi_op {
1275   MPI_User_function *func;
1276   int is_commute;
1277 } s_smpi_mpi_op_t;
1278
1279 #define MAX_OP(a, b)  (b) = (a) < (b) ? (b) : (a)
1280 #define MIN_OP(a, b)  (b) = (a) < (b) ? (a) : (b)
1281 #define SUM_OP(a, b)  (b) += (a)
1282 #define PROD_OP(a, b) (b) *= (a)
1283 #define LAND_OP(a, b) (b) = (a) && (b)
1284 #define LOR_OP(a, b)  (b) = (a) || (b)
1285 #define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b))
1286 #define BAND_OP(a, b) (b) &= (a)
1287 #define BOR_OP(a, b)  (b) |= (a)
1288 #define BXOR_OP(a, b) (b) ^= (a)
1289 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
1290 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
1291
1292 #define APPLY_FUNC(a, b, length, type, func) \
1293 {                                          \
1294   int i;                                   \
1295   type* x = (type*)(a);                    \
1296   type* y = (type*)(b);                    \
1297   for(i = 0; i < *(length); i++) {         \
1298     func(x[i], y[i]);                      \
1299   }                                        \
1300 }
1301
1302 static void max_func(void *a, void *b, int *length,
1303                      MPI_Datatype * datatype)
1304 {
1305   if (*datatype == MPI_CHAR) {
1306     APPLY_FUNC(a, b, length, char, MAX_OP);
1307   } else if (*datatype == MPI_SHORT) {
1308     APPLY_FUNC(a, b, length, short, MAX_OP);
1309   } else if (*datatype == MPI_INT) {
1310     APPLY_FUNC(a, b, length, int, MAX_OP);
1311   } else if (*datatype == MPI_LONG) {
1312     APPLY_FUNC(a, b, length, long, MAX_OP);
1313   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1314     APPLY_FUNC(a, b, length, unsigned short, MAX_OP);
1315   } else if (*datatype == MPI_UNSIGNED) {
1316     APPLY_FUNC(a, b, length, unsigned int, MAX_OP);
1317   } else if (*datatype == MPI_UNSIGNED_LONG) {
1318     APPLY_FUNC(a, b, length, unsigned long, MAX_OP);
1319   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1320     APPLY_FUNC(a, b, length, unsigned char, MAX_OP);
1321   } else if (*datatype == MPI_FLOAT) {
1322     APPLY_FUNC(a, b, length, float, MAX_OP);
1323   } else if (*datatype == MPI_DOUBLE) {
1324     APPLY_FUNC(a, b, length, double, MAX_OP);
1325   } else if (*datatype == MPI_LONG_DOUBLE) {
1326     APPLY_FUNC(a, b, length, long double, MAX_OP);
1327   }
1328 }
1329
1330 static void min_func(void *a, void *b, int *length,
1331                      MPI_Datatype * datatype)
1332 {
1333   if (*datatype == MPI_CHAR) {
1334     APPLY_FUNC(a, b, length, char, MIN_OP);
1335   } else if (*datatype == MPI_SHORT) {
1336     APPLY_FUNC(a, b, length, short, MIN_OP);
1337   } else if (*datatype == MPI_INT) {
1338     APPLY_FUNC(a, b, length, int, MIN_OP);
1339   } else if (*datatype == MPI_LONG) {
1340     APPLY_FUNC(a, b, length, long, MIN_OP);
1341   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1342     APPLY_FUNC(a, b, length, unsigned short, MIN_OP);
1343   } else if (*datatype == MPI_UNSIGNED) {
1344     APPLY_FUNC(a, b, length, unsigned int, MIN_OP);
1345   } else if (*datatype == MPI_UNSIGNED_LONG) {
1346     APPLY_FUNC(a, b, length, unsigned long, MIN_OP);
1347   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1348     APPLY_FUNC(a, b, length, unsigned char, MIN_OP);
1349   } else if (*datatype == MPI_FLOAT) {
1350     APPLY_FUNC(a, b, length, float, MIN_OP);
1351   } else if (*datatype == MPI_DOUBLE) {
1352     APPLY_FUNC(a, b, length, double, MIN_OP);
1353   } else if (*datatype == MPI_LONG_DOUBLE) {
1354     APPLY_FUNC(a, b, length, long double, MIN_OP);
1355   }
1356 }
1357
1358 static void sum_func(void *a, void *b, int *length,
1359                      MPI_Datatype * datatype)
1360 {
1361   if (*datatype == MPI_CHAR) {
1362     APPLY_FUNC(a, b, length, char, SUM_OP);
1363   } else if (*datatype == MPI_SHORT) {
1364     APPLY_FUNC(a, b, length, short, SUM_OP);
1365   } else if (*datatype == MPI_INT) {
1366     APPLY_FUNC(a, b, length, int, SUM_OP);
1367   } else if (*datatype == MPI_LONG) {
1368     APPLY_FUNC(a, b, length, long, SUM_OP);
1369   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1370     APPLY_FUNC(a, b, length, unsigned short, SUM_OP);
1371   } else if (*datatype == MPI_UNSIGNED) {
1372     APPLY_FUNC(a, b, length, unsigned int, SUM_OP);
1373   } else if (*datatype == MPI_UNSIGNED_LONG) {
1374     APPLY_FUNC(a, b, length, unsigned long, SUM_OP);
1375   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1376     APPLY_FUNC(a, b, length, unsigned char, SUM_OP);
1377   } else if (*datatype == MPI_FLOAT) {
1378     APPLY_FUNC(a, b, length, float, SUM_OP);
1379   } else if (*datatype == MPI_DOUBLE) {
1380     APPLY_FUNC(a, b, length, double, SUM_OP);
1381   } else if (*datatype == MPI_LONG_DOUBLE) {
1382     APPLY_FUNC(a, b, length, long double, SUM_OP);
1383   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1384     APPLY_FUNC(a, b, length, float _Complex, SUM_OP);
1385   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1386     APPLY_FUNC(a, b, length, double _Complex, SUM_OP);
1387   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1388     APPLY_FUNC(a, b, length, long double _Complex, SUM_OP);
1389   }
1390 }
1391
1392 static void prod_func(void *a, void *b, int *length,
1393                       MPI_Datatype * datatype)
1394 {
1395   if (*datatype == MPI_CHAR) {
1396     APPLY_FUNC(a, b, length, char, PROD_OP);
1397   } else if (*datatype == MPI_SHORT) {
1398     APPLY_FUNC(a, b, length, short, PROD_OP);
1399   } else if (*datatype == MPI_INT) {
1400     APPLY_FUNC(a, b, length, int, PROD_OP);
1401   } else if (*datatype == MPI_LONG) {
1402     APPLY_FUNC(a, b, length, long, PROD_OP);
1403   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1404     APPLY_FUNC(a, b, length, unsigned short, PROD_OP);
1405   } else if (*datatype == MPI_UNSIGNED) {
1406     APPLY_FUNC(a, b, length, unsigned int, PROD_OP);
1407   } else if (*datatype == MPI_UNSIGNED_LONG) {
1408     APPLY_FUNC(a, b, length, unsigned long, PROD_OP);
1409   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1410     APPLY_FUNC(a, b, length, unsigned char, PROD_OP);
1411   } else if (*datatype == MPI_FLOAT) {
1412     APPLY_FUNC(a, b, length, float, PROD_OP);
1413   } else if (*datatype == MPI_DOUBLE) {
1414     APPLY_FUNC(a, b, length, double, PROD_OP);
1415   } else if (*datatype == MPI_LONG_DOUBLE) {
1416     APPLY_FUNC(a, b, length, long double, PROD_OP);
1417   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1418     APPLY_FUNC(a, b, length, float _Complex, PROD_OP);
1419   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1420     APPLY_FUNC(a, b, length, double _Complex, PROD_OP);
1421   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1422     APPLY_FUNC(a, b, length, long double _Complex, PROD_OP);
1423   }
1424 }
1425
1426 static void land_func(void *a, void *b, int *length,
1427                       MPI_Datatype * datatype)
1428 {
1429   if (*datatype == MPI_CHAR) {
1430     APPLY_FUNC(a, b, length, char, LAND_OP);
1431   } else if (*datatype == MPI_SHORT) {
1432     APPLY_FUNC(a, b, length, short, LAND_OP);
1433   } else if (*datatype == MPI_INT) {
1434     APPLY_FUNC(a, b, length, int, LAND_OP);
1435   } else if (*datatype == MPI_LONG) {
1436     APPLY_FUNC(a, b, length, long, LAND_OP);
1437   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1438     APPLY_FUNC(a, b, length, unsigned short, LAND_OP);
1439   } else if (*datatype == MPI_UNSIGNED) {
1440     APPLY_FUNC(a, b, length, unsigned int, LAND_OP);
1441   } else if (*datatype == MPI_UNSIGNED_LONG) {
1442     APPLY_FUNC(a, b, length, unsigned long, LAND_OP);
1443   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1444     APPLY_FUNC(a, b, length, unsigned char, LAND_OP);
1445   } else if (*datatype == MPI_C_BOOL) {
1446     APPLY_FUNC(a, b, length, _Bool, LAND_OP);
1447   }
1448 }
1449
1450 static void lor_func(void *a, void *b, int *length,
1451                      MPI_Datatype * datatype)
1452 {
1453   if (*datatype == MPI_CHAR) {
1454     APPLY_FUNC(a, b, length, char, LOR_OP);
1455   } else if (*datatype == MPI_SHORT) {
1456     APPLY_FUNC(a, b, length, short, LOR_OP);
1457   } else if (*datatype == MPI_INT) {
1458     APPLY_FUNC(a, b, length, int, LOR_OP);
1459   } else if (*datatype == MPI_LONG) {
1460     APPLY_FUNC(a, b, length, long, LOR_OP);
1461   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1462     APPLY_FUNC(a, b, length, unsigned short, LOR_OP);
1463   } else if (*datatype == MPI_UNSIGNED) {
1464     APPLY_FUNC(a, b, length, unsigned int, LOR_OP);
1465   } else if (*datatype == MPI_UNSIGNED_LONG) {
1466     APPLY_FUNC(a, b, length, unsigned long, LOR_OP);
1467   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1468     APPLY_FUNC(a, b, length, unsigned char, LOR_OP);
1469   } else if (*datatype == MPI_C_BOOL) {
1470     APPLY_FUNC(a, b, length, _Bool, LOR_OP);
1471   }
1472 }
1473
1474 static void lxor_func(void *a, void *b, int *length,
1475                       MPI_Datatype * datatype)
1476 {
1477   if (*datatype == MPI_CHAR) {
1478     APPLY_FUNC(a, b, length, char, LXOR_OP);
1479   } else if (*datatype == MPI_SHORT) {
1480     APPLY_FUNC(a, b, length, short, LXOR_OP);
1481   } else if (*datatype == MPI_INT) {
1482     APPLY_FUNC(a, b, length, int, LXOR_OP);
1483   } else if (*datatype == MPI_LONG) {
1484     APPLY_FUNC(a, b, length, long, LXOR_OP);
1485   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1486     APPLY_FUNC(a, b, length, unsigned short, LXOR_OP);
1487   } else if (*datatype == MPI_UNSIGNED) {
1488     APPLY_FUNC(a, b, length, unsigned int, LXOR_OP);
1489   } else if (*datatype == MPI_UNSIGNED_LONG) {
1490     APPLY_FUNC(a, b, length, unsigned long, LXOR_OP);
1491   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1492     APPLY_FUNC(a, b, length, unsigned char, LXOR_OP);
1493   } else if (*datatype == MPI_C_BOOL) {
1494     APPLY_FUNC(a, b, length, _Bool, LXOR_OP);
1495   }
1496 }
1497
1498 static void band_func(void *a, void *b, int *length,
1499                       MPI_Datatype * datatype)
1500 {
1501   if (*datatype == MPI_CHAR) {
1502     APPLY_FUNC(a, b, length, char, BAND_OP);
1503   }else if (*datatype == MPI_SHORT) {
1504     APPLY_FUNC(a, b, length, short, BAND_OP);
1505   } else if (*datatype == MPI_INT) {
1506     APPLY_FUNC(a, b, length, int, BAND_OP);
1507   } else if (*datatype == MPI_LONG) {
1508     APPLY_FUNC(a, b, length, long, BAND_OP);
1509   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1510     APPLY_FUNC(a, b, length, unsigned short, BAND_OP);
1511   } else if (*datatype == MPI_UNSIGNED) {
1512     APPLY_FUNC(a, b, length, unsigned int, BAND_OP);
1513   } else if (*datatype == MPI_UNSIGNED_LONG) {
1514     APPLY_FUNC(a, b, length, unsigned long, BAND_OP);
1515   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1516     APPLY_FUNC(a, b, length, unsigned char, BAND_OP);
1517   } else if (*datatype == MPI_BYTE) {
1518     APPLY_FUNC(a, b, length, uint8_t, BAND_OP);
1519   }
1520 }
1521
1522 static void bor_func(void *a, void *b, int *length,
1523                      MPI_Datatype * datatype)
1524 {
1525   if (*datatype == MPI_CHAR) {
1526     APPLY_FUNC(a, b, length, char, BOR_OP);
1527   } else if (*datatype == MPI_SHORT) {
1528     APPLY_FUNC(a, b, length, short, BOR_OP);
1529   } else if (*datatype == MPI_INT) {
1530     APPLY_FUNC(a, b, length, int, BOR_OP);
1531   } else if (*datatype == MPI_LONG) {
1532     APPLY_FUNC(a, b, length, long, BOR_OP);
1533   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1534     APPLY_FUNC(a, b, length, unsigned short, BOR_OP);
1535   } else if (*datatype == MPI_UNSIGNED) {
1536     APPLY_FUNC(a, b, length, unsigned int, BOR_OP);
1537   } else if (*datatype == MPI_UNSIGNED_LONG) {
1538     APPLY_FUNC(a, b, length, unsigned long, BOR_OP);
1539   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1540     APPLY_FUNC(a, b, length, unsigned char, BOR_OP);
1541   } else if (*datatype == MPI_BYTE) {
1542     APPLY_FUNC(a, b, length, uint8_t, BOR_OP);
1543   }
1544 }
1545
1546 static void bxor_func(void *a, void *b, int *length,
1547                       MPI_Datatype * datatype)
1548 {
1549   if (*datatype == MPI_CHAR) {
1550     APPLY_FUNC(a, b, length, char, BXOR_OP);
1551   } else if (*datatype == MPI_SHORT) {
1552     APPLY_FUNC(a, b, length, short, BXOR_OP);
1553   } else if (*datatype == MPI_INT) {
1554     APPLY_FUNC(a, b, length, int, BXOR_OP);
1555   } else if (*datatype == MPI_LONG) {
1556     APPLY_FUNC(a, b, length, long, BXOR_OP);
1557   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1558     APPLY_FUNC(a, b, length, unsigned short, BXOR_OP);
1559   } else if (*datatype == MPI_UNSIGNED) {
1560     APPLY_FUNC(a, b, length, unsigned int, BXOR_OP);
1561   } else if (*datatype == MPI_UNSIGNED_LONG) {
1562     APPLY_FUNC(a, b, length, unsigned long, BXOR_OP);
1563   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1564     APPLY_FUNC(a, b, length, unsigned char, BXOR_OP);
1565   } else if (*datatype == MPI_BYTE) {
1566     APPLY_FUNC(a, b, length, uint8_t, BXOR_OP);
1567   }
1568 }
1569
1570 static void minloc_func(void *a, void *b, int *length,
1571                         MPI_Datatype * datatype)
1572 {
1573   if (*datatype == MPI_FLOAT_INT) {
1574     APPLY_FUNC(a, b, length, float_int, MINLOC_OP);
1575   } else if (*datatype == MPI_LONG_INT) {
1576     APPLY_FUNC(a, b, length, long_int, MINLOC_OP);
1577   } else if (*datatype == MPI_DOUBLE_INT) {
1578     APPLY_FUNC(a, b, length, double_int, MINLOC_OP);
1579   } else if (*datatype == MPI_SHORT_INT) {
1580     APPLY_FUNC(a, b, length, short_int, MINLOC_OP);
1581   } else if (*datatype == MPI_2LONG) {
1582     APPLY_FUNC(a, b, length, long_long, MINLOC_OP);
1583   } else if (*datatype == MPI_2INT) {
1584     APPLY_FUNC(a, b, length, int_int, MINLOC_OP);
1585   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1586     APPLY_FUNC(a, b, length, long_double_int, MINLOC_OP);
1587   } else if (*datatype == MPI_2FLOAT) {
1588     APPLY_FUNC(a, b, length, float_float, MINLOC_OP);
1589   } else if (*datatype == MPI_2DOUBLE) {
1590     APPLY_FUNC(a, b, length, double_double, MINLOC_OP);
1591   }
1592 }
1593
1594 static void maxloc_func(void *a, void *b, int *length,
1595                         MPI_Datatype * datatype)
1596 {
1597   if (*datatype == MPI_FLOAT_INT) {
1598     APPLY_FUNC(a, b, length, float_int, MAXLOC_OP);
1599   } else if (*datatype == MPI_LONG_INT) {
1600     APPLY_FUNC(a, b, length, long_int, MAXLOC_OP);
1601   } else if (*datatype == MPI_DOUBLE_INT) {
1602     APPLY_FUNC(a, b, length, double_int, MAXLOC_OP);
1603   } else if (*datatype == MPI_SHORT_INT) {
1604     APPLY_FUNC(a, b, length, short_int, MAXLOC_OP);
1605   } else if (*datatype == MPI_2LONG) {
1606     APPLY_FUNC(a, b, length, long_long, MAXLOC_OP);
1607   } else if (*datatype == MPI_2INT) {
1608     APPLY_FUNC(a, b, length, int_int, MAXLOC_OP);
1609   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1610     APPLY_FUNC(a, b, length, long_double_int, MAXLOC_OP);
1611   } else if (*datatype == MPI_2FLOAT) {
1612     APPLY_FUNC(a, b, length, float_float, MAXLOC_OP);
1613   } else if (*datatype == MPI_2DOUBLE) {
1614     APPLY_FUNC(a, b, length, double_double, MAXLOC_OP);
1615   }
1616 }
1617
1618 static void replace_func(void *a, void *b, int *length,
1619                         MPI_Datatype * datatype)
1620 {
1621   memcpy(b, a, *length * smpi_datatype_size(*datatype));
1622 }
1623
1624 #define CREATE_MPI_OP(name, func)                             \
1625   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */, TRUE }; \
1626 MPI_Op name = &mpi_##name;
1627
1628 CREATE_MPI_OP(MPI_MAX, max_func);
1629 CREATE_MPI_OP(MPI_MIN, min_func);
1630 CREATE_MPI_OP(MPI_SUM, sum_func);
1631 CREATE_MPI_OP(MPI_PROD, prod_func);
1632 CREATE_MPI_OP(MPI_LAND, land_func);
1633 CREATE_MPI_OP(MPI_LOR, lor_func);
1634 CREATE_MPI_OP(MPI_LXOR, lxor_func);
1635 CREATE_MPI_OP(MPI_BAND, band_func);
1636 CREATE_MPI_OP(MPI_BOR, bor_func);
1637 CREATE_MPI_OP(MPI_BXOR, bxor_func);
1638 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
1639 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
1640 CREATE_MPI_OP(MPI_REPLACE, replace_func);
1641
1642
1643 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
1644 {
1645   MPI_Op op;
1646   op = xbt_new(s_smpi_mpi_op_t, 1);
1647   op->func = function;
1648   op-> is_commute = commute;
1649   return op;
1650 }
1651
1652 int smpi_op_is_commute(MPI_Op op)
1653 {
1654   return (op==MPI_OP_NULL) ? 1 : op-> is_commute;
1655 }
1656
1657 void smpi_op_destroy(MPI_Op op)
1658 {
1659   xbt_free(op);
1660 }
1661
1662 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
1663                    MPI_Datatype * datatype)
1664 {
1665   if(op==MPI_OP_NULL)
1666     return;
1667
1668   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
1669     XBT_DEBUG("Applying operation, switch to the right data frame ");
1670     smpi_switch_data_segment(smpi_process_index());
1671   }
1672
1673   if(!smpi_process_get_replaying())
1674   op->func(invec, inoutvec, len, datatype);
1675 }
1676
1677 int smpi_type_attr_delete(MPI_Datatype type, int keyval){
1678   smpi_type_key_elem elem = xbt_dict_get_or_null_ext(smpi_type_keyvals, (const char*)&keyval, sizeof(int));
1679   if(!elem)
1680     return MPI_ERR_ARG;
1681   if(elem->delete_fn!=MPI_NULL_DELETE_FN){
1682     void * value;
1683     int flag;
1684     if(smpi_type_attr_get(type, keyval, &value, &flag)==MPI_SUCCESS){
1685       int ret = elem->delete_fn(type, keyval, value, &flag);
1686       if(ret!=MPI_SUCCESS) return ret;
1687     }
1688   }  
1689   if(type->attributes==NULL)
1690     return MPI_ERR_ARG;
1691
1692   xbt_dict_remove_ext(type->attributes, (const char*)&keyval, sizeof(int));
1693   return MPI_SUCCESS;
1694 }
1695
1696 int smpi_type_attr_get(MPI_Datatype type, int keyval, void* attr_value, int* flag){
1697   smpi_type_key_elem elem = xbt_dict_get_or_null_ext(smpi_type_keyvals, (const char*)&keyval, sizeof(int));
1698   if(!elem)
1699     return MPI_ERR_ARG;
1700   xbt_ex_t ex;
1701   if(type->attributes==NULL){
1702     *flag=0;
1703     return MPI_SUCCESS;
1704   }
1705   TRY {
1706     *(void**)attr_value = xbt_dict_get_ext(type->attributes, (const char*)&keyval, sizeof(int));
1707     *flag=1;
1708   }
1709   CATCH(ex) {
1710     *flag=0;
1711     xbt_ex_free(ex);
1712   }
1713   return MPI_SUCCESS;
1714 }
1715
1716 int smpi_type_attr_put(MPI_Datatype type, int keyval, void* attr_value){
1717   if(!smpi_type_keyvals)
1718   smpi_type_keyvals = xbt_dict_new();
1719   smpi_type_key_elem elem = xbt_dict_get_or_null_ext(smpi_type_keyvals, (const char*)&keyval, sizeof(int));
1720   if(!elem )
1721     return MPI_ERR_ARG;
1722   int flag;
1723   void* value;
1724   smpi_type_attr_get(type, keyval, &value, &flag);
1725   if(flag && elem->delete_fn!=MPI_NULL_DELETE_FN){
1726     int ret = elem->delete_fn(type, keyval, value, &flag);
1727     if(ret!=MPI_SUCCESS) return ret;
1728   }
1729   if(type->attributes==NULL)
1730     type->attributes=xbt_dict_new();
1731
1732   xbt_dict_set_ext(type->attributes, (const char*)&keyval, sizeof(int), attr_value, NULL);
1733   return MPI_SUCCESS;
1734 }
1735
1736 int smpi_type_keyval_create(MPI_Type_copy_attr_function* copy_fn, MPI_Type_delete_attr_function* delete_fn, int* keyval, void* extra_state){
1737
1738   if(!smpi_type_keyvals)
1739   smpi_type_keyvals = xbt_dict_new();
1740   
1741   smpi_type_key_elem value = (smpi_type_key_elem) xbt_new0(s_smpi_mpi_type_key_elem_t,1);
1742   
1743   value->copy_fn=copy_fn;
1744   value->delete_fn=delete_fn;
1745   
1746   *keyval = type_keyval_id;
1747   xbt_dict_set_ext(smpi_type_keyvals,(const char*)keyval, sizeof(int),(void*)value, NULL);
1748   type_keyval_id++;
1749   return MPI_SUCCESS;
1750 }
1751
1752 int smpi_type_keyval_free(int* keyval){
1753   smpi_type_key_elem elem = xbt_dict_get_or_null_ext(smpi_type_keyvals, (const char*)keyval, sizeof(int));
1754   if(!elem){
1755     return MPI_ERR_ARG;
1756   }
1757   xbt_dict_remove_ext(smpi_type_keyvals, (const char*)keyval, sizeof(int));
1758   xbt_free(elem);
1759   return MPI_SUCCESS;
1760 }
1761
1762 int smpi_mpi_pack(void* inbuf, int incount, MPI_Datatype type, void* outbuf, int outcount, int* position, MPI_Comm comm){
1763   size_t size = smpi_datatype_size(type);
1764   if (outcount - *position < incount*size)
1765     return MPI_ERR_BUFFER;
1766   smpi_datatype_copy(inbuf, incount, type,
1767                    (char*)outbuf + *position, outcount, MPI_CHAR);
1768   *position += incount * size;
1769   return MPI_SUCCESS;
1770 }
1771
1772 int smpi_mpi_unpack(void* inbuf, int insize, int* position, void* outbuf, int outcount, MPI_Datatype type, MPI_Comm comm){
1773   size_t size = smpi_datatype_size(type);
1774   if (outcount*size> insize)
1775     return MPI_ERR_BUFFER;
1776   smpi_datatype_copy((char*)inbuf + *position, insize, MPI_CHAR,
1777                    outbuf, outcount, type);
1778   *position += outcount * size;
1779   return MPI_SUCCESS;
1780 }
1781