Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
c5e93f38a5c2a61eb7be75621223a231cc9db5d0
[simgrid.git] / src / smpi / smpi_mpi_dt.c
1 /* smpi_mpi_dt.c -- MPI primitives to handle datatypes                        */
2 /* FIXME: a very incomplete implementation                                    */
3
4 /* Copyright (c) 2009-2014. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <limits.h>
14 #include "private.h"
15 #include "smpi_mpi_dt_private.h"
16 #include "mc/mc.h"
17 #include "xbt/replay.h"
18 #include "simgrid/modelchecker.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi,
21                                 "Logging specific to SMPI (datatype)");
22
23 #define INTSIZEDCHAR (sizeof(int)*CHAR_BIT-1)/3 + 3 
24 xbt_dict_t smpi_type_keyvals = NULL;
25 int type_keyval_id=0;//avoid collisions
26
27 #define CREATE_MPI_DATATYPE(name, type)       \
28   static s_smpi_mpi_datatype_t mpi_##name = { \
29     (char*) # name,                                   \
30     sizeof(type),  /* size */                 \
31     0,             /*was 1 has_subtype*/             \
32     0,             /* lb */                   \
33     sizeof(type),  /* ub = lb + size */       \
34     DT_FLAG_BASIC,  /* flags */              \
35     NULL,           /* attributes */         \
36     NULL,           /* pointer on extended struct*/ \
37   };                                          \
38 MPI_Datatype name = &mpi_##name;
39
40 #define CREATE_MPI_DATATYPE_NULL(name)       \
41   static s_smpi_mpi_datatype_t mpi_##name = { \
42     (char*) # name,                                   \
43     0,  /* size */                 \
44     0,             /*was 1 has_subtype*/             \
45     0,             /* lb */                   \
46     0,  /* ub = lb + size */       \
47     DT_FLAG_BASIC,  /* flags */              \
48     NULL,           /* attributes */         \
49     NULL           /* pointer on extended struct*/ \
50   };                                          \
51 MPI_Datatype name = &mpi_##name;
52
53 //The following are datatypes for the MPI functions MPI_MAXLOC and MPI_MINLOC.
54 typedef struct {
55   float value;
56   int index;
57 } float_int;
58 typedef struct {
59   float value;
60   float index;
61 } float_float;
62 typedef struct {
63   long value;
64   long index;
65 } long_long;
66 typedef struct {
67   double value;
68   double index;
69 } double_double;
70 typedef struct {
71   long value;
72   int index;
73 } long_int;
74 typedef struct {
75   double value;
76   int index;
77 } double_int;
78 typedef struct {
79   short value;
80   int index;
81 } short_int;
82 typedef struct {
83   int value;
84   int index;
85 } int_int;
86 typedef struct {
87   long double value;
88   int index;
89 } long_double_int;
90 typedef struct {
91   int64_t value;
92   int64_t index;
93 } integer128_t;
94 // Predefined data types
95 CREATE_MPI_DATATYPE(MPI_CHAR, char);
96 CREATE_MPI_DATATYPE(MPI_SHORT, short);
97 CREATE_MPI_DATATYPE(MPI_INT, int);
98 CREATE_MPI_DATATYPE(MPI_LONG, long);
99 CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long);
100 CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char);
101 CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char);
102 CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short);
103 CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int);
104 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long);
105 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long);
106 CREATE_MPI_DATATYPE(MPI_FLOAT, float);
107 CREATE_MPI_DATATYPE(MPI_DOUBLE, double);
108 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double);
109 CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t);
110 CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool);
111 CREATE_MPI_DATATYPE(MPI_BYTE, int8_t);
112 CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t);
113 CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t);
114 CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t);
115 CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t);
116 CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t);
117 CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t);
118 CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t);
119 CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t);
120 CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex);
121 CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex);
122 CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex);
123 CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint);
124 CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset);
125
126 CREATE_MPI_DATATYPE(MPI_FLOAT_INT, float_int);
127 CREATE_MPI_DATATYPE(MPI_LONG_INT, long_int);
128 CREATE_MPI_DATATYPE(MPI_DOUBLE_INT, double_int);
129 CREATE_MPI_DATATYPE(MPI_SHORT_INT, short_int);
130 CREATE_MPI_DATATYPE(MPI_2INT, int_int);
131 CREATE_MPI_DATATYPE(MPI_2FLOAT, float_float);
132 CREATE_MPI_DATATYPE(MPI_2DOUBLE, double_double);
133 CREATE_MPI_DATATYPE(MPI_2LONG, long_long);
134
135 CREATE_MPI_DATATYPE(MPI_REAL4, float);
136 CREATE_MPI_DATATYPE(MPI_REAL8, float);
137 CREATE_MPI_DATATYPE(MPI_REAL16, double);
138 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX8);
139 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX16);
140 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX32);
141 CREATE_MPI_DATATYPE(MPI_INTEGER1, int);
142 CREATE_MPI_DATATYPE(MPI_INTEGER2, int16_t);
143 CREATE_MPI_DATATYPE(MPI_INTEGER4, int32_t);
144 CREATE_MPI_DATATYPE(MPI_INTEGER8, int64_t);
145 CREATE_MPI_DATATYPE(MPI_INTEGER16, integer128_t);
146
147 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE_INT, long_double_int);
148
149 CREATE_MPI_DATATYPE_NULL(MPI_UB);
150 CREATE_MPI_DATATYPE_NULL(MPI_LB);
151 CREATE_MPI_DATATYPE_NULL(MPI_PACKED);
152 // Internal use only
153 CREATE_MPI_DATATYPE(MPI_PTR, void*);
154
155 /** Check if the datatype is usable for communications
156  */
157 int is_datatype_valid(MPI_Datatype datatype) {
158     return datatype != MPI_DATATYPE_NULL
159         && (datatype->flags & DT_FLAG_COMMITED);
160 }
161
162 size_t smpi_datatype_size(MPI_Datatype datatype)
163 {
164   return datatype->size;
165 }
166
167 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype)
168 {
169   return datatype->lb;
170 }
171
172 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
173 {
174   return datatype->ub;
175 }
176
177 int smpi_datatype_dup(MPI_Datatype datatype, MPI_Datatype* new_t)
178 {
179   int ret=MPI_SUCCESS;
180   *new_t= xbt_new(s_smpi_mpi_datatype_t,1);
181   memcpy(*new_t, datatype, sizeof(s_smpi_mpi_datatype_t));
182   if (datatype->has_subtype){
183     //FIXME: may copy too much information.
184     (*new_t)->substruct=xbt_malloc(sizeof(s_smpi_mpi_struct_t));
185     memcpy((*new_t)->substruct, datatype->substruct, sizeof(s_smpi_mpi_struct_t));
186   }
187   if(datatype->name)
188     (*new_t)->name = strdup(datatype->name);
189   if(datatype->attributes !=NULL){
190       (*new_t)->attributes=xbt_dict_new();
191       xbt_dict_cursor_t cursor = NULL;
192       int *key;
193       int flag;
194       void* value_in;
195       void* value_out;
196       xbt_dict_foreach(datatype->attributes, cursor, key, value_in){
197         smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)key);
198         if(elem && elem->copy_fn!=MPI_NULL_COPY_FN){
199           ret = elem->copy_fn(datatype, atoi((const char*)key), NULL, value_in, &value_out, &flag );
200           if(ret!=MPI_SUCCESS){
201             *new_t=MPI_DATATYPE_NULL;
202             return ret;
203           }
204           if(flag)
205             xbt_dict_set((*new_t)->attributes, (const char*)key,value_out, NULL);
206         }
207       }
208     }
209   return ret;
210 }
211
212 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
213                          MPI_Aint * extent)
214 {
215   if(datatype == MPI_DATATYPE_NULL){
216     *lb=0;
217     *extent=0;
218     return MPI_SUCCESS;
219   }
220   *lb = datatype->lb;
221   *extent = datatype->ub - datatype->lb;
222   return MPI_SUCCESS;
223 }
224
225 MPI_Aint smpi_datatype_get_extent(MPI_Datatype datatype){
226   if(datatype == MPI_DATATYPE_NULL){
227     return 0;
228   }
229   return datatype->ub - datatype->lb;
230 }
231
232 void smpi_datatype_get_name(MPI_Datatype datatype, char* name, int* length){
233   *length = strlen(datatype->name);
234   strcpy(name, datatype->name);
235 }
236
237 void smpi_datatype_set_name(MPI_Datatype datatype, char* name){
238   datatype->name = strdup(name);;
239 }
240
241 int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
242                        void *recvbuf, int recvcount, MPI_Datatype recvtype)
243 {
244   int count;
245   if(smpi_privatize_global_variables){
246     smpi_switch_data_segment(smpi_process_index());
247   }
248   /* First check if we really have something to do */
249   if (recvcount > 0 && recvbuf != sendbuf) {
250     /* FIXME: treat packed cases */
251     sendcount *= smpi_datatype_size(sendtype);
252     recvcount *= smpi_datatype_size(recvtype);
253     count = sendcount < recvcount ? sendcount : recvcount;
254
255     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
256       if(!smpi_process_get_replaying()) memcpy(recvbuf, sendbuf, count);
257     }
258     else if (sendtype->has_subtype == 0)
259     {
260       s_smpi_subtype_t *subtype =  recvtype->substruct;
261       subtype->unserialize( sendbuf, recvbuf,1, subtype, MPI_REPLACE);
262     }
263     else if (recvtype->has_subtype == 0)
264     {
265       s_smpi_subtype_t *subtype =  sendtype->substruct;
266       subtype->serialize(sendbuf, recvbuf,1, subtype);
267     }else{
268       s_smpi_subtype_t *subtype =  sendtype->substruct;
269
270
271       void * buf_tmp = xbt_malloc(count);
272
273       subtype->serialize( sendbuf, buf_tmp,count/smpi_datatype_size(sendtype), subtype);
274       subtype =  recvtype->substruct;
275       subtype->unserialize( buf_tmp, recvbuf,count/smpi_datatype_size(recvtype), subtype, MPI_REPLACE);
276
277       free(buf_tmp);
278     }
279   }
280
281   return sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
282 }
283
284 /*
285  *  Copies noncontiguous data into contiguous memory.
286  *  @param contiguous_vector - output vector
287  *  @param noncontiguous_vector - input vector
288  *  @param type - pointer contening :
289  *      - stride - stride of between noncontiguous data
290  *      - block_length - the width or height of blocked matrix
291  *      - count - the number of rows of matrix
292  */
293 void serialize_vector( const void *noncontiguous_vector,
294                        void *contiguous_vector,
295                        int count,
296                        void *type)
297 {
298   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
299   int i;
300   char* contiguous_vector_char = (char*)contiguous_vector;
301   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
302
303   for (i = 0; i < type_c->block_count * count; i++) {
304       if (type_c->old_type->has_subtype == 0)
305         memcpy(contiguous_vector_char,
306                noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
307       else
308         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
309                                                                      contiguous_vector_char,
310                                                                      type_c->block_length,
311                                                                      type_c->old_type->substruct);
312
313     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
314     if((i+1)%type_c->block_count ==0)
315     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
316     else
317     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
318   }
319 }
320
321 /*
322  *  Copies contiguous data into noncontiguous memory.
323  *  @param noncontiguous_vector - output vector
324  *  @param contiguous_vector - input vector
325  *  @param type - pointer contening :
326  *      - stride - stride of between noncontiguous data
327  *      - block_length - the width or height of blocked matrix
328  *      - count - the number of rows of matrix
329  */
330 void unserialize_vector( const void *contiguous_vector,
331                          void *noncontiguous_vector,
332                          int count,
333                          void *type,
334                          MPI_Op op)
335 {
336   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
337   int i;
338
339   char* contiguous_vector_char = (char*)contiguous_vector;
340   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
341
342   for (i = 0; i < type_c->block_count * count; i++) {
343     if (type_c->old_type->has_subtype == 0)
344       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
345           &type_c->old_type);
346      /* memcpy(noncontiguous_vector_char,
347              contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
348     else
349       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
350                                                                      noncontiguous_vector_char,
351                                                                      type_c->block_length,
352                                                                      type_c->old_type->substruct,
353                                                                      op);
354     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
355     if((i+1)%type_c->block_count ==0)
356     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
357     else
358     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
359   }
360 }
361
362 /*
363  * Create a Sub type vector to be able to serialize and unserialize it
364  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
365  * required the functions unserialize and serialize
366  *
367  */
368 s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
369                                                   int block_length,
370                                                   int block_count,
371                                                   MPI_Datatype old_type,
372                                                   int size_oldtype){
373   s_smpi_mpi_vector_t *new_t= xbt_new(s_smpi_mpi_vector_t,1);
374   new_t->base.serialize = &serialize_vector;
375   new_t->base.unserialize = &unserialize_vector;
376   new_t->base.subtype_free = &free_vector;
377   new_t->block_stride = block_stride;
378   new_t->block_length = block_length;
379   new_t->block_count = block_count;
380   smpi_datatype_use(old_type);
381   new_t->old_type = old_type;
382   new_t->size_oldtype = size_oldtype;
383   return new_t;
384 }
385
386 void smpi_datatype_create(MPI_Datatype* new_type, int size,int lb, int ub, int has_subtype,
387                           void *struct_type, int flags){
388   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
389   new_t->name = NULL;
390   new_t->size = size;
391   new_t->has_subtype = size>0? has_subtype:0;
392   new_t->lb = lb;
393   new_t->ub = ub;
394   new_t->flags = flags;
395   new_t->substruct = struct_type;
396   new_t->in_use=0;
397   new_t->attributes=NULL;
398   *new_type = new_t;
399
400 #ifdef HAVE_MC
401   if(MC_is_active())
402     MC_ignore(&(new_t->in_use), sizeof(new_t->in_use));
403 #endif
404 }
405
406 void smpi_datatype_free(MPI_Datatype* type){
407   if((*type)->attributes !=NULL){
408       xbt_dict_cursor_t cursor = NULL;
409       int* key;
410       void * value;
411       int flag;
412       xbt_dict_foreach((*type)->attributes, cursor, key, value){
413         smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)key);
414         if(elem &&  elem->delete_fn)
415           elem->delete_fn(*type, atoi((const char*)key), value, &flag);
416       }
417   }
418
419   if((*type)->flags & DT_FLAG_PREDEFINED)return;
420
421   //if still used, mark for deletion
422   if((*type)->in_use!=0){
423       (*type)->flags |=DT_FLAG_DESTROYED;
424       return;
425   }
426
427   if ((*type)->has_subtype == 1){
428     ((s_smpi_subtype_t *)(*type)->substruct)->subtype_free(type);  
429     xbt_free((*type)->substruct);
430   }
431   if ((*type)->name != NULL){
432     xbt_free((*type)->name);
433   }
434   xbt_free(*type);
435   *type = MPI_DATATYPE_NULL;
436 }
437
438 void smpi_datatype_use(MPI_Datatype type){
439   if(type)type->in_use++;
440
441 #ifdef HAVE_MC
442   if(MC_is_active())
443     MC_ignore(&(type->in_use), sizeof(type->in_use));
444 #endif
445 }
446
447
448 void smpi_datatype_unuse(MPI_Datatype type){
449   if(type && type->in_use-- == 0 && (type->flags & DT_FLAG_DESTROYED))
450     smpi_datatype_free(&type);
451
452 #ifdef HAVE_MC
453   if(MC_is_active())
454     MC_ignore(&(type->in_use), sizeof(type->in_use));
455 #endif
456 }
457
458
459
460
461 /*
462 Contiguous Implementation
463 */
464
465
466 /*
467  *  Copies noncontiguous data into contiguous memory.
468  *  @param contiguous_hvector - output hvector
469  *  @param noncontiguous_hvector - input hvector
470  *  @param type - pointer contening :
471  *      - stride - stride of between noncontiguous data, in bytes
472  *      - block_length - the width or height of blocked matrix
473  *      - count - the number of rows of matrix
474  */
475 void serialize_contiguous( const void *noncontiguous_hvector,
476                        void *contiguous_hvector,
477                        int count,
478                        void *type)
479 {
480   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
481   char* contiguous_vector_char = (char*)contiguous_hvector;
482   char* noncontiguous_vector_char = (char*)noncontiguous_hvector+type_c->lb;
483   memcpy(contiguous_vector_char,
484            noncontiguous_vector_char, count* type_c->block_count * type_c->size_oldtype);
485 }
486 /*
487  *  Copies contiguous data into noncontiguous memory.
488  *  @param noncontiguous_vector - output hvector
489  *  @param contiguous_vector - input hvector
490  *  @param type - pointer contening :
491  *      - stride - stride of between noncontiguous data, in bytes
492  *      - block_length - the width or height of blocked matrix
493  *      - count - the number of rows of matrix
494  */
495 void unserialize_contiguous( const void *contiguous_vector,
496                          void *noncontiguous_vector,
497                          int count,
498                          void *type,
499                          MPI_Op op)
500 {
501   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
502   char* contiguous_vector_char = (char*)contiguous_vector;
503   char* noncontiguous_vector_char = (char*)noncontiguous_vector+type_c->lb;
504   int n= count* type_c->block_count;
505   smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &n,
506             &type_c->old_type);
507        /*memcpy(noncontiguous_vector_char,
508            contiguous_vector_char, count*  type_c->block_count * type_c->size_oldtype);*/
509 }
510
511 void free_contiguous(MPI_Datatype* d){
512   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
513 }
514
515 /*
516  * Create a Sub type contiguous to be able to serialize and unserialize it
517  * the structure s_smpi_mpi_contiguous_t is derived from s_smpi_subtype which
518  * required the functions unserialize and serialize
519  *
520  */
521 s_smpi_mpi_contiguous_t* smpi_datatype_contiguous_create( MPI_Aint lb,
522                                                   int block_count,
523                                                   MPI_Datatype old_type,
524                                                   int size_oldtype){
525   s_smpi_mpi_contiguous_t *new_t= xbt_new(s_smpi_mpi_contiguous_t,1);
526   new_t->base.serialize = &serialize_contiguous;
527   new_t->base.unserialize = &unserialize_contiguous;
528   new_t->base.subtype_free = &free_contiguous;
529   new_t->lb = lb;
530   new_t->block_count = block_count;
531   new_t->old_type = old_type;
532   new_t->size_oldtype = size_oldtype;
533   smpi_datatype_use(old_type);
534   return new_t;
535 }
536
537
538
539
540 int smpi_datatype_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* new_type, MPI_Aint lb)
541 {
542   int retval;
543   if(old_type->has_subtype){
544           //handle this case as a hvector with stride equals to the extent of the datatype
545           return smpi_datatype_hvector(count, 1, smpi_datatype_get_extent(old_type), old_type, new_type);
546   }
547   
548   s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
549                                                                 count,
550                                                                 old_type,
551                                                                 smpi_datatype_size(old_type));
552                                                                 
553   smpi_datatype_create(new_type,
554                                           count * smpi_datatype_size(old_type),
555                                           lb,lb + count * smpi_datatype_size(old_type),
556                                           1,subtype, DT_FLAG_CONTIGUOUS);
557   retval=MPI_SUCCESS;
558   return retval;
559 }
560
561 int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_type, MPI_Datatype* new_type)
562 {
563   int retval;
564   if (blocklen<0) return MPI_ERR_ARG;
565   MPI_Aint lb = 0;
566   MPI_Aint ub = 0;
567   if(count>0){
568     lb=smpi_datatype_lb(old_type);
569     ub=((count-1)*stride+blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
570   }
571   if(old_type->has_subtype || stride != blocklen){
572
573
574     s_smpi_mpi_vector_t* subtype = smpi_datatype_vector_create( stride,
575                                                                 blocklen,
576                                                                 count,
577                                                                 old_type,
578                                                                 smpi_datatype_size(old_type));
579     smpi_datatype_create(new_type,
580                          count * (blocklen) * smpi_datatype_size(old_type), lb,
581                          ub,
582                          1,
583                          subtype,
584                          DT_FLAG_VECTOR);
585     retval=MPI_SUCCESS;
586   }else{
587     /* in this situation the data are contignous thus it's not
588      * required to serialize and unserialize it*/
589     smpi_datatype_create(new_type, count * blocklen *
590                          smpi_datatype_size(old_type), 0, ((count -1) * stride + blocklen)*
591                          smpi_datatype_size(old_type),
592                          0,
593                          NULL,
594                          DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
595     retval=MPI_SUCCESS;
596   }
597   return retval;
598 }
599
600 void free_vector(MPI_Datatype* d){
601   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
602 }
603
604 /*
605 Hvector Implementation - Vector with stride in bytes
606 */
607
608
609 /*
610  *  Copies noncontiguous data into contiguous memory.
611  *  @param contiguous_hvector - output hvector
612  *  @param noncontiguous_hvector - input hvector
613  *  @param type - pointer contening :
614  *      - stride - stride of between noncontiguous data, in bytes
615  *      - block_length - the width or height of blocked matrix
616  *      - count - the number of rows of matrix
617  */
618 void serialize_hvector( const void *noncontiguous_hvector,
619                        void *contiguous_hvector,
620                        int count,
621                        void *type)
622 {
623   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
624   int i;
625   char* contiguous_vector_char = (char*)contiguous_hvector;
626   char* noncontiguous_vector_char = (char*)noncontiguous_hvector;
627
628   for (i = 0; i < type_c->block_count * count; i++) {
629     if (type_c->old_type->has_subtype == 0)
630       memcpy(contiguous_vector_char,
631            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
632     else
633       ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
634                                                                    contiguous_vector_char,
635                                                                    type_c->block_length,
636                                                                    type_c->old_type->substruct);
637
638     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
639     if((i+1)%type_c->block_count ==0)
640     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
641     else
642     noncontiguous_vector_char += type_c->block_stride;
643   }
644 }
645 /*
646  *  Copies contiguous data into noncontiguous memory.
647  *  @param noncontiguous_vector - output hvector
648  *  @param contiguous_vector - input hvector
649  *  @param type - pointer contening :
650  *      - stride - stride of between noncontiguous data, in bytes
651  *      - block_length - the width or height of blocked matrix
652  *      - count - the number of rows of matrix
653  */
654 void unserialize_hvector( const void *contiguous_vector,
655                          void *noncontiguous_vector,
656                          int count,
657                          void *type,
658                          MPI_Op op)
659 {
660   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
661   int i;
662
663   char* contiguous_vector_char = (char*)contiguous_vector;
664   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
665
666   for (i = 0; i < type_c->block_count * count; i++) {
667     if (type_c->old_type->has_subtype == 0)
668       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
669                   &type_c->old_type);
670              /*memcpy(noncontiguous_vector_char,
671            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
672     else
673       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
674                                                                      noncontiguous_vector_char,
675                                                                      type_c->block_length,
676                                                                      type_c->old_type->substruct,
677                                                                      op);
678     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
679     if((i+1)%type_c->block_count ==0)
680     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
681     else
682     noncontiguous_vector_char += type_c->block_stride;
683   }
684 }
685
686 /*
687  * Create a Sub type vector to be able to serialize and unserialize it
688  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
689  * required the functions unserialize and serialize
690  *
691  */
692 s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
693                                                   int block_length,
694                                                   int block_count,
695                                                   MPI_Datatype old_type,
696                                                   int size_oldtype){
697   s_smpi_mpi_hvector_t *new_t= xbt_new(s_smpi_mpi_hvector_t,1);
698   new_t->base.serialize = &serialize_hvector;
699   new_t->base.unserialize = &unserialize_hvector;
700   new_t->base.subtype_free = &free_hvector;
701   new_t->block_stride = block_stride;
702   new_t->block_length = block_length;
703   new_t->block_count = block_count;
704   new_t->old_type = old_type;
705   new_t->size_oldtype = size_oldtype;
706   smpi_datatype_use(old_type);
707   return new_t;
708 }
709
710 //do nothing for vector types
711 void free_hvector(MPI_Datatype* d){
712   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
713 }
714
715 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
716 {
717   int retval;
718   if (blocklen<0) return MPI_ERR_ARG;
719   MPI_Aint lb = 0;
720   MPI_Aint ub = 0;
721   if(count>0){
722     lb=smpi_datatype_lb(old_type);
723     ub=((count-1)*stride)+(blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
724   }
725   if(old_type->has_subtype || stride != blocklen*smpi_datatype_get_extent(old_type)){
726     s_smpi_mpi_hvector_t* subtype = smpi_datatype_hvector_create( stride,
727                                                                   blocklen,
728                                                                   count,
729                                                                   old_type,
730                                                                   smpi_datatype_size(old_type));
731
732     smpi_datatype_create(new_type, count * blocklen * smpi_datatype_size(old_type),
733                                                  lb,ub,
734                          1,
735                          subtype,
736                          DT_FLAG_VECTOR);
737     retval=MPI_SUCCESS;
738   }else{
739     smpi_datatype_create(new_type, count * blocklen *
740                                              smpi_datatype_size(old_type),0,count * blocklen *
741                                              smpi_datatype_size(old_type),
742                                             0,
743                                             NULL,
744                                             DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
745     retval=MPI_SUCCESS;
746   }
747   return retval;
748 }
749
750
751 /*
752 Indexed Implementation
753 */
754
755 /*
756  *  Copies noncontiguous data into contiguous memory.
757  *  @param contiguous_indexed - output indexed
758  *  @param noncontiguous_indexed - input indexed
759  *  @param type - pointer contening :
760  *      - block_lengths - the width or height of blocked matrix
761  *      - block_indices - indices of each data, in element
762  *      - count - the number of rows of matrix
763  */
764 void serialize_indexed( const void *noncontiguous_indexed,
765                        void *contiguous_indexed,
766                        int count,
767                        void *type)
768 {
769   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
770   int i,j;
771   char* contiguous_indexed_char = (char*)contiguous_indexed;
772   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0] * type_c->size_oldtype;
773   for(j=0; j<count;j++){
774     for (i = 0; i < type_c->block_count; i++) {
775       if (type_c->old_type->has_subtype == 0)
776         memcpy(contiguous_indexed_char,
777                      noncontiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
778       else
779         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_indexed_char,
780                                                                      contiguous_indexed_char,
781                                                                      type_c->block_lengths[i],
782                                                                      type_c->old_type->substruct);
783
784
785       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
786       if (i<type_c->block_count-1)noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
787       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
788     }
789     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
790   }
791 }
792 /*
793  *  Copies contiguous data into noncontiguous memory.
794  *  @param noncontiguous_indexed - output indexed
795  *  @param contiguous_indexed - input indexed
796  *  @param type - pointer contening :
797  *      - block_lengths - the width or height of blocked matrix
798  *      - block_indices - indices of each data, in element
799  *      - count - the number of rows of matrix
800  */
801 void unserialize_indexed( const void *contiguous_indexed,
802                          void *noncontiguous_indexed,
803                          int count,
804                          void *type,
805                          MPI_Op op)
806 {
807
808   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
809   int i,j;
810   char* contiguous_indexed_char = (char*)contiguous_indexed;
811   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0]*smpi_datatype_get_extent(type_c->old_type);
812   for(j=0; j<count;j++){
813     for (i = 0; i < type_c->block_count; i++) {
814       if (type_c->old_type->has_subtype == 0)
815         smpi_op_apply(op, contiguous_indexed_char, noncontiguous_indexed_char, &type_c->block_lengths[i],
816                     &type_c->old_type);
817                /*memcpy(noncontiguous_indexed_char ,
818              contiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
819       else
820         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_indexed_char,
821                                                                        noncontiguous_indexed_char,
822                                                                        type_c->block_lengths[i],
823                                                                        type_c->old_type->substruct,
824                                                                        op);
825
826       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
827       if (i<type_c->block_count-1)
828         noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
829       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
830     }
831     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
832   }
833 }
834
835 void free_indexed(MPI_Datatype* type){
836   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths);
837   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices);
838   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
839 }
840
841 /*
842  * Create a Sub type indexed to be able to serialize and unserialize it
843  * the structure s_smpi_mpi_indexed_t is derived from s_smpi_subtype which
844  * required the functions unserialize and serialize
845  */
846 s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
847                                                   int* block_indices,
848                                                   int block_count,
849                                                   MPI_Datatype old_type,
850                                                   int size_oldtype){
851   s_smpi_mpi_indexed_t *new_t= xbt_new(s_smpi_mpi_indexed_t,1);
852   new_t->base.serialize = &serialize_indexed;
853   new_t->base.unserialize = &unserialize_indexed;
854   new_t->base.subtype_free = &free_indexed;
855  //TODO : add a custom function for each time to clean these 
856   new_t->block_lengths= xbt_new(int, block_count);
857   new_t->block_indices= xbt_new(int, block_count);
858   int i;
859   for(i=0;i<block_count;i++){
860     new_t->block_lengths[i]=block_lengths[i];
861     new_t->block_indices[i]=block_indices[i];
862   }
863   new_t->block_count = block_count;
864   smpi_datatype_use(old_type);
865   new_t->old_type = old_type;
866   new_t->size_oldtype = size_oldtype;
867   return new_t;
868 }
869
870
871 int smpi_datatype_indexed(int count, int* blocklens, int* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
872 {
873   int i;
874   int retval;
875   int size = 0;
876   int contiguous=1;
877   MPI_Aint lb = 0;
878   MPI_Aint ub = 0;
879   if(count>0){
880     lb=indices[0]*smpi_datatype_get_extent(old_type);
881     ub=indices[0]*smpi_datatype_get_extent(old_type) + blocklens[0]*smpi_datatype_ub(old_type);
882   }
883
884   for(i=0; i< count; i++){
885     if   (blocklens[i]<0)
886       return MPI_ERR_ARG;
887     size += blocklens[i];
888
889     if(indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type)<lb)
890         lb = indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type);
891     if(indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type)>ub)
892         ub = indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type);
893
894     if ( (i< count -1) && (indices[i]+blocklens[i] != indices[i+1]) )contiguous=0;
895   }
896   if (old_type->has_subtype == 1)
897     contiguous=0;
898
899   if(!contiguous){
900     s_smpi_mpi_indexed_t* subtype = smpi_datatype_indexed_create( blocklens,
901                                                                   indices,
902                                                                   count,
903                                                                   old_type,
904                                                                   smpi_datatype_size(old_type));
905      smpi_datatype_create(new_type,  size *
906                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA);
907   }else{
908     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
909                                                                   size,
910                                                                   old_type,
911                                                                   smpi_datatype_size(old_type));
912     smpi_datatype_create(new_type,  size *
913                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
914   }
915   retval=MPI_SUCCESS;
916   return retval;
917 }
918
919
920 /*
921 Hindexed Implementation - Indexed with indices in bytes 
922 */
923
924 /*
925  *  Copies noncontiguous data into contiguous memory.
926  *  @param contiguous_hindexed - output hindexed
927  *  @param noncontiguous_hindexed - input hindexed
928  *  @param type - pointer contening :
929  *      - block_lengths - the width or height of blocked matrix
930  *      - block_indices - indices of each data, in bytes
931  *      - count - the number of rows of matrix
932  */
933 void serialize_hindexed( const void *noncontiguous_hindexed,
934                        void *contiguous_hindexed,
935                        int count,
936                        void *type)
937 {
938   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
939   int i,j;
940   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
941   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
942   for(j=0; j<count;j++){
943     for (i = 0; i < type_c->block_count; i++) {
944       if (type_c->old_type->has_subtype == 0)
945         memcpy(contiguous_hindexed_char,
946                      noncontiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
947       else
948         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_hindexed_char,
949                                                                      contiguous_hindexed_char,
950                                                                      type_c->block_lengths[i],
951                                                                      type_c->old_type->substruct);
952
953       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
954       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
955       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
956     }
957     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
958   }
959 }
960 /*
961  *  Copies contiguous data into noncontiguous memory.
962  *  @param noncontiguous_hindexed - output hindexed
963  *  @param contiguous_hindexed - input hindexed
964  *  @param type - pointer contening :
965  *      - block_lengths - the width or height of blocked matrix
966  *      - block_indices - indices of each data, in bytes
967  *      - count - the number of rows of matrix
968  */
969 void unserialize_hindexed( const void *contiguous_hindexed,
970                          void *noncontiguous_hindexed,
971                          int count,
972                          void *type,
973                          MPI_Op op)
974 {
975   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
976   int i,j;
977
978   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
979   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
980   for(j=0; j<count;j++){
981     for (i = 0; i < type_c->block_count; i++) {
982       if (type_c->old_type->has_subtype == 0)
983         smpi_op_apply(op, contiguous_hindexed_char, noncontiguous_hindexed_char, &type_c->block_lengths[i],
984                             &type_c->old_type);
985                        /*memcpy(noncontiguous_hindexed_char,
986                contiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
987       else
988         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_hindexed_char,
989                                                                        noncontiguous_hindexed_char,
990                                                                        type_c->block_lengths[i],
991                                                                        type_c->old_type->substruct,
992                                                                        op);
993
994       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
995       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
996       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
997     }
998     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
999   }
1000 }
1001
1002 void free_hindexed(MPI_Datatype* type){
1003   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths);
1004   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices);
1005   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
1006 }
1007
1008 /*
1009  * Create a Sub type hindexed to be able to serialize and unserialize it
1010  * the structure s_smpi_mpi_hindexed_t is derived from s_smpi_subtype which
1011  * required the functions unserialize and serialize
1012  */
1013 s_smpi_mpi_hindexed_t* smpi_datatype_hindexed_create( int* block_lengths,
1014                                                   MPI_Aint* block_indices,
1015                                                   int block_count,
1016                                                   MPI_Datatype old_type,
1017                                                   int size_oldtype){
1018   s_smpi_mpi_hindexed_t *new_t= xbt_new(s_smpi_mpi_hindexed_t,1);
1019   new_t->base.serialize = &serialize_hindexed;
1020   new_t->base.unserialize = &unserialize_hindexed;
1021   new_t->base.subtype_free = &free_hindexed;
1022  //TODO : add a custom function for each time to clean these 
1023   new_t->block_lengths= xbt_new(int, block_count);
1024   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1025   int i;
1026   for(i=0;i<block_count;i++){
1027     new_t->block_lengths[i]=block_lengths[i];
1028     new_t->block_indices[i]=block_indices[i];
1029   }
1030   new_t->block_count = block_count;
1031   new_t->old_type = old_type;
1032   new_t->size_oldtype = size_oldtype;
1033   return new_t;
1034 }
1035
1036
1037 int smpi_datatype_hindexed(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
1038 {
1039   int i;
1040   int retval;
1041   int size = 0;
1042   int contiguous=1;
1043   MPI_Aint lb = 0;
1044   MPI_Aint ub = 0;
1045   if(count>0){
1046     lb=indices[0] + smpi_datatype_lb(old_type);
1047     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_type);
1048   }
1049   for(i=0; i< count; i++){
1050     if   (blocklens[i]<0)
1051       return MPI_ERR_ARG;
1052     size += blocklens[i];
1053
1054     if(indices[i]+smpi_datatype_lb(old_type)<lb) lb = indices[i]+smpi_datatype_lb(old_type);
1055     if(indices[i]+blocklens[i]*smpi_datatype_ub(old_type)>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_type);
1056
1057     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_type) != indices[i+1]) )contiguous=0;
1058   }
1059   if (old_type->has_subtype == 1 || lb!=0)
1060     contiguous=0;
1061
1062   if(!contiguous){
1063     s_smpi_mpi_hindexed_t* subtype = smpi_datatype_hindexed_create( blocklens,
1064                                                                   indices,
1065                                                                   count,
1066                                                                   old_type,
1067                                                                   smpi_datatype_size(old_type));
1068     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1069                                                  lb,
1070                          ub
1071                          ,1, subtype, DT_FLAG_DATA);
1072   }else{
1073     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1074                                                                   size,
1075                                                                   old_type,
1076                                                                   smpi_datatype_size(old_type));
1077     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1078                                              0,size * smpi_datatype_size(old_type),
1079                                              1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1080   }
1081   retval=MPI_SUCCESS;
1082   return retval;
1083 }
1084
1085
1086 /*
1087 struct Implementation - Indexed with indices in bytes 
1088 */
1089
1090 /*
1091  *  Copies noncontiguous data into contiguous memory.
1092  *  @param contiguous_struct - output struct
1093  *  @param noncontiguous_struct - input struct
1094  *  @param type - pointer contening :
1095  *      - stride - stride of between noncontiguous data
1096  *      - block_length - the width or height of blocked matrix
1097  *      - count - the number of rows of matrix
1098  */
1099 void serialize_struct( const void *noncontiguous_struct,
1100                        void *contiguous_struct,
1101                        int count,
1102                        void *type)
1103 {
1104   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1105   int i,j;
1106   char* contiguous_struct_char = (char*)contiguous_struct;
1107   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1108   for(j=0; j<count;j++){
1109     for (i = 0; i < type_c->block_count; i++) {
1110       if (type_c->old_types[i]->has_subtype == 0)
1111         memcpy(contiguous_struct_char,
1112              noncontiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
1113       else
1114         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->serialize( noncontiguous_struct_char,
1115                                                                          contiguous_struct_char,
1116                                                                          type_c->block_lengths[i],
1117                                                                          type_c->old_types[i]->substruct);
1118
1119
1120       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1121       if (i<type_c->block_count-1)noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
1122       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);//let's hope this is MPI_UB ?
1123     }
1124     noncontiguous_struct=(void*)noncontiguous_struct_char;
1125   }
1126 }
1127 /*
1128  *  Copies contiguous data into noncontiguous memory.
1129  *  @param noncontiguous_struct - output struct
1130  *  @param contiguous_struct - input struct
1131  *  @param type - pointer contening :
1132  *      - stride - stride of between noncontiguous data
1133  *      - block_length - the width or height of blocked matrix
1134  *      - count - the number of rows of matrix
1135  */
1136 void unserialize_struct( const void *contiguous_struct,
1137                          void *noncontiguous_struct,
1138                          int count,
1139                          void *type,
1140                          MPI_Op op)
1141 {
1142   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1143   int i,j;
1144
1145   char* contiguous_struct_char = (char*)contiguous_struct;
1146   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1147   for(j=0; j<count;j++){
1148     for (i = 0; i < type_c->block_count; i++) {
1149       if (type_c->old_types[i]->has_subtype == 0)
1150         smpi_op_apply(op, contiguous_struct_char, noncontiguous_struct_char, &type_c->block_lengths[i],
1151            & type_c->old_types[i]);
1152                        /*memcpy(noncontiguous_struct_char,
1153              contiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));*/
1154       else
1155         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->unserialize( contiguous_struct_char,
1156                                                                            noncontiguous_struct_char,
1157                                                                            type_c->block_lengths[i],
1158                                                                            type_c->old_types[i]->substruct,
1159                                                                            op);
1160
1161       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1162       if (i<type_c->block_count-1)noncontiguous_struct_char =  (char*)noncontiguous_struct + type_c->block_indices[i+1];
1163       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);
1164     }
1165     noncontiguous_struct=(void*)noncontiguous_struct_char;
1166     
1167   }
1168 }
1169
1170 void free_struct(MPI_Datatype* type){
1171   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths);
1172   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices);
1173   int i=0;
1174   for (i = 0; i < ((s_smpi_mpi_struct_t *)(*type)->substruct)->block_count; i++)
1175     smpi_datatype_unuse(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types[i]);
1176   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types);
1177 }
1178
1179 /*
1180  * Create a Sub type struct to be able to serialize and unserialize it
1181  * the structure s_smpi_mpi_struct_t is derived from s_smpi_subtype which
1182  * required the functions unserialize and serialize
1183  */
1184 s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
1185                                                   MPI_Aint* block_indices,
1186                                                   int block_count,
1187                                                   MPI_Datatype* old_types){
1188   s_smpi_mpi_struct_t *new_t= xbt_new(s_smpi_mpi_struct_t,1);
1189   new_t->base.serialize = &serialize_struct;
1190   new_t->base.unserialize = &unserialize_struct;
1191   new_t->base.subtype_free = &free_struct;
1192  //TODO : add a custom function for each time to clean these 
1193   new_t->block_lengths= xbt_new(int, block_count);
1194   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1195   new_t->old_types=  xbt_new(MPI_Datatype, block_count);
1196   int i;
1197   for(i=0;i<block_count;i++){
1198     new_t->block_lengths[i]=block_lengths[i];
1199     new_t->block_indices[i]=block_indices[i];
1200     new_t->old_types[i]=old_types[i];
1201     smpi_datatype_use(new_t->old_types[i]);
1202   }
1203   //new_t->block_lengths = block_lengths;
1204   //new_t->block_indices = block_indices;
1205   new_t->block_count = block_count;
1206   //new_t->old_types = old_types;
1207   return new_t;
1208 }
1209
1210
1211 int smpi_datatype_struct(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype* old_types, MPI_Datatype* new_type)
1212 {
1213   int i;
1214   size_t size = 0;
1215   int contiguous=1;
1216   size = 0;
1217   MPI_Aint lb = 0;
1218   MPI_Aint ub = 0;
1219   if(count>0){
1220     lb=indices[0] + smpi_datatype_lb(old_types[0]);
1221     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_types[0]);
1222   }
1223   int forced_lb=0;
1224   int forced_ub=0;
1225   for(i=0; i< count; i++){
1226     if (blocklens[i]<0)
1227       return MPI_ERR_ARG;
1228     if (old_types[i]->has_subtype == 1)
1229       contiguous=0;
1230
1231     size += blocklens[i]*smpi_datatype_size(old_types[i]);
1232     if (old_types[i]==MPI_LB){
1233       lb=indices[i];
1234       forced_lb=1;
1235     }
1236     if (old_types[i]==MPI_UB){
1237       ub=indices[i];
1238       forced_ub=1;
1239     }
1240
1241     if(!forced_lb && indices[i]+smpi_datatype_lb(old_types[i])<lb) lb = indices[i];
1242     if(!forced_ub && indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i])>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i]);
1243
1244     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_types[i]) != indices[i+1]) )contiguous=0;
1245   }
1246
1247   if(!contiguous){
1248     s_smpi_mpi_struct_t* subtype = smpi_datatype_struct_create( blocklens,
1249                                                               indices,
1250                                                               count,
1251                                                               old_types);
1252
1253     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA);
1254   }else{
1255     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1256                                                                   size,
1257                                                                   MPI_CHAR,
1258                                                                   1);
1259     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1260   }
1261   return MPI_SUCCESS;
1262 }
1263
1264 void smpi_datatype_commit(MPI_Datatype *datatype)
1265 {
1266   (*datatype)->flags=  ((*datatype)->flags | DT_FLAG_COMMITED);
1267 }
1268
1269 typedef struct s_smpi_mpi_op {
1270   MPI_User_function *func;
1271   int is_commute;
1272 } s_smpi_mpi_op_t;
1273
1274 #define MAX_OP(a, b)  (b) = (a) < (b) ? (b) : (a)
1275 #define MIN_OP(a, b)  (b) = (a) < (b) ? (a) : (b)
1276 #define SUM_OP(a, b)  (b) += (a)
1277 #define PROD_OP(a, b) (b) *= (a)
1278 #define LAND_OP(a, b) (b) = (a) && (b)
1279 #define LOR_OP(a, b)  (b) = (a) || (b)
1280 #define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b))
1281 #define BAND_OP(a, b) (b) &= (a)
1282 #define BOR_OP(a, b)  (b) |= (a)
1283 #define BXOR_OP(a, b) (b) ^= (a)
1284 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
1285 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
1286
1287 #define APPLY_FUNC(a, b, length, type, func) \
1288 {                                          \
1289   int i;                                   \
1290   type* x = (type*)(a);                    \
1291   type* y = (type*)(b);                    \
1292   for(i = 0; i < *(length); i++) {         \
1293     func(x[i], y[i]);                      \
1294   }                                        \
1295 }
1296
1297 static void max_func(void *a, void *b, int *length,
1298                      MPI_Datatype * datatype)
1299 {
1300   if (*datatype == MPI_CHAR) {
1301     APPLY_FUNC(a, b, length, char, MAX_OP);
1302   } else if (*datatype == MPI_SHORT) {
1303     APPLY_FUNC(a, b, length, short, MAX_OP);
1304   } else if (*datatype == MPI_INT) {
1305     APPLY_FUNC(a, b, length, int, MAX_OP);
1306   } else if (*datatype == MPI_LONG) {
1307     APPLY_FUNC(a, b, length, long, MAX_OP);
1308   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1309     APPLY_FUNC(a, b, length, unsigned short, MAX_OP);
1310   } else if (*datatype == MPI_UNSIGNED) {
1311     APPLY_FUNC(a, b, length, unsigned int, MAX_OP);
1312   } else if (*datatype == MPI_UNSIGNED_LONG) {
1313     APPLY_FUNC(a, b, length, unsigned long, MAX_OP);
1314   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1315     APPLY_FUNC(a, b, length, unsigned char, MAX_OP);
1316   } else if (*datatype == MPI_FLOAT) {
1317     APPLY_FUNC(a, b, length, float, MAX_OP);
1318   } else if (*datatype == MPI_DOUBLE) {
1319     APPLY_FUNC(a, b, length, double, MAX_OP);
1320   } else if (*datatype == MPI_LONG_DOUBLE) {
1321     APPLY_FUNC(a, b, length, long double, MAX_OP);
1322   }
1323 }
1324
1325 static void min_func(void *a, void *b, int *length,
1326                      MPI_Datatype * datatype)
1327 {
1328   if (*datatype == MPI_CHAR) {
1329     APPLY_FUNC(a, b, length, char, MIN_OP);
1330   } else if (*datatype == MPI_SHORT) {
1331     APPLY_FUNC(a, b, length, short, MIN_OP);
1332   } else if (*datatype == MPI_INT) {
1333     APPLY_FUNC(a, b, length, int, MIN_OP);
1334   } else if (*datatype == MPI_LONG) {
1335     APPLY_FUNC(a, b, length, long, MIN_OP);
1336   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1337     APPLY_FUNC(a, b, length, unsigned short, MIN_OP);
1338   } else if (*datatype == MPI_UNSIGNED) {
1339     APPLY_FUNC(a, b, length, unsigned int, MIN_OP);
1340   } else if (*datatype == MPI_UNSIGNED_LONG) {
1341     APPLY_FUNC(a, b, length, unsigned long, MIN_OP);
1342   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1343     APPLY_FUNC(a, b, length, unsigned char, MIN_OP);
1344   } else if (*datatype == MPI_FLOAT) {
1345     APPLY_FUNC(a, b, length, float, MIN_OP);
1346   } else if (*datatype == MPI_DOUBLE) {
1347     APPLY_FUNC(a, b, length, double, MIN_OP);
1348   } else if (*datatype == MPI_LONG_DOUBLE) {
1349     APPLY_FUNC(a, b, length, long double, MIN_OP);
1350   }
1351 }
1352
1353 static void sum_func(void *a, void *b, int *length,
1354                      MPI_Datatype * datatype)
1355 {
1356   if (*datatype == MPI_CHAR) {
1357     APPLY_FUNC(a, b, length, char, SUM_OP);
1358   } else if (*datatype == MPI_SHORT) {
1359     APPLY_FUNC(a, b, length, short, SUM_OP);
1360   } else if (*datatype == MPI_INT) {
1361     APPLY_FUNC(a, b, length, int, SUM_OP);
1362   } else if (*datatype == MPI_LONG) {
1363     APPLY_FUNC(a, b, length, long, SUM_OP);
1364   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1365     APPLY_FUNC(a, b, length, unsigned short, SUM_OP);
1366   } else if (*datatype == MPI_UNSIGNED) {
1367     APPLY_FUNC(a, b, length, unsigned int, SUM_OP);
1368   } else if (*datatype == MPI_UNSIGNED_LONG) {
1369     APPLY_FUNC(a, b, length, unsigned long, SUM_OP);
1370   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1371     APPLY_FUNC(a, b, length, unsigned char, SUM_OP);
1372   } else if (*datatype == MPI_FLOAT) {
1373     APPLY_FUNC(a, b, length, float, SUM_OP);
1374   } else if (*datatype == MPI_DOUBLE) {
1375     APPLY_FUNC(a, b, length, double, SUM_OP);
1376   } else if (*datatype == MPI_LONG_DOUBLE) {
1377     APPLY_FUNC(a, b, length, long double, SUM_OP);
1378   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1379     APPLY_FUNC(a, b, length, float _Complex, SUM_OP);
1380   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1381     APPLY_FUNC(a, b, length, double _Complex, SUM_OP);
1382   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1383     APPLY_FUNC(a, b, length, long double _Complex, SUM_OP);
1384   }
1385 }
1386
1387 static void prod_func(void *a, void *b, int *length,
1388                       MPI_Datatype * datatype)
1389 {
1390   if (*datatype == MPI_CHAR) {
1391     APPLY_FUNC(a, b, length, char, PROD_OP);
1392   } else if (*datatype == MPI_SHORT) {
1393     APPLY_FUNC(a, b, length, short, PROD_OP);
1394   } else if (*datatype == MPI_INT) {
1395     APPLY_FUNC(a, b, length, int, PROD_OP);
1396   } else if (*datatype == MPI_LONG) {
1397     APPLY_FUNC(a, b, length, long, PROD_OP);
1398   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1399     APPLY_FUNC(a, b, length, unsigned short, PROD_OP);
1400   } else if (*datatype == MPI_UNSIGNED) {
1401     APPLY_FUNC(a, b, length, unsigned int, PROD_OP);
1402   } else if (*datatype == MPI_UNSIGNED_LONG) {
1403     APPLY_FUNC(a, b, length, unsigned long, PROD_OP);
1404   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1405     APPLY_FUNC(a, b, length, unsigned char, PROD_OP);
1406   } else if (*datatype == MPI_FLOAT) {
1407     APPLY_FUNC(a, b, length, float, PROD_OP);
1408   } else if (*datatype == MPI_DOUBLE) {
1409     APPLY_FUNC(a, b, length, double, PROD_OP);
1410   } else if (*datatype == MPI_LONG_DOUBLE) {
1411     APPLY_FUNC(a, b, length, long double, PROD_OP);
1412   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1413     APPLY_FUNC(a, b, length, float _Complex, PROD_OP);
1414   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1415     APPLY_FUNC(a, b, length, double _Complex, PROD_OP);
1416   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1417     APPLY_FUNC(a, b, length, long double _Complex, PROD_OP);
1418   }
1419 }
1420
1421 static void land_func(void *a, void *b, int *length,
1422                       MPI_Datatype * datatype)
1423 {
1424   if (*datatype == MPI_CHAR) {
1425     APPLY_FUNC(a, b, length, char, LAND_OP);
1426   } else if (*datatype == MPI_SHORT) {
1427     APPLY_FUNC(a, b, length, short, LAND_OP);
1428   } else if (*datatype == MPI_INT) {
1429     APPLY_FUNC(a, b, length, int, LAND_OP);
1430   } else if (*datatype == MPI_LONG) {
1431     APPLY_FUNC(a, b, length, long, LAND_OP);
1432   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1433     APPLY_FUNC(a, b, length, unsigned short, LAND_OP);
1434   } else if (*datatype == MPI_UNSIGNED) {
1435     APPLY_FUNC(a, b, length, unsigned int, LAND_OP);
1436   } else if (*datatype == MPI_UNSIGNED_LONG) {
1437     APPLY_FUNC(a, b, length, unsigned long, LAND_OP);
1438   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1439     APPLY_FUNC(a, b, length, unsigned char, LAND_OP);
1440   } else if (*datatype == MPI_C_BOOL) {
1441     APPLY_FUNC(a, b, length, _Bool, LAND_OP);
1442   }
1443 }
1444
1445 static void lor_func(void *a, void *b, int *length,
1446                      MPI_Datatype * datatype)
1447 {
1448   if (*datatype == MPI_CHAR) {
1449     APPLY_FUNC(a, b, length, char, LOR_OP);
1450   } else if (*datatype == MPI_SHORT) {
1451     APPLY_FUNC(a, b, length, short, LOR_OP);
1452   } else if (*datatype == MPI_INT) {
1453     APPLY_FUNC(a, b, length, int, LOR_OP);
1454   } else if (*datatype == MPI_LONG) {
1455     APPLY_FUNC(a, b, length, long, LOR_OP);
1456   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1457     APPLY_FUNC(a, b, length, unsigned short, LOR_OP);
1458   } else if (*datatype == MPI_UNSIGNED) {
1459     APPLY_FUNC(a, b, length, unsigned int, LOR_OP);
1460   } else if (*datatype == MPI_UNSIGNED_LONG) {
1461     APPLY_FUNC(a, b, length, unsigned long, LOR_OP);
1462   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1463     APPLY_FUNC(a, b, length, unsigned char, LOR_OP);
1464   } else if (*datatype == MPI_C_BOOL) {
1465     APPLY_FUNC(a, b, length, _Bool, LOR_OP);
1466   }
1467 }
1468
1469 static void lxor_func(void *a, void *b, int *length,
1470                       MPI_Datatype * datatype)
1471 {
1472   if (*datatype == MPI_CHAR) {
1473     APPLY_FUNC(a, b, length, char, LXOR_OP);
1474   } else if (*datatype == MPI_SHORT) {
1475     APPLY_FUNC(a, b, length, short, LXOR_OP);
1476   } else if (*datatype == MPI_INT) {
1477     APPLY_FUNC(a, b, length, int, LXOR_OP);
1478   } else if (*datatype == MPI_LONG) {
1479     APPLY_FUNC(a, b, length, long, LXOR_OP);
1480   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1481     APPLY_FUNC(a, b, length, unsigned short, LXOR_OP);
1482   } else if (*datatype == MPI_UNSIGNED) {
1483     APPLY_FUNC(a, b, length, unsigned int, LXOR_OP);
1484   } else if (*datatype == MPI_UNSIGNED_LONG) {
1485     APPLY_FUNC(a, b, length, unsigned long, LXOR_OP);
1486   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1487     APPLY_FUNC(a, b, length, unsigned char, LXOR_OP);
1488   } else if (*datatype == MPI_C_BOOL) {
1489     APPLY_FUNC(a, b, length, _Bool, LXOR_OP);
1490   }
1491 }
1492
1493 static void band_func(void *a, void *b, int *length,
1494                       MPI_Datatype * datatype)
1495 {
1496   if (*datatype == MPI_CHAR) {
1497     APPLY_FUNC(a, b, length, char, BAND_OP);
1498   }else if (*datatype == MPI_SHORT) {
1499     APPLY_FUNC(a, b, length, short, BAND_OP);
1500   } else if (*datatype == MPI_INT) {
1501     APPLY_FUNC(a, b, length, int, BAND_OP);
1502   } else if (*datatype == MPI_LONG) {
1503     APPLY_FUNC(a, b, length, long, BAND_OP);
1504   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1505     APPLY_FUNC(a, b, length, unsigned short, BAND_OP);
1506   } else if (*datatype == MPI_UNSIGNED) {
1507     APPLY_FUNC(a, b, length, unsigned int, BAND_OP);
1508   } else if (*datatype == MPI_UNSIGNED_LONG) {
1509     APPLY_FUNC(a, b, length, unsigned long, BAND_OP);
1510   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1511     APPLY_FUNC(a, b, length, unsigned char, BAND_OP);
1512   } else if (*datatype == MPI_BYTE) {
1513     APPLY_FUNC(a, b, length, uint8_t, BAND_OP);
1514   }
1515 }
1516
1517 static void bor_func(void *a, void *b, int *length,
1518                      MPI_Datatype * datatype)
1519 {
1520   if (*datatype == MPI_CHAR) {
1521     APPLY_FUNC(a, b, length, char, BOR_OP);
1522   } else if (*datatype == MPI_SHORT) {
1523     APPLY_FUNC(a, b, length, short, BOR_OP);
1524   } else if (*datatype == MPI_INT) {
1525     APPLY_FUNC(a, b, length, int, BOR_OP);
1526   } else if (*datatype == MPI_LONG) {
1527     APPLY_FUNC(a, b, length, long, BOR_OP);
1528   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1529     APPLY_FUNC(a, b, length, unsigned short, BOR_OP);
1530   } else if (*datatype == MPI_UNSIGNED) {
1531     APPLY_FUNC(a, b, length, unsigned int, BOR_OP);
1532   } else if (*datatype == MPI_UNSIGNED_LONG) {
1533     APPLY_FUNC(a, b, length, unsigned long, BOR_OP);
1534   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1535     APPLY_FUNC(a, b, length, unsigned char, BOR_OP);
1536   } else if (*datatype == MPI_BYTE) {
1537     APPLY_FUNC(a, b, length, uint8_t, BOR_OP);
1538   }
1539 }
1540
1541 static void bxor_func(void *a, void *b, int *length,
1542                       MPI_Datatype * datatype)
1543 {
1544   if (*datatype == MPI_CHAR) {
1545     APPLY_FUNC(a, b, length, char, BXOR_OP);
1546   } else if (*datatype == MPI_SHORT) {
1547     APPLY_FUNC(a, b, length, short, BXOR_OP);
1548   } else if (*datatype == MPI_INT) {
1549     APPLY_FUNC(a, b, length, int, BXOR_OP);
1550   } else if (*datatype == MPI_LONG) {
1551     APPLY_FUNC(a, b, length, long, BXOR_OP);
1552   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1553     APPLY_FUNC(a, b, length, unsigned short, BXOR_OP);
1554   } else if (*datatype == MPI_UNSIGNED) {
1555     APPLY_FUNC(a, b, length, unsigned int, BXOR_OP);
1556   } else if (*datatype == MPI_UNSIGNED_LONG) {
1557     APPLY_FUNC(a, b, length, unsigned long, BXOR_OP);
1558   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1559     APPLY_FUNC(a, b, length, unsigned char, BXOR_OP);
1560   } else if (*datatype == MPI_BYTE) {
1561     APPLY_FUNC(a, b, length, uint8_t, BXOR_OP);
1562   }
1563 }
1564
1565 static void minloc_func(void *a, void *b, int *length,
1566                         MPI_Datatype * datatype)
1567 {
1568   if (*datatype == MPI_FLOAT_INT) {
1569     APPLY_FUNC(a, b, length, float_int, MINLOC_OP);
1570   } else if (*datatype == MPI_LONG_INT) {
1571     APPLY_FUNC(a, b, length, long_int, MINLOC_OP);
1572   } else if (*datatype == MPI_DOUBLE_INT) {
1573     APPLY_FUNC(a, b, length, double_int, MINLOC_OP);
1574   } else if (*datatype == MPI_SHORT_INT) {
1575     APPLY_FUNC(a, b, length, short_int, MINLOC_OP);
1576   } else if (*datatype == MPI_2LONG) {
1577     APPLY_FUNC(a, b, length, long_long, MINLOC_OP);
1578   } else if (*datatype == MPI_2INT) {
1579     APPLY_FUNC(a, b, length, int_int, MINLOC_OP);
1580   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1581     APPLY_FUNC(a, b, length, long_double_int, MINLOC_OP);
1582   } else if (*datatype == MPI_2FLOAT) {
1583     APPLY_FUNC(a, b, length, float_float, MINLOC_OP);
1584   } else if (*datatype == MPI_2DOUBLE) {
1585     APPLY_FUNC(a, b, length, double_double, MINLOC_OP);
1586   }
1587 }
1588
1589 static void maxloc_func(void *a, void *b, int *length,
1590                         MPI_Datatype * datatype)
1591 {
1592   if (*datatype == MPI_FLOAT_INT) {
1593     APPLY_FUNC(a, b, length, float_int, MAXLOC_OP);
1594   } else if (*datatype == MPI_LONG_INT) {
1595     APPLY_FUNC(a, b, length, long_int, MAXLOC_OP);
1596   } else if (*datatype == MPI_DOUBLE_INT) {
1597     APPLY_FUNC(a, b, length, double_int, MAXLOC_OP);
1598   } else if (*datatype == MPI_SHORT_INT) {
1599     APPLY_FUNC(a, b, length, short_int, MAXLOC_OP);
1600   } else if (*datatype == MPI_2LONG) {
1601     APPLY_FUNC(a, b, length, long_long, MAXLOC_OP);
1602   } else if (*datatype == MPI_2INT) {
1603     APPLY_FUNC(a, b, length, int_int, MAXLOC_OP);
1604   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1605     APPLY_FUNC(a, b, length, long_double_int, MAXLOC_OP);
1606   } else if (*datatype == MPI_2FLOAT) {
1607     APPLY_FUNC(a, b, length, float_float, MAXLOC_OP);
1608   } else if (*datatype == MPI_2DOUBLE) {
1609     APPLY_FUNC(a, b, length, double_double, MAXLOC_OP);
1610   }
1611 }
1612
1613 static void replace_func(void *a, void *b, int *length,
1614                         MPI_Datatype * datatype)
1615 {
1616   memcpy(b, a, *length * smpi_datatype_size(*datatype));
1617 }
1618
1619 #define CREATE_MPI_OP(name, func)                             \
1620   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */, TRUE }; \
1621 MPI_Op name = &mpi_##name;
1622
1623 CREATE_MPI_OP(MPI_MAX, max_func);
1624 CREATE_MPI_OP(MPI_MIN, min_func);
1625 CREATE_MPI_OP(MPI_SUM, sum_func);
1626 CREATE_MPI_OP(MPI_PROD, prod_func);
1627 CREATE_MPI_OP(MPI_LAND, land_func);
1628 CREATE_MPI_OP(MPI_LOR, lor_func);
1629 CREATE_MPI_OP(MPI_LXOR, lxor_func);
1630 CREATE_MPI_OP(MPI_BAND, band_func);
1631 CREATE_MPI_OP(MPI_BOR, bor_func);
1632 CREATE_MPI_OP(MPI_BXOR, bxor_func);
1633 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
1634 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
1635 CREATE_MPI_OP(MPI_REPLACE, replace_func);
1636
1637
1638 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
1639 {
1640   MPI_Op op;
1641   op = xbt_new(s_smpi_mpi_op_t, 1);
1642   op->func = function;
1643   op-> is_commute = commute;
1644   return op;
1645 }
1646
1647 int smpi_op_is_commute(MPI_Op op)
1648 {
1649   return (op==MPI_OP_NULL) ? 1 : op-> is_commute;
1650 }
1651
1652 void smpi_op_destroy(MPI_Op op)
1653 {
1654   xbt_free(op);
1655 }
1656
1657 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
1658                    MPI_Datatype * datatype)
1659 {
1660   if(op==MPI_OP_NULL)
1661     return;
1662
1663   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
1664     XBT_DEBUG("Applying operation, switch to the right data frame ");
1665     smpi_switch_data_segment(smpi_process_index());
1666   }
1667
1668   if(!smpi_process_get_replaying())
1669   op->func(invec, inoutvec, len, datatype);
1670 }
1671
1672 int smpi_type_attr_delete(MPI_Datatype type, int keyval){
1673   char* tmpkey=xbt_malloc(INTSIZEDCHAR);
1674   sprintf(tmpkey, "%d", keyval);
1675   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)tmpkey);
1676   if(!elem)
1677     return MPI_ERR_ARG;
1678   if(elem->delete_fn!=MPI_NULL_DELETE_FN){
1679     void * value;
1680     int flag;
1681     if(smpi_type_attr_get(type, keyval, &value, &flag)==MPI_SUCCESS){
1682       int ret = elem->delete_fn(type, keyval, value, &flag);
1683       if(ret!=MPI_SUCCESS) return ret;
1684     }
1685   }  
1686   if(type->attributes==NULL)
1687     return MPI_ERR_ARG;
1688
1689   xbt_dict_remove(type->attributes, (const char*)tmpkey);
1690   xbt_free(tmpkey);
1691   return MPI_SUCCESS;
1692 }
1693
1694 int smpi_type_attr_get(MPI_Datatype type, int keyval, void* attr_value, int* flag){
1695   char* tmpkey=xbt_malloc(INTSIZEDCHAR);
1696   sprintf(tmpkey, "%d", keyval);
1697   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)tmpkey);
1698   if(!elem)
1699     return MPI_ERR_ARG;
1700   xbt_ex_t ex;
1701   if(type->attributes==NULL){
1702     *flag=0;
1703     return MPI_SUCCESS;
1704   }
1705   TRY {
1706     *(void**)attr_value = xbt_dict_get(type->attributes, (const char*)tmpkey);
1707     *flag=1;
1708   }
1709   CATCH(ex) {
1710     *flag=0;
1711     xbt_ex_free(ex);
1712   }
1713   xbt_free(tmpkey);
1714   return MPI_SUCCESS;
1715 }
1716
1717 int smpi_type_attr_put(MPI_Datatype type, int keyval, void* attr_value){
1718   if(!smpi_type_keyvals)
1719   smpi_type_keyvals = xbt_dict_new();
1720   char* tmpkey=xbt_malloc(INTSIZEDCHAR);
1721   sprintf(tmpkey, "%d", keyval);
1722   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)tmpkey);
1723   if(!elem )
1724     return MPI_ERR_ARG;
1725   int flag;
1726   void* value;
1727   smpi_type_attr_get(type, keyval, &value, &flag);
1728   if(flag && elem->delete_fn!=MPI_NULL_DELETE_FN){
1729     int ret = elem->delete_fn(type, keyval, value, &flag);
1730     if(ret!=MPI_SUCCESS) return ret;
1731   }
1732   if(type->attributes==NULL)
1733     type->attributes=xbt_dict_new();
1734
1735   xbt_dict_set(type->attributes, (const char*)tmpkey, attr_value, NULL);
1736   xbt_free(tmpkey);
1737   return MPI_SUCCESS;
1738 }
1739
1740 int smpi_type_keyval_create(MPI_Type_copy_attr_function* copy_fn, MPI_Type_delete_attr_function* delete_fn, int* keyval, void* extra_state){
1741
1742   if(!smpi_type_keyvals)
1743   smpi_type_keyvals = xbt_dict_new();
1744   
1745   smpi_type_key_elem value = (smpi_type_key_elem) xbt_new0(s_smpi_mpi_type_key_elem_t,1);
1746   
1747   value->copy_fn=copy_fn;
1748   value->delete_fn=delete_fn;
1749   
1750   *keyval = type_keyval_id;
1751   char* tmpkey=xbt_malloc(INTSIZEDCHAR);
1752   sprintf(tmpkey, "%d", *keyval);
1753   xbt_dict_set(smpi_type_keyvals,(const char*)tmpkey,(void*)value, NULL);
1754   type_keyval_id++;
1755   xbt_free(tmpkey);
1756   return MPI_SUCCESS;
1757 }
1758
1759 int smpi_type_keyval_free(int* keyval){
1760   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)keyval);
1761   if(!elem)
1762     return MPI_ERR_ARG;
1763   char* tmpkey=xbt_malloc(INTSIZEDCHAR);
1764   sprintf(tmpkey, "%d", *keyval);
1765   xbt_dict_remove(smpi_type_keyvals, (const char*)tmpkey);
1766   xbt_free(elem);
1767   xbt_free(tmpkey);
1768   return MPI_SUCCESS;
1769 }