Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add MPI_Type*keyval and MPI_Type*attr functions
[simgrid.git] / src / smpi / smpi_mpi_dt.c
1 /* smpi_mpi_dt.c -- MPI primitives to handle datatypes                        */
2 /* FIXME: a very incomplete implementation                                    */
3
4 /* Copyright (c) 2009-2014. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include "private.h"
15 #include "smpi_mpi_dt_private.h"
16 #include "mc/mc.h"
17 #include "xbt/replay.h"
18 #include "simgrid/modelchecker.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi,
21                                 "Logging specific to SMPI (datatype)");
22
23 xbt_dict_t smpi_type_keyvals = NULL;
24 int type_keyval_id=0;//avoid collisions
25
26 #define CREATE_MPI_DATATYPE(name, type)       \
27   static s_smpi_mpi_datatype_t mpi_##name = { \
28     (char*) # name,                                   \
29     sizeof(type),  /* size */                 \
30     0,             /*was 1 has_subtype*/             \
31     0,             /* lb */                   \
32     sizeof(type),  /* ub = lb + size */       \
33     DT_FLAG_BASIC,  /* flags */              \
34     NULL,           /* attributes */         \
35     NULL,           /* pointer on extended struct*/ \
36   };                                          \
37 MPI_Datatype name = &mpi_##name;
38
39 #define CREATE_MPI_DATATYPE_NULL(name)       \
40   static s_smpi_mpi_datatype_t mpi_##name = { \
41     (char*) # name,                                   \
42     0,  /* size */                 \
43     0,             /*was 1 has_subtype*/             \
44     0,             /* lb */                   \
45     0,  /* ub = lb + size */       \
46     DT_FLAG_BASIC,  /* flags */              \
47     NULL,           /* attributes */         \
48     NULL           /* pointer on extended struct*/ \
49   };                                          \
50 MPI_Datatype name = &mpi_##name;
51
52 //The following are datatypes for the MPI functions MPI_MAXLOC and MPI_MINLOC.
53 typedef struct {
54   float value;
55   int index;
56 } float_int;
57 typedef struct {
58   float value;
59   float index;
60 } float_float;
61 typedef struct {
62   long value;
63   long index;
64 } long_long;
65 typedef struct {
66   double value;
67   double index;
68 } double_double;
69 typedef struct {
70   long value;
71   int index;
72 } long_int;
73 typedef struct {
74   double value;
75   int index;
76 } double_int;
77 typedef struct {
78   short value;
79   int index;
80 } short_int;
81 typedef struct {
82   int value;
83   int index;
84 } int_int;
85 typedef struct {
86   long double value;
87   int index;
88 } long_double_int;
89 typedef struct {
90   int64_t value;
91   int64_t index;
92 } integer128_t;
93 // Predefined data types
94 CREATE_MPI_DATATYPE(MPI_CHAR, char);
95 CREATE_MPI_DATATYPE(MPI_SHORT, short);
96 CREATE_MPI_DATATYPE(MPI_INT, int);
97 CREATE_MPI_DATATYPE(MPI_LONG, long);
98 CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long);
99 CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char);
100 CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char);
101 CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short);
102 CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int);
103 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long);
104 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long);
105 CREATE_MPI_DATATYPE(MPI_FLOAT, float);
106 CREATE_MPI_DATATYPE(MPI_DOUBLE, double);
107 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double);
108 CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t);
109 CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool);
110 CREATE_MPI_DATATYPE(MPI_BYTE, int8_t);
111 CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t);
112 CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t);
113 CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t);
114 CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t);
115 CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t);
116 CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t);
117 CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t);
118 CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t);
119 CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex);
120 CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex);
121 CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex);
122 CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint);
123 CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset);
124
125 CREATE_MPI_DATATYPE(MPI_FLOAT_INT, float_int);
126 CREATE_MPI_DATATYPE(MPI_LONG_INT, long_int);
127 CREATE_MPI_DATATYPE(MPI_DOUBLE_INT, double_int);
128 CREATE_MPI_DATATYPE(MPI_SHORT_INT, short_int);
129 CREATE_MPI_DATATYPE(MPI_2INT, int_int);
130 CREATE_MPI_DATATYPE(MPI_2FLOAT, float_float);
131 CREATE_MPI_DATATYPE(MPI_2DOUBLE, double_double);
132 CREATE_MPI_DATATYPE(MPI_2LONG, long_long);
133
134 CREATE_MPI_DATATYPE(MPI_REAL4, float);
135 CREATE_MPI_DATATYPE(MPI_REAL8, float);
136 CREATE_MPI_DATATYPE(MPI_REAL16, double);
137 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX8);
138 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX16);
139 CREATE_MPI_DATATYPE_NULL(MPI_COMPLEX32);
140 CREATE_MPI_DATATYPE(MPI_INTEGER1, int);
141 CREATE_MPI_DATATYPE(MPI_INTEGER2, int16_t);
142 CREATE_MPI_DATATYPE(MPI_INTEGER4, int32_t);
143 CREATE_MPI_DATATYPE(MPI_INTEGER8, int64_t);
144 CREATE_MPI_DATATYPE(MPI_INTEGER16, integer128_t);
145
146 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE_INT, long_double_int);
147
148 CREATE_MPI_DATATYPE_NULL(MPI_UB);
149 CREATE_MPI_DATATYPE_NULL(MPI_LB);
150 CREATE_MPI_DATATYPE_NULL(MPI_PACKED);
151 // Internal use only
152 CREATE_MPI_DATATYPE(MPI_PTR, void*);
153
154 /** Check if the datatype is usable for communications
155  */
156 int is_datatype_valid(MPI_Datatype datatype) {
157     return datatype != MPI_DATATYPE_NULL
158         && (datatype->flags & DT_FLAG_COMMITED);
159 }
160
161 size_t smpi_datatype_size(MPI_Datatype datatype)
162 {
163   return datatype->size;
164 }
165
166 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype)
167 {
168   return datatype->lb;
169 }
170
171 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
172 {
173   return datatype->ub;
174 }
175
176 int smpi_datatype_dup(MPI_Datatype datatype, MPI_Datatype* new_t)
177 {
178   int ret=MPI_SUCCESS;
179   *new_t= xbt_new(s_smpi_mpi_datatype_t,1);
180   memcpy(*new_t, datatype, sizeof(s_smpi_mpi_datatype_t));
181   if (datatype->has_subtype){
182     //FIXME: may copy too much information.
183     (*new_t)->substruct=xbt_malloc(sizeof(s_smpi_mpi_struct_t));
184     memcpy((*new_t)->substruct, datatype->substruct, sizeof(s_smpi_mpi_struct_t));
185   }
186   if(datatype->name)
187     (*new_t)->name = strdup(datatype->name);
188   if(datatype->attributes !=NULL){
189       (*new_t)->attributes=xbt_dict_new();
190       xbt_dict_cursor_t cursor = NULL;
191       int *key;
192       int flag;
193       void* value_in;
194       void* value_out;
195       xbt_dict_foreach(datatype->attributes, cursor, key, value_in){
196         smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)key);
197         if(elem && elem->copy_fn!=MPI_NULL_COPY_FN){
198           ret = elem->copy_fn(datatype, atoi((const char*)key), NULL, value_in, &value_out, &flag );
199           if(ret!=MPI_SUCCESS){
200             *new_t=MPI_DATATYPE_NULL;
201             return ret;
202           }
203           if(flag)
204             xbt_dict_set((*new_t)->attributes, (const char*)key,value_out, NULL);
205         }
206       }
207     }
208   return ret;
209 }
210
211 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
212                          MPI_Aint * extent)
213 {
214   if(datatype == MPI_DATATYPE_NULL){
215     *lb=0;
216     *extent=0;
217     return MPI_SUCCESS;
218   }
219   *lb = datatype->lb;
220   *extent = datatype->ub - datatype->lb;
221   return MPI_SUCCESS;
222 }
223
224 MPI_Aint smpi_datatype_get_extent(MPI_Datatype datatype){
225   if(datatype == MPI_DATATYPE_NULL){
226     return 0;
227   }
228   return datatype->ub - datatype->lb;
229 }
230
231 void smpi_datatype_get_name(MPI_Datatype datatype, char* name, int* length){
232   *length = strlen(datatype->name);
233   strcpy(name, datatype->name);
234 }
235
236 void smpi_datatype_set_name(MPI_Datatype datatype, char* name){
237   datatype->name = strdup(name);;
238 }
239
240 int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
241                        void *recvbuf, int recvcount, MPI_Datatype recvtype)
242 {
243   int count;
244   if(smpi_privatize_global_variables){
245     smpi_switch_data_segment(smpi_process_index());
246   }
247   /* First check if we really have something to do */
248   if (recvcount > 0 && recvbuf != sendbuf) {
249     /* FIXME: treat packed cases */
250     sendcount *= smpi_datatype_size(sendtype);
251     recvcount *= smpi_datatype_size(recvtype);
252     count = sendcount < recvcount ? sendcount : recvcount;
253
254     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
255       if(!smpi_process_get_replaying()) memcpy(recvbuf, sendbuf, count);
256     }
257     else if (sendtype->has_subtype == 0)
258     {
259       s_smpi_subtype_t *subtype =  recvtype->substruct;
260       subtype->unserialize( sendbuf, recvbuf,1, subtype, MPI_REPLACE);
261     }
262     else if (recvtype->has_subtype == 0)
263     {
264       s_smpi_subtype_t *subtype =  sendtype->substruct;
265       subtype->serialize(sendbuf, recvbuf,1, subtype);
266     }else{
267       s_smpi_subtype_t *subtype =  sendtype->substruct;
268
269
270       void * buf_tmp = xbt_malloc(count);
271
272       subtype->serialize( sendbuf, buf_tmp,count/smpi_datatype_size(sendtype), subtype);
273       subtype =  recvtype->substruct;
274       subtype->unserialize( buf_tmp, recvbuf,count/smpi_datatype_size(recvtype), subtype, MPI_REPLACE);
275
276       free(buf_tmp);
277     }
278   }
279
280   return sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
281 }
282
283 /*
284  *  Copies noncontiguous data into contiguous memory.
285  *  @param contiguous_vector - output vector
286  *  @param noncontiguous_vector - input vector
287  *  @param type - pointer contening :
288  *      - stride - stride of between noncontiguous data
289  *      - block_length - the width or height of blocked matrix
290  *      - count - the number of rows of matrix
291  */
292 void serialize_vector( const void *noncontiguous_vector,
293                        void *contiguous_vector,
294                        int count,
295                        void *type)
296 {
297   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
298   int i;
299   char* contiguous_vector_char = (char*)contiguous_vector;
300   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
301
302   for (i = 0; i < type_c->block_count * count; i++) {
303       if (type_c->old_type->has_subtype == 0)
304         memcpy(contiguous_vector_char,
305                noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
306       else
307         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
308                                                                      contiguous_vector_char,
309                                                                      type_c->block_length,
310                                                                      type_c->old_type->substruct);
311
312     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
313     if((i+1)%type_c->block_count ==0)
314     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
315     else
316     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
317   }
318 }
319
320 /*
321  *  Copies contiguous data into noncontiguous memory.
322  *  @param noncontiguous_vector - output vector
323  *  @param contiguous_vector - input vector
324  *  @param type - pointer contening :
325  *      - stride - stride of between noncontiguous data
326  *      - block_length - the width or height of blocked matrix
327  *      - count - the number of rows of matrix
328  */
329 void unserialize_vector( const void *contiguous_vector,
330                          void *noncontiguous_vector,
331                          int count,
332                          void *type,
333                          MPI_Op op)
334 {
335   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
336   int i;
337
338   char* contiguous_vector_char = (char*)contiguous_vector;
339   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
340
341   for (i = 0; i < type_c->block_count * count; i++) {
342     if (type_c->old_type->has_subtype == 0)
343       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
344           &type_c->old_type);
345      /* memcpy(noncontiguous_vector_char,
346              contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
347     else
348       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
349                                                                      noncontiguous_vector_char,
350                                                                      type_c->block_length,
351                                                                      type_c->old_type->substruct,
352                                                                      op);
353     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
354     if((i+1)%type_c->block_count ==0)
355     noncontiguous_vector_char += type_c->block_length*smpi_datatype_get_extent(type_c->old_type);
356     else
357     noncontiguous_vector_char += type_c->block_stride*smpi_datatype_get_extent(type_c->old_type);
358   }
359 }
360
361 /*
362  * Create a Sub type vector to be able to serialize and unserialize it
363  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
364  * required the functions unserialize and serialize
365  *
366  */
367 s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
368                                                   int block_length,
369                                                   int block_count,
370                                                   MPI_Datatype old_type,
371                                                   int size_oldtype){
372   s_smpi_mpi_vector_t *new_t= xbt_new(s_smpi_mpi_vector_t,1);
373   new_t->base.serialize = &serialize_vector;
374   new_t->base.unserialize = &unserialize_vector;
375   new_t->base.subtype_free = &free_vector;
376   new_t->block_stride = block_stride;
377   new_t->block_length = block_length;
378   new_t->block_count = block_count;
379   smpi_datatype_use(old_type);
380   new_t->old_type = old_type;
381   new_t->size_oldtype = size_oldtype;
382   return new_t;
383 }
384
385 void smpi_datatype_create(MPI_Datatype* new_type, int size,int lb, int ub, int has_subtype,
386                           void *struct_type, int flags){
387   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
388   new_t->name = NULL;
389   new_t->size = size;
390   new_t->has_subtype = size>0? has_subtype:0;
391   new_t->lb = lb;
392   new_t->ub = ub;
393   new_t->flags = flags;
394   new_t->substruct = struct_type;
395   new_t->in_use=0;
396   new_t->attributes=NULL;
397   *new_type = new_t;
398
399 #ifdef HAVE_MC
400   if(MC_is_active())
401     MC_ignore(&(new_t->in_use), sizeof(new_t->in_use));
402 #endif
403 }
404
405 void smpi_datatype_free(MPI_Datatype* type){
406   if((*type)->attributes !=NULL){
407       xbt_dict_cursor_t cursor = NULL;
408       int* key;
409       void * value;
410       int flag;
411       xbt_dict_foreach((*type)->attributes, cursor, key, value){
412         smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)key);
413         if(elem)
414           elem->delete_fn(*type, atoi((const char*)key), &value, &flag);
415       }
416   }
417
418   if((*type)->flags & DT_FLAG_PREDEFINED)return;
419
420   //if still used, mark for deletion
421   if((*type)->in_use!=0){
422       (*type)->flags |=DT_FLAG_DESTROYED;
423       return;
424   }
425
426   if ((*type)->has_subtype == 1){
427     ((s_smpi_subtype_t *)(*type)->substruct)->subtype_free(type);  
428     xbt_free((*type)->substruct);
429   }
430   if ((*type)->name != NULL){
431     xbt_free((*type)->name);
432   }
433   xbt_free(*type);
434   *type = MPI_DATATYPE_NULL;
435 }
436
437 void smpi_datatype_use(MPI_Datatype type){
438   if(type)type->in_use++;
439
440 #ifdef HAVE_MC
441   if(MC_is_active())
442     MC_ignore(&(type->in_use), sizeof(type->in_use));
443 #endif
444 }
445
446
447 void smpi_datatype_unuse(MPI_Datatype type){
448   if(type && type->in_use-- == 0 && (type->flags & DT_FLAG_DESTROYED))
449     smpi_datatype_free(&type);
450
451 #ifdef HAVE_MC
452   if(MC_is_active())
453     MC_ignore(&(type->in_use), sizeof(type->in_use));
454 #endif
455 }
456
457
458
459
460 /*
461 Contiguous Implementation
462 */
463
464
465 /*
466  *  Copies noncontiguous data into contiguous memory.
467  *  @param contiguous_hvector - output hvector
468  *  @param noncontiguous_hvector - input hvector
469  *  @param type - pointer contening :
470  *      - stride - stride of between noncontiguous data, in bytes
471  *      - block_length - the width or height of blocked matrix
472  *      - count - the number of rows of matrix
473  */
474 void serialize_contiguous( const void *noncontiguous_hvector,
475                        void *contiguous_hvector,
476                        int count,
477                        void *type)
478 {
479   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
480   char* contiguous_vector_char = (char*)contiguous_hvector;
481   char* noncontiguous_vector_char = (char*)noncontiguous_hvector+type_c->lb;
482   memcpy(contiguous_vector_char,
483            noncontiguous_vector_char, count* type_c->block_count * type_c->size_oldtype);
484 }
485 /*
486  *  Copies contiguous data into noncontiguous memory.
487  *  @param noncontiguous_vector - output hvector
488  *  @param contiguous_vector - input hvector
489  *  @param type - pointer contening :
490  *      - stride - stride of between noncontiguous data, in bytes
491  *      - block_length - the width or height of blocked matrix
492  *      - count - the number of rows of matrix
493  */
494 void unserialize_contiguous( const void *contiguous_vector,
495                          void *noncontiguous_vector,
496                          int count,
497                          void *type,
498                          MPI_Op op)
499 {
500   s_smpi_mpi_contiguous_t* type_c = (s_smpi_mpi_contiguous_t*)type;
501   char* contiguous_vector_char = (char*)contiguous_vector;
502   char* noncontiguous_vector_char = (char*)noncontiguous_vector+type_c->lb;
503   int n= count* type_c->block_count;
504   smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &n,
505             &type_c->old_type);
506        /*memcpy(noncontiguous_vector_char,
507            contiguous_vector_char, count*  type_c->block_count * type_c->size_oldtype);*/
508 }
509
510 void free_contiguous(MPI_Datatype* d){
511   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
512 }
513
514 /*
515  * Create a Sub type contiguous to be able to serialize and unserialize it
516  * the structure s_smpi_mpi_contiguous_t is derived from s_smpi_subtype which
517  * required the functions unserialize and serialize
518  *
519  */
520 s_smpi_mpi_contiguous_t* smpi_datatype_contiguous_create( MPI_Aint lb,
521                                                   int block_count,
522                                                   MPI_Datatype old_type,
523                                                   int size_oldtype){
524   s_smpi_mpi_contiguous_t *new_t= xbt_new(s_smpi_mpi_contiguous_t,1);
525   new_t->base.serialize = &serialize_contiguous;
526   new_t->base.unserialize = &unserialize_contiguous;
527   new_t->base.subtype_free = &free_contiguous;
528   new_t->lb = lb;
529   new_t->block_count = block_count;
530   new_t->old_type = old_type;
531   new_t->size_oldtype = size_oldtype;
532   smpi_datatype_use(old_type);
533   return new_t;
534 }
535
536
537
538
539 int smpi_datatype_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* new_type, MPI_Aint lb)
540 {
541   int retval;
542   if(old_type->has_subtype){
543           //handle this case as a hvector with stride equals to the extent of the datatype
544           return smpi_datatype_hvector(count, 1, smpi_datatype_get_extent(old_type), old_type, new_type);
545   }
546   
547   s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
548                                                                 count,
549                                                                 old_type,
550                                                                 smpi_datatype_size(old_type));
551                                                                 
552   smpi_datatype_create(new_type,
553                                           count * smpi_datatype_size(old_type),
554                                           lb,lb + count * smpi_datatype_size(old_type),
555                                           1,subtype, DT_FLAG_CONTIGUOUS);
556   retval=MPI_SUCCESS;
557   return retval;
558 }
559
560 int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_type, MPI_Datatype* new_type)
561 {
562   int retval;
563   if (blocklen<0) return MPI_ERR_ARG;
564   MPI_Aint lb = 0;
565   MPI_Aint ub = 0;
566   if(count>0){
567     lb=smpi_datatype_lb(old_type);
568     ub=((count-1)*stride+blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
569   }
570   if(old_type->has_subtype || stride != blocklen){
571
572
573     s_smpi_mpi_vector_t* subtype = smpi_datatype_vector_create( stride,
574                                                                 blocklen,
575                                                                 count,
576                                                                 old_type,
577                                                                 smpi_datatype_size(old_type));
578     smpi_datatype_create(new_type,
579                          count * (blocklen) * smpi_datatype_size(old_type), lb,
580                          ub,
581                          1,
582                          subtype,
583                          DT_FLAG_VECTOR);
584     retval=MPI_SUCCESS;
585   }else{
586     /* in this situation the data are contignous thus it's not
587      * required to serialize and unserialize it*/
588     smpi_datatype_create(new_type, count * blocklen *
589                          smpi_datatype_size(old_type), 0, ((count -1) * stride + blocklen)*
590                          smpi_datatype_size(old_type),
591                          0,
592                          NULL,
593                          DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
594     retval=MPI_SUCCESS;
595   }
596   return retval;
597 }
598
599 void free_vector(MPI_Datatype* d){
600   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
601 }
602
603 /*
604 Hvector Implementation - Vector with stride in bytes
605 */
606
607
608 /*
609  *  Copies noncontiguous data into contiguous memory.
610  *  @param contiguous_hvector - output hvector
611  *  @param noncontiguous_hvector - input hvector
612  *  @param type - pointer contening :
613  *      - stride - stride of between noncontiguous data, in bytes
614  *      - block_length - the width or height of blocked matrix
615  *      - count - the number of rows of matrix
616  */
617 void serialize_hvector( const void *noncontiguous_hvector,
618                        void *contiguous_hvector,
619                        int count,
620                        void *type)
621 {
622   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
623   int i;
624   char* contiguous_vector_char = (char*)contiguous_hvector;
625   char* noncontiguous_vector_char = (char*)noncontiguous_hvector;
626
627   for (i = 0; i < type_c->block_count * count; i++) {
628     if (type_c->old_type->has_subtype == 0)
629       memcpy(contiguous_vector_char,
630            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
631     else
632       ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_vector_char,
633                                                                    contiguous_vector_char,
634                                                                    type_c->block_length,
635                                                                    type_c->old_type->substruct);
636
637     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
638     if((i+1)%type_c->block_count ==0)
639     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
640     else
641     noncontiguous_vector_char += type_c->block_stride;
642   }
643 }
644 /*
645  *  Copies contiguous data into noncontiguous memory.
646  *  @param noncontiguous_vector - output hvector
647  *  @param contiguous_vector - input hvector
648  *  @param type - pointer contening :
649  *      - stride - stride of between noncontiguous data, in bytes
650  *      - block_length - the width or height of blocked matrix
651  *      - count - the number of rows of matrix
652  */
653 void unserialize_hvector( const void *contiguous_vector,
654                          void *noncontiguous_vector,
655                          int count,
656                          void *type,
657                          MPI_Op op)
658 {
659   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
660   int i;
661
662   char* contiguous_vector_char = (char*)contiguous_vector;
663   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
664
665   for (i = 0; i < type_c->block_count * count; i++) {
666     if (type_c->old_type->has_subtype == 0)
667       smpi_op_apply(op, contiguous_vector_char, noncontiguous_vector_char, &type_c->block_length,
668                   &type_c->old_type);
669              /*memcpy(noncontiguous_vector_char,
670            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);*/
671     else
672       ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_vector_char,
673                                                                      noncontiguous_vector_char,
674                                                                      type_c->block_length,
675                                                                      type_c->old_type->substruct,
676                                                                      op);
677     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
678     if((i+1)%type_c->block_count ==0)
679     noncontiguous_vector_char += type_c->block_length*type_c->size_oldtype;
680     else
681     noncontiguous_vector_char += type_c->block_stride;
682   }
683 }
684
685 /*
686  * Create a Sub type vector to be able to serialize and unserialize it
687  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
688  * required the functions unserialize and serialize
689  *
690  */
691 s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
692                                                   int block_length,
693                                                   int block_count,
694                                                   MPI_Datatype old_type,
695                                                   int size_oldtype){
696   s_smpi_mpi_hvector_t *new_t= xbt_new(s_smpi_mpi_hvector_t,1);
697   new_t->base.serialize = &serialize_hvector;
698   new_t->base.unserialize = &unserialize_hvector;
699   new_t->base.subtype_free = &free_hvector;
700   new_t->block_stride = block_stride;
701   new_t->block_length = block_length;
702   new_t->block_count = block_count;
703   new_t->old_type = old_type;
704   new_t->size_oldtype = size_oldtype;
705   smpi_datatype_use(old_type);
706   return new_t;
707 }
708
709 //do nothing for vector types
710 void free_hvector(MPI_Datatype* d){
711   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*d)->substruct)->old_type);
712 }
713
714 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
715 {
716   int retval;
717   if (blocklen<0) return MPI_ERR_ARG;
718   MPI_Aint lb = 0;
719   MPI_Aint ub = 0;
720   if(count>0){
721     lb=smpi_datatype_lb(old_type);
722     ub=((count-1)*stride)+(blocklen-1)*smpi_datatype_get_extent(old_type)+smpi_datatype_ub(old_type);
723   }
724   if(old_type->has_subtype || stride != blocklen*smpi_datatype_get_extent(old_type)){
725     s_smpi_mpi_hvector_t* subtype = smpi_datatype_hvector_create( stride,
726                                                                   blocklen,
727                                                                   count,
728                                                                   old_type,
729                                                                   smpi_datatype_size(old_type));
730
731     smpi_datatype_create(new_type, count * blocklen * smpi_datatype_size(old_type),
732                                                  lb,ub,
733                          1,
734                          subtype,
735                          DT_FLAG_VECTOR);
736     retval=MPI_SUCCESS;
737   }else{
738     smpi_datatype_create(new_type, count * blocklen *
739                                              smpi_datatype_size(old_type),0,count * blocklen *
740                                              smpi_datatype_size(old_type),
741                                             0,
742                                             NULL,
743                                             DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
744     retval=MPI_SUCCESS;
745   }
746   return retval;
747 }
748
749
750 /*
751 Indexed Implementation
752 */
753
754 /*
755  *  Copies noncontiguous data into contiguous memory.
756  *  @param contiguous_indexed - output indexed
757  *  @param noncontiguous_indexed - input indexed
758  *  @param type - pointer contening :
759  *      - block_lengths - the width or height of blocked matrix
760  *      - block_indices - indices of each data, in element
761  *      - count - the number of rows of matrix
762  */
763 void serialize_indexed( const void *noncontiguous_indexed,
764                        void *contiguous_indexed,
765                        int count,
766                        void *type)
767 {
768   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
769   int i,j;
770   char* contiguous_indexed_char = (char*)contiguous_indexed;
771   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0] * type_c->size_oldtype;
772   for(j=0; j<count;j++){
773     for (i = 0; i < type_c->block_count; i++) {
774       if (type_c->old_type->has_subtype == 0)
775         memcpy(contiguous_indexed_char,
776                      noncontiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
777       else
778         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_indexed_char,
779                                                                      contiguous_indexed_char,
780                                                                      type_c->block_lengths[i],
781                                                                      type_c->old_type->substruct);
782
783
784       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
785       if (i<type_c->block_count-1)noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
786       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
787     }
788     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
789   }
790 }
791 /*
792  *  Copies contiguous data into noncontiguous memory.
793  *  @param noncontiguous_indexed - output indexed
794  *  @param contiguous_indexed - input indexed
795  *  @param type - pointer contening :
796  *      - block_lengths - the width or height of blocked matrix
797  *      - block_indices - indices of each data, in element
798  *      - count - the number of rows of matrix
799  */
800 void unserialize_indexed( const void *contiguous_indexed,
801                          void *noncontiguous_indexed,
802                          int count,
803                          void *type,
804                          MPI_Op op)
805 {
806
807   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
808   int i,j;
809   char* contiguous_indexed_char = (char*)contiguous_indexed;
810   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed+type_c->block_indices[0]*smpi_datatype_get_extent(type_c->old_type);
811   for(j=0; j<count;j++){
812     for (i = 0; i < type_c->block_count; i++) {
813       if (type_c->old_type->has_subtype == 0)
814         smpi_op_apply(op, contiguous_indexed_char, noncontiguous_indexed_char, &type_c->block_lengths[i],
815                     &type_c->old_type);
816                /*memcpy(noncontiguous_indexed_char ,
817              contiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
818       else
819         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_indexed_char,
820                                                                        noncontiguous_indexed_char,
821                                                                        type_c->block_lengths[i],
822                                                                        type_c->old_type->substruct,
823                                                                        op);
824
825       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
826       if (i<type_c->block_count-1)
827         noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*smpi_datatype_get_extent(type_c->old_type);
828       else noncontiguous_indexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
829     }
830     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
831   }
832 }
833
834 void free_indexed(MPI_Datatype* type){
835   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths);
836   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices);
837   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
838 }
839
840 /*
841  * Create a Sub type indexed to be able to serialize and unserialize it
842  * the structure s_smpi_mpi_indexed_t is derived from s_smpi_subtype which
843  * required the functions unserialize and serialize
844  */
845 s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
846                                                   int* block_indices,
847                                                   int block_count,
848                                                   MPI_Datatype old_type,
849                                                   int size_oldtype){
850   s_smpi_mpi_indexed_t *new_t= xbt_new(s_smpi_mpi_indexed_t,1);
851   new_t->base.serialize = &serialize_indexed;
852   new_t->base.unserialize = &unserialize_indexed;
853   new_t->base.subtype_free = &free_indexed;
854  //TODO : add a custom function for each time to clean these 
855   new_t->block_lengths= xbt_new(int, block_count);
856   new_t->block_indices= xbt_new(int, block_count);
857   int i;
858   for(i=0;i<block_count;i++){
859     new_t->block_lengths[i]=block_lengths[i];
860     new_t->block_indices[i]=block_indices[i];
861   }
862   new_t->block_count = block_count;
863   smpi_datatype_use(old_type);
864   new_t->old_type = old_type;
865   new_t->size_oldtype = size_oldtype;
866   return new_t;
867 }
868
869
870 int smpi_datatype_indexed(int count, int* blocklens, int* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
871 {
872   int i;
873   int retval;
874   int size = 0;
875   int contiguous=1;
876   MPI_Aint lb = 0;
877   MPI_Aint ub = 0;
878   if(count>0){
879     lb=indices[0]*smpi_datatype_get_extent(old_type);
880     ub=indices[0]*smpi_datatype_get_extent(old_type) + blocklens[0]*smpi_datatype_ub(old_type);
881   }
882
883   for(i=0; i< count; i++){
884     if   (blocklens[i]<0)
885       return MPI_ERR_ARG;
886     size += blocklens[i];
887
888     if(indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type)<lb)
889         lb = indices[i]*smpi_datatype_get_extent(old_type)+smpi_datatype_lb(old_type);
890     if(indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type)>ub)
891         ub = indices[i]*smpi_datatype_get_extent(old_type)+blocklens[i]*smpi_datatype_ub(old_type);
892
893     if ( (i< count -1) && (indices[i]+blocklens[i] != indices[i+1]) )contiguous=0;
894   }
895   if (old_type->has_subtype == 1)
896     contiguous=0;
897
898   if(!contiguous){
899     s_smpi_mpi_indexed_t* subtype = smpi_datatype_indexed_create( blocklens,
900                                                                   indices,
901                                                                   count,
902                                                                   old_type,
903                                                                   smpi_datatype_size(old_type));
904      smpi_datatype_create(new_type,  size *
905                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA);
906   }else{
907     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
908                                                                   size,
909                                                                   old_type,
910                                                                   smpi_datatype_size(old_type));
911     smpi_datatype_create(new_type,  size *
912                          smpi_datatype_size(old_type),lb,ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
913   }
914   retval=MPI_SUCCESS;
915   return retval;
916 }
917
918
919 /*
920 Hindexed Implementation - Indexed with indices in bytes 
921 */
922
923 /*
924  *  Copies noncontiguous data into contiguous memory.
925  *  @param contiguous_hindexed - output hindexed
926  *  @param noncontiguous_hindexed - input hindexed
927  *  @param type - pointer contening :
928  *      - block_lengths - the width or height of blocked matrix
929  *      - block_indices - indices of each data, in bytes
930  *      - count - the number of rows of matrix
931  */
932 void serialize_hindexed( const void *noncontiguous_hindexed,
933                        void *contiguous_hindexed,
934                        int count,
935                        void *type)
936 {
937   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
938   int i,j;
939   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
940   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
941   for(j=0; j<count;j++){
942     for (i = 0; i < type_c->block_count; i++) {
943       if (type_c->old_type->has_subtype == 0)
944         memcpy(contiguous_hindexed_char,
945                      noncontiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
946       else
947         ((s_smpi_subtype_t*)type_c->old_type->substruct)->serialize( noncontiguous_hindexed_char,
948                                                                      contiguous_hindexed_char,
949                                                                      type_c->block_lengths[i],
950                                                                      type_c->old_type->substruct);
951
952       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
953       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
954       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
955     }
956     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
957   }
958 }
959 /*
960  *  Copies contiguous data into noncontiguous memory.
961  *  @param noncontiguous_hindexed - output hindexed
962  *  @param contiguous_hindexed - input hindexed
963  *  @param type - pointer contening :
964  *      - block_lengths - the width or height of blocked matrix
965  *      - block_indices - indices of each data, in bytes
966  *      - count - the number of rows of matrix
967  */
968 void unserialize_hindexed( const void *contiguous_hindexed,
969                          void *noncontiguous_hindexed,
970                          int count,
971                          void *type,
972                          MPI_Op op)
973 {
974   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
975   int i,j;
976
977   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
978   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed+ type_c->block_indices[0];
979   for(j=0; j<count;j++){
980     for (i = 0; i < type_c->block_count; i++) {
981       if (type_c->old_type->has_subtype == 0)
982         smpi_op_apply(op, contiguous_hindexed_char, noncontiguous_hindexed_char, &type_c->block_lengths[i],
983                             &type_c->old_type);
984                        /*memcpy(noncontiguous_hindexed_char,
985                contiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);*/
986       else
987         ((s_smpi_subtype_t*)type_c->old_type->substruct)->unserialize( contiguous_hindexed_char,
988                                                                        noncontiguous_hindexed_char,
989                                                                        type_c->block_lengths[i],
990                                                                        type_c->old_type->substruct,
991                                                                        op);
992
993       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
994       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
995       else noncontiguous_hindexed_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_type);
996     }
997     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
998   }
999 }
1000
1001 void free_hindexed(MPI_Datatype* type){
1002   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths);
1003   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices);
1004   smpi_datatype_unuse(((s_smpi_mpi_indexed_t *)(*type)->substruct)->old_type);
1005 }
1006
1007 /*
1008  * Create a Sub type hindexed to be able to serialize and unserialize it
1009  * the structure s_smpi_mpi_hindexed_t is derived from s_smpi_subtype which
1010  * required the functions unserialize and serialize
1011  */
1012 s_smpi_mpi_hindexed_t* smpi_datatype_hindexed_create( int* block_lengths,
1013                                                   MPI_Aint* block_indices,
1014                                                   int block_count,
1015                                                   MPI_Datatype old_type,
1016                                                   int size_oldtype){
1017   s_smpi_mpi_hindexed_t *new_t= xbt_new(s_smpi_mpi_hindexed_t,1);
1018   new_t->base.serialize = &serialize_hindexed;
1019   new_t->base.unserialize = &unserialize_hindexed;
1020   new_t->base.subtype_free = &free_hindexed;
1021  //TODO : add a custom function for each time to clean these 
1022   new_t->block_lengths= xbt_new(int, block_count);
1023   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1024   int i;
1025   for(i=0;i<block_count;i++){
1026     new_t->block_lengths[i]=block_lengths[i];
1027     new_t->block_indices[i]=block_indices[i];
1028   }
1029   new_t->block_count = block_count;
1030   new_t->old_type = old_type;
1031   new_t->size_oldtype = size_oldtype;
1032   return new_t;
1033 }
1034
1035
1036 int smpi_datatype_hindexed(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
1037 {
1038   int i;
1039   int retval;
1040   int size = 0;
1041   int contiguous=1;
1042   MPI_Aint lb = 0;
1043   MPI_Aint ub = 0;
1044   if(count>0){
1045     lb=indices[0] + smpi_datatype_lb(old_type);
1046     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_type);
1047   }
1048   for(i=0; i< count; i++){
1049     if   (blocklens[i]<0)
1050       return MPI_ERR_ARG;
1051     size += blocklens[i];
1052
1053     if(indices[i]+smpi_datatype_lb(old_type)<lb) lb = indices[i]+smpi_datatype_lb(old_type);
1054     if(indices[i]+blocklens[i]*smpi_datatype_ub(old_type)>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_type);
1055
1056     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_type) != indices[i+1]) )contiguous=0;
1057   }
1058   if (old_type->has_subtype == 1 || lb!=0)
1059     contiguous=0;
1060
1061   if(!contiguous){
1062     s_smpi_mpi_hindexed_t* subtype = smpi_datatype_hindexed_create( blocklens,
1063                                                                   indices,
1064                                                                   count,
1065                                                                   old_type,
1066                                                                   smpi_datatype_size(old_type));
1067     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1068                                                  lb,
1069                          ub
1070                          ,1, subtype, DT_FLAG_DATA);
1071   }else{
1072     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1073                                                                   size,
1074                                                                   old_type,
1075                                                                   smpi_datatype_size(old_type));
1076     smpi_datatype_create(new_type,  size * smpi_datatype_size(old_type),
1077                                              0,size * smpi_datatype_size(old_type),
1078                                              1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1079   }
1080   retval=MPI_SUCCESS;
1081   return retval;
1082 }
1083
1084
1085 /*
1086 struct Implementation - Indexed with indices in bytes 
1087 */
1088
1089 /*
1090  *  Copies noncontiguous data into contiguous memory.
1091  *  @param contiguous_struct - output struct
1092  *  @param noncontiguous_struct - input struct
1093  *  @param type - pointer contening :
1094  *      - stride - stride of between noncontiguous data
1095  *      - block_length - the width or height of blocked matrix
1096  *      - count - the number of rows of matrix
1097  */
1098 void serialize_struct( const void *noncontiguous_struct,
1099                        void *contiguous_struct,
1100                        int count,
1101                        void *type)
1102 {
1103   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1104   int i,j;
1105   char* contiguous_struct_char = (char*)contiguous_struct;
1106   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1107   for(j=0; j<count;j++){
1108     for (i = 0; i < type_c->block_count; i++) {
1109       if (type_c->old_types[i]->has_subtype == 0)
1110         memcpy(contiguous_struct_char,
1111              noncontiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
1112       else
1113         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->serialize( noncontiguous_struct_char,
1114                                                                          contiguous_struct_char,
1115                                                                          type_c->block_lengths[i],
1116                                                                          type_c->old_types[i]->substruct);
1117
1118
1119       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1120       if (i<type_c->block_count-1)noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
1121       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);//let's hope this is MPI_UB ?
1122     }
1123     noncontiguous_struct=(void*)noncontiguous_struct_char;
1124   }
1125 }
1126 /*
1127  *  Copies contiguous data into noncontiguous memory.
1128  *  @param noncontiguous_struct - output struct
1129  *  @param contiguous_struct - input struct
1130  *  @param type - pointer contening :
1131  *      - stride - stride of between noncontiguous data
1132  *      - block_length - the width or height of blocked matrix
1133  *      - count - the number of rows of matrix
1134  */
1135 void unserialize_struct( const void *contiguous_struct,
1136                          void *noncontiguous_struct,
1137                          int count,
1138                          void *type,
1139                          MPI_Op op)
1140 {
1141   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
1142   int i,j;
1143
1144   char* contiguous_struct_char = (char*)contiguous_struct;
1145   char* noncontiguous_struct_char = (char*)noncontiguous_struct+ type_c->block_indices[0];
1146   for(j=0; j<count;j++){
1147     for (i = 0; i < type_c->block_count; i++) {
1148       if (type_c->old_types[i]->has_subtype == 0)
1149         smpi_op_apply(op, contiguous_struct_char, noncontiguous_struct_char, &type_c->block_lengths[i],
1150            & type_c->old_types[i]);
1151                        /*memcpy(noncontiguous_struct_char,
1152              contiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));*/
1153       else
1154         ((s_smpi_subtype_t*)type_c->old_types[i]->substruct)->unserialize( contiguous_struct_char,
1155                                                                            noncontiguous_struct_char,
1156                                                                            type_c->block_lengths[i],
1157                                                                            type_c->old_types[i]->substruct,
1158                                                                            op);
1159
1160       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
1161       if (i<type_c->block_count-1)noncontiguous_struct_char =  (char*)noncontiguous_struct + type_c->block_indices[i+1];
1162       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_get_extent(type_c->old_types[i]);
1163     }
1164     noncontiguous_struct=(void*)noncontiguous_struct_char;
1165     
1166   }
1167 }
1168
1169 void free_struct(MPI_Datatype* type){
1170   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths);
1171   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices);
1172   int i=0;
1173   for (i = 0; i < ((s_smpi_mpi_struct_t *)(*type)->substruct)->block_count; i++)
1174     smpi_datatype_unuse(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types[i]);
1175   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types);
1176 }
1177
1178 /*
1179  * Create a Sub type struct to be able to serialize and unserialize it
1180  * the structure s_smpi_mpi_struct_t is derived from s_smpi_subtype which
1181  * required the functions unserialize and serialize
1182  */
1183 s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
1184                                                   MPI_Aint* block_indices,
1185                                                   int block_count,
1186                                                   MPI_Datatype* old_types){
1187   s_smpi_mpi_struct_t *new_t= xbt_new(s_smpi_mpi_struct_t,1);
1188   new_t->base.serialize = &serialize_struct;
1189   new_t->base.unserialize = &unserialize_struct;
1190   new_t->base.subtype_free = &free_struct;
1191  //TODO : add a custom function for each time to clean these 
1192   new_t->block_lengths= xbt_new(int, block_count);
1193   new_t->block_indices= xbt_new(MPI_Aint, block_count);
1194   new_t->old_types=  xbt_new(MPI_Datatype, block_count);
1195   int i;
1196   for(i=0;i<block_count;i++){
1197     new_t->block_lengths[i]=block_lengths[i];
1198     new_t->block_indices[i]=block_indices[i];
1199     new_t->old_types[i]=old_types[i];
1200     smpi_datatype_use(new_t->old_types[i]);
1201   }
1202   //new_t->block_lengths = block_lengths;
1203   //new_t->block_indices = block_indices;
1204   new_t->block_count = block_count;
1205   //new_t->old_types = old_types;
1206   return new_t;
1207 }
1208
1209
1210 int smpi_datatype_struct(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype* old_types, MPI_Datatype* new_type)
1211 {
1212   int i;
1213   size_t size = 0;
1214   int contiguous=1;
1215   size = 0;
1216   MPI_Aint lb = 0;
1217   MPI_Aint ub = 0;
1218   if(count>0){
1219     lb=indices[0] + smpi_datatype_lb(old_types[0]);
1220     ub=indices[0] + blocklens[0]*smpi_datatype_ub(old_types[0]);
1221   }
1222   int forced_lb=0;
1223   int forced_ub=0;
1224   for(i=0; i< count; i++){
1225     if (blocklens[i]<0)
1226       return MPI_ERR_ARG;
1227     if (old_types[i]->has_subtype == 1)
1228       contiguous=0;
1229
1230     size += blocklens[i]*smpi_datatype_size(old_types[i]);
1231     if (old_types[i]==MPI_LB){
1232       lb=indices[i];
1233       forced_lb=1;
1234     }
1235     if (old_types[i]==MPI_UB){
1236       ub=indices[i];
1237       forced_ub=1;
1238     }
1239
1240     if(!forced_lb && indices[i]+smpi_datatype_lb(old_types[i])<lb) lb = indices[i];
1241     if(!forced_ub && indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i])>ub) ub = indices[i]+blocklens[i]*smpi_datatype_ub(old_types[i]);
1242
1243     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_types[i]) != indices[i+1]) )contiguous=0;
1244   }
1245
1246   if(!contiguous){
1247     s_smpi_mpi_struct_t* subtype = smpi_datatype_struct_create( blocklens,
1248                                                               indices,
1249                                                               count,
1250                                                               old_types);
1251
1252     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA);
1253   }else{
1254     s_smpi_mpi_contiguous_t* subtype = smpi_datatype_contiguous_create( lb,
1255                                                                   size,
1256                                                                   MPI_CHAR,
1257                                                                   1);
1258     smpi_datatype_create(new_type,  size, lb, ub,1, subtype, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
1259   }
1260   return MPI_SUCCESS;
1261 }
1262
1263 void smpi_datatype_commit(MPI_Datatype *datatype)
1264 {
1265   (*datatype)->flags=  ((*datatype)->flags | DT_FLAG_COMMITED);
1266 }
1267
1268 typedef struct s_smpi_mpi_op {
1269   MPI_User_function *func;
1270   int is_commute;
1271 } s_smpi_mpi_op_t;
1272
1273 #define MAX_OP(a, b)  (b) = (a) < (b) ? (b) : (a)
1274 #define MIN_OP(a, b)  (b) = (a) < (b) ? (a) : (b)
1275 #define SUM_OP(a, b)  (b) += (a)
1276 #define PROD_OP(a, b) (b) *= (a)
1277 #define LAND_OP(a, b) (b) = (a) && (b)
1278 #define LOR_OP(a, b)  (b) = (a) || (b)
1279 #define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b))
1280 #define BAND_OP(a, b) (b) &= (a)
1281 #define BOR_OP(a, b)  (b) |= (a)
1282 #define BXOR_OP(a, b) (b) ^= (a)
1283 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
1284 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
1285
1286 #define APPLY_FUNC(a, b, length, type, func) \
1287 {                                          \
1288   int i;                                   \
1289   type* x = (type*)(a);                    \
1290   type* y = (type*)(b);                    \
1291   for(i = 0; i < *(length); i++) {         \
1292     func(x[i], y[i]);                      \
1293   }                                        \
1294 }
1295
1296 static void max_func(void *a, void *b, int *length,
1297                      MPI_Datatype * datatype)
1298 {
1299   if (*datatype == MPI_CHAR) {
1300     APPLY_FUNC(a, b, length, char, MAX_OP);
1301   } else if (*datatype == MPI_SHORT) {
1302     APPLY_FUNC(a, b, length, short, MAX_OP);
1303   } else if (*datatype == MPI_INT) {
1304     APPLY_FUNC(a, b, length, int, MAX_OP);
1305   } else if (*datatype == MPI_LONG) {
1306     APPLY_FUNC(a, b, length, long, MAX_OP);
1307   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1308     APPLY_FUNC(a, b, length, unsigned short, MAX_OP);
1309   } else if (*datatype == MPI_UNSIGNED) {
1310     APPLY_FUNC(a, b, length, unsigned int, MAX_OP);
1311   } else if (*datatype == MPI_UNSIGNED_LONG) {
1312     APPLY_FUNC(a, b, length, unsigned long, MAX_OP);
1313   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1314     APPLY_FUNC(a, b, length, unsigned char, MAX_OP);
1315   } else if (*datatype == MPI_FLOAT) {
1316     APPLY_FUNC(a, b, length, float, MAX_OP);
1317   } else if (*datatype == MPI_DOUBLE) {
1318     APPLY_FUNC(a, b, length, double, MAX_OP);
1319   } else if (*datatype == MPI_LONG_DOUBLE) {
1320     APPLY_FUNC(a, b, length, long double, MAX_OP);
1321   }
1322 }
1323
1324 static void min_func(void *a, void *b, int *length,
1325                      MPI_Datatype * datatype)
1326 {
1327   if (*datatype == MPI_CHAR) {
1328     APPLY_FUNC(a, b, length, char, MIN_OP);
1329   } else if (*datatype == MPI_SHORT) {
1330     APPLY_FUNC(a, b, length, short, MIN_OP);
1331   } else if (*datatype == MPI_INT) {
1332     APPLY_FUNC(a, b, length, int, MIN_OP);
1333   } else if (*datatype == MPI_LONG) {
1334     APPLY_FUNC(a, b, length, long, MIN_OP);
1335   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1336     APPLY_FUNC(a, b, length, unsigned short, MIN_OP);
1337   } else if (*datatype == MPI_UNSIGNED) {
1338     APPLY_FUNC(a, b, length, unsigned int, MIN_OP);
1339   } else if (*datatype == MPI_UNSIGNED_LONG) {
1340     APPLY_FUNC(a, b, length, unsigned long, MIN_OP);
1341   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1342     APPLY_FUNC(a, b, length, unsigned char, MIN_OP);
1343   } else if (*datatype == MPI_FLOAT) {
1344     APPLY_FUNC(a, b, length, float, MIN_OP);
1345   } else if (*datatype == MPI_DOUBLE) {
1346     APPLY_FUNC(a, b, length, double, MIN_OP);
1347   } else if (*datatype == MPI_LONG_DOUBLE) {
1348     APPLY_FUNC(a, b, length, long double, MIN_OP);
1349   }
1350 }
1351
1352 static void sum_func(void *a, void *b, int *length,
1353                      MPI_Datatype * datatype)
1354 {
1355   if (*datatype == MPI_CHAR) {
1356     APPLY_FUNC(a, b, length, char, SUM_OP);
1357   } else if (*datatype == MPI_SHORT) {
1358     APPLY_FUNC(a, b, length, short, SUM_OP);
1359   } else if (*datatype == MPI_INT) {
1360     APPLY_FUNC(a, b, length, int, SUM_OP);
1361   } else if (*datatype == MPI_LONG) {
1362     APPLY_FUNC(a, b, length, long, SUM_OP);
1363   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1364     APPLY_FUNC(a, b, length, unsigned short, SUM_OP);
1365   } else if (*datatype == MPI_UNSIGNED) {
1366     APPLY_FUNC(a, b, length, unsigned int, SUM_OP);
1367   } else if (*datatype == MPI_UNSIGNED_LONG) {
1368     APPLY_FUNC(a, b, length, unsigned long, SUM_OP);
1369   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1370     APPLY_FUNC(a, b, length, unsigned char, SUM_OP);
1371   } else if (*datatype == MPI_FLOAT) {
1372     APPLY_FUNC(a, b, length, float, SUM_OP);
1373   } else if (*datatype == MPI_DOUBLE) {
1374     APPLY_FUNC(a, b, length, double, SUM_OP);
1375   } else if (*datatype == MPI_LONG_DOUBLE) {
1376     APPLY_FUNC(a, b, length, long double, SUM_OP);
1377   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1378     APPLY_FUNC(a, b, length, float _Complex, SUM_OP);
1379   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1380     APPLY_FUNC(a, b, length, double _Complex, SUM_OP);
1381   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1382     APPLY_FUNC(a, b, length, long double _Complex, SUM_OP);
1383   }
1384 }
1385
1386 static void prod_func(void *a, void *b, int *length,
1387                       MPI_Datatype * datatype)
1388 {
1389   if (*datatype == MPI_CHAR) {
1390     APPLY_FUNC(a, b, length, char, PROD_OP);
1391   } else if (*datatype == MPI_SHORT) {
1392     APPLY_FUNC(a, b, length, short, PROD_OP);
1393   } else if (*datatype == MPI_INT) {
1394     APPLY_FUNC(a, b, length, int, PROD_OP);
1395   } else if (*datatype == MPI_LONG) {
1396     APPLY_FUNC(a, b, length, long, PROD_OP);
1397   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1398     APPLY_FUNC(a, b, length, unsigned short, PROD_OP);
1399   } else if (*datatype == MPI_UNSIGNED) {
1400     APPLY_FUNC(a, b, length, unsigned int, PROD_OP);
1401   } else if (*datatype == MPI_UNSIGNED_LONG) {
1402     APPLY_FUNC(a, b, length, unsigned long, PROD_OP);
1403   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1404     APPLY_FUNC(a, b, length, unsigned char, PROD_OP);
1405   } else if (*datatype == MPI_FLOAT) {
1406     APPLY_FUNC(a, b, length, float, PROD_OP);
1407   } else if (*datatype == MPI_DOUBLE) {
1408     APPLY_FUNC(a, b, length, double, PROD_OP);
1409   } else if (*datatype == MPI_LONG_DOUBLE) {
1410     APPLY_FUNC(a, b, length, long double, PROD_OP);
1411   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1412     APPLY_FUNC(a, b, length, float _Complex, PROD_OP);
1413   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1414     APPLY_FUNC(a, b, length, double _Complex, PROD_OP);
1415   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1416     APPLY_FUNC(a, b, length, long double _Complex, PROD_OP);
1417   }
1418 }
1419
1420 static void land_func(void *a, void *b, int *length,
1421                       MPI_Datatype * datatype)
1422 {
1423   if (*datatype == MPI_CHAR) {
1424     APPLY_FUNC(a, b, length, char, LAND_OP);
1425   } else if (*datatype == MPI_SHORT) {
1426     APPLY_FUNC(a, b, length, short, LAND_OP);
1427   } else if (*datatype == MPI_INT) {
1428     APPLY_FUNC(a, b, length, int, LAND_OP);
1429   } else if (*datatype == MPI_LONG) {
1430     APPLY_FUNC(a, b, length, long, LAND_OP);
1431   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1432     APPLY_FUNC(a, b, length, unsigned short, LAND_OP);
1433   } else if (*datatype == MPI_UNSIGNED) {
1434     APPLY_FUNC(a, b, length, unsigned int, LAND_OP);
1435   } else if (*datatype == MPI_UNSIGNED_LONG) {
1436     APPLY_FUNC(a, b, length, unsigned long, LAND_OP);
1437   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1438     APPLY_FUNC(a, b, length, unsigned char, LAND_OP);
1439   } else if (*datatype == MPI_C_BOOL) {
1440     APPLY_FUNC(a, b, length, _Bool, LAND_OP);
1441   }
1442 }
1443
1444 static void lor_func(void *a, void *b, int *length,
1445                      MPI_Datatype * datatype)
1446 {
1447   if (*datatype == MPI_CHAR) {
1448     APPLY_FUNC(a, b, length, char, LOR_OP);
1449   } else if (*datatype == MPI_SHORT) {
1450     APPLY_FUNC(a, b, length, short, LOR_OP);
1451   } else if (*datatype == MPI_INT) {
1452     APPLY_FUNC(a, b, length, int, LOR_OP);
1453   } else if (*datatype == MPI_LONG) {
1454     APPLY_FUNC(a, b, length, long, LOR_OP);
1455   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1456     APPLY_FUNC(a, b, length, unsigned short, LOR_OP);
1457   } else if (*datatype == MPI_UNSIGNED) {
1458     APPLY_FUNC(a, b, length, unsigned int, LOR_OP);
1459   } else if (*datatype == MPI_UNSIGNED_LONG) {
1460     APPLY_FUNC(a, b, length, unsigned long, LOR_OP);
1461   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1462     APPLY_FUNC(a, b, length, unsigned char, LOR_OP);
1463   } else if (*datatype == MPI_C_BOOL) {
1464     APPLY_FUNC(a, b, length, _Bool, LOR_OP);
1465   }
1466 }
1467
1468 static void lxor_func(void *a, void *b, int *length,
1469                       MPI_Datatype * datatype)
1470 {
1471   if (*datatype == MPI_CHAR) {
1472     APPLY_FUNC(a, b, length, char, LXOR_OP);
1473   } else if (*datatype == MPI_SHORT) {
1474     APPLY_FUNC(a, b, length, short, LXOR_OP);
1475   } else if (*datatype == MPI_INT) {
1476     APPLY_FUNC(a, b, length, int, LXOR_OP);
1477   } else if (*datatype == MPI_LONG) {
1478     APPLY_FUNC(a, b, length, long, LXOR_OP);
1479   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1480     APPLY_FUNC(a, b, length, unsigned short, LXOR_OP);
1481   } else if (*datatype == MPI_UNSIGNED) {
1482     APPLY_FUNC(a, b, length, unsigned int, LXOR_OP);
1483   } else if (*datatype == MPI_UNSIGNED_LONG) {
1484     APPLY_FUNC(a, b, length, unsigned long, LXOR_OP);
1485   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1486     APPLY_FUNC(a, b, length, unsigned char, LXOR_OP);
1487   } else if (*datatype == MPI_C_BOOL) {
1488     APPLY_FUNC(a, b, length, _Bool, LXOR_OP);
1489   }
1490 }
1491
1492 static void band_func(void *a, void *b, int *length,
1493                       MPI_Datatype * datatype)
1494 {
1495   if (*datatype == MPI_CHAR) {
1496     APPLY_FUNC(a, b, length, char, BAND_OP);
1497   }else if (*datatype == MPI_SHORT) {
1498     APPLY_FUNC(a, b, length, short, BAND_OP);
1499   } else if (*datatype == MPI_INT) {
1500     APPLY_FUNC(a, b, length, int, BAND_OP);
1501   } else if (*datatype == MPI_LONG) {
1502     APPLY_FUNC(a, b, length, long, BAND_OP);
1503   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1504     APPLY_FUNC(a, b, length, unsigned short, BAND_OP);
1505   } else if (*datatype == MPI_UNSIGNED) {
1506     APPLY_FUNC(a, b, length, unsigned int, BAND_OP);
1507   } else if (*datatype == MPI_UNSIGNED_LONG) {
1508     APPLY_FUNC(a, b, length, unsigned long, BAND_OP);
1509   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1510     APPLY_FUNC(a, b, length, unsigned char, BAND_OP);
1511   } else if (*datatype == MPI_BYTE) {
1512     APPLY_FUNC(a, b, length, uint8_t, BAND_OP);
1513   }
1514 }
1515
1516 static void bor_func(void *a, void *b, int *length,
1517                      MPI_Datatype * datatype)
1518 {
1519   if (*datatype == MPI_CHAR) {
1520     APPLY_FUNC(a, b, length, char, BOR_OP);
1521   } else if (*datatype == MPI_SHORT) {
1522     APPLY_FUNC(a, b, length, short, BOR_OP);
1523   } else if (*datatype == MPI_INT) {
1524     APPLY_FUNC(a, b, length, int, BOR_OP);
1525   } else if (*datatype == MPI_LONG) {
1526     APPLY_FUNC(a, b, length, long, BOR_OP);
1527   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1528     APPLY_FUNC(a, b, length, unsigned short, BOR_OP);
1529   } else if (*datatype == MPI_UNSIGNED) {
1530     APPLY_FUNC(a, b, length, unsigned int, BOR_OP);
1531   } else if (*datatype == MPI_UNSIGNED_LONG) {
1532     APPLY_FUNC(a, b, length, unsigned long, BOR_OP);
1533   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1534     APPLY_FUNC(a, b, length, unsigned char, BOR_OP);
1535   } else if (*datatype == MPI_BYTE) {
1536     APPLY_FUNC(a, b, length, uint8_t, BOR_OP);
1537   }
1538 }
1539
1540 static void bxor_func(void *a, void *b, int *length,
1541                       MPI_Datatype * datatype)
1542 {
1543   if (*datatype == MPI_CHAR) {
1544     APPLY_FUNC(a, b, length, char, BXOR_OP);
1545   } else if (*datatype == MPI_SHORT) {
1546     APPLY_FUNC(a, b, length, short, BXOR_OP);
1547   } else if (*datatype == MPI_INT) {
1548     APPLY_FUNC(a, b, length, int, BXOR_OP);
1549   } else if (*datatype == MPI_LONG) {
1550     APPLY_FUNC(a, b, length, long, BXOR_OP);
1551   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1552     APPLY_FUNC(a, b, length, unsigned short, BXOR_OP);
1553   } else if (*datatype == MPI_UNSIGNED) {
1554     APPLY_FUNC(a, b, length, unsigned int, BXOR_OP);
1555   } else if (*datatype == MPI_UNSIGNED_LONG) {
1556     APPLY_FUNC(a, b, length, unsigned long, BXOR_OP);
1557   } else if (*datatype == MPI_UNSIGNED_CHAR) {
1558     APPLY_FUNC(a, b, length, unsigned char, BXOR_OP);
1559   } else if (*datatype == MPI_BYTE) {
1560     APPLY_FUNC(a, b, length, uint8_t, BXOR_OP);
1561   }
1562 }
1563
1564 static void minloc_func(void *a, void *b, int *length,
1565                         MPI_Datatype * datatype)
1566 {
1567   if (*datatype == MPI_FLOAT_INT) {
1568     APPLY_FUNC(a, b, length, float_int, MINLOC_OP);
1569   } else if (*datatype == MPI_LONG_INT) {
1570     APPLY_FUNC(a, b, length, long_int, MINLOC_OP);
1571   } else if (*datatype == MPI_DOUBLE_INT) {
1572     APPLY_FUNC(a, b, length, double_int, MINLOC_OP);
1573   } else if (*datatype == MPI_SHORT_INT) {
1574     APPLY_FUNC(a, b, length, short_int, MINLOC_OP);
1575   } else if (*datatype == MPI_2LONG) {
1576     APPLY_FUNC(a, b, length, long_long, MINLOC_OP);
1577   } else if (*datatype == MPI_2INT) {
1578     APPLY_FUNC(a, b, length, int_int, MINLOC_OP);
1579   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1580     APPLY_FUNC(a, b, length, long_double_int, MINLOC_OP);
1581   } else if (*datatype == MPI_2FLOAT) {
1582     APPLY_FUNC(a, b, length, float_float, MINLOC_OP);
1583   } else if (*datatype == MPI_2DOUBLE) {
1584     APPLY_FUNC(a, b, length, double_double, MINLOC_OP);
1585   }
1586 }
1587
1588 static void maxloc_func(void *a, void *b, int *length,
1589                         MPI_Datatype * datatype)
1590 {
1591   if (*datatype == MPI_FLOAT_INT) {
1592     APPLY_FUNC(a, b, length, float_int, MAXLOC_OP);
1593   } else if (*datatype == MPI_LONG_INT) {
1594     APPLY_FUNC(a, b, length, long_int, MAXLOC_OP);
1595   } else if (*datatype == MPI_DOUBLE_INT) {
1596     APPLY_FUNC(a, b, length, double_int, MAXLOC_OP);
1597   } else if (*datatype == MPI_SHORT_INT) {
1598     APPLY_FUNC(a, b, length, short_int, MAXLOC_OP);
1599   } else if (*datatype == MPI_2LONG) {
1600     APPLY_FUNC(a, b, length, long_long, MAXLOC_OP);
1601   } else if (*datatype == MPI_2INT) {
1602     APPLY_FUNC(a, b, length, int_int, MAXLOC_OP);
1603   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1604     APPLY_FUNC(a, b, length, long_double_int, MAXLOC_OP);
1605   } else if (*datatype == MPI_2FLOAT) {
1606     APPLY_FUNC(a, b, length, float_float, MAXLOC_OP);
1607   } else if (*datatype == MPI_2DOUBLE) {
1608     APPLY_FUNC(a, b, length, double_double, MAXLOC_OP);
1609   }
1610 }
1611
1612 static void replace_func(void *a, void *b, int *length,
1613                         MPI_Datatype * datatype)
1614 {
1615   memcpy(b, a, *length * smpi_datatype_size(*datatype));
1616 }
1617
1618 #define CREATE_MPI_OP(name, func)                             \
1619   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */, TRUE }; \
1620 MPI_Op name = &mpi_##name;
1621
1622 CREATE_MPI_OP(MPI_MAX, max_func);
1623 CREATE_MPI_OP(MPI_MIN, min_func);
1624 CREATE_MPI_OP(MPI_SUM, sum_func);
1625 CREATE_MPI_OP(MPI_PROD, prod_func);
1626 CREATE_MPI_OP(MPI_LAND, land_func);
1627 CREATE_MPI_OP(MPI_LOR, lor_func);
1628 CREATE_MPI_OP(MPI_LXOR, lxor_func);
1629 CREATE_MPI_OP(MPI_BAND, band_func);
1630 CREATE_MPI_OP(MPI_BOR, bor_func);
1631 CREATE_MPI_OP(MPI_BXOR, bxor_func);
1632 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
1633 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
1634 CREATE_MPI_OP(MPI_REPLACE, replace_func);
1635
1636
1637 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
1638 {
1639   MPI_Op op;
1640   op = xbt_new(s_smpi_mpi_op_t, 1);
1641   op->func = function;
1642   op-> is_commute = commute;
1643   return op;
1644 }
1645
1646 int smpi_op_is_commute(MPI_Op op)
1647 {
1648   return (op==MPI_OP_NULL) ? 1 : op-> is_commute;
1649 }
1650
1651 void smpi_op_destroy(MPI_Op op)
1652 {
1653   xbt_free(op);
1654 }
1655
1656 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
1657                    MPI_Datatype * datatype)
1658 {
1659   if(op==MPI_OP_NULL)
1660     return;
1661
1662   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
1663     XBT_DEBUG("Applying operation, switch to the right data frame ");
1664     smpi_switch_data_segment(smpi_process_index());
1665   }
1666
1667   if(!smpi_process_get_replaying())
1668   op->func(invec, inoutvec, len, datatype);
1669 }
1670
1671 int smpi_type_attr_delete(MPI_Datatype type, int keyval){
1672   char* tmpkey=xbt_malloc(sizeof(int));
1673   sprintf(tmpkey, "%d", keyval);
1674   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)tmpkey);
1675   if(!elem)
1676     return MPI_ERR_ARG;
1677   if(elem->delete_fn!=MPI_NULL_DELETE_FN){
1678     void * value;
1679     int flag;
1680     if(smpi_type_attr_get(type, keyval, &value, &flag)==MPI_SUCCESS){
1681       int ret = elem->delete_fn(type, keyval, &value, &flag);
1682       if(ret!=MPI_SUCCESS) return ret;
1683     }
1684   }  
1685   if(type->attributes==NULL)
1686     return MPI_ERR_ARG;
1687
1688   xbt_dict_remove(type->attributes, (const char*)tmpkey);
1689   return MPI_SUCCESS;
1690 }
1691
1692 int smpi_type_attr_get(MPI_Datatype type, int keyval, void* attr_value, int* flag){
1693   char* tmpkey=xbt_malloc(sizeof(int));
1694   sprintf(tmpkey, "%d", keyval);
1695   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)tmpkey);
1696   if(!elem)
1697     return MPI_ERR_ARG;
1698   xbt_ex_t ex;
1699   if(type->attributes==NULL){
1700     *flag=0;
1701     return MPI_SUCCESS;
1702   }
1703   TRY {
1704   char* tmpkey=xbt_malloc(sizeof(int));
1705   sprintf(tmpkey, "%d", keyval);
1706     *(void**)attr_value = xbt_dict_get(type->attributes, (const char*)tmpkey);
1707     *flag=1;
1708   }
1709   CATCH(ex) {
1710     *flag=0;
1711     xbt_ex_free(ex);
1712   }
1713   return MPI_SUCCESS;
1714 }
1715
1716 int smpi_type_attr_put(MPI_Datatype type, int keyval, void* attr_value){
1717   if(!smpi_type_keyvals)
1718   smpi_type_keyvals = xbt_dict_new();
1719   char* tmpkey=xbt_malloc(sizeof(int));
1720   sprintf(tmpkey, "%d", keyval);
1721   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)tmpkey);
1722   if(!elem )
1723     return MPI_ERR_ARG;
1724   int flag;
1725   void* value;
1726   smpi_type_attr_get(type, keyval, &value, &flag);
1727   if(flag && elem->delete_fn!=MPI_NULL_DELETE_FN){
1728     int ret = elem->delete_fn(type, keyval, &value, &flag);
1729     if(ret!=MPI_SUCCESS) return ret;
1730   }
1731   if(type->attributes==NULL)
1732     type->attributes=xbt_dict_new();
1733
1734   xbt_dict_set(type->attributes, (const char*)tmpkey, attr_value, NULL);
1735   return MPI_SUCCESS;
1736 }
1737
1738 int smpi_type_keyval_create(MPI_Type_copy_attr_function* copy_fn, MPI_Type_delete_attr_function* delete_fn, int* keyval, void* extra_state){
1739
1740   if(!smpi_type_keyvals)
1741   smpi_type_keyvals = xbt_dict_new();
1742   
1743   smpi_type_key_elem value = (smpi_type_key_elem) xbt_new0(s_smpi_mpi_type_key_elem_t,1);
1744   
1745   value->copy_fn=copy_fn;
1746   value->delete_fn=delete_fn;
1747   
1748   *keyval = type_keyval_id;
1749   char* tmpkey=xbt_malloc(sizeof(int));
1750   sprintf(tmpkey, "%d", *keyval);
1751   xbt_dict_set(smpi_type_keyvals,(const char*)tmpkey,(void*)value, NULL);
1752   type_keyval_id++;
1753   return MPI_SUCCESS;
1754 }
1755
1756 int smpi_type_keyval_free(int* keyval){
1757   smpi_type_key_elem elem = xbt_dict_get_or_null(smpi_type_keyvals, (const char*)keyval);
1758   if(!elem)
1759     return MPI_ERR_ARG;
1760   char* tmpkey=xbt_malloc(sizeof(int));
1761   sprintf(tmpkey, "%d", *keyval);
1762   xbt_dict_remove(smpi_type_keyvals, (const char*)tmpkey);
1763   xbt_free(elem);
1764   return MPI_SUCCESS;
1765 }