Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Properly fix the case where the user provides an empty hostfile
[simgrid.git] / src / smpi / smpi_mpi_dt.c
1 /* smpi_mpi_dt.c -- MPI primitives to handle datatypes                        */
2 /* FIXME: a very incomplete implementation                                    */
3
4 /* Copyright (c) 2009-2014. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include "private.h"
15 #include "smpi_mpi_dt_private.h"
16 #include "mc/mc.h"
17 #include "xbt/replay.h"
18 #include "simgrid/modelchecker.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi,
21                                 "Logging specific to SMPI (datatype)");
22
23 #define CREATE_MPI_DATATYPE(name, type)       \
24   static s_smpi_mpi_datatype_t mpi_##name = { \
25     sizeof(type),  /* size */                 \
26     0,             /*was 1 has_subtype*/             \
27     0,             /* lb */                   \
28     sizeof(type),  /* ub = lb + size */       \
29     DT_FLAG_BASIC,  /* flags */              \
30     NULL           /* pointer on extended struct*/ \
31   };                                          \
32 MPI_Datatype name = &mpi_##name;
33
34 #define CREATE_MPI_DATATYPE_NULL(name)       \
35   static s_smpi_mpi_datatype_t mpi_##name = { \
36     0,  /* size */                 \
37     0,             /*was 1 has_subtype*/             \
38     0,             /* lb */                   \
39     0,  /* ub = lb + size */       \
40     DT_FLAG_BASIC,  /* flags */              \
41     NULL           /* pointer on extended struct*/ \
42   };                                          \
43 MPI_Datatype name = &mpi_##name;
44
45 //The following are datatypes for the MPI functions MPI_MAXLOC and MPI_MINLOC.
46 typedef struct {
47   float value;
48   int index;
49 } float_int;
50 typedef struct {
51   float value;
52   float index;
53 } float_float;
54 typedef struct {
55   double value;
56   double index;
57 } double_double;
58 typedef struct {
59   long value;
60   int index;
61 } long_int;
62 typedef struct {
63   double value;
64   int index;
65 } double_int;
66 typedef struct {
67   short value;
68   int index;
69 } short_int;
70 typedef struct {
71   int value;
72   int index;
73 } int_int;
74 typedef struct {
75   long double value;
76   int index;
77 } long_double_int;
78
79 // Predefined data types
80 CREATE_MPI_DATATYPE(MPI_CHAR, char);
81 CREATE_MPI_DATATYPE(MPI_SHORT, short);
82 CREATE_MPI_DATATYPE(MPI_INT, int);
83 CREATE_MPI_DATATYPE(MPI_LONG, long);
84 CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long);
85 CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char);
86 CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char);
87 CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short);
88 CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int);
89 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long);
90 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long);
91 CREATE_MPI_DATATYPE(MPI_FLOAT, float);
92 CREATE_MPI_DATATYPE(MPI_DOUBLE, double);
93 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double);
94 CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t);
95 CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool);
96 CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t);
97 CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t);
98 CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t);
99 CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t);
100 CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t);
101 CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t);
102 CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t);
103 CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t);
104 CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex);
105 CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex);
106 CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex);
107 CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint);
108 CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset);
109
110 CREATE_MPI_DATATYPE(MPI_FLOAT_INT, float_int);
111 CREATE_MPI_DATATYPE(MPI_LONG_INT, long_int);
112 CREATE_MPI_DATATYPE(MPI_DOUBLE_INT, double_int);
113 CREATE_MPI_DATATYPE(MPI_SHORT_INT, short_int);
114 CREATE_MPI_DATATYPE(MPI_2INT, int_int);
115 CREATE_MPI_DATATYPE(MPI_2FLOAT, float_float);
116 CREATE_MPI_DATATYPE(MPI_2DOUBLE, double_double);
117
118 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE_INT, long_double_int);
119
120 CREATE_MPI_DATATYPE_NULL(MPI_UB);
121 CREATE_MPI_DATATYPE_NULL(MPI_LB);
122 CREATE_MPI_DATATYPE_NULL(MPI_PACKED);
123 // Internal use only
124 CREATE_MPI_DATATYPE(MPI_PTR, void*);
125
126 /** Check if the datatype is usable for communications
127  */
128 int is_datatype_valid(MPI_Datatype datatype) {
129     return datatype != MPI_DATATYPE_NULL
130         && (datatype->flags & DT_FLAG_COMMITED);
131 }
132
133 size_t smpi_datatype_size(MPI_Datatype datatype)
134 {
135   return datatype->size;
136 }
137
138 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype)
139 {
140   return datatype->lb;
141 }
142
143 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
144 {
145   return datatype->ub;
146 }
147
148 MPI_Datatype smpi_datatype_dup(MPI_Datatype datatype)
149 {
150   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
151   memcpy(new_t, datatype, sizeof(s_smpi_mpi_datatype_t));
152   if (datatype->has_subtype)
153     memcpy(new_t->substruct, datatype->substruct, sizeof(s_smpi_subtype_t));
154   return new_t;
155 }
156
157 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
158                          MPI_Aint * extent)
159 {
160   *lb = datatype->lb;
161   *extent = datatype->ub - datatype->lb;
162   return MPI_SUCCESS;
163 }
164
165 MPI_Aint smpi_datatype_get_extent(MPI_Datatype datatype){
166   return datatype->ub - datatype->lb;
167 }
168
169 int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
170                        void *recvbuf, int recvcount, MPI_Datatype recvtype)
171 {
172   int count;
173   if(smpi_privatize_global_variables){
174     switch_data_segment(smpi_process_index());
175   }
176   /* First check if we really have something to do */
177   if (recvcount > 0 && recvbuf != sendbuf) {
178     /* FIXME: treat packed cases */
179     sendcount *= smpi_datatype_size(sendtype);
180     recvcount *= smpi_datatype_size(recvtype);
181     count = sendcount < recvcount ? sendcount : recvcount;
182
183     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
184       if(!_xbt_replay_is_active()) memcpy(recvbuf, sendbuf, count);
185     }
186     else if (sendtype->has_subtype == 0)
187     {
188       s_smpi_subtype_t *subtype =  recvtype->substruct;
189       subtype->unserialize( sendbuf, recvbuf,1, subtype, MPI_REPLACE);
190     }
191     else if (recvtype->has_subtype == 0)
192     {
193       s_smpi_subtype_t *subtype =  sendtype->substruct;
194       subtype->serialize(sendbuf, recvbuf,1, subtype);
195     }else{
196       s_smpi_subtype_t *subtype =  sendtype->substruct;
197
198
199       void * buf_tmp = xbt_malloc(count);
200
201       subtype->serialize( sendbuf, buf_tmp,count/smpi_datatype_size(sendtype), subtype);
202       subtype =  recvtype->substruct;
203       subtype->unserialize( buf_tmp, recvbuf,count/smpi_datatype_size(recvtype), subtype, MPI_REPLACE);
204
205       free(buf_tmp);
206     }
207   }
208
209   return sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
210 }
211
212 /*
213  *  Copies noncontiguous data into contiguous memory.
214  *  @param contiguous_vector - output vector
215  *  @param noncontiguous_vector - input vector
216  *  @param type - pointer contening :
217  *      - stride - stride of between noncontiguous data
218  *      - block_length - the width or height of blocked matrix
219  *      - count - the number of rows of matrix
220  */
221 void serialize_vector( const void *noncontiguous_vector,
222                        void *contiguous_vector,
223                        int count,
224                        void *type)
225 {
226   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
227   int i;
228   char* contiguous_vector_char = (char*)contiguous_vector;
229   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
230
231   for (i = 0; i < type_c->block_count * count; i++) {
232       if (type_c->old_type->has_subtype == 0)
233         memcpy(contiguous_vector_char,
234                noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
235       else
236         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
237                                                                      contiguous_vector_char,
238                                                                      type_c->block_length,
239                                                                      type_c->old_type->substruct);
240
241     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
242     if((i+1)%type_c->block_count ==0)
243     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
244     else
245     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
246   }
247 }
248
249 /*
250  *  Copies contiguous data into noncontiguous memory.
251  *  @param noncontiguous_vector - output vector
252  *  @param contiguous_vector - input vector
253  *  @param type - pointer contening :
254  *      - stride - stride of between noncontiguous data
255  *      - block_length - the width or height of blocked matrix
256  *      - count - the number of rows of matrix
257  */
258 void unserialize_vector( const void *contiguous_vector,
259                          void *noncontiguous_vector,
260                          int count,
261                          void *type,
262                          MPI_Op op)
263 {
264   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
265   int i;
266
267   char* contiguous_vector_char = (char*)contiguous_vector;
268   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
269
270   for (i = 0; i < type_c->block_count * count; i++) {
271     if (type_c->old_type->has_subtype == 0)
272       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
273           &type_c->old_type);
274      /* memcpy(noncontiguous_vector_char,
275              contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
276     else
277       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
278                                                                      noncontiguous_vector_char,
279                                                                      type_c->block_length,
280                                                                      type_c->old_type->substruct,
281                                                                      op);
282     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
283     if((i+1)%type_c->block_count ==0)
284     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
285     else
286     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
287   }
288 }
289
290 /*
291  * Create a Sub type vector to be able to serialize and unserialize it
292  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
293  * required the functions unserialize and serialize
294  *
295  */
296 s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
297                                                   int block_length,
298                                                   int block_count,
299                                                   MPI_Datatype old_type,
300                                                   int size_oldtype){
301   s_smpi_mpi_vector_t *new_t= xbt_new(s_smpi_mpi_vector_t,1);
302   new_t->base.serialize = &serialize_vector;
303   new_t->base.unserialize = &unserialize_vector;
304   new_t->base.subtype_free = &free_vector;
305   new_t->block_stride = block_stride;
306   new_t->block_length = block_length;
307   new_t->block_count = block_count;
308   smpi_datatype_use(old_type);
309   new_t->old_type = old_type;
310   new_t->size_oldtype = size_oldtype;
311   return new_t;
312 }
313
314 void smpi_datatype_create(MPI_Datatype* new_type, int size,int lb, int ub, int has_subtype,
315                           void *struct_type, int flags){
316   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
317   new_t->size = size;
318   new_t->has_subtype = size>0? has_subtype:0;
319   new_t->lb = lb;
320   new_t->ub = ub;
321   new_t->flags = flags;
322   new_t->substruct = struct_type;
323   new_t->in_use=0;
324   *new_type = new_t;
325
326 #ifdef HAVE_MC
327   if(MC_is_active())
328     MC_ignore(&(new_t->in_use), sizeof(new_t->in_use));
329 #endif
330 }
331
332 void smpi_datatype_free(MPI_Datatype* type){
333
334   if((*type)->flags & DT_FLAG_PREDEFINED)return;
335
336   //if still used, mark for deletion
337   if((*type)->in_use!=0){
338       (*type)->flags |=DT_FLAG_DESTROYED;
339       return;
340   }
341
342   if ((*type)->has_subtype == 1){
343     ((s_smpi_subtype_t *)(*type)->substruct)->subtype_free(type);  
344     xbt_free((*type)->substruct);
345   }
346   xbt_free(*type);
347   *type = MPI_DATATYPE_NULL;
348 }
349
350 void smpi_datatype_use(MPI_Datatype type){
351   if(type)type->in_use++;
352
353 #ifdef HAVE_MC
354   if(MC_is_active())
355     MC_ignore(&(type->in_use), sizeof(type->in_use));
356 #endif
357 }
358
359
360 void smpi_datatype_unuse(MPI_Datatype type){
361   if(type && type->in_use-- == 0 && (type->flags & DT_FLAG_DESTROYED))
362     smpi_datatype_free(&type);
363
364 #ifdef HAVE_MC
365   if(MC_is_active())
366     MC_ignore(&(type->in_use), sizeof(type->in_use));
367 #endif
368 }
369
370
371
372
373 /*
374 Contiguous Implementation
375 */
376
377
378 /*
379  *  Copies noncontiguous data into contiguous memory.
380  *  @param contiguous_hvector - output hvector
381  *  @param noncontiguous_hvector - input hvector
382  *  @param type - pointer contening :
383  *      - stride - stride of between noncontiguous data, in bytes
384  *      - block_length - the width or height of blocked matrix
385  *      - count - the number of rows of matrix
386  */
387 void serialize_contiguous( const void *noncontiguous_hvector,
388                        void *contiguous_hvector,
389                        int count,
390                        void *type)
391 {
392   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
393   char* contiguous_vector_char = (char*)contiguous_hvector;
394   char* noncontiguous_vector_char = (char*)noncontiguous_hvector+type_c->lb;
395   memcpy(contiguous_vector_char,
396            noncontiguous_vector_char, count* type_c->block_count * type_c->size_oldtype);
397 }
398 /*
399  *  Copies contiguous data into noncontiguous memory.
400  *  @param noncontiguous_vector - output hvector
401  *  @param contiguous_vector - input hvector
402  *  @param type - pointer contening :
403  *      - stride - stride of between noncontiguous data, in bytes
404  *      - block_length - the width or height of blocked matrix
405  *      - count - the number of rows of matrix
406  */
407 void unserialize_contiguous( const void *contiguous_vector,
408                          void *noncontiguous_vector,
409                          int count,
410                          void *type,
411                          MPI_Op op)
412 {
413   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
414   char* contiguous_vector_char = (char*)contiguous_vector;
415   char* noncontiguous_vector_char = (char*)noncontiguous_vector+type_c->lb;
416   int n= count* type_c->block_count;
417   smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &n,
418             &type_c->old_type);
419        /*memcpy(noncontiguous_vector_char,
420            contiguous_vector_char, count*  type_c->block_count * type_c->size_oldtype);*/
421 }
422
423 void free_contiguous(MPI_Datatype* d){
424   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
425 }
426
427 /*
428  * Create a Sub type contiguous to be able to serialize and unserialize it
429  * the structure s_smpi_mpi_contiguous_t is derived from s_smpi_subtype which
430  * required the functions unserialize and serialize
431  *
432  */
433 s_smpi_mpi_contiguous_t* smpi_datatype_contiguous_create( MPI_Aint lb,
434                                                   int block_count,
435                                                   MPI_Datatype old_type,
436                                                   int size_oldtype){
437   s_smpi_mpi_contiguous_t *new_t= xbt_new(s_smpi_mpi_contiguous_t,1);
438   new_t->base.serialize = &serialize_contiguous;
439   new_t->base.unserialize = &unserialize_contiguous;
440   new_t->base.subtype_free = &free_contiguous;
441   new_t->lb = lb;
442   new_t->block_count = block_count;
443   new_t->old_type = old_type;
444   new_t->size_oldtype = size_oldtype;
445   smpi_datatype_use(old_type);
446   return new_t;
447 }
448
449
450
451
452 int smpi_datatype_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* new_type, MPI_Aint lb)
453 {
454   int retval;
455   if(old_type->has_subtype){
456           //handle this case as a hvector with stride equals to the extent of the datatype
457           return smpi_datatype_hvector(count, 1, smpi_datatype_get_extent(old_type), old_type, new_type);
458   }
459   
460   s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
461                                                                 count,
462                                                                 old_type,
463                                                                 smpi_datatype_size(old_type));
464                                                                 
465   smpi_datatype_create(new_type,
466                                           count * smpi_datatype_size(old_type),
467                                           lb,lb + count * smpi_datatype_size(old_type),
468                                           1,subtype, DT_FLAG_CONTIGUOUS);
469   retval=MPI_SUCCESS;
470   return retval;
471 }
472
473 int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_type, MPI_Datatype* new_type)
474 {
475   int retval;
476   if (blocklen<0) return MPI_ERR_ARG;
477   MPI_Aint lb = 0;
478   MPI_Aint ub = 0;
479   if(count>0){
480     lb=smpi_datatype_lb(old_type);
481     ub=((count-1)*stride+blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
482   }
483   if(old_type->has_subtype || stride != blocklen){
484
485
486     s_smpi_mpi_vector_t* subtype = smpi_datatype_vector_create( stride,
487                                                                 blocklen,
488                                                                 count,
489                                                                 old_type,
490                                                                 smpi_datatype_size(old_type));
491     smpi_datatype_create(new_type,
492                          count * (blocklen) * smpi_datatype_size(old_type), lb,
493                          ub,
494                          1,
495                          subtype,
496                          DT_FLAG_VECTOR);
497     retval=MPI_SUCCESS;
498   }else{
499     /* in this situation the data are contignous thus it's not
500      * required to serialize and unserialize it*/
501     smpi_datatype_create(new_type, count * blocklen *
502                          smpi_datatype_size(old_type), 0, ((count -1) * stride + blocklen)*
503                          smpi_datatype_size(old_type),
504                          0,
505                          NULL,
506                          DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
507     retval=MPI_SUCCESS;
508   }
509   return retval;
510 }
511
512 void free_vector(MPI_Datatype* d){
513   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
514 }
515
516 /*
517 Hvector Implementation - Vector with stride in bytes
518 */
519
520
521 /*
522  *  Copies noncontiguous data into contiguous memory.
523  *  @param contiguous_hvector - output hvector
524  *  @param noncontiguous_hvector - input hvector
525  *  @param type - pointer contening :
526  *      - stride - stride of between noncontiguous data, in bytes
527  *      - block_length - the width or height of blocked matrix
528  *      - count - the number of rows of matrix
529  */
530 void serialize_hvector( const void *noncontiguous_hvector,
531                        void *contiguous_hvector,
532                        int count,
533                        void *type)
534 {
535   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
536   int i;
537   char* contiguous_vector_char = (char*)contiguous_hvector;
538   char* noncontiguous_vector_char = (char*)noncontiguous_hvector;
539
540   for (i = 0; i < type_c->block_count * count; i++) {
541     if (type_c->old_type->has_subtype == 0)
542       memcpy(contiguous_vector_char,
543            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
544     else
545       ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
546                                                                    contiguous_vector_char,
547                                                                    type_c->block_length,
548                                                                    type_c->old_type->substruct);
549
550     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
551     if((i+1)%type_c->block_count ==0)
552     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
553     else
554     noncontiguous_vector_char += type_c->block_stride;
555   }
556 }
557 /*
558  *  Copies contiguous data into noncontiguous memory.
559  *  @param noncontiguous_vector - output hvector
560  *  @param contiguous_vector - input hvector
561  *  @param type - pointer contening :
562  *      - stride - stride of between noncontiguous data, in bytes
563  *      - block_length - the width or height of blocked matrix
564  *      - count - the number of rows of matrix
565  */
566 void unserialize_hvector( const void *contiguous_vector,
567                          void *noncontiguous_vector,
568                          int count,
569                          void *type,
570                          MPI_Op op)
571 {
572   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
573   int i;
574
575   char* contiguous_vector_char = (char*)contiguous_vector;
576   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
577
578   for (i = 0; i < type_c->block_count * count; i++) {
579     if (type_c->old_type->has_subtype == 0)
580       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
581                   &type_c->old_type);
582              /*memcpy(noncontiguous_vector_char,
583            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
584     else
585       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
586                                                                      noncontiguous_vector_char,
587                                                                      type_c->block_length,
588                                                                      type_c->old_type->substruct,
589                                                                      op);
590     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
591     if((i+1)%type_c->block_count ==0)
592     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
593     else
594     noncontiguous_vector_char += type_c->block_stride;
595   }
596 }
597
598 /*
599  * Create a Sub type vector to be able to serialize and unserialize it
600  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
601  * required the functions unserialize and serialize
602  *
603  */
604 s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
605                                                   int block_length,
606                                                   int block_count,
607                                                   MPI_Datatype old_type,
608                                                   int size_oldtype){
609   s_smpi_mpi_hvector_t *new_t= xbt_new(s_smpi_mpi_hvector_t,1);
610   new_t->base.serialize = &serialize_hvector;
611   new_t->base.unserialize = &unserialize_hvector;
612   new_t->base.subtype_free = &free_hvector;
613   new_t->block_stride = block_stride;
614   new_t->block_length = block_length;
615   new_t->block_count = block_count;
616   new_t->old_type = old_type;
617   new_t->size_oldtype = size_oldtype;
618   smpi_datatype_use(old_type);
619   return new_t;
620 }
621
622 //do nothing for vector types
623 void free_hvector(MPI_Datatype* d){
624   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
625 }
626
627 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
628 {
629   int retval;
630   if (blocklen<0) return MPI_ERR_ARG;
631   MPI_Aint lb = 0;
632   MPI_Aint ub = 0;
633   if(count>0){
634     lb=smpi_datatype_lb(old_type);
635     ub=((count-1)*stride)+(blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
636   }
637   if(old_type->has_subtype || stride != blocklen*smpi_datatype_get_extent(old_type)){
638     s_smpi_mpi_hvector_t* subtype = smpi_datatype_hvector_create( stride,
639                                                                   blocklen,
640                                                                   count,
641                                                                   old_type,
642                                                                   smpi_datatype_size(old_type));
643
644     smpi_datatype_create(new_type, count * blocklen * smpi_datatype_size(old_type),
645                                                  lb,ub,
646                          1,
647                          subtype,
648                          DT_FLAG_VECTOR);
649     retval=MPI_SUCCESS;
650   }else{
651     smpi_datatype_create(new_type, count * blocklen *
652                                              smpi_datatype_size(old_type),0,count * blocklen *
653                                              smpi_datatype_size(old_type),
654                                             0,
655                                             NULL,
656                                             DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
657     retval=MPI_SUCCESS;
658   }
659   return retval;
660 }
661
662
663 /*
664 Indexed Implementation
665 */
666
667 /*
668  *  Copies noncontiguous data into contiguous memory.
669  *  @param contiguous_indexed - output indexed
670  *  @param noncontiguous_indexed - input indexed
671  *  @param type - pointer contening :
672  *      - block_lengths - the width or height of blocked matrix
673  *      - block_indices - indices of each data, in element
674  *      - count - the number of rows of matrix
675  */
676 void serialize_indexed( const void *noncontiguous_indexed,
677                        void *contiguous_indexed,
678                        int count,
679                        void *type)
680 {
681   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
682   int i,j;
683   char* contiguous_indexed_char = (char*)contiguous_indexed;
684   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0] * type_c->size_oldtype;
685   for(j=0; j<count;j++){
686     for (i = 0; i < type_c->block_count; i++) {
687       if (type_c->old_type->has_subtype == 0)
688         memcpy(contiguous_indexed_char,
689                      noncontiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
690       else
691         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_indexed_char,
692                                                                      contiguous_indexed_char,
693                                                                      type_c->block_lengths[i],
694                                                                      type_c->old_type->substruct);
695
696
697       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
698       if (i<type_c->block_count-1)noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
699       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
700     }
701     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
702   }
703 }
704 /*
705  *  Copies contiguous data into noncontiguous memory.
706  *  @param noncontiguous_indexed - output indexed
707  *  @param contiguous_indexed - input indexed
708  *  @param type - pointer contening :
709  *      - block_lengths - the width or height of blocked matrix
710  *      - block_indices - indices of each data, in element
711  *      - count - the number of rows of matrix
712  */
713 void unserialize_indexed( const void *contiguous_indexed,
714                          void *noncontiguous_indexed,
715                          int count,
716                          void *type,
717                          MPI_Op op)
718 {
719
720   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
721   int i,j;
722   char* contiguous_indexed_char = (char*)contiguous_indexed;
723   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0]*smpi_datatype_get_extent(type_c->old_type);
724   for(j=0; j<count;j++){
725     for (i = 0; i < type_c->block_count; i++) {
726       if (type_c->old_type->has_subtype == 0)
727         smpi_op_apply(op, contiguous_indexed_char, noncontiguous_indexed_char, &type_c->block_lengths[i],
728                     &type_c->old_type);
729                /*memcpy(noncontiguous_indexed_char ,
730              contiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
731       else
732         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_indexed_char,
733                                                                        noncontiguous_indexed_char,
734                                                                        type_c->block_lengths[i],
735                                                                        type_c->old_type->substruct,
736                                                                        op);
737
738       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
739       if (i<type_c->block_count-1)
740         noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
741       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
742     }
743     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
744   }
745 }
746
747 void free_indexed(MPI_Datatype* type){
748   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths);
749   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices);
750   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
751 }
752
753 /*
754  * Create a Sub type indexed to be able to serialize and unserialize it
755  * the structure s_smpi_mpi_indexed_t is derived from s_smpi_subtype which
756  * required the functions unserialize and serialize
757  */
758 s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
759                                                   int* block_indices,
760                                                   int block_count,
761                                                   MPI_Datatype old_type,
762                                                   int size_oldtype){
763   s_smpi_mpi_indexed_t *new_t= xbt_new(s_smpi_mpi_indexed_t,1);
764   new_t->base.serialize = &serialize_indexed;
765   new_t->base.unserialize = &unserialize_indexed;
766   new_t->base.subtype_free = &free_indexed;
767  //TODO : add a custom function for each time to clean these 
768   new_t->block_lengths= xbt_new(int, block_count);
769   new_t->block_indices= xbt_new(int, block_count);
770   int i;
771   for(i=0;i<block_count;i++){
772     new_t->block_lengths[i]=block_lengths[i];
773     new_t->block_indices[i]=block_indices[i];
774   }
775   new_t->block_count = block_count;
776   smpi_datatype_use(old_type);
777   new_t->old_type = old_type;
778   new_t->size_oldtype = size_oldtype;
779   return new_t;
780 }
781
782
783 int smpi_datatype_indexed(int count, int* blocklens, int* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
784 {
785   int i;
786   int retval;
787   int size = 0;
788   int contiguous=1;
789   MPI_Aint lb = 0;
790   MPI_Aint ub = 0;
791   if(count>0){
792     lb=indices[0]*smpi_datatype_get_extent(old_type);
793     ub=indices[0]*smpi_datatype_get_extent(old_type) + blocklens[0]*smpi_datatype_ub(old_type);
794   }
795
796   for(i=0; i< count; i++){
797     if   (blocklens[i]<0)
798       return MPI_ERR_ARG;
799     size += blocklens[i];
800
801     if(indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type)<lb)
802         lb = indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type);
803     if(indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type)>ub)
804         ub = indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type);
805
806     if ( (i< count -1) && (indices[i]+blocklens[i] != indices[i+1]) )contiguous=0;
807   }
808   if (old_type->has_subtype == 1)
809     contiguous=0;
810
811   if(!contiguous){
812     s_smpi_mpi_indexed_t* subtype = smpi_datatype_indexed_create( blocklens,
813                                                                   indices,
814                                                                   count,
815                                                                   old_type,
816                                                                   smpi_datatype_size(old_type));
817      smpi_datatype_create(new_type,  size *
818                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA);
819   }else{
820     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
821                                                                   size,
822                                                                   old_type,
823                                                                   smpi_datatype_size(old_type));
824     smpi_datatype_create(new_type,  size *
825                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
826   }
827   retval=MPI_SUCCESS;
828   return retval;
829 }
830
831
832 /*
833 Hindexed Implementation - Indexed with indices in bytes 
834 */
835
836 /*
837  *  Copies noncontiguous data into contiguous memory.
838  *  @param contiguous_hindexed - output hindexed
839  *  @param noncontiguous_hindexed - input hindexed
840  *  @param type - pointer contening :
841  *      - block_lengths - the width or height of blocked matrix
842  *      - block_indices - indices of each data, in bytes
843  *      - count - the number of rows of matrix
844  */
845 void serialize_hindexed( const void *noncontiguous_hindexed,
846                        void *contiguous_hindexed,
847                        int count,
848                        void *type)
849 {
850   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
851   int i,j;
852   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
853   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
854   for(j=0; j<count;j++){
855     for (i = 0; i < type_c->block_count; i++) {
856       if (type_c->old_type->has_subtype == 0)
857         memcpy(contiguous_hindexed_char,
858                      noncontiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
859       else
860         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_hindexed_char,
861                                                                      contiguous_hindexed_char,
862                                                                      type_c->block_lengths[i],
863                                                                      type_c->old_type->substruct);
864
865       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
866       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
867       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
868     }
869     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
870   }
871 }
872 /*
873  *  Copies contiguous data into noncontiguous memory.
874  *  @param noncontiguous_hindexed - output hindexed
875  *  @param contiguous_hindexed - input hindexed
876  *  @param type - pointer contening :
877  *      - block_lengths - the width or height of blocked matrix
878  *      - block_indices - indices of each data, in bytes
879  *      - count - the number of rows of matrix
880  */
881 void unserialize_hindexed( const void *contiguous_hindexed,
882                          void *noncontiguous_hindexed,
883                          int count,
884                          void *type,
885                          MPI_Op op)
886 {
887   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
888   int i,j;
889
890   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
891   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
892   for(j=0; j<count;j++){
893     for (i = 0; i < type_c->block_count; i++) {
894       if (type_c->old_type->has_subtype == 0)
895         smpi_op_apply(op, contiguous_hindexed_char, noncontiguous_hindexed_char, &type_c->block_lengths[i],
896                             &type_c->old_type);
897                        /*memcpy(noncontiguous_hindexed_char,
898                contiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
899       else
900         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_hindexed_char,
901                                                                        noncontiguous_hindexed_char,
902                                                                        type_c->block_lengths[i],
903                                                                        type_c->old_type->substruct,
904                                                                        op);
905
906       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
907       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
908       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
909     }
910     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
911   }
912 }
913
914 void free_hindexed(MPI_Datatype* type){
915   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths);
916   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices);
917   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
918 }
919
920 /*
921  * Create a Sub type hindexed to be able to serialize and unserialize it
922  * the structure s_smpi_mpi_hindexed_t is derived from s_smpi_subtype which
923  * required the functions unserialize and serialize
924  */
925 s_smpi_mpi_hindexed_t* smpi_datatype_hindexed_create( int* block_lengths,
926                                                   MPI_Aint* block_indices,
927                                                   int block_count,
928                                                   MPI_Datatype old_type,
929                                                   int size_oldtype){
930   s_smpi_mpi_hindexed_t *new_t= xbt_new(s_smpi_mpi_hindexed_t,1);
931   new_t->base.serialize = &serialize_hindexed;
932   new_t->base.unserialize = &unserialize_hindexed;
933   new_t->base.subtype_free = &free_hindexed;
934  //TODO : add a custom function for each time to clean these 
935   new_t->block_lengths= xbt_new(int, block_count);
936   new_t->block_indices= xbt_new(MPI_Aint, block_count);
937   int i;
938   for(i=0;i<block_count;i++){
939     new_t->block_lengths[i]=block_lengths[i];
940     new_t->block_indices[i]=block_indices[i];
941   }
942   new_t->block_count = block_count;
943   new_t->old_type = old_type;
944   new_t->size_oldtype = size_oldtype;
945   return new_t;
946 }
947
948
949 int smpi_datatype_hindexed(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
950 {
951   int i;
952   int retval;
953   int size = 0;
954   int contiguous=1;
955   MPI_Aint lb = 0;
956   MPI_Aint ub = 0;
957   if(count>0){
958     lb=indices[0] + smpi_datatype_lb(old_type);
959     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_type);
960   }
961   for(i=0; i< count; i++){
962     if   (blocklens[i]<0)
963       return MPI_ERR_ARG;
964     size += blocklens[i];
965
966     if(indices[i]+smpi_datatype_lb(old_type)<lb) lb = indices[i]+smpi_datatype_lb(old_type);
967     if(indices[i]+blocklens[i]*smpi_datatype_ub(old_type)>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_type);
968
969     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_type) != indices[i+1]) )contiguous=0;
970   }
971   if (old_type->has_subtype == 1 || lb!=0)
972     contiguous=0;
973
974   if(!contiguous){
975     s_smpi_mpi_hindexed_t* subtype = smpi_datatype_hindexed_create( blocklens,
976                                                                   indices,
977                                                                   count,
978                                                                   old_type,
979                                                                   smpi_datatype_size(old_type));
980     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
981                                                  lb,
982                          ub
983                          ,1, subtype, DT_FLAG_DATA);
984   }else{
985     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
986                                                                   size,
987                                                                   old_type,
988                                                                   smpi_datatype_size(old_type));
989     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
990                                              0,size * smpi_datatype_size(old_type),
991                                              1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
992   }
993   retval=MPI_SUCCESS;
994   return retval;
995 }
996
997
998 /*
999 struct Implementation - Indexed with indices in bytes 
1000 */
1001
1002 /*
1003  *  Copies noncontiguous data into contiguous memory.
1004  *  @param contiguous_struct - output struct
1005  *  @param noncontiguous_struct - input struct
1006  *  @param type - pointer contening :
1007  *      - stride - stride of between noncontiguous data
1008  *      - block_length - the width or height of blocked matrix
1009  *      - count - the number of rows of matrix
1010  */
1011 void serialize_struct( const void *noncontiguous_struct,
1012                        void *contiguous_struct,
1013                        int count,
1014                        void *type)
1015 {
1016   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1017   int i,j;
1018   char* contiguous_struct_char = (char*)contiguous_struct;
1019   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1020   for(j=0; j<count;j++){
1021     for (i = 0; i < type_c->block_count; i++) {
1022       if (type_c->old_types[i]->has_subtype == 0)
1023         memcpy(contiguous_struct_char,
1024              noncontiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
1025       else
1026         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->serialize( noncontiguous_struct_char,
1027                                                                          contiguous_struct_char,
1028                                                                          type_c->block_lengths[i],
1029                                                                          type_c->old_types[i]->substruct);
1030
1031
1032       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1033       if (i<type_c->block_count-1)noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
1034       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);//let's hope this is MPI_UB ?
1035     }
1036     noncontiguous_struct=(void*)noncontiguous_struct_char;
1037   }
1038 }
1039 /*
1040  *  Copies contiguous data into noncontiguous memory.
1041  *  @param noncontiguous_struct - output struct
1042  *  @param contiguous_struct - input struct
1043  *  @param type - pointer contening :
1044  *      - stride - stride of between noncontiguous data
1045  *      - block_length - the width or height of blocked matrix
1046  *      - count - the number of rows of matrix
1047  */
1048 void unserialize_struct( const void *contiguous_struct,
1049                          void *noncontiguous_struct,
1050                          int count,
1051                          void *type,
1052                          MPI_Op op)
1053 {
1054   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1055   int i,j;
1056
1057   char* contiguous_struct_char = (char*)contiguous_struct;
1058   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1059   for(j=0; j<count;j++){
1060     for (i = 0; i < type_c->block_count; i++) {
1061       if (type_c->old_types[i]->has_subtype == 0)
1062         smpi_op_apply(op, contiguous_struct_char, noncontiguous_struct_char, &type_c->block_lengths[i],
1063            & type_c->old_types[i]);
1064                        /*memcpy(noncontiguous_struct_char,
1065              contiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));*/
1066       else
1067         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->unserialize( contiguous_struct_char,
1068                                                                            noncontiguous_struct_char,
1069                                                                            type_c->block_lengths[i],
1070                                                                            type_c->old_types[i]->substruct,
1071                                                                            op);
1072
1073       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1074       if (i<type_c->block_count-1)noncontiguous_struct_char =  (char*)noncontiguous_struct + type_c->block_indices[i+1];
1075       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);
1076     }
1077     noncontiguous_struct=(void*)noncontiguous_struct_char;
1078     
1079   }
1080 }
1081
1082 void free_struct(MPI_Datatype* type){
1083   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths);
1084   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices);
1085   int i=0;
1086   for (i = 0; i < ((s_smpi_mpi_struct_t *)(*type)->substruct)->block_count; i++)
1087     smpi_datatype_unuse(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types[i]);
1088   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types);
1089 }
1090
1091 /*
1092  * Create a Sub type struct to be able to serialize and unserialize it
1093  * the structure s_smpi_mpi_struct_t is derived from s_smpi_subtype which
1094  * required the functions unserialize and serialize
1095  */
1096 s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
1097                                                   MPI_Aint* block_indices,
1098                                                   int block_count,
1099                                                   MPI_Datatype* old_types){
1100   s_smpi_mpi_struct_t *new_t= xbt_new(s_smpi_mpi_struct_t,1);
1101   new_t->base.serialize = &serialize_struct;
1102   new_t->base.unserialize = &unserialize_struct;
1103   new_t->base.subtype_free = &free_struct;
1104  //TODO : add a custom function for each time to clean these 
1105   new_t->block_lengths= xbt_new(int, block_count);
1106   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1107   new_t->old_types=  xbt_new(MPI_Datatype, block_count);
1108   int i;
1109   for(i=0;i<block_count;i++){
1110     new_t->block_lengths[i]=block_lengths[i];
1111     new_t->block_indices[i]=block_indices[i];
1112     new_t->old_types[i]=old_types[i];
1113     smpi_datatype_use(new_t->old_types[i]);
1114   }
1115   //new_t->block_lengths = block_lengths;
1116   //new_t->block_indices = block_indices;
1117   new_t->block_count = block_count;
1118   //new_t->old_types = old_types;
1119   return new_t;
1120 }
1121
1122
1123 int smpi_datatype_struct(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype* old_types, MPI_Datatype* new_type)
1124 {
1125   int i;
1126   size_t size = 0;
1127   int contiguous=1;
1128   size = 0;
1129   MPI_Aint lb = 0;
1130   MPI_Aint ub = 0;
1131   if(count>0){
1132     lb=indices[0] + smpi_datatype_lb(old_types[0]);
1133     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_types[0]);
1134   }
1135   int forced_lb=0;
1136   int forced_ub=0;
1137   for(i=0; i< count; i++){
1138     if (blocklens[i]<0)
1139       return MPI_ERR_ARG;
1140     if (old_types[i]->has_subtype == 1)
1141       contiguous=0;
1142
1143     size += blocklens[i]*smpi_datatype_size(old_types[i]);
1144     if (old_types[i]==MPI_LB){
1145       lb=indices[i];
1146       forced_lb=1;
1147     }
1148     if (old_types[i]==MPI_UB){
1149       ub=indices[i];
1150       forced_ub=1;
1151     }
1152
1153     if(!forced_lb && indices[i]+smpi_datatype_lb(old_types[i])<lb) lb = indices[i];
1154     if(!forced_ub && indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i])>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i]);
1155
1156     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_types[i]) != indices[i+1]) )contiguous=0;
1157   }
1158
1159   if(!contiguous){
1160     s_smpi_mpi_struct_t* subtype = smpi_datatype_struct_create( blocklens,
1161                                                               indices,
1162                                                               count,
1163                                                               old_types);
1164
1165     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA);
1166   }else{
1167     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1168                                                                   size,
1169                                                                   MPI_CHAR,
1170                                                                   1);
1171     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1172   }
1173   return MPI_SUCCESS;
1174 }
1175
1176 void smpi_datatype_commit(MPI_Datatype *datatype)
1177 {
1178   (*datatype)->flags=  ((*datatype)->flags | DT_FLAG_COMMITED);
1179 }
1180
1181 typedef struct s_smpi_mpi_op {
1182   MPI_User_function *func;
1183   int is_commute;
1184 } s_smpi_mpi_op_t;
1185
1186 #define MAX_OP(a, b)  (b) = (a) < (b) ? (b) : (a)
1187 #define MIN_OP(a, b)  (b) = (a) < (b) ? (a) : (b)
1188 #define SUM_OP(a, b)  (b) += (a)
1189 #define PROD_OP(a, b) (b) *= (a)
1190 #define LAND_OP(a, b) (b) = (a) && (b)
1191 #define LOR_OP(a, b)  (b) = (a) || (b)
1192 #define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b))
1193 #define BAND_OP(a, b) (b) &= (a)
1194 #define BOR_OP(a, b)  (b) |= (a)
1195 #define BXOR_OP(a, b) (b) ^= (a)
1196 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
1197 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
1198
1199 #define APPLY_FUNC(a, b, length, type, func) \
1200 {                                          \
1201   int i;                                   \
1202   type* x = (type*)(a);                    \
1203   type* y = (type*)(b);                    \
1204   for(i = 0; i < *(length); i++) {         \
1205     func(x[i], y[i]);                      \
1206   }                                        \
1207 }
1208
1209 static void max_func(void *a, void *b, int *length,
1210                      MPI_Datatype * datatype)
1211 {
1212   if (*datatype == MPI_CHAR) {
1213     APPLY_FUNC(a, b, length, char, MAX_OP);
1214   } else if (*datatype == MPI_SHORT) {
1215     APPLY_FUNC(a, b, length, short, MAX_OP);
1216   } else if (*datatype == MPI_INT) {
1217     APPLY_FUNC(a, b, length, int, MAX_OP);
1218   } else if (*datatype == MPI_LONG) {
1219     APPLY_FUNC(a, b, length, long, MAX_OP);
1220   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1221     APPLY_FUNC(a, b, length, unsigned short, MAX_OP);
1222   } else if (*datatype == MPI_UNSIGNED) {
1223     APPLY_FUNC(a, b, length, unsigned int, MAX_OP);
1224   } else if (*datatype == MPI_UNSIGNED_LONG) {
1225     APPLY_FUNC(a, b, length, unsigned long, MAX_OP);
1226   } else if (*datatype == MPI_FLOAT) {
1227     APPLY_FUNC(a, b, length, float, MAX_OP);
1228   } else if (*datatype == MPI_DOUBLE) {
1229     APPLY_FUNC(a, b, length, double, MAX_OP);
1230   } else if (*datatype == MPI_LONG_DOUBLE) {
1231     APPLY_FUNC(a, b, length, long double, MAX_OP);
1232   }
1233 }
1234
1235 static void min_func(void *a, void *b, int *length,
1236                      MPI_Datatype * datatype)
1237 {
1238   if (*datatype == MPI_CHAR) {
1239     APPLY_FUNC(a, b, length, char, MIN_OP);
1240   } else if (*datatype == MPI_SHORT) {
1241     APPLY_FUNC(a, b, length, short, MIN_OP);
1242   } else if (*datatype == MPI_INT) {
1243     APPLY_FUNC(a, b, length, int, MIN_OP);
1244   } else if (*datatype == MPI_LONG) {
1245     APPLY_FUNC(a, b, length, long, MIN_OP);
1246   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1247     APPLY_FUNC(a, b, length, unsigned short, MIN_OP);
1248   } else if (*datatype == MPI_UNSIGNED) {
1249     APPLY_FUNC(a, b, length, unsigned int, MIN_OP);
1250   } else if (*datatype == MPI_UNSIGNED_LONG) {
1251     APPLY_FUNC(a, b, length, unsigned long, MIN_OP);
1252   } else if (*datatype == MPI_FLOAT) {
1253     APPLY_FUNC(a, b, length, float, MIN_OP);
1254   } else if (*datatype == MPI_DOUBLE) {
1255     APPLY_FUNC(a, b, length, double, MIN_OP);
1256   } else if (*datatype == MPI_LONG_DOUBLE) {
1257     APPLY_FUNC(a, b, length, long double, MIN_OP);
1258   }
1259 }
1260
1261 static void sum_func(void *a, void *b, int *length,
1262                      MPI_Datatype * datatype)
1263 {
1264   if (*datatype == MPI_CHAR) {
1265     APPLY_FUNC(a, b, length, char, SUM_OP);
1266   } else if (*datatype == MPI_SHORT) {
1267     APPLY_FUNC(a, b, length, short, SUM_OP);
1268   } else if (*datatype == MPI_INT) {
1269     APPLY_FUNC(a, b, length, int, SUM_OP);
1270   } else if (*datatype == MPI_LONG) {
1271     APPLY_FUNC(a, b, length, long, SUM_OP);
1272   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1273     APPLY_FUNC(a, b, length, unsigned short, SUM_OP);
1274   } else if (*datatype == MPI_UNSIGNED) {
1275     APPLY_FUNC(a, b, length, unsigned int, SUM_OP);
1276   } else if (*datatype == MPI_UNSIGNED_LONG) {
1277     APPLY_FUNC(a, b, length, unsigned long, SUM_OP);
1278   } else if (*datatype == MPI_FLOAT) {
1279     APPLY_FUNC(a, b, length, float, SUM_OP);
1280   } else if (*datatype == MPI_DOUBLE) {
1281     APPLY_FUNC(a, b, length, double, SUM_OP);
1282   } else if (*datatype == MPI_LONG_DOUBLE) {
1283     APPLY_FUNC(a, b, length, long double, SUM_OP);
1284   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1285     APPLY_FUNC(a, b, length, float _Complex, SUM_OP);
1286   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1287     APPLY_FUNC(a, b, length, double _Complex, SUM_OP);
1288   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1289     APPLY_FUNC(a, b, length, long double _Complex, SUM_OP);
1290   }
1291 }
1292
1293 static void prod_func(void *a, void *b, int *length,
1294                       MPI_Datatype * datatype)
1295 {
1296   if (*datatype == MPI_CHAR) {
1297     APPLY_FUNC(a, b, length, char, PROD_OP);
1298   } else if (*datatype == MPI_SHORT) {
1299     APPLY_FUNC(a, b, length, short, PROD_OP);
1300   } else if (*datatype == MPI_INT) {
1301     APPLY_FUNC(a, b, length, int, PROD_OP);
1302   } else if (*datatype == MPI_LONG) {
1303     APPLY_FUNC(a, b, length, long, PROD_OP);
1304   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1305     APPLY_FUNC(a, b, length, unsigned short, PROD_OP);
1306   } else if (*datatype == MPI_UNSIGNED) {
1307     APPLY_FUNC(a, b, length, unsigned int, PROD_OP);
1308   } else if (*datatype == MPI_UNSIGNED_LONG) {
1309     APPLY_FUNC(a, b, length, unsigned long, PROD_OP);
1310   } else if (*datatype == MPI_FLOAT) {
1311     APPLY_FUNC(a, b, length, float, PROD_OP);
1312   } else if (*datatype == MPI_DOUBLE) {
1313     APPLY_FUNC(a, b, length, double, PROD_OP);
1314   } else if (*datatype == MPI_LONG_DOUBLE) {
1315     APPLY_FUNC(a, b, length, long double, PROD_OP);
1316   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1317     APPLY_FUNC(a, b, length, float _Complex, PROD_OP);
1318   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1319     APPLY_FUNC(a, b, length, double _Complex, PROD_OP);
1320   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1321     APPLY_FUNC(a, b, length, long double _Complex, PROD_OP);
1322   }
1323 }
1324
1325 static void land_func(void *a, void *b, int *length,
1326                       MPI_Datatype * datatype)
1327 {
1328   if (*datatype == MPI_CHAR) {
1329     APPLY_FUNC(a, b, length, char, LAND_OP);
1330   } else if (*datatype == MPI_SHORT) {
1331     APPLY_FUNC(a, b, length, short, LAND_OP);
1332   } else if (*datatype == MPI_INT) {
1333     APPLY_FUNC(a, b, length, int, LAND_OP);
1334   } else if (*datatype == MPI_LONG) {
1335     APPLY_FUNC(a, b, length, long, LAND_OP);
1336   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1337     APPLY_FUNC(a, b, length, unsigned short, LAND_OP);
1338   } else if (*datatype == MPI_UNSIGNED) {
1339     APPLY_FUNC(a, b, length, unsigned int, LAND_OP);
1340   } else if (*datatype == MPI_UNSIGNED_LONG) {
1341     APPLY_FUNC(a, b, length, unsigned long, LAND_OP);
1342   } else if (*datatype == MPI_C_BOOL) {
1343     APPLY_FUNC(a, b, length, _Bool, LAND_OP);
1344   }
1345 }
1346
1347 static void lor_func(void *a, void *b, int *length,
1348                      MPI_Datatype * datatype)
1349 {
1350   if (*datatype == MPI_CHAR) {
1351     APPLY_FUNC(a, b, length, char, LOR_OP);
1352   } else if (*datatype == MPI_SHORT) {
1353     APPLY_FUNC(a, b, length, short, LOR_OP);
1354   } else if (*datatype == MPI_INT) {
1355     APPLY_FUNC(a, b, length, int, LOR_OP);
1356   } else if (*datatype == MPI_LONG) {
1357     APPLY_FUNC(a, b, length, long, LOR_OP);
1358   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1359     APPLY_FUNC(a, b, length, unsigned short, LOR_OP);
1360   } else if (*datatype == MPI_UNSIGNED) {
1361     APPLY_FUNC(a, b, length, unsigned int, LOR_OP);
1362   } else if (*datatype == MPI_UNSIGNED_LONG) {
1363     APPLY_FUNC(a, b, length, unsigned long, LOR_OP);
1364   } else if (*datatype == MPI_C_BOOL) {
1365     APPLY_FUNC(a, b, length, _Bool, LOR_OP);
1366   }
1367 }
1368
1369 static void lxor_func(void *a, void *b, int *length,
1370                       MPI_Datatype * datatype)
1371 {
1372   if (*datatype == MPI_CHAR) {
1373     APPLY_FUNC(a, b, length, char, LXOR_OP);
1374   } else if (*datatype == MPI_SHORT) {
1375     APPLY_FUNC(a, b, length, short, LXOR_OP);
1376   } else if (*datatype == MPI_INT) {
1377     APPLY_FUNC(a, b, length, int, LXOR_OP);
1378   } else if (*datatype == MPI_LONG) {
1379     APPLY_FUNC(a, b, length, long, LXOR_OP);
1380   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1381     APPLY_FUNC(a, b, length, unsigned short, LXOR_OP);
1382   } else if (*datatype == MPI_UNSIGNED) {
1383     APPLY_FUNC(a, b, length, unsigned int, LXOR_OP);
1384   } else if (*datatype == MPI_UNSIGNED_LONG) {
1385     APPLY_FUNC(a, b, length, unsigned long, LXOR_OP);
1386   } else if (*datatype == MPI_C_BOOL) {
1387     APPLY_FUNC(a, b, length, _Bool, LXOR_OP);
1388   }
1389 }
1390
1391 static void band_func(void *a, void *b, int *length,
1392                       MPI_Datatype * datatype)
1393 {
1394   if (*datatype == MPI_CHAR) {
1395     APPLY_FUNC(a, b, length, char, BAND_OP);
1396   }
1397   if (*datatype == MPI_SHORT) {
1398     APPLY_FUNC(a, b, length, short, BAND_OP);
1399   } else if (*datatype == MPI_INT) {
1400     APPLY_FUNC(a, b, length, int, BAND_OP);
1401   } else if (*datatype == MPI_LONG) {
1402     APPLY_FUNC(a, b, length, long, BAND_OP);
1403   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1404     APPLY_FUNC(a, b, length, unsigned short, BAND_OP);
1405   } else if (*datatype == MPI_UNSIGNED) {
1406     APPLY_FUNC(a, b, length, unsigned int, BAND_OP);
1407   } else if (*datatype == MPI_UNSIGNED_LONG) {
1408     APPLY_FUNC(a, b, length, unsigned long, BAND_OP);
1409   } else if (*datatype == MPI_BYTE) {
1410     APPLY_FUNC(a, b, length, uint8_t, BAND_OP);
1411   }
1412 }
1413
1414 static void bor_func(void *a, void *b, int *length,
1415                      MPI_Datatype * datatype)
1416 {
1417   if (*datatype == MPI_CHAR) {
1418     APPLY_FUNC(a, b, length, char, BOR_OP);
1419   } else if (*datatype == MPI_SHORT) {
1420     APPLY_FUNC(a, b, length, short, BOR_OP);
1421   } else if (*datatype == MPI_INT) {
1422     APPLY_FUNC(a, b, length, int, BOR_OP);
1423   } else if (*datatype == MPI_LONG) {
1424     APPLY_FUNC(a, b, length, long, BOR_OP);
1425   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1426     APPLY_FUNC(a, b, length, unsigned short, BOR_OP);
1427   } else if (*datatype == MPI_UNSIGNED) {
1428     APPLY_FUNC(a, b, length, unsigned int, BOR_OP);
1429   } else if (*datatype == MPI_UNSIGNED_LONG) {
1430     APPLY_FUNC(a, b, length, unsigned long, BOR_OP);
1431   } else if (*datatype == MPI_BYTE) {
1432     APPLY_FUNC(a, b, length, uint8_t, BOR_OP);
1433   }
1434 }
1435
1436 static void bxor_func(void *a, void *b, int *length,
1437                       MPI_Datatype * datatype)
1438 {
1439   if (*datatype == MPI_CHAR) {
1440     APPLY_FUNC(a, b, length, char, BXOR_OP);
1441   } else if (*datatype == MPI_SHORT) {
1442     APPLY_FUNC(a, b, length, short, BXOR_OP);
1443   } else if (*datatype == MPI_INT) {
1444     APPLY_FUNC(a, b, length, int, BXOR_OP);
1445   } else if (*datatype == MPI_LONG) {
1446     APPLY_FUNC(a, b, length, long, BXOR_OP);
1447   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1448     APPLY_FUNC(a, b, length, unsigned short, BXOR_OP);
1449   } else if (*datatype == MPI_UNSIGNED) {
1450     APPLY_FUNC(a, b, length, unsigned int, BXOR_OP);
1451   } else if (*datatype == MPI_UNSIGNED_LONG) {
1452     APPLY_FUNC(a, b, length, unsigned long, BXOR_OP);
1453   } else if (*datatype == MPI_BYTE) {
1454     APPLY_FUNC(a, b, length, uint8_t, BXOR_OP);
1455   }
1456 }
1457
1458 static void minloc_func(void *a, void *b, int *length,
1459                         MPI_Datatype * datatype)
1460 {
1461   if (*datatype == MPI_FLOAT_INT) {
1462     APPLY_FUNC(a, b, length, float_int, MINLOC_OP);
1463   } else if (*datatype == MPI_LONG_INT) {
1464     APPLY_FUNC(a, b, length, long_int, MINLOC_OP);
1465   } else if (*datatype == MPI_DOUBLE_INT) {
1466     APPLY_FUNC(a, b, length, double_int, MINLOC_OP);
1467   } else if (*datatype == MPI_SHORT_INT) {
1468     APPLY_FUNC(a, b, length, short_int, MINLOC_OP);
1469   } else if (*datatype == MPI_2INT) {
1470     APPLY_FUNC(a, b, length, int_int, MINLOC_OP);
1471   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1472     APPLY_FUNC(a, b, length, long_double_int, MINLOC_OP);
1473   } else if (*datatype == MPI_2FLOAT) {
1474     APPLY_FUNC(a, b, length, float_float, MINLOC_OP);
1475   } else if (*datatype == MPI_2DOUBLE) {
1476     APPLY_FUNC(a, b, length, double_double, MINLOC_OP);
1477   }
1478 }
1479
1480 static void maxloc_func(void *a, void *b, int *length,
1481                         MPI_Datatype * datatype)
1482 {
1483   if (*datatype == MPI_FLOAT_INT) {
1484     APPLY_FUNC(a, b, length, float_int, MAXLOC_OP);
1485   } else if (*datatype == MPI_LONG_INT) {
1486     APPLY_FUNC(a, b, length, long_int, MAXLOC_OP);
1487   } else if (*datatype == MPI_DOUBLE_INT) {
1488     APPLY_FUNC(a, b, length, double_int, MAXLOC_OP);
1489   } else if (*datatype == MPI_SHORT_INT) {
1490     APPLY_FUNC(a, b, length, short_int, MAXLOC_OP);
1491   } else if (*datatype == MPI_2INT) {
1492     APPLY_FUNC(a, b, length, int_int, MAXLOC_OP);
1493   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1494     APPLY_FUNC(a, b, length, long_double_int, MAXLOC_OP);
1495   } else if (*datatype == MPI_2FLOAT) {
1496     APPLY_FUNC(a, b, length, float_float, MAXLOC_OP);
1497   } else if (*datatype == MPI_2DOUBLE) {
1498     APPLY_FUNC(a, b, length, double_double, MAXLOC_OP);
1499   }
1500 }
1501
1502 static void replace_func(void *a, void *b, int *length,
1503                         MPI_Datatype * datatype)
1504 {
1505   memcpy(b, a, *length * smpi_datatype_size(*datatype));
1506 }
1507
1508 #define CREATE_MPI_OP(name, func)                             \
1509   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */, TRUE }; \
1510 MPI_Op name = &mpi_##name;
1511
1512 CREATE_MPI_OP(MPI_MAX, max_func);
1513 CREATE_MPI_OP(MPI_MIN, min_func);
1514 CREATE_MPI_OP(MPI_SUM, sum_func);
1515 CREATE_MPI_OP(MPI_PROD, prod_func);
1516 CREATE_MPI_OP(MPI_LAND, land_func);
1517 CREATE_MPI_OP(MPI_LOR, lor_func);
1518 CREATE_MPI_OP(MPI_LXOR, lxor_func);
1519 CREATE_MPI_OP(MPI_BAND, band_func);
1520 CREATE_MPI_OP(MPI_BOR, bor_func);
1521 CREATE_MPI_OP(MPI_BXOR, bxor_func);
1522 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
1523 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
1524 CREATE_MPI_OP(MPI_REPLACE, replace_func);
1525
1526
1527 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
1528 {
1529   MPI_Op op;
1530   op = xbt_new(s_smpi_mpi_op_t, 1);
1531   op->func = function;
1532   op-> is_commute = commute;
1533   return op;
1534 }
1535
1536 int smpi_op_is_commute(MPI_Op op)
1537 {
1538   return (op==MPI_OP_NULL) ? 1 : op-> is_commute;
1539 }
1540
1541 void smpi_op_destroy(MPI_Op op)
1542 {
1543   xbt_free(op);
1544 }
1545
1546 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
1547                    MPI_Datatype * datatype)
1548 {
1549   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
1550     XBT_VERB("Applying operation, switch to the right data frame ");
1551     switch_data_segment(smpi_process_index());
1552   }
1553
1554   if(!_xbt_replay_is_active())
1555   op->func(invec, inoutvec, len, datatype);
1556 }