Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add MPI_Info_* support.
[simgrid.git] / src / smpi / smpi_mpi_dt.c
1 /* smpi_mpi_dt.c -- MPI primitives to handle datatypes                        */
2 /* FIXME: a very incomplete implementation                                    */
3
4 /* Copyright (c) 2009-2014. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <limits.h>
14 #include "private.h"
15 #include "smpi_mpi_dt_private.h"
16 #include "mc/mc.h"
17 #include "xbt/replay.h"
18 #include "simgrid/modelchecker.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi,
21                                 "Logging specific to SMPI (datatype)");
22
23 #define INTSIZEDCHAR (sizeof(int)*CHAR_BIT-1)/3 + 3 
24 xbt_dict_t smpi_type_keyvals = NULL;
25 int type_keyval_id=0;//avoid collisions
26
27 #define CREATE_MPI_DATATYPE(name, type)       \
28   static s_smpi_mpi_datatype_t mpi_##name = { \
29     (char*) # name,                                   \
30     sizeof(type),  /* size */                 \
31     0,             /*was 1 has_subtype*/             \
32     0,             /* lb */                   \
33     sizeof(type),  /* ub = lb + size */       \
34     DT_FLAG_BASIC,  /* flags */              \
35     NULL,           /* attributes */         \
36     NULL,           /* pointer on extended struct*/ \
37   };                                          \
38 MPI_Datatype name = &mpi_##name;
39
40 #define CREATE_MPI_DATATYPE_NULL(name)       \
41   static s_smpi_mpi_datatype_t mpi_##name = { \
42     (char*) # name,                                   \
43     0,  /* size */                 \
44     0,             /*was 1 has_subtype*/             \
45     0,             /* lb */                   \
46     0,  /* ub = lb + size */       \
47     DT_FLAG_BASIC,  /* flags */              \
48     NULL,           /* attributes */         \
49     NULL           /* pointer on extended struct*/ \
50   };                                          \
51 MPI_Datatype name = &mpi_##name;
52
53 //The following are datatypes for the MPI functions MPI_MAXLOC and MPI_MINLOC.
54 typedef struct {
55   float value;
56   int index;
57 } float_int;
58 typedef struct {
59   float value;
60   float index;
61 } float_float;
62 typedef struct {
63   long value;
64   long index;
65 } long_long;
66 typedef struct {
67   double value;
68   double index;
69 } double_double;
70 typedef struct {
71   long value;
72   int index;
73 } long_int;
74 typedef struct {
75   double value;
76   int index;
77 } double_int;
78 typedef struct {
79   short value;
80   int index;
81 } short_int;
82 typedef struct {
83   int value;
84   int index;
85 } int_int;
86 typedef struct {
87   long double value;
88   int index;
89 } long_double_int;
90 typedef struct {
91   int64_t value;
92   int64_t index;
93 } integer128_t;
94 // Predefined data types
95 CREATE_MPI_DATATYPE(MPI_CHAR, char);
96 CREATE_MPI_DATATYPE(MPI_SHORT, short);
97 CREATE_MPI_DATATYPE(MPI_INT, int);
98 CREATE_MPI_DATATYPE(MPI_LONG, long);
99 CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long);
100 CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char);
101 CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char);
102 CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short);
103 CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int);
104 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long);
105 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long);
106 CREATE_MPI_DATATYPE(MPI_FLOAT, float);
107 CREATE_MPI_DATATYPE(MPI_DOUBLE, double);
108 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double);
109 CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t);
110 CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool);
111 CREATE_MPI_DATATYPE(MPI_BYTE, int8_t);
112 CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t);
113 CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t);
114 CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t);
115 CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t);
116 CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t);
117 CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t);
118 CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t);
119 CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t);
120 CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex);
121 CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex);
122 CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex);
123 CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint);
124 CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset);
125
126 CREATE_MPI_DATATYPE(MPI_FLOAT_INT, float_int);
127 CREATE_MPI_DATATYPE(MPI_LONG_INT, long_int);
128 CREATE_MPI_DATATYPE(MPI_DOUBLE_INT, double_int);
129 CREATE_MPI_DATATYPE(MPI_SHORT_INT, short_int);
130 CREATE_MPI_DATATYPE(MPI_2INT, int_int);
131 CREATE_MPI_DATATYPE(MPI_2FLOAT, float_float);
132 CREATE_MPI_DATATYPE(MPI_2DOUBLE, double_double);
133 CREATE_MPI_DATATYPE(MPI_2LONG, long_long);
134
135 CREATE_MPI_DATATYPE(MPI_REAL, float);
136 CREATE_MPI_DATATYPE(MPI_REAL4, float);
137 CREATE_MPI_DATATYPE(MPI_REAL8, float);
138 CREATE_MPI_DATATYPE(MPI_REAL16, double);
139 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX8);
140 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX16);
141 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX32);
142 CREATE_MPI_DATATYPE(MPI_INTEGER1, int);
143 CREATE_MPI_DATATYPE(MPI_INTEGER2, int16_t);
144 CREATE_MPI_DATATYPE(MPI_INTEGER4, int32_t);
145 CREATE_MPI_DATATYPE(MPI_INTEGER8, int64_t);
146 CREATE_MPI_DATATYPE(MPI_INTEGER16, integer128_t);
147
148 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE_INT, long_double_int);
149
150 CREATE_MPI_DATATYPE_NULL(MPI_UB);
151 CREATE_MPI_DATATYPE_NULL(MPI_LB);
152 CREATE_MPI_DATATYPE_NULL(MPI_PACKED);
153 // Internal use only
154 CREATE_MPI_DATATYPE(MPI_PTR, void*);
155
156 /** Check if the datatype is usable for communications
157  */
158 int is_datatype_valid(MPI_Datatype datatype) {
159     return datatype != MPI_DATATYPE_NULL
160         && (datatype->flags & DT_FLAG_COMMITED);
161 }
162
163 size_t smpi_datatype_size(MPI_Datatype datatype)
164 {
165   return datatype->size;
166 }
167
168 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype)
169 {
170   return datatype->lb;
171 }
172
173 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
174 {
175   return datatype->ub;
176 }
177
178 int smpi_datatype_dup(MPI_Datatype datatype, MPI_Datatype* new_t)
179 {
180   int ret=MPI_SUCCESS;
181   *new_t= xbt_new(s_smpi_mpi_datatype_t,1);
182   memcpy(*new_t, datatype, sizeof(s_smpi_mpi_datatype_t));
183   if (datatype->has_subtype){
184     //FIXME: may copy too much information.
185     (*new_t)->substruct=xbt_malloc(sizeof(s_smpi_mpi_struct_t));
186     memcpy((*new_t)->substruct, datatype->substruct, sizeof(s_smpi_mpi_struct_t));
187   }
188   if(datatype->name)
189     (*new_t)->name = strdup(datatype->name);
190   if(datatype->attributes !=NULL){
191       (*new_t)->attributes=xbt_dict_new();
192       xbt_dict_cursor_t cursor = NULL;
193       int *key;
194       int flag;
195       void* value_in;
196       void* value_out;
197       xbt_dict_foreach(datatype->attributes, cursor, key, value_in){
198         smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)key);
199         if(elem && elem->copy_fn!=MPI_NULL_COPY_FN){
200           ret = elem->copy_fn(datatype, atoi((const char*)key), NULL, value_in, &value_out, &flag );
201           if(ret!=MPI_SUCCESS){
202             *new_t=MPI_DATATYPE_NULL;
203             return ret;
204           }
205           if(flag)
206             xbt_dict_set((*new_t)->attributes, (const char*)key,value_out, NULL);
207         }
208       }
209     }
210   return ret;
211 }
212
213 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
214                          MPI_Aint * extent)
215 {
216   if(datatype == MPI_DATATYPE_NULL){
217     *lb=0;
218     *extent=0;
219     return MPI_SUCCESS;
220   }
221   *lb = datatype->lb;
222   *extent = datatype->ub - datatype->lb;
223   return MPI_SUCCESS;
224 }
225
226 MPI_Aint smpi_datatype_get_extent(MPI_Datatype datatype){
227   if(datatype == MPI_DATATYPE_NULL){
228     return 0;
229   }
230   return datatype->ub - datatype->lb;
231 }
232
233 void smpi_datatype_get_name(MPI_Datatype datatype, char* name, int* length){
234   *length = strlen(datatype->name);
235   strcpy(name, datatype->name);
236 }
237
238 void smpi_datatype_set_name(MPI_Datatype datatype, char* name){
239   datatype->name = strdup(name);;
240 }
241
242 int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
243                        void *recvbuf, int recvcount, MPI_Datatype recvtype)
244 {
245   int count;
246   if(smpi_privatize_global_variables){
247     smpi_switch_data_segment(smpi_process_index());
248   }
249   /* First check if we really have something to do */
250   if (recvcount > 0 && recvbuf != sendbuf) {
251     /* FIXME: treat packed cases */
252     sendcount *= smpi_datatype_size(sendtype);
253     recvcount *= smpi_datatype_size(recvtype);
254     count = sendcount < recvcount ? sendcount : recvcount;
255
256     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
257       if(!smpi_process_get_replaying()) memcpy(recvbuf, sendbuf, count);
258     }
259     else if (sendtype->has_subtype == 0)
260     {
261       s_smpi_subtype_t *subtype =  recvtype->substruct;
262       subtype->unserialize( sendbuf, recvbuf,1, subtype, MPI_REPLACE);
263     }
264     else if (recvtype->has_subtype == 0)
265     {
266       s_smpi_subtype_t *subtype =  sendtype->substruct;
267       subtype->serialize(sendbuf, recvbuf,1, subtype);
268     }else{
269       s_smpi_subtype_t *subtype =  sendtype->substruct;
270
271
272       void * buf_tmp = xbt_malloc(count);
273
274       subtype->serialize( sendbuf, buf_tmp,count/smpi_datatype_size(sendtype), subtype);
275       subtype =  recvtype->substruct;
276       subtype->unserialize( buf_tmp, recvbuf,count/smpi_datatype_size(recvtype), subtype, MPI_REPLACE);
277
278       free(buf_tmp);
279     }
280   }
281
282   return sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
283 }
284
285 /*
286  *  Copies noncontiguous data into contiguous memory.
287  *  @param contiguous_vector - output vector
288  *  @param noncontiguous_vector - input vector
289  *  @param type - pointer contening :
290  *      - stride - stride of between noncontiguous data
291  *      - block_length - the width or height of blocked matrix
292  *      - count - the number of rows of matrix
293  */
294 void serialize_vector( const void *noncontiguous_vector,
295                        void *contiguous_vector,
296                        int count,
297                        void *type)
298 {
299   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
300   int i;
301   char* contiguous_vector_char = (char*)contiguous_vector;
302   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
303
304   for (i = 0; i < type_c->block_count * count; i++) {
305       if (type_c->old_type->has_subtype == 0)
306         memcpy(contiguous_vector_char,
307                noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
308       else
309         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
310                                                                      contiguous_vector_char,
311                                                                      type_c->block_length,
312                                                                      type_c->old_type->substruct);
313
314     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
315     if((i+1)%type_c->block_count ==0)
316     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
317     else
318     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
319   }
320 }
321
322 /*
323  *  Copies contiguous data into noncontiguous memory.
324  *  @param noncontiguous_vector - output vector
325  *  @param contiguous_vector - input vector
326  *  @param type - pointer contening :
327  *      - stride - stride of between noncontiguous data
328  *      - block_length - the width or height of blocked matrix
329  *      - count - the number of rows of matrix
330  */
331 void unserialize_vector( const void *contiguous_vector,
332                          void *noncontiguous_vector,
333                          int count,
334                          void *type,
335                          MPI_Op op)
336 {
337   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
338   int i;
339
340   char* contiguous_vector_char = (char*)contiguous_vector;
341   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
342
343   for (i = 0; i < type_c->block_count * count; i++) {
344     if (type_c->old_type->has_subtype == 0)
345       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
346           &type_c->old_type);
347      /* memcpy(noncontiguous_vector_char,
348              contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
349     else
350       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
351                                                                      noncontiguous_vector_char,
352                                                                      type_c->block_length,
353                                                                      type_c->old_type->substruct,
354                                                                      op);
355     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
356     if((i+1)%type_c->block_count ==0)
357     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
358     else
359     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
360   }
361 }
362
363 /*
364  * Create a Sub type vector to be able to serialize and unserialize it
365  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
366  * required the functions unserialize and serialize
367  *
368  */
369 s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
370                                                   int block_length,
371                                                   int block_count,
372                                                   MPI_Datatype old_type,
373                                                   int size_oldtype){
374   s_smpi_mpi_vector_t *new_t= xbt_new(s_smpi_mpi_vector_t,1);
375   new_t->base.serialize = &serialize_vector;
376   new_t->base.unserialize = &unserialize_vector;
377   new_t->base.subtype_free = &free_vector;
378   new_t->block_stride = block_stride;
379   new_t->block_length = block_length;
380   new_t->block_count = block_count;
381   smpi_datatype_use(old_type);
382   new_t->old_type = old_type;
383   new_t->size_oldtype = size_oldtype;
384   return new_t;
385 }
386
387 void smpi_datatype_create(MPI_Datatype* new_type, int size,int lb, int ub, int has_subtype,
388                           void *struct_type, int flags){
389   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
390   new_t->name = NULL;
391   new_t->size = size;
392   new_t->has_subtype = size>0? has_subtype:0;
393   new_t->lb = lb;
394   new_t->ub = ub;
395   new_t->flags = flags;
396   new_t->substruct = struct_type;
397   new_t->in_use=0;
398   new_t->attributes=NULL;
399   *new_type = new_t;
400
401 #ifdef HAVE_MC
402   if(MC_is_active())
403     MC_ignore(&(new_t->in_use), sizeof(new_t->in_use));
404 #endif
405 }
406
407 void smpi_datatype_free(MPI_Datatype* type){
408   if((*type)->attributes !=NULL){
409       xbt_dict_cursor_t cursor = NULL;
410       int* key;
411       void * value;
412       int flag;
413       xbt_dict_foreach((*type)->attributes, cursor, key, value){
414         smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)key);
415         if(elem &&  elem->delete_fn)
416           elem->delete_fn(*type, atoi((const char*)key), value, &flag);
417       }
418   }
419
420   if((*type)->flags & DT_FLAG_PREDEFINED)return;
421
422   //if still used, mark for deletion
423   if((*type)->in_use!=0){
424       (*type)->flags |=DT_FLAG_DESTROYED;
425       return;
426   }
427
428   if ((*type)->has_subtype == 1){
429     ((s_smpi_subtype_t *)(*type)->substruct)->subtype_free(type);  
430     xbt_free((*type)->substruct);
431   }
432   if ((*type)->name != NULL){
433     xbt_free((*type)->name);
434   }
435   xbt_free(*type);
436   *type = MPI_DATATYPE_NULL;
437 }
438
439 void smpi_datatype_use(MPI_Datatype type){
440   if(type)type->in_use++;
441
442 #ifdef HAVE_MC
443   if(MC_is_active())
444     MC_ignore(&(type->in_use), sizeof(type->in_use));
445 #endif
446 }
447
448
449 void smpi_datatype_unuse(MPI_Datatype type){
450   if(type && type->in_use-- == 0 && (type->flags & DT_FLAG_DESTROYED))
451     smpi_datatype_free(&type);
452
453 #ifdef HAVE_MC
454   if(MC_is_active())
455     MC_ignore(&(type->in_use), sizeof(type->in_use));
456 #endif
457 }
458
459
460
461
462 /*
463 Contiguous Implementation
464 */
465
466
467 /*
468  *  Copies noncontiguous data into contiguous memory.
469  *  @param contiguous_hvector - output hvector
470  *  @param noncontiguous_hvector - input hvector
471  *  @param type - pointer contening :
472  *      - stride - stride of between noncontiguous data, in bytes
473  *      - block_length - the width or height of blocked matrix
474  *      - count - the number of rows of matrix
475  */
476 void serialize_contiguous( const void *noncontiguous_hvector,
477                        void *contiguous_hvector,
478                        int count,
479                        void *type)
480 {
481   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
482   char* contiguous_vector_char = (char*)contiguous_hvector;
483   char* noncontiguous_vector_char = (char*)noncontiguous_hvector+type_c->lb;
484   memcpy(contiguous_vector_char,
485            noncontiguous_vector_char, count* type_c->block_count * type_c->size_oldtype);
486 }
487 /*
488  *  Copies contiguous data into noncontiguous memory.
489  *  @param noncontiguous_vector - output hvector
490  *  @param contiguous_vector - input hvector
491  *  @param type - pointer contening :
492  *      - stride - stride of between noncontiguous data, in bytes
493  *      - block_length - the width or height of blocked matrix
494  *      - count - the number of rows of matrix
495  */
496 void unserialize_contiguous( const void *contiguous_vector,
497                          void *noncontiguous_vector,
498                          int count,
499                          void *type,
500                          MPI_Op op)
501 {
502   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
503   char* contiguous_vector_char = (char*)contiguous_vector;
504   char* noncontiguous_vector_char = (char*)noncontiguous_vector+type_c->lb;
505   int n= count* type_c->block_count;
506   smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &n,
507             &type_c->old_type);
508        /*memcpy(noncontiguous_vector_char,
509            contiguous_vector_char, count*  type_c->block_count * type_c->size_oldtype);*/
510 }
511
512 void free_contiguous(MPI_Datatype* d){
513   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
514 }
515
516 /*
517  * Create a Sub type contiguous to be able to serialize and unserialize it
518  * the structure s_smpi_mpi_contiguous_t is derived from s_smpi_subtype which
519  * required the functions unserialize and serialize
520  *
521  */
522 s_smpi_mpi_contiguous_t* smpi_datatype_contiguous_create( MPI_Aint lb,
523                                                   int block_count,
524                                                   MPI_Datatype old_type,
525                                                   int size_oldtype){
526   s_smpi_mpi_contiguous_t *new_t= xbt_new(s_smpi_mpi_contiguous_t,1);
527   new_t->base.serialize = &serialize_contiguous;
528   new_t->base.unserialize = &unserialize_contiguous;
529   new_t->base.subtype_free = &free_contiguous;
530   new_t->lb = lb;
531   new_t->block_count = block_count;
532   new_t->old_type = old_type;
533   new_t->size_oldtype = size_oldtype;
534   smpi_datatype_use(old_type);
535   return new_t;
536 }
537
538
539
540
541 int smpi_datatype_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* new_type, MPI_Aint lb)
542 {
543   int retval;
544   if(old_type->has_subtype){
545           //handle this case as a hvector with stride equals to the extent of the datatype
546           return smpi_datatype_hvector(count, 1, smpi_datatype_get_extent(old_type), old_type, new_type);
547   }
548   
549   s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
550                                                                 count,
551                                                                 old_type,
552                                                                 smpi_datatype_size(old_type));
553                                                                 
554   smpi_datatype_create(new_type,
555                                           count * smpi_datatype_size(old_type),
556                                           lb,lb + count * smpi_datatype_size(old_type),
557                                           1,subtype, DT_FLAG_CONTIGUOUS);
558   retval=MPI_SUCCESS;
559   return retval;
560 }
561
562 int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_type, MPI_Datatype* new_type)
563 {
564   int retval;
565   if (blocklen<0) return MPI_ERR_ARG;
566   MPI_Aint lb = 0;
567   MPI_Aint ub = 0;
568   if(count>0){
569     lb=smpi_datatype_lb(old_type);
570     ub=((count-1)*stride+blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
571   }
572   if(old_type->has_subtype || stride != blocklen){
573
574
575     s_smpi_mpi_vector_t* subtype = smpi_datatype_vector_create( stride,
576                                                                 blocklen,
577                                                                 count,
578                                                                 old_type,
579                                                                 smpi_datatype_size(old_type));
580     smpi_datatype_create(new_type,
581                          count * (blocklen) * smpi_datatype_size(old_type), lb,
582                          ub,
583                          1,
584                          subtype,
585                          DT_FLAG_VECTOR);
586     retval=MPI_SUCCESS;
587   }else{
588     /* in this situation the data are contignous thus it's not
589      * required to serialize and unserialize it*/
590     smpi_datatype_create(new_type, count * blocklen *
591                          smpi_datatype_size(old_type), 0, ((count -1) * stride + blocklen)*
592                          smpi_datatype_size(old_type),
593                          0,
594                          NULL,
595                          DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
596     retval=MPI_SUCCESS;
597   }
598   return retval;
599 }
600
601 void free_vector(MPI_Datatype* d){
602   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
603 }
604
605 /*
606 Hvector Implementation - Vector with stride in bytes
607 */
608
609
610 /*
611  *  Copies noncontiguous data into contiguous memory.
612  *  @param contiguous_hvector - output hvector
613  *  @param noncontiguous_hvector - input hvector
614  *  @param type - pointer contening :
615  *      - stride - stride of between noncontiguous data, in bytes
616  *      - block_length - the width or height of blocked matrix
617  *      - count - the number of rows of matrix
618  */
619 void serialize_hvector( const void *noncontiguous_hvector,
620                        void *contiguous_hvector,
621                        int count,
622                        void *type)
623 {
624   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
625   int i;
626   char* contiguous_vector_char = (char*)contiguous_hvector;
627   char* noncontiguous_vector_char = (char*)noncontiguous_hvector;
628
629   for (i = 0; i < type_c->block_count * count; i++) {
630     if (type_c->old_type->has_subtype == 0)
631       memcpy(contiguous_vector_char,
632            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
633     else
634       ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
635                                                                    contiguous_vector_char,
636                                                                    type_c->block_length,
637                                                                    type_c->old_type->substruct);
638
639     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
640     if((i+1)%type_c->block_count ==0)
641     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
642     else
643     noncontiguous_vector_char += type_c->block_stride;
644   }
645 }
646 /*
647  *  Copies contiguous data into noncontiguous memory.
648  *  @param noncontiguous_vector - output hvector
649  *  @param contiguous_vector - input hvector
650  *  @param type - pointer contening :
651  *      - stride - stride of between noncontiguous data, in bytes
652  *      - block_length - the width or height of blocked matrix
653  *      - count - the number of rows of matrix
654  */
655 void unserialize_hvector( const void *contiguous_vector,
656                          void *noncontiguous_vector,
657                          int count,
658                          void *type,
659                          MPI_Op op)
660 {
661   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
662   int i;
663
664   char* contiguous_vector_char = (char*)contiguous_vector;
665   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
666
667   for (i = 0; i < type_c->block_count * count; i++) {
668     if (type_c->old_type->has_subtype == 0)
669       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
670                   &type_c->old_type);
671              /*memcpy(noncontiguous_vector_char,
672            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
673     else
674       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
675                                                                      noncontiguous_vector_char,
676                                                                      type_c->block_length,
677                                                                      type_c->old_type->substruct,
678                                                                      op);
679     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
680     if((i+1)%type_c->block_count ==0)
681     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
682     else
683     noncontiguous_vector_char += type_c->block_stride;
684   }
685 }
686
687 /*
688  * Create a Sub type vector to be able to serialize and unserialize it
689  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
690  * required the functions unserialize and serialize
691  *
692  */
693 s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
694                                                   int block_length,
695                                                   int block_count,
696                                                   MPI_Datatype old_type,
697                                                   int size_oldtype){
698   s_smpi_mpi_hvector_t *new_t= xbt_new(s_smpi_mpi_hvector_t,1);
699   new_t->base.serialize = &serialize_hvector;
700   new_t->base.unserialize = &unserialize_hvector;
701   new_t->base.subtype_free = &free_hvector;
702   new_t->block_stride = block_stride;
703   new_t->block_length = block_length;
704   new_t->block_count = block_count;
705   new_t->old_type = old_type;
706   new_t->size_oldtype = size_oldtype;
707   smpi_datatype_use(old_type);
708   return new_t;
709 }
710
711 //do nothing for vector types
712 void free_hvector(MPI_Datatype* d){
713   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
714 }
715
716 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
717 {
718   int retval;
719   if (blocklen<0) return MPI_ERR_ARG;
720   MPI_Aint lb = 0;
721   MPI_Aint ub = 0;
722   if(count>0){
723     lb=smpi_datatype_lb(old_type);
724     ub=((count-1)*stride)+(blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
725   }
726   if(old_type->has_subtype || stride != blocklen*smpi_datatype_get_extent(old_type)){
727     s_smpi_mpi_hvector_t* subtype = smpi_datatype_hvector_create( stride,
728                                                                   blocklen,
729                                                                   count,
730                                                                   old_type,
731                                                                   smpi_datatype_size(old_type));
732
733     smpi_datatype_create(new_type, count * blocklen * smpi_datatype_size(old_type),
734                                                  lb,ub,
735                          1,
736                          subtype,
737                          DT_FLAG_VECTOR);
738     retval=MPI_SUCCESS;
739   }else{
740     smpi_datatype_create(new_type, count * blocklen *
741                                              smpi_datatype_size(old_type),0,count * blocklen *
742                                              smpi_datatype_size(old_type),
743                                             0,
744                                             NULL,
745                                             DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
746     retval=MPI_SUCCESS;
747   }
748   return retval;
749 }
750
751
752 /*
753 Indexed Implementation
754 */
755
756 /*
757  *  Copies noncontiguous data into contiguous memory.
758  *  @param contiguous_indexed - output indexed
759  *  @param noncontiguous_indexed - input indexed
760  *  @param type - pointer contening :
761  *      - block_lengths - the width or height of blocked matrix
762  *      - block_indices - indices of each data, in element
763  *      - count - the number of rows of matrix
764  */
765 void serialize_indexed( const void *noncontiguous_indexed,
766                        void *contiguous_indexed,
767                        int count,
768                        void *type)
769 {
770   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
771   int i,j;
772   char* contiguous_indexed_char = (char*)contiguous_indexed;
773   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0] * type_c->size_oldtype;
774   for(j=0; j<count;j++){
775     for (i = 0; i < type_c->block_count; i++) {
776       if (type_c->old_type->has_subtype == 0)
777         memcpy(contiguous_indexed_char,
778                      noncontiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
779       else
780         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_indexed_char,
781                                                                      contiguous_indexed_char,
782                                                                      type_c->block_lengths[i],
783                                                                      type_c->old_type->substruct);
784
785
786       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
787       if (i<type_c->block_count-1)noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
788       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
789     }
790     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
791   }
792 }
793 /*
794  *  Copies contiguous data into noncontiguous memory.
795  *  @param noncontiguous_indexed - output indexed
796  *  @param contiguous_indexed - input indexed
797  *  @param type - pointer contening :
798  *      - block_lengths - the width or height of blocked matrix
799  *      - block_indices - indices of each data, in element
800  *      - count - the number of rows of matrix
801  */
802 void unserialize_indexed( const void *contiguous_indexed,
803                          void *noncontiguous_indexed,
804                          int count,
805                          void *type,
806                          MPI_Op op)
807 {
808
809   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
810   int i,j;
811   char* contiguous_indexed_char = (char*)contiguous_indexed;
812   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0]*smpi_datatype_get_extent(type_c->old_type);
813   for(j=0; j<count;j++){
814     for (i = 0; i < type_c->block_count; i++) {
815       if (type_c->old_type->has_subtype == 0)
816         smpi_op_apply(op, contiguous_indexed_char, noncontiguous_indexed_char, &type_c->block_lengths[i],
817                     &type_c->old_type);
818                /*memcpy(noncontiguous_indexed_char ,
819              contiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
820       else
821         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_indexed_char,
822                                                                        noncontiguous_indexed_char,
823                                                                        type_c->block_lengths[i],
824                                                                        type_c->old_type->substruct,
825                                                                        op);
826
827       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
828       if (i<type_c->block_count-1)
829         noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
830       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
831     }
832     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
833   }
834 }
835
836 void free_indexed(MPI_Datatype* type){
837   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths);
838   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices);
839   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
840 }
841
842 /*
843  * Create a Sub type indexed to be able to serialize and unserialize it
844  * the structure s_smpi_mpi_indexed_t is derived from s_smpi_subtype which
845  * required the functions unserialize and serialize
846  */
847 s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
848                                                   int* block_indices,
849                                                   int block_count,
850                                                   MPI_Datatype old_type,
851                                                   int size_oldtype){
852   s_smpi_mpi_indexed_t *new_t= xbt_new(s_smpi_mpi_indexed_t,1);
853   new_t->base.serialize = &serialize_indexed;
854   new_t->base.unserialize = &unserialize_indexed;
855   new_t->base.subtype_free = &free_indexed;
856  //TODO : add a custom function for each time to clean these 
857   new_t->block_lengths= xbt_new(int, block_count);
858   new_t->block_indices= xbt_new(int, block_count);
859   int i;
860   for(i=0;i<block_count;i++){
861     new_t->block_lengths[i]=block_lengths[i];
862     new_t->block_indices[i]=block_indices[i];
863   }
864   new_t->block_count = block_count;
865   smpi_datatype_use(old_type);
866   new_t->old_type = old_type;
867   new_t->size_oldtype = size_oldtype;
868   return new_t;
869 }
870
871
872 int smpi_datatype_indexed(int count, int* blocklens, int* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
873 {
874   int i;
875   int retval;
876   int size = 0;
877   int contiguous=1;
878   MPI_Aint lb = 0;
879   MPI_Aint ub = 0;
880   if(count>0){
881     lb=indices[0]*smpi_datatype_get_extent(old_type);
882     ub=indices[0]*smpi_datatype_get_extent(old_type) + blocklens[0]*smpi_datatype_ub(old_type);
883   }
884
885   for(i=0; i< count; i++){
886     if   (blocklens[i]<0)
887       return MPI_ERR_ARG;
888     size += blocklens[i];
889
890     if(indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type)<lb)
891         lb = indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type);
892     if(indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type)>ub)
893         ub = indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type);
894
895     if ( (i< count -1) && (indices[i]+blocklens[i] != indices[i+1]) )contiguous=0;
896   }
897   if (old_type->has_subtype == 1)
898     contiguous=0;
899
900   if(!contiguous){
901     s_smpi_mpi_indexed_t* subtype = smpi_datatype_indexed_create( blocklens,
902                                                                   indices,
903                                                                   count,
904                                                                   old_type,
905                                                                   smpi_datatype_size(old_type));
906      smpi_datatype_create(new_type,  size *
907                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA);
908   }else{
909     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
910                                                                   size,
911                                                                   old_type,
912                                                                   smpi_datatype_size(old_type));
913     smpi_datatype_create(new_type,  size *
914                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
915   }
916   retval=MPI_SUCCESS;
917   return retval;
918 }
919
920
921 /*
922 Hindexed Implementation - Indexed with indices in bytes 
923 */
924
925 /*
926  *  Copies noncontiguous data into contiguous memory.
927  *  @param contiguous_hindexed - output hindexed
928  *  @param noncontiguous_hindexed - input hindexed
929  *  @param type - pointer contening :
930  *      - block_lengths - the width or height of blocked matrix
931  *      - block_indices - indices of each data, in bytes
932  *      - count - the number of rows of matrix
933  */
934 void serialize_hindexed( const void *noncontiguous_hindexed,
935                        void *contiguous_hindexed,
936                        int count,
937                        void *type)
938 {
939   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
940   int i,j;
941   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
942   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
943   for(j=0; j<count;j++){
944     for (i = 0; i < type_c->block_count; i++) {
945       if (type_c->old_type->has_subtype == 0)
946         memcpy(contiguous_hindexed_char,
947                      noncontiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
948       else
949         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_hindexed_char,
950                                                                      contiguous_hindexed_char,
951                                                                      type_c->block_lengths[i],
952                                                                      type_c->old_type->substruct);
953
954       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
955       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
956       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
957     }
958     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
959   }
960 }
961 /*
962  *  Copies contiguous data into noncontiguous memory.
963  *  @param noncontiguous_hindexed - output hindexed
964  *  @param contiguous_hindexed - input hindexed
965  *  @param type - pointer contening :
966  *      - block_lengths - the width or height of blocked matrix
967  *      - block_indices - indices of each data, in bytes
968  *      - count - the number of rows of matrix
969  */
970 void unserialize_hindexed( const void *contiguous_hindexed,
971                          void *noncontiguous_hindexed,
972                          int count,
973                          void *type,
974                          MPI_Op op)
975 {
976   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
977   int i,j;
978
979   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
980   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
981   for(j=0; j<count;j++){
982     for (i = 0; i < type_c->block_count; i++) {
983       if (type_c->old_type->has_subtype == 0)
984         smpi_op_apply(op, contiguous_hindexed_char, noncontiguous_hindexed_char, &type_c->block_lengths[i],
985                             &type_c->old_type);
986                        /*memcpy(noncontiguous_hindexed_char,
987                contiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
988       else
989         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_hindexed_char,
990                                                                        noncontiguous_hindexed_char,
991                                                                        type_c->block_lengths[i],
992                                                                        type_c->old_type->substruct,
993                                                                        op);
994
995       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
996       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
997       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
998     }
999     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
1000   }
1001 }
1002
1003 void free_hindexed(MPI_Datatype* type){
1004   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths);
1005   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices);
1006   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
1007 }
1008
1009 /*
1010  * Create a Sub type hindexed to be able to serialize and unserialize it
1011  * the structure s_smpi_mpi_hindexed_t is derived from s_smpi_subtype which
1012  * required the functions unserialize and serialize
1013  */
1014 s_smpi_mpi_hindexed_t* smpi_datatype_hindexed_create( int* block_lengths,
1015                                                   MPI_Aint* block_indices,
1016                                                   int block_count,
1017                                                   MPI_Datatype old_type,
1018                                                   int size_oldtype){
1019   s_smpi_mpi_hindexed_t *new_t= xbt_new(s_smpi_mpi_hindexed_t,1);
1020   new_t->base.serialize = &serialize_hindexed;
1021   new_t->base.unserialize = &unserialize_hindexed;
1022   new_t->base.subtype_free = &free_hindexed;
1023  //TODO : add a custom function for each time to clean these 
1024   new_t->block_lengths= xbt_new(int, block_count);
1025   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1026   int i;
1027   for(i=0;i<block_count;i++){
1028     new_t->block_lengths[i]=block_lengths[i];
1029     new_t->block_indices[i]=block_indices[i];
1030   }
1031   new_t->block_count = block_count;
1032   new_t->old_type = old_type;
1033   new_t->size_oldtype = size_oldtype;
1034   return new_t;
1035 }
1036
1037
1038 int smpi_datatype_hindexed(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
1039 {
1040   int i;
1041   int retval;
1042   int size = 0;
1043   int contiguous=1;
1044   MPI_Aint lb = 0;
1045   MPI_Aint ub = 0;
1046   if(count>0){
1047     lb=indices[0] + smpi_datatype_lb(old_type);
1048     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_type);
1049   }
1050   for(i=0; i< count; i++){
1051     if   (blocklens[i]<0)
1052       return MPI_ERR_ARG;
1053     size += blocklens[i];
1054
1055     if(indices[i]+smpi_datatype_lb(old_type)<lb) lb = indices[i]+smpi_datatype_lb(old_type);
1056     if(indices[i]+blocklens[i]*smpi_datatype_ub(old_type)>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_type);
1057
1058     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_type) != indices[i+1]) )contiguous=0;
1059   }
1060   if (old_type->has_subtype == 1 || lb!=0)
1061     contiguous=0;
1062
1063   if(!contiguous){
1064     s_smpi_mpi_hindexed_t* subtype = smpi_datatype_hindexed_create( blocklens,
1065                                                                   indices,
1066                                                                   count,
1067                                                                   old_type,
1068                                                                   smpi_datatype_size(old_type));
1069     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1070                                                  lb,
1071                          ub
1072                          ,1, subtype, DT_FLAG_DATA);
1073   }else{
1074     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1075                                                                   size,
1076                                                                   old_type,
1077                                                                   smpi_datatype_size(old_type));
1078     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1079                                              0,size * smpi_datatype_size(old_type),
1080                                              1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1081   }
1082   retval=MPI_SUCCESS;
1083   return retval;
1084 }
1085
1086
1087 /*
1088 struct Implementation - Indexed with indices in bytes 
1089 */
1090
1091 /*
1092  *  Copies noncontiguous data into contiguous memory.
1093  *  @param contiguous_struct - output struct
1094  *  @param noncontiguous_struct - input struct
1095  *  @param type - pointer contening :
1096  *      - stride - stride of between noncontiguous data
1097  *      - block_length - the width or height of blocked matrix
1098  *      - count - the number of rows of matrix
1099  */
1100 void serialize_struct( const void *noncontiguous_struct,
1101                        void *contiguous_struct,
1102                        int count,
1103                        void *type)
1104 {
1105   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1106   int i,j;
1107   char* contiguous_struct_char = (char*)contiguous_struct;
1108   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1109   for(j=0; j<count;j++){
1110     for (i = 0; i < type_c->block_count; i++) {
1111       if (type_c->old_types[i]->has_subtype == 0)
1112         memcpy(contiguous_struct_char,
1113              noncontiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
1114       else
1115         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->serialize( noncontiguous_struct_char,
1116                                                                          contiguous_struct_char,
1117                                                                          type_c->block_lengths[i],
1118                                                                          type_c->old_types[i]->substruct);
1119
1120
1121       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1122       if (i<type_c->block_count-1)noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
1123       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);//let's hope this is MPI_UB ?
1124     }
1125     noncontiguous_struct=(void*)noncontiguous_struct_char;
1126   }
1127 }
1128 /*
1129  *  Copies contiguous data into noncontiguous memory.
1130  *  @param noncontiguous_struct - output struct
1131  *  @param contiguous_struct - input struct
1132  *  @param type - pointer contening :
1133  *      - stride - stride of between noncontiguous data
1134  *      - block_length - the width or height of blocked matrix
1135  *      - count - the number of rows of matrix
1136  */
1137 void unserialize_struct( const void *contiguous_struct,
1138                          void *noncontiguous_struct,
1139                          int count,
1140                          void *type,
1141                          MPI_Op op)
1142 {
1143   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1144   int i,j;
1145
1146   char* contiguous_struct_char = (char*)contiguous_struct;
1147   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1148   for(j=0; j<count;j++){
1149     for (i = 0; i < type_c->block_count; i++) {
1150       if (type_c->old_types[i]->has_subtype == 0)
1151         smpi_op_apply(op, contiguous_struct_char, noncontiguous_struct_char, &type_c->block_lengths[i],
1152            & type_c->old_types[i]);
1153                        /*memcpy(noncontiguous_struct_char,
1154              contiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));*/
1155       else
1156         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->unserialize( contiguous_struct_char,
1157                                                                            noncontiguous_struct_char,
1158                                                                            type_c->block_lengths[i],
1159                                                                            type_c->old_types[i]->substruct,
1160                                                                            op);
1161
1162       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1163       if (i<type_c->block_count-1)noncontiguous_struct_char =  (char*)noncontiguous_struct + type_c->block_indices[i+1];
1164       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);
1165     }
1166     noncontiguous_struct=(void*)noncontiguous_struct_char;
1167     
1168   }
1169 }
1170
1171 void free_struct(MPI_Datatype* type){
1172   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths);
1173   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices);
1174   int i=0;
1175   for (i = 0; i < ((s_smpi_mpi_struct_t *)(*type)->substruct)->block_count; i++)
1176     smpi_datatype_unuse(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types[i]);
1177   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types);
1178 }
1179
1180 /*
1181  * Create a Sub type struct to be able to serialize and unserialize it
1182  * the structure s_smpi_mpi_struct_t is derived from s_smpi_subtype which
1183  * required the functions unserialize and serialize
1184  */
1185 s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
1186                                                   MPI_Aint* block_indices,
1187                                                   int block_count,
1188                                                   MPI_Datatype* old_types){
1189   s_smpi_mpi_struct_t *new_t= xbt_new(s_smpi_mpi_struct_t,1);
1190   new_t->base.serialize = &serialize_struct;
1191   new_t->base.unserialize = &unserialize_struct;
1192   new_t->base.subtype_free = &free_struct;
1193  //TODO : add a custom function for each time to clean these 
1194   new_t->block_lengths= xbt_new(int, block_count);
1195   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1196   new_t->old_types=  xbt_new(MPI_Datatype, block_count);
1197   int i;
1198   for(i=0;i<block_count;i++){
1199     new_t->block_lengths[i]=block_lengths[i];
1200     new_t->block_indices[i]=block_indices[i];
1201     new_t->old_types[i]=old_types[i];
1202     smpi_datatype_use(new_t->old_types[i]);
1203   }
1204   //new_t->block_lengths = block_lengths;
1205   //new_t->block_indices = block_indices;
1206   new_t->block_count = block_count;
1207   //new_t->old_types = old_types;
1208   return new_t;
1209 }
1210
1211
1212 int smpi_datatype_struct(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype* old_types, MPI_Datatype* new_type)
1213 {
1214   int i;
1215   size_t size = 0;
1216   int contiguous=1;
1217   size = 0;
1218   MPI_Aint lb = 0;
1219   MPI_Aint ub = 0;
1220   if(count>0){
1221     lb=indices[0] + smpi_datatype_lb(old_types[0]);
1222     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_types[0]);
1223   }
1224   int forced_lb=0;
1225   int forced_ub=0;
1226   for(i=0; i< count; i++){
1227     if (blocklens[i]<0)
1228       return MPI_ERR_ARG;
1229     if (old_types[i]->has_subtype == 1)
1230       contiguous=0;
1231
1232     size += blocklens[i]*smpi_datatype_size(old_types[i]);
1233     if (old_types[i]==MPI_LB){
1234       lb=indices[i];
1235       forced_lb=1;
1236     }
1237     if (old_types[i]==MPI_UB){
1238       ub=indices[i];
1239       forced_ub=1;
1240     }
1241
1242     if(!forced_lb && indices[i]+smpi_datatype_lb(old_types[i])<lb) lb = indices[i];
1243     if(!forced_ub && indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i])>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i]);
1244
1245     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_types[i]) != indices[i+1]) )contiguous=0;
1246   }
1247
1248   if(!contiguous){
1249     s_smpi_mpi_struct_t* subtype = smpi_datatype_struct_create( blocklens,
1250                                                               indices,
1251                                                               count,
1252                                                               old_types);
1253
1254     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA);
1255   }else{
1256     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1257                                                                   size,
1258                                                                   MPI_CHAR,
1259                                                                   1);
1260     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1261   }
1262   return MPI_SUCCESS;
1263 }
1264
1265 void smpi_datatype_commit(MPI_Datatype *datatype)
1266 {
1267   (*datatype)->flags=  ((*datatype)->flags | DT_FLAG_COMMITED);
1268 }
1269
1270 typedef struct s_smpi_mpi_op {
1271   MPI_User_function *func;
1272   int is_commute;
1273 } s_smpi_mpi_op_t;
1274
1275 #define MAX_OP(a, b)  (b) = (a) < (b) ? (b) : (a)
1276 #define MIN_OP(a, b)  (b) = (a) < (b) ? (a) : (b)
1277 #define SUM_OP(a, b)  (b) += (a)
1278 #define PROD_OP(a, b) (b) *= (a)
1279 #define LAND_OP(a, b) (b) = (a) && (b)
1280 #define LOR_OP(a, b)  (b) = (a) || (b)
1281 #define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b))
1282 #define BAND_OP(a, b) (b) &= (a)
1283 #define BOR_OP(a, b)  (b) |= (a)
1284 #define BXOR_OP(a, b) (b) ^= (a)
1285 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
1286 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
1287
1288 #define APPLY_FUNC(a, b, length, type, func) \
1289 {                                          \
1290   int i;                                   \
1291   type* x = (type*)(a);                    \
1292   type* y = (type*)(b);                    \
1293   for(i = 0; i < *(length); i++) {         \
1294     func(x[i], y[i]);                      \
1295   }                                        \
1296 }
1297
1298 static void max_func(void *a, void *b, int *length,
1299                      MPI_Datatype * datatype)
1300 {
1301   if (*datatype == MPI_CHAR) {
1302     APPLY_FUNC(a, b, length, char, MAX_OP);
1303   } else if (*datatype == MPI_SHORT) {
1304     APPLY_FUNC(a, b, length, short, MAX_OP);
1305   } else if (*datatype == MPI_INT) {
1306     APPLY_FUNC(a, b, length, int, MAX_OP);
1307   } else if (*datatype == MPI_LONG) {
1308     APPLY_FUNC(a, b, length, long, MAX_OP);
1309   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1310     APPLY_FUNC(a, b, length, unsigned short, MAX_OP);
1311   } else if (*datatype == MPI_UNSIGNED) {
1312     APPLY_FUNC(a, b, length, unsigned int, MAX_OP);
1313   } else if (*datatype == MPI_UNSIGNED_LONG) {
1314     APPLY_FUNC(a, b, length, unsigned long, MAX_OP);
1315   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1316     APPLY_FUNC(a, b, length, unsigned char, MAX_OP);
1317   } else if (*datatype == MPI_FLOAT) {
1318     APPLY_FUNC(a, b, length, float, MAX_OP);
1319   } else if (*datatype == MPI_DOUBLE) {
1320     APPLY_FUNC(a, b, length, double, MAX_OP);
1321   } else if (*datatype == MPI_LONG_DOUBLE) {
1322     APPLY_FUNC(a, b, length, long double, MAX_OP);
1323   }
1324 }
1325
1326 static void min_func(void *a, void *b, int *length,
1327                      MPI_Datatype * datatype)
1328 {
1329   if (*datatype == MPI_CHAR) {
1330     APPLY_FUNC(a, b, length, char, MIN_OP);
1331   } else if (*datatype == MPI_SHORT) {
1332     APPLY_FUNC(a, b, length, short, MIN_OP);
1333   } else if (*datatype == MPI_INT) {
1334     APPLY_FUNC(a, b, length, int, MIN_OP);
1335   } else if (*datatype == MPI_LONG) {
1336     APPLY_FUNC(a, b, length, long, MIN_OP);
1337   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1338     APPLY_FUNC(a, b, length, unsigned short, MIN_OP);
1339   } else if (*datatype == MPI_UNSIGNED) {
1340     APPLY_FUNC(a, b, length, unsigned int, MIN_OP);
1341   } else if (*datatype == MPI_UNSIGNED_LONG) {
1342     APPLY_FUNC(a, b, length, unsigned long, MIN_OP);
1343   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1344     APPLY_FUNC(a, b, length, unsigned char, MIN_OP);
1345   } else if (*datatype == MPI_FLOAT) {
1346     APPLY_FUNC(a, b, length, float, MIN_OP);
1347   } else if (*datatype == MPI_DOUBLE) {
1348     APPLY_FUNC(a, b, length, double, MIN_OP);
1349   } else if (*datatype == MPI_LONG_DOUBLE) {
1350     APPLY_FUNC(a, b, length, long double, MIN_OP);
1351   }
1352 }
1353
1354 static void sum_func(void *a, void *b, int *length,
1355                      MPI_Datatype * datatype)
1356 {
1357   if (*datatype == MPI_CHAR) {
1358     APPLY_FUNC(a, b, length, char, SUM_OP);
1359   } else if (*datatype == MPI_SHORT) {
1360     APPLY_FUNC(a, b, length, short, SUM_OP);
1361   } else if (*datatype == MPI_INT) {
1362     APPLY_FUNC(a, b, length, int, SUM_OP);
1363   } else if (*datatype == MPI_LONG) {
1364     APPLY_FUNC(a, b, length, long, SUM_OP);
1365   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1366     APPLY_FUNC(a, b, length, unsigned short, SUM_OP);
1367   } else if (*datatype == MPI_UNSIGNED) {
1368     APPLY_FUNC(a, b, length, unsigned int, SUM_OP);
1369   } else if (*datatype == MPI_UNSIGNED_LONG) {
1370     APPLY_FUNC(a, b, length, unsigned long, SUM_OP);
1371   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1372     APPLY_FUNC(a, b, length, unsigned char, SUM_OP);
1373   } else if (*datatype == MPI_FLOAT) {
1374     APPLY_FUNC(a, b, length, float, SUM_OP);
1375   } else if (*datatype == MPI_DOUBLE) {
1376     APPLY_FUNC(a, b, length, double, SUM_OP);
1377   } else if (*datatype == MPI_LONG_DOUBLE) {
1378     APPLY_FUNC(a, b, length, long double, SUM_OP);
1379   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1380     APPLY_FUNC(a, b, length, float _Complex, SUM_OP);
1381   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1382     APPLY_FUNC(a, b, length, double _Complex, SUM_OP);
1383   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1384     APPLY_FUNC(a, b, length, long double _Complex, SUM_OP);
1385   }
1386 }
1387
1388 static void prod_func(void *a, void *b, int *length,
1389                       MPI_Datatype * datatype)
1390 {
1391   if (*datatype == MPI_CHAR) {
1392     APPLY_FUNC(a, b, length, char, PROD_OP);
1393   } else if (*datatype == MPI_SHORT) {
1394     APPLY_FUNC(a, b, length, short, PROD_OP);
1395   } else if (*datatype == MPI_INT) {
1396     APPLY_FUNC(a, b, length, int, PROD_OP);
1397   } else if (*datatype == MPI_LONG) {
1398     APPLY_FUNC(a, b, length, long, PROD_OP);
1399   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1400     APPLY_FUNC(a, b, length, unsigned short, PROD_OP);
1401   } else if (*datatype == MPI_UNSIGNED) {
1402     APPLY_FUNC(a, b, length, unsigned int, PROD_OP);
1403   } else if (*datatype == MPI_UNSIGNED_LONG) {
1404     APPLY_FUNC(a, b, length, unsigned long, PROD_OP);
1405   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1406     APPLY_FUNC(a, b, length, unsigned char, PROD_OP);
1407   } else if (*datatype == MPI_FLOAT) {
1408     APPLY_FUNC(a, b, length, float, PROD_OP);
1409   } else if (*datatype == MPI_DOUBLE) {
1410     APPLY_FUNC(a, b, length, double, PROD_OP);
1411   } else if (*datatype == MPI_LONG_DOUBLE) {
1412     APPLY_FUNC(a, b, length, long double, PROD_OP);
1413   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1414     APPLY_FUNC(a, b, length, float _Complex, PROD_OP);
1415   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1416     APPLY_FUNC(a, b, length, double _Complex, PROD_OP);
1417   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1418     APPLY_FUNC(a, b, length, long double _Complex, PROD_OP);
1419   }
1420 }
1421
1422 static void land_func(void *a, void *b, int *length,
1423                       MPI_Datatype * datatype)
1424 {
1425   if (*datatype == MPI_CHAR) {
1426     APPLY_FUNC(a, b, length, char, LAND_OP);
1427   } else if (*datatype == MPI_SHORT) {
1428     APPLY_FUNC(a, b, length, short, LAND_OP);
1429   } else if (*datatype == MPI_INT) {
1430     APPLY_FUNC(a, b, length, int, LAND_OP);
1431   } else if (*datatype == MPI_LONG) {
1432     APPLY_FUNC(a, b, length, long, LAND_OP);
1433   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1434     APPLY_FUNC(a, b, length, unsigned short, LAND_OP);
1435   } else if (*datatype == MPI_UNSIGNED) {
1436     APPLY_FUNC(a, b, length, unsigned int, LAND_OP);
1437   } else if (*datatype == MPI_UNSIGNED_LONG) {
1438     APPLY_FUNC(a, b, length, unsigned long, LAND_OP);
1439   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1440     APPLY_FUNC(a, b, length, unsigned char, LAND_OP);
1441   } else if (*datatype == MPI_C_BOOL) {
1442     APPLY_FUNC(a, b, length, _Bool, LAND_OP);
1443   }
1444 }
1445
1446 static void lor_func(void *a, void *b, int *length,
1447                      MPI_Datatype * datatype)
1448 {
1449   if (*datatype == MPI_CHAR) {
1450     APPLY_FUNC(a, b, length, char, LOR_OP);
1451   } else if (*datatype == MPI_SHORT) {
1452     APPLY_FUNC(a, b, length, short, LOR_OP);
1453   } else if (*datatype == MPI_INT) {
1454     APPLY_FUNC(a, b, length, int, LOR_OP);
1455   } else if (*datatype == MPI_LONG) {
1456     APPLY_FUNC(a, b, length, long, LOR_OP);
1457   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1458     APPLY_FUNC(a, b, length, unsigned short, LOR_OP);
1459   } else if (*datatype == MPI_UNSIGNED) {
1460     APPLY_FUNC(a, b, length, unsigned int, LOR_OP);
1461   } else if (*datatype == MPI_UNSIGNED_LONG) {
1462     APPLY_FUNC(a, b, length, unsigned long, LOR_OP);
1463   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1464     APPLY_FUNC(a, b, length, unsigned char, LOR_OP);
1465   } else if (*datatype == MPI_C_BOOL) {
1466     APPLY_FUNC(a, b, length, _Bool, LOR_OP);
1467   }
1468 }
1469
1470 static void lxor_func(void *a, void *b, int *length,
1471                       MPI_Datatype * datatype)
1472 {
1473   if (*datatype == MPI_CHAR) {
1474     APPLY_FUNC(a, b, length, char, LXOR_OP);
1475   } else if (*datatype == MPI_SHORT) {
1476     APPLY_FUNC(a, b, length, short, LXOR_OP);
1477   } else if (*datatype == MPI_INT) {
1478     APPLY_FUNC(a, b, length, int, LXOR_OP);
1479   } else if (*datatype == MPI_LONG) {
1480     APPLY_FUNC(a, b, length, long, LXOR_OP);
1481   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1482     APPLY_FUNC(a, b, length, unsigned short, LXOR_OP);
1483   } else if (*datatype == MPI_UNSIGNED) {
1484     APPLY_FUNC(a, b, length, unsigned int, LXOR_OP);
1485   } else if (*datatype == MPI_UNSIGNED_LONG) {
1486     APPLY_FUNC(a, b, length, unsigned long, LXOR_OP);
1487   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1488     APPLY_FUNC(a, b, length, unsigned char, LXOR_OP);
1489   } else if (*datatype == MPI_C_BOOL) {
1490     APPLY_FUNC(a, b, length, _Bool, LXOR_OP);
1491   }
1492 }
1493
1494 static void band_func(void *a, void *b, int *length,
1495                       MPI_Datatype * datatype)
1496 {
1497   if (*datatype == MPI_CHAR) {
1498     APPLY_FUNC(a, b, length, char, BAND_OP);
1499   }else if (*datatype == MPI_SHORT) {
1500     APPLY_FUNC(a, b, length, short, BAND_OP);
1501   } else if (*datatype == MPI_INT) {
1502     APPLY_FUNC(a, b, length, int, BAND_OP);
1503   } else if (*datatype == MPI_LONG) {
1504     APPLY_FUNC(a, b, length, long, BAND_OP);
1505   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1506     APPLY_FUNC(a, b, length, unsigned short, BAND_OP);
1507   } else if (*datatype == MPI_UNSIGNED) {
1508     APPLY_FUNC(a, b, length, unsigned int, BAND_OP);
1509   } else if (*datatype == MPI_UNSIGNED_LONG) {
1510     APPLY_FUNC(a, b, length, unsigned long, BAND_OP);
1511   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1512     APPLY_FUNC(a, b, length, unsigned char, BAND_OP);
1513   } else if (*datatype == MPI_BYTE) {
1514     APPLY_FUNC(a, b, length, uint8_t, BAND_OP);
1515   }
1516 }
1517
1518 static void bor_func(void *a, void *b, int *length,
1519                      MPI_Datatype * datatype)
1520 {
1521   if (*datatype == MPI_CHAR) {
1522     APPLY_FUNC(a, b, length, char, BOR_OP);
1523   } else if (*datatype == MPI_SHORT) {
1524     APPLY_FUNC(a, b, length, short, BOR_OP);
1525   } else if (*datatype == MPI_INT) {
1526     APPLY_FUNC(a, b, length, int, BOR_OP);
1527   } else if (*datatype == MPI_LONG) {
1528     APPLY_FUNC(a, b, length, long, BOR_OP);
1529   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1530     APPLY_FUNC(a, b, length, unsigned short, BOR_OP);
1531   } else if (*datatype == MPI_UNSIGNED) {
1532     APPLY_FUNC(a, b, length, unsigned int, BOR_OP);
1533   } else if (*datatype == MPI_UNSIGNED_LONG) {
1534     APPLY_FUNC(a, b, length, unsigned long, BOR_OP);
1535   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1536     APPLY_FUNC(a, b, length, unsigned char, BOR_OP);
1537   } else if (*datatype == MPI_BYTE) {
1538     APPLY_FUNC(a, b, length, uint8_t, BOR_OP);
1539   }
1540 }
1541
1542 static void bxor_func(void *a, void *b, int *length,
1543                       MPI_Datatype * datatype)
1544 {
1545   if (*datatype == MPI_CHAR) {
1546     APPLY_FUNC(a, b, length, char, BXOR_OP);
1547   } else if (*datatype == MPI_SHORT) {
1548     APPLY_FUNC(a, b, length, short, BXOR_OP);
1549   } else if (*datatype == MPI_INT) {
1550     APPLY_FUNC(a, b, length, int, BXOR_OP);
1551   } else if (*datatype == MPI_LONG) {
1552     APPLY_FUNC(a, b, length, long, BXOR_OP);
1553   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1554     APPLY_FUNC(a, b, length, unsigned short, BXOR_OP);
1555   } else if (*datatype == MPI_UNSIGNED) {
1556     APPLY_FUNC(a, b, length, unsigned int, BXOR_OP);
1557   } else if (*datatype == MPI_UNSIGNED_LONG) {
1558     APPLY_FUNC(a, b, length, unsigned long, BXOR_OP);
1559   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1560     APPLY_FUNC(a, b, length, unsigned char, BXOR_OP);
1561   } else if (*datatype == MPI_BYTE) {
1562     APPLY_FUNC(a, b, length, uint8_t, BXOR_OP);
1563   }
1564 }
1565
1566 static void minloc_func(void *a, void *b, int *length,
1567                         MPI_Datatype * datatype)
1568 {
1569   if (*datatype == MPI_FLOAT_INT) {
1570     APPLY_FUNC(a, b, length, float_int, MINLOC_OP);
1571   } else if (*datatype == MPI_LONG_INT) {
1572     APPLY_FUNC(a, b, length, long_int, MINLOC_OP);
1573   } else if (*datatype == MPI_DOUBLE_INT) {
1574     APPLY_FUNC(a, b, length, double_int, MINLOC_OP);
1575   } else if (*datatype == MPI_SHORT_INT) {
1576     APPLY_FUNC(a, b, length, short_int, MINLOC_OP);
1577   } else if (*datatype == MPI_2LONG) {
1578     APPLY_FUNC(a, b, length, long_long, MINLOC_OP);
1579   } else if (*datatype == MPI_2INT) {
1580     APPLY_FUNC(a, b, length, int_int, MINLOC_OP);
1581   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1582     APPLY_FUNC(a, b, length, long_double_int, MINLOC_OP);
1583   } else if (*datatype == MPI_2FLOAT) {
1584     APPLY_FUNC(a, b, length, float_float, MINLOC_OP);
1585   } else if (*datatype == MPI_2DOUBLE) {
1586     APPLY_FUNC(a, b, length, double_double, MINLOC_OP);
1587   }
1588 }
1589
1590 static void maxloc_func(void *a, void *b, int *length,
1591                         MPI_Datatype * datatype)
1592 {
1593   if (*datatype == MPI_FLOAT_INT) {
1594     APPLY_FUNC(a, b, length, float_int, MAXLOC_OP);
1595   } else if (*datatype == MPI_LONG_INT) {
1596     APPLY_FUNC(a, b, length, long_int, MAXLOC_OP);
1597   } else if (*datatype == MPI_DOUBLE_INT) {
1598     APPLY_FUNC(a, b, length, double_int, MAXLOC_OP);
1599   } else if (*datatype == MPI_SHORT_INT) {
1600     APPLY_FUNC(a, b, length, short_int, MAXLOC_OP);
1601   } else if (*datatype == MPI_2LONG) {
1602     APPLY_FUNC(a, b, length, long_long, MAXLOC_OP);
1603   } else if (*datatype == MPI_2INT) {
1604     APPLY_FUNC(a, b, length, int_int, MAXLOC_OP);
1605   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1606     APPLY_FUNC(a, b, length, long_double_int, MAXLOC_OP);
1607   } else if (*datatype == MPI_2FLOAT) {
1608     APPLY_FUNC(a, b, length, float_float, MAXLOC_OP);
1609   } else if (*datatype == MPI_2DOUBLE) {
1610     APPLY_FUNC(a, b, length, double_double, MAXLOC_OP);
1611   }
1612 }
1613
1614 static void replace_func(void *a, void *b, int *length,
1615                         MPI_Datatype * datatype)
1616 {
1617   memcpy(b, a, *length * smpi_datatype_size(*datatype));
1618 }
1619
1620 #define CREATE_MPI_OP(name, func)                             \
1621   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */, TRUE }; \
1622 MPI_Op name = &mpi_##name;
1623
1624 CREATE_MPI_OP(MPI_MAX, max_func);
1625 CREATE_MPI_OP(MPI_MIN, min_func);
1626 CREATE_MPI_OP(MPI_SUM, sum_func);
1627 CREATE_MPI_OP(MPI_PROD, prod_func);
1628 CREATE_MPI_OP(MPI_LAND, land_func);
1629 CREATE_MPI_OP(MPI_LOR, lor_func);
1630 CREATE_MPI_OP(MPI_LXOR, lxor_func);
1631 CREATE_MPI_OP(MPI_BAND, band_func);
1632 CREATE_MPI_OP(MPI_BOR, bor_func);
1633 CREATE_MPI_OP(MPI_BXOR, bxor_func);
1634 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
1635 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
1636 CREATE_MPI_OP(MPI_REPLACE, replace_func);
1637
1638
1639 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
1640 {
1641   MPI_Op op;
1642   op = xbt_new(s_smpi_mpi_op_t, 1);
1643   op->func = function;
1644   op-> is_commute = commute;
1645   return op;
1646 }
1647
1648 int smpi_op_is_commute(MPI_Op op)
1649 {
1650   return (op==MPI_OP_NULL) ? 1 : op-> is_commute;
1651 }
1652
1653 void smpi_op_destroy(MPI_Op op)
1654 {
1655   xbt_free(op);
1656 }
1657
1658 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
1659                    MPI_Datatype * datatype)
1660 {
1661   if(op==MPI_OP_NULL)
1662     return;
1663
1664   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
1665     XBT_DEBUG("Applying operation, switch to the right data frame ");
1666     smpi_switch_data_segment(smpi_process_index());
1667   }
1668
1669   if(!smpi_process_get_replaying())
1670   op->func(invec, inoutvec, len, datatype);
1671 }
1672
1673 int smpi_type_attr_delete(MPI_Datatype type, int keyval){
1674   char* tmpkey=xbt_malloc(INTSIZEDCHAR);
1675   sprintf(tmpkey, "%d", keyval);
1676   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)tmpkey);
1677   if(!elem)
1678     return MPI_ERR_ARG;
1679   if(elem->delete_fn!=MPI_NULL_DELETE_FN){
1680     void * value;
1681     int flag;
1682     if(smpi_type_attr_get(type, keyval, &value, &flag)==MPI_SUCCESS){
1683       int ret = elem->delete_fn(type, keyval, value, &flag);
1684       if(ret!=MPI_SUCCESS) return ret;
1685     }
1686   }  
1687   if(type->attributes==NULL)
1688     return MPI_ERR_ARG;
1689
1690   xbt_dict_remove(type->attributes, (const char*)tmpkey);
1691   xbt_free(tmpkey);
1692   return MPI_SUCCESS;
1693 }
1694
1695 int smpi_type_attr_get(MPI_Datatype type, int keyval, void* attr_value, int* flag){
1696   char* tmpkey=xbt_malloc(INTSIZEDCHAR);
1697   sprintf(tmpkey, "%d", keyval);
1698   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)tmpkey);
1699   if(!elem)
1700     return MPI_ERR_ARG;
1701   xbt_ex_t ex;
1702   if(type->attributes==NULL){
1703     *flag=0;
1704     return MPI_SUCCESS;
1705   }
1706   TRY {
1707     *(void**)attr_value = xbt_dict_get(type->attributes, (const char*)tmpkey);
1708     *flag=1;
1709   }
1710   CATCH(ex) {
1711     *flag=0;
1712     xbt_ex_free(ex);
1713   }
1714   xbt_free(tmpkey);
1715   return MPI_SUCCESS;
1716 }
1717
1718 int smpi_type_attr_put(MPI_Datatype type, int keyval, void* attr_value){
1719   if(!smpi_type_keyvals)
1720   smpi_type_keyvals = xbt_dict_new();
1721   char* tmpkey=xbt_malloc(INTSIZEDCHAR);
1722   sprintf(tmpkey, "%d", keyval);
1723   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)tmpkey);
1724   if(!elem )
1725     return MPI_ERR_ARG;
1726   int flag;
1727   void* value;
1728   smpi_type_attr_get(type, keyval, &value, &flag);
1729   if(flag && elem->delete_fn!=MPI_NULL_DELETE_FN){
1730     int ret = elem->delete_fn(type, keyval, value, &flag);
1731     if(ret!=MPI_SUCCESS) return ret;
1732   }
1733   if(type->attributes==NULL)
1734     type->attributes=xbt_dict_new();
1735
1736   xbt_dict_set(type->attributes, (const char*)tmpkey, attr_value, NULL);
1737   xbt_free(tmpkey);
1738   return MPI_SUCCESS;
1739 }
1740
1741 int smpi_type_keyval_create(MPI_Type_copy_attr_function* copy_fn, MPI_Type_delete_attr_function* delete_fn, int* keyval, void* extra_state){
1742
1743   if(!smpi_type_keyvals)
1744   smpi_type_keyvals = xbt_dict_new();
1745   
1746   smpi_type_key_elem value = (smpi_type_key_elem) xbt_new0(s_smpi_mpi_type_key_elem_t,1);
1747   
1748   value->copy_fn=copy_fn;
1749   value->delete_fn=delete_fn;
1750   
1751   *keyval = type_keyval_id;
1752   char* tmpkey=xbt_malloc(INTSIZEDCHAR);
1753   sprintf(tmpkey, "%d", *keyval);
1754   xbt_dict_set(smpi_type_keyvals,(const char*)tmpkey,(void*)value, NULL);
1755   type_keyval_id++;
1756   xbt_free(tmpkey);
1757   return MPI_SUCCESS;
1758 }
1759
1760 int smpi_type_keyval_free(int* keyval){
1761   char* tmpkey=xbt_malloc(INTSIZEDCHAR);
1762   sprintf(tmpkey, "%d", *keyval);
1763   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)tmpkey);
1764   if(!elem){
1765     xbt_free(tmpkey);
1766     return MPI_ERR_ARG;
1767   }
1768   xbt_dict_remove(smpi_type_keyvals, (const char*)tmpkey);
1769   xbt_free(elem);
1770   xbt_free(tmpkey);
1771   return MPI_SUCCESS;
1772 }