Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add better support for MPI datatypes extent values (to correct behavior of gather...
[simgrid.git] / src / smpi / smpi_mpi_dt.c
1 /* smpi_mpi_dt.c -- MPI primitives to handle datatypes                        */
2 /* FIXME: a very incomplete implementation                                    */
3
4 /* Copyright (c) 2009, 2010. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include "private.h"
15 #include "smpi_mpi_dt_private.h"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi,
18                                 "Logging specific to SMPI (datatype)");
19
20 #define CREATE_MPI_DATATYPE(name, type)       \
21   static s_smpi_mpi_datatype_t mpi_##name = { \
22     sizeof(type),  /* size */                 \
23     0,             /*was 1 has_subtype*/             \
24     0,             /* lb */                   \
25     sizeof(type),  /* ub = lb + size */       \
26     DT_FLAG_BASIC,  /* flags */              \
27     NULL           /* pointer on extended struct*/ \
28   };                                          \
29 MPI_Datatype name = &mpi_##name;
30
31 #define CREATE_MPI_DATATYPE_NULL(name)       \
32   static s_smpi_mpi_datatype_t mpi_##name = { \
33     0,  /* size */                 \
34     0,             /*was 1 has_subtype*/             \
35     0,             /* lb */                   \
36     0,  /* ub = lb + size */       \
37     DT_FLAG_BASIC,  /* flags */              \
38     NULL           /* pointer on extended struct*/ \
39   };                                          \
40 MPI_Datatype name = &mpi_##name;
41
42 //The following are datatypes for the MPI functions MPI_MAXLOC and MPI_MINLOC.
43 typedef struct {
44   float value;
45   int index;
46 } float_int;
47 typedef struct {
48   long value;
49   int index;
50 } long_int;
51 typedef struct {
52   double value;
53   int index;
54 } double_int;
55 typedef struct {
56   short value;
57   int index;
58 } short_int;
59 typedef struct {
60   int value;
61   int index;
62 } int_int;
63 typedef struct {
64   long double value;
65   int index;
66 } long_double_int;
67
68 // Predefined data types
69 CREATE_MPI_DATATYPE(MPI_CHAR, char);
70 CREATE_MPI_DATATYPE(MPI_SHORT, short);
71 CREATE_MPI_DATATYPE(MPI_INT, int);
72 CREATE_MPI_DATATYPE(MPI_LONG, long);
73 CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long);
74 CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char);
75 CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char);
76 CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short);
77 CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int);
78 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long);
79 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long);
80 CREATE_MPI_DATATYPE(MPI_FLOAT, float);
81 CREATE_MPI_DATATYPE(MPI_DOUBLE, double);
82 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double);
83 CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t);
84 CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool);
85 CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t);
86 CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t);
87 CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t);
88 CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t);
89 CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t);
90 CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t);
91 CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t);
92 CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t);
93 CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex);
94 CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex);
95 CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex);
96 CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint);
97 CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset);
98
99 CREATE_MPI_DATATYPE(MPI_FLOAT_INT, float_int);
100 CREATE_MPI_DATATYPE(MPI_LONG_INT, long_int);
101 CREATE_MPI_DATATYPE(MPI_DOUBLE_INT, double_int);
102 CREATE_MPI_DATATYPE(MPI_SHORT_INT, short_int);
103 CREATE_MPI_DATATYPE(MPI_2INT, int_int);
104 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE_INT, long_double_int);
105
106 CREATE_MPI_DATATYPE_NULL(MPI_UB);
107 CREATE_MPI_DATATYPE_NULL(MPI_LB);
108 // Internal use only
109 CREATE_MPI_DATATYPE(MPI_PTR, void*);
110
111
112 size_t smpi_datatype_size(MPI_Datatype datatype)
113 {
114   return datatype->size;
115 }
116
117
118
119 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype)
120 {
121   return datatype->lb;
122 }
123
124 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
125 {
126   return datatype->ub;
127 }
128
129 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
130                          MPI_Aint * extent)
131 {
132   int retval;
133
134   if ((datatype->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
135     retval = MPI_ERR_TYPE;
136   } else {
137     *lb = datatype->lb;
138     *extent = datatype->ub - datatype->lb;
139     retval = MPI_SUCCESS;
140   }
141   return retval;
142 }
143
144 int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
145                        void *recvbuf, int recvcount, MPI_Datatype recvtype)
146 {
147   int retval, count;
148
149   /* First check if we really have something to do */
150   if (recvcount == 0) {
151     retval = sendcount == 0 ? MPI_SUCCESS : MPI_ERR_TRUNCATE;
152   } else {
153     /* FIXME: treat packed cases */
154     sendcount *= smpi_datatype_size(sendtype);
155     recvcount *= smpi_datatype_size(recvtype);
156     count = sendcount < recvcount ? sendcount : recvcount;
157
158     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
159       memcpy(recvbuf, sendbuf, count);
160     }
161     else if (sendtype->has_subtype == 0)
162     {
163       s_smpi_subtype_t *subtype =  recvtype->substruct;
164       subtype->unserialize( sendbuf, recvbuf,1, subtype);
165     }
166     else if (recvtype->has_subtype == 0)
167     {
168       s_smpi_subtype_t *subtype =  sendtype->substruct;
169       subtype->serialize(sendbuf, recvbuf,1, subtype);
170     }else{
171       s_smpi_subtype_t *subtype =  sendtype->substruct;
172
173       s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)sendtype;
174
175       void * buf_tmp = malloc(count * type_c->size_oldtype);
176
177       subtype->serialize( sendbuf, buf_tmp,1, subtype);
178       subtype =  recvtype->substruct;
179       subtype->unserialize(recvbuf, buf_tmp,1, subtype);
180
181       free(buf_tmp);
182     }
183     retval = sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
184   }
185
186   return retval;
187 }
188
189 /*
190  *  Copies noncontiguous data into contiguous memory.
191  *  @param contiguous_vector - output vector
192  *  @param noncontiguous_vector - input vector
193  *  @param type - pointer contening :
194  *      - stride - stride of between noncontiguous data
195  *      - block_length - the width or height of blocked matrix
196  *      - count - the number of rows of matrix
197  */
198 void serialize_vector( const void *noncontiguous_vector,
199                        void *contiguous_vector,
200                        size_t count,
201                        void *type)
202 {
203   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
204   int i;
205   char* contiguous_vector_char = (char*)contiguous_vector;
206   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
207
208   for (i = 0; i < type_c->block_count * count; i++) {
209     memcpy(contiguous_vector_char,
210            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
211
212     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
213     noncontiguous_vector_char += type_c->block_stride*type_c->size_oldtype;
214   }
215 }
216
217 /*
218  *  Copies contiguous data into noncontiguous memory.
219  *  @param noncontiguous_vector - output vector
220  *  @param contiguous_vector - input vector
221  *  @param type - pointer contening :
222  *      - stride - stride of between noncontiguous data
223  *      - block_length - the width or height of blocked matrix
224  *      - count - the number of rows of matrix
225  */
226 void unserialize_vector( const void *contiguous_vector,
227                          void *noncontiguous_vector,
228                          size_t count,
229                          void *type)
230 {
231   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
232   int i;
233
234   char* contiguous_vector_char = (char*)contiguous_vector;
235   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
236
237   for (i = 0; i < type_c->block_count * count; i++) {
238     memcpy(noncontiguous_vector_char,
239            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);
240
241     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
242     noncontiguous_vector_char += type_c->block_stride*type_c->size_oldtype;
243   }
244 }
245
246 /*
247  * Create a Sub type vector to be able to serialize and unserialize it
248  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
249  * required the functions unserialize and serialize
250  *
251  */
252 s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
253                                                   int block_length,
254                                                   int block_count,
255                                                   MPI_Datatype old_type,
256                                                   int size_oldtype){
257   s_smpi_mpi_vector_t *new_t= xbt_new(s_smpi_mpi_vector_t,1);
258   new_t->base.serialize = &serialize_vector;
259   new_t->base.unserialize = &unserialize_vector;
260   new_t->base.subtype_free = &free_vector;
261   new_t->block_stride = block_stride;
262   new_t->block_length = block_length;
263   new_t->block_count = block_count;
264   new_t->old_type = old_type;
265   new_t->size_oldtype = size_oldtype;
266   return new_t;
267 }
268
269 void smpi_datatype_create(MPI_Datatype* new_type, int size,int extent, int has_subtype,
270                           void *struct_type, int flags){
271   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
272   new_t->size = size;
273   new_t->has_subtype = has_subtype;
274   new_t->lb = 0;
275   new_t->ub = extent;
276   new_t->flags = flags;
277   new_t->substruct = struct_type;
278   *new_type = new_t;
279 }
280
281 void smpi_datatype_free(MPI_Datatype* type){
282   if ((*type)->has_subtype == 1){
283     ((s_smpi_subtype_t *)(*type)->substruct)->subtype_free(type);  
284   }
285   xbt_free(*type);
286 }
287
288 int smpi_datatype_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* new_type)
289 {
290   int retval;
291   if ((old_type->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
292     retval = MPI_ERR_TYPE;
293   } else {
294     smpi_datatype_create(new_type, count *
295                          smpi_datatype_size(old_type),count *
296                          smpi_datatype_size(old_type),0,NULL, DT_FLAG_CONTIGUOUS);
297     retval=MPI_SUCCESS;
298   }
299   return retval;
300 }
301
302 int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_type, MPI_Datatype* new_type)
303 {
304   int retval;
305   if (blocklen<=0) return MPI_ERR_ARG;
306   if ((old_type->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
307     retval = MPI_ERR_TYPE;
308   } else {
309     if(stride != blocklen){
310 if (old_type->has_subtype == 1)
311       XBT_WARN("vector contains a complex type - not yet handled");
312       s_smpi_mpi_vector_t* subtype = smpi_datatype_vector_create( stride,
313                                                                   blocklen,
314                                                                   count,
315                                                                   old_type,
316                                                                   smpi_datatype_size(old_type));
317
318       smpi_datatype_create(new_type, count * (blocklen) *
319                            smpi_datatype_size(old_type),
320                           ((count -1) * stride + blocklen) * smpi_datatype_size(old_type),
321                            1,
322                            subtype,
323                            DT_FLAG_VECTOR);
324       retval=MPI_SUCCESS;
325     }else{
326       /* in this situation the data are contignous thus it's not
327        * required to serialize and unserialize it*/
328       smpi_datatype_create(new_type, count * blocklen *
329                            smpi_datatype_size(old_type), ((count -1) * stride + blocklen)*
330                            smpi_datatype_size(old_type),
331                            0,
332                            NULL,
333                            DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
334       retval=MPI_SUCCESS;
335     }
336   }
337   return retval;
338 }
339
340 void free_vector(MPI_Datatype* d){
341 }
342
343 /*
344 Hvector Implementation - Vector with stride in bytes
345 */
346
347
348 /*
349  *  Copies noncontiguous data into contiguous memory.
350  *  @param contiguous_hvector - output hvector
351  *  @param noncontiguous_hvector - input hvector
352  *  @param type - pointer contening :
353  *      - stride - stride of between noncontiguous data, in bytes
354  *      - block_length - the width or height of blocked matrix
355  *      - count - the number of rows of matrix
356  */
357 void serialize_hvector( const void *noncontiguous_hvector,
358                        void *contiguous_hvector,
359                        size_t count,
360                        void *type)
361 {
362   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
363   int i;
364   char* contiguous_vector_char = (char*)contiguous_hvector;
365   char* noncontiguous_vector_char = (char*)noncontiguous_hvector;
366
367   for (i = 0; i < type_c->block_count * count; i++) {
368     memcpy(contiguous_vector_char,
369            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
370
371     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
372     noncontiguous_vector_char += type_c->block_stride;
373   }
374 }
375 /*
376  *  Copies contiguous data into noncontiguous memory.
377  *  @param noncontiguous_vector - output hvector
378  *  @param contiguous_vector - input hvector
379  *  @param type - pointer contening :
380  *      - stride - stride of between noncontiguous data, in bytes
381  *      - block_length - the width or height of blocked matrix
382  *      - count - the number of rows of matrix
383  */
384 void unserialize_hvector( const void *contiguous_vector,
385                          void *noncontiguous_vector,
386                          size_t count,
387                          void *type)
388 {
389   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
390   int i;
391
392   char* contiguous_vector_char = (char*)contiguous_vector;
393   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
394
395   for (i = 0; i < type_c->block_count * count; i++) {
396     memcpy(noncontiguous_vector_char,
397            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);
398
399     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
400     noncontiguous_vector_char += type_c->block_stride;
401   }
402 }
403
404 /*
405  * Create a Sub type vector to be able to serialize and unserialize it
406  * the structure s_smpi_mpi_vector_t is derived from s_smpi_subtype which
407  * required the functions unserialize and serialize
408  *
409  */
410 s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
411                                                   int block_length,
412                                                   int block_count,
413                                                   MPI_Datatype old_type,
414                                                   int size_oldtype){
415   s_smpi_mpi_hvector_t *new_t= xbt_new(s_smpi_mpi_hvector_t,1);
416   new_t->base.serialize = &serialize_hvector;
417   new_t->base.unserialize = &unserialize_hvector;
418   new_t->base.subtype_free = &free_hvector;
419   new_t->block_stride = block_stride;
420   new_t->block_length = block_length;
421   new_t->block_count = block_count;
422   new_t->old_type = old_type;
423   new_t->size_oldtype = size_oldtype;
424   return new_t;
425 }
426
427 //do nothing for vector types
428 void free_hvector(MPI_Datatype* d){
429 }
430
431 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
432 {
433   int retval;
434   if (blocklen<=0) return MPI_ERR_ARG;
435   if ((old_type->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
436     retval = MPI_ERR_TYPE;
437   } else {
438 if (old_type->has_subtype == 1)
439       XBT_WARN("hvector contains a complex type - not yet handled");
440     if(stride != blocklen*smpi_datatype_size(old_type)){
441       s_smpi_mpi_hvector_t* subtype = smpi_datatype_hvector_create( stride,
442                                                                     blocklen,
443                                                                     count,
444                                                                     old_type,
445                                                                     smpi_datatype_size(old_type));
446
447       smpi_datatype_create(new_type, count * blocklen *
448                            smpi_datatype_size(old_type), (count-1) * stride + blocklen *
449                            smpi_datatype_size(old_type),
450                            1,
451                            subtype,
452                            DT_FLAG_VECTOR);
453       retval=MPI_SUCCESS;
454     }else{
455       smpi_datatype_create(new_type, count * blocklen *
456                                                smpi_datatype_size(old_type),count * blocklen *
457                                                smpi_datatype_size(old_type),
458                                               0,
459                                               NULL,
460                                               DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
461       retval=MPI_SUCCESS;
462     }
463   }
464   return retval;
465 }
466
467
468 /*
469 Indexed Implementation
470 */
471
472 /*
473  *  Copies noncontiguous data into contiguous memory.
474  *  @param contiguous_indexed - output indexed
475  *  @param noncontiguous_indexed - input indexed
476  *  @param type - pointer contening :
477  *      - block_lengths - the width or height of blocked matrix
478  *      - block_indices - indices of each data, in element
479  *      - count - the number of rows of matrix
480  */
481 void serialize_indexed( const void *noncontiguous_indexed,
482                        void *contiguous_indexed,
483                        size_t count,
484                        void *type)
485 {
486   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
487   int i,j;
488   char* contiguous_indexed_char = (char*)contiguous_indexed;
489   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed;
490   for(j=0; j<count;j++){
491     for (i = 0; i < type_c->block_count; i++) {
492       memcpy(contiguous_indexed_char,
493              noncontiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
494
495       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
496       if (i<type_c->block_count-1)noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*type_c->size_oldtype;
497       else noncontiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
498     }
499     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
500   }
501 }
502 /*
503  *  Copies contiguous data into noncontiguous memory.
504  *  @param noncontiguous_indexed - output indexed
505  *  @param contiguous_indexed - input indexed
506  *  @param type - pointer contening :
507  *      - block_lengths - the width or height of blocked matrix
508  *      - block_indices - indices of each data, in element
509  *      - count - the number of rows of matrix
510  */
511 void unserialize_indexed( const void *contiguous_indexed,
512                          void *noncontiguous_indexed,
513                          size_t count,
514                          void *type)
515 {
516   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
517   int i,j;
518
519   char* contiguous_indexed_char = (char*)contiguous_indexed;
520   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed;
521   for(j=0; j<count;j++){
522     for (i = 0; i < type_c->block_count; i++) {
523       memcpy(noncontiguous_indexed_char,
524              contiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
525
526       contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
527       if (i<type_c->block_count-1)noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*type_c->size_oldtype;
528       else noncontiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
529     }
530     noncontiguous_indexed=(void*)noncontiguous_indexed_char;
531   }
532 }
533
534 void free_indexed(MPI_Datatype* type){
535   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_lengths);
536   xbt_free(((s_smpi_mpi_indexed_t *)(*type)->substruct)->block_indices);
537 }
538
539 /*
540  * Create a Sub type indexed to be able to serialize and unserialize it
541  * the structure s_smpi_mpi_indexed_t is derived from s_smpi_subtype which
542  * required the functions unserialize and serialize
543  */
544 s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
545                                                   int* block_indices,
546                                                   int block_count,
547                                                   MPI_Datatype old_type,
548                                                   int size_oldtype){
549   s_smpi_mpi_indexed_t *new_t= xbt_new(s_smpi_mpi_indexed_t,1);
550   new_t->base.serialize = &serialize_indexed;
551   new_t->base.unserialize = &unserialize_indexed;
552   new_t->base.subtype_free = &free_indexed;
553  //TODO : add a custom function for each time to clean these 
554   new_t->block_lengths= xbt_new(int, block_count);
555   new_t->block_indices= xbt_new(int, block_count);
556   int i;
557   for(i=0;i<block_count;i++){
558     new_t->block_lengths[i]=block_lengths[i];
559     new_t->block_indices[i]=block_indices[i];
560   }
561   new_t->block_count = block_count;
562   new_t->old_type = old_type;
563   new_t->size_oldtype = size_oldtype;
564   return new_t;
565 }
566
567
568 int smpi_datatype_indexed(int count, int* blocklens, int* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
569 {
570   int i;
571   int retval;
572   int size = 0;
573   int contiguous=1;
574   for(i=0; i< count; i++){
575     if   (blocklens[i]<=0)
576       return MPI_ERR_ARG;
577     size += blocklens[i];
578
579     if ( (i< count -1) && (indices[i]+blocklens[i] != indices[i+1]) )contiguous=0;
580   }
581   if ((old_type->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
582     retval = MPI_ERR_TYPE;
583   } else {
584
585     if (old_type->has_subtype == 1)
586       XBT_WARN("indexed contains a complex type - not yet handled");
587
588     if(!contiguous){
589       s_smpi_mpi_indexed_t* subtype = smpi_datatype_indexed_create( blocklens,
590                                                                     indices,
591                                                                     count,
592                                                                     old_type,
593                                                                     smpi_datatype_size(old_type));
594
595       smpi_datatype_create(new_type,  size *
596                            smpi_datatype_size(old_type),(indices[count-1]+blocklens[count-1])*smpi_datatype_size(old_type),1, subtype, DT_FLAG_DATA);
597 }else{
598       smpi_datatype_create(new_type,  size *
599                            smpi_datatype_size(old_type),size *
600                            smpi_datatype_size(old_type),0, NULL, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
601 }
602     retval=MPI_SUCCESS;
603   }
604   return retval;
605 }
606
607
608 /*
609 Hindexed Implementation - Indexed with indices in bytes 
610 */
611
612 /*
613  *  Copies noncontiguous data into contiguous memory.
614  *  @param contiguous_hindexed - output hindexed
615  *  @param noncontiguous_hindexed - input hindexed
616  *  @param type - pointer contening :
617  *      - block_lengths - the width or height of blocked matrix
618  *      - block_indices - indices of each data, in bytes
619  *      - count - the number of rows of matrix
620  */
621 void serialize_hindexed( const void *noncontiguous_hindexed,
622                        void *contiguous_hindexed,
623                        size_t count,
624                        void *type)
625 {
626   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
627   int i,j;
628   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
629   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed;
630   for(j=0; j<count;j++){
631     for (i = 0; i < type_c->block_count; i++) {
632       memcpy(contiguous_hindexed_char,
633              noncontiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
634
635       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
636       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
637       else noncontiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
638     }
639     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
640   }
641 }
642 /*
643  *  Copies contiguous data into noncontiguous memory.
644  *  @param noncontiguous_hindexed - output hindexed
645  *  @param contiguous_hindexed - input hindexed
646  *  @param type - pointer contening :
647  *      - block_lengths - the width or height of blocked matrix
648  *      - block_indices - indices of each data, in bytes
649  *      - count - the number of rows of matrix
650  */
651 void unserialize_hindexed( const void *contiguous_hindexed,
652                          void *noncontiguous_hindexed,
653                          size_t count,
654                          void *type)
655 {
656   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
657   int i,j;
658
659   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
660   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed;
661   for(j=0; j<count;j++){
662     for (i = 0; i < type_c->block_count; i++) {
663       memcpy(noncontiguous_hindexed_char,
664              contiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
665
666       contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
667       if (i<type_c->block_count-1)noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
668       else noncontiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
669     }
670     noncontiguous_hindexed=(void*)noncontiguous_hindexed_char;
671   }
672 }
673
674 void free_hindexed(MPI_Datatype* type){
675   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_lengths);
676   xbt_free(((s_smpi_mpi_hindexed_t *)(*type)->substruct)->block_indices);
677 }
678
679 /*
680  * Create a Sub type hindexed to be able to serialize and unserialize it
681  * the structure s_smpi_mpi_hindexed_t is derived from s_smpi_subtype which
682  * required the functions unserialize and serialize
683  */
684 s_smpi_mpi_hindexed_t* smpi_datatype_hindexed_create( int* block_lengths,
685                                                   MPI_Aint* block_indices,
686                                                   int block_count,
687                                                   MPI_Datatype old_type,
688                                                   int size_oldtype){
689   s_smpi_mpi_hindexed_t *new_t= xbt_new(s_smpi_mpi_hindexed_t,1);
690   new_t->base.serialize = &serialize_hindexed;
691   new_t->base.unserialize = &unserialize_hindexed;
692   new_t->base.subtype_free = &free_hindexed;
693  //TODO : add a custom function for each time to clean these 
694   new_t->block_lengths= xbt_new(int, block_count);
695   new_t->block_indices= xbt_new(MPI_Aint, block_count);
696   int i;
697   for(i=0;i<block_count;i++){
698     new_t->block_lengths[i]=block_lengths[i];
699     new_t->block_indices[i]=block_indices[i];
700   }
701   new_t->block_count = block_count;
702   new_t->old_type = old_type;
703   new_t->size_oldtype = size_oldtype;
704   return new_t;
705 }
706
707
708 int smpi_datatype_hindexed(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
709 {
710   int i;
711   int retval;
712   int size = 0;
713   int contiguous=1;
714   for(i=0; i< count; i++){
715     if   (blocklens[i]<=0)
716       return MPI_ERR_ARG;
717     size += blocklens[i];
718
719
720     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_type) != indices[i+1]) )contiguous=0;
721   }
722   if ((old_type->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
723     retval = MPI_ERR_TYPE;
724   } else {
725     if (old_type->has_subtype == 1)
726       XBT_WARN("hindexed contains a complex type - not yet handled");
727
728     if(!contiguous){
729       s_smpi_mpi_hindexed_t* subtype = smpi_datatype_hindexed_create( blocklens,
730                                                                     indices,
731                                                                     count,
732                                                                     old_type,
733                                                                     smpi_datatype_size(old_type));
734
735       smpi_datatype_create(new_type,  size *
736                            smpi_datatype_size(old_type),indices[count-1]+blocklens[count-1]*smpi_datatype_size(old_type)
737                            ,1, subtype, DT_FLAG_DATA);
738     }else{
739       smpi_datatype_create(new_type,  size *
740                            smpi_datatype_size(old_type),size *
741                            smpi_datatype_size(old_type),0, NULL, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
742     }
743     retval=MPI_SUCCESS;
744   }
745   return retval;
746 }
747
748
749 /*
750 struct Implementation - Indexed with indices in bytes 
751 */
752
753 /*
754  *  Copies noncontiguous data into contiguous memory.
755  *  @param contiguous_struct - output struct
756  *  @param noncontiguous_struct - input struct
757  *  @param type - pointer contening :
758  *      - stride - stride of between noncontiguous data
759  *      - block_length - the width or height of blocked matrix
760  *      - count - the number of rows of matrix
761  */
762 void serialize_struct( const void *noncontiguous_struct,
763                        void *contiguous_struct,
764                        size_t count,
765                        void *type)
766 {
767   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
768   int i,j;
769   char* contiguous_struct_char = (char*)contiguous_struct;
770   char* noncontiguous_struct_char = (char*)noncontiguous_struct;
771   for(j=0; j<count;j++){
772     for (i = 0; i < type_c->block_count; i++) {
773       memcpy(contiguous_struct_char,
774              noncontiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
775       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
776       if (i<type_c->block_count-1)noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
777       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);//let's hope this is MPI_UB ?
778     }
779     noncontiguous_struct=(void*)noncontiguous_struct_char;
780   }
781 }
782 /*
783  *  Copies contiguous data into noncontiguous memory.
784  *  @param noncontiguous_struct - output struct
785  *  @param contiguous_struct - input struct
786  *  @param type - pointer contening :
787  *      - stride - stride of between noncontiguous data
788  *      - block_length - the width or height of blocked matrix
789  *      - count - the number of rows of matrix
790  */
791 void unserialize_struct( const void *contiguous_struct,
792                          void *noncontiguous_struct,
793                          size_t count,
794                          void *type)
795 {
796   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
797   int i,j;
798
799   char* contiguous_struct_char = (char*)contiguous_struct;
800   char* noncontiguous_struct_char = (char*)noncontiguous_struct;
801   for(j=0; j<count;j++){
802     for (i = 0; i < type_c->block_count; i++) {
803       memcpy(noncontiguous_struct_char,
804              contiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
805       contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
806       if (i<type_c->block_count-1)noncontiguous_struct_char =  (char*)noncontiguous_struct + type_c->block_indices[i+1];
807       else noncontiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
808     }
809     noncontiguous_struct=(void*)noncontiguous_struct_char;
810     
811   }
812 }
813
814 void free_struct(MPI_Datatype* type){
815   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_lengths);
816   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->block_indices);
817   xbt_free(((s_smpi_mpi_struct_t *)(*type)->substruct)->old_types);
818 }
819
820 /*
821  * Create a Sub type struct to be able to serialize and unserialize it
822  * the structure s_smpi_mpi_struct_t is derived from s_smpi_subtype which
823  * required the functions unserialize and serialize
824  */
825 s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
826                                                   MPI_Aint* block_indices,
827                                                   int block_count,
828                                                   MPI_Datatype* old_types){
829   s_smpi_mpi_struct_t *new_t= xbt_new(s_smpi_mpi_struct_t,1);
830   new_t->base.serialize = &serialize_struct;
831   new_t->base.unserialize = &unserialize_struct;
832   new_t->base.subtype_free = &free_struct;
833  //TODO : add a custom function for each time to clean these 
834   new_t->block_lengths= xbt_new(int, block_count);
835   new_t->block_indices= xbt_new(MPI_Aint, block_count);
836   new_t->old_types=  xbt_new(MPI_Datatype, block_count);
837   int i;
838   for(i=0;i<block_count;i++){
839     new_t->block_lengths[i]=block_lengths[i];
840     new_t->block_indices[i]=block_indices[i];
841     new_t->old_types[i]=old_types[i];
842   }
843   //new_t->block_lengths = block_lengths;
844   //new_t->block_indices = block_indices;
845   new_t->block_count = block_count;
846   //new_t->old_types = old_types;
847   return new_t;
848 }
849
850
851 int smpi_datatype_struct(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype* old_types, MPI_Datatype* new_type)
852 {
853   int i;
854   size_t size = 0;
855   int contiguous=1;
856   size = 0;
857   for(i=0; i< count; i++){
858     if (blocklens[i]<=0)
859       return MPI_ERR_ARG;
860     if ((old_types[i]->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED)
861       return MPI_ERR_TYPE;
862     if (old_types[i]->has_subtype == 1)
863       XBT_WARN("Struct contains a complex type - not yet handled");
864     size += blocklens[i]*smpi_datatype_size(old_types[i]);
865
866     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_types[i]) != indices[i+1]) )contiguous=0;
867   }
868
869   if(!contiguous){
870     s_smpi_mpi_struct_t* subtype = smpi_datatype_struct_create( blocklens,
871                                                               indices,
872                                                               count,
873                                                               old_types);
874
875     smpi_datatype_create(new_type,  size, indices[count-1] + blocklens[count-1]*smpi_datatype_size(old_types[count-1]),1, subtype, DT_FLAG_DATA);
876   }else{
877     smpi_datatype_create(new_type,  size, indices[count-1] + blocklens[count-1]*smpi_datatype_size(old_types[count-1]),0, NULL, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
878   }
879   return MPI_SUCCESS;
880 }
881
882 void smpi_datatype_commit(MPI_Datatype *datatype)
883 {
884   (*datatype)->flags=  ((*datatype)->flags | DT_FLAG_COMMITED);
885 }
886
887 typedef struct s_smpi_mpi_op {
888   MPI_User_function *func;
889 } s_smpi_mpi_op_t;
890
891 #define MAX_OP(a, b)  (b) = (a) < (b) ? (b) : (a)
892 #define MIN_OP(a, b)  (b) = (a) < (b) ? (a) : (b)
893 #define SUM_OP(a, b)  (b) += (a)
894 #define PROD_OP(a, b) (b) *= (a)
895 #define LAND_OP(a, b) (b) = (a) && (b)
896 #define LOR_OP(a, b)  (b) = (a) || (b)
897 #define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b))
898 #define BAND_OP(a, b) (b) &= (a)
899 #define BOR_OP(a, b)  (b) |= (a)
900 #define BXOR_OP(a, b) (b) ^= (a)
901 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
902 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
903 //TODO : MINLOC & MAXLOC
904
905 #define APPLY_FUNC(a, b, length, type, func) \
906 {                                          \
907   int i;                                   \
908   type* x = (type*)(a);                    \
909   type* y = (type*)(b);                    \
910   for(i = 0; i < *(length); i++) {         \
911     func(x[i], y[i]);                      \
912   }                                        \
913 }
914
915 static void max_func(void *a, void *b, int *length,
916                      MPI_Datatype * datatype)
917 {
918   if (*datatype == MPI_CHAR) {
919     APPLY_FUNC(a, b, length, char, MAX_OP);
920   } else if (*datatype == MPI_SHORT) {
921     APPLY_FUNC(a, b, length, short, MAX_OP);
922   } else if (*datatype == MPI_INT) {
923     APPLY_FUNC(a, b, length, int, MAX_OP);
924   } else if (*datatype == MPI_LONG) {
925     APPLY_FUNC(a, b, length, long, MAX_OP);
926   } else if (*datatype == MPI_UNSIGNED_SHORT) {
927     APPLY_FUNC(a, b, length, unsigned short, MAX_OP);
928   } else if (*datatype == MPI_UNSIGNED) {
929     APPLY_FUNC(a, b, length, unsigned int, MAX_OP);
930   } else if (*datatype == MPI_UNSIGNED_LONG) {
931     APPLY_FUNC(a, b, length, unsigned long, MAX_OP);
932   } else if (*datatype == MPI_FLOAT) {
933     APPLY_FUNC(a, b, length, float, MAX_OP);
934   } else if (*datatype == MPI_DOUBLE) {
935     APPLY_FUNC(a, b, length, double, MAX_OP);
936   } else if (*datatype == MPI_LONG_DOUBLE) {
937     APPLY_FUNC(a, b, length, long double, MAX_OP);
938   }
939 }
940
941 static void min_func(void *a, void *b, int *length,
942                      MPI_Datatype * datatype)
943 {
944   if (*datatype == MPI_CHAR) {
945     APPLY_FUNC(a, b, length, char, MIN_OP);
946   } else if (*datatype == MPI_SHORT) {
947     APPLY_FUNC(a, b, length, short, MIN_OP);
948   } else if (*datatype == MPI_INT) {
949     APPLY_FUNC(a, b, length, int, MIN_OP);
950   } else if (*datatype == MPI_LONG) {
951     APPLY_FUNC(a, b, length, long, MIN_OP);
952   } else if (*datatype == MPI_UNSIGNED_SHORT) {
953     APPLY_FUNC(a, b, length, unsigned short, MIN_OP);
954   } else if (*datatype == MPI_UNSIGNED) {
955     APPLY_FUNC(a, b, length, unsigned int, MIN_OP);
956   } else if (*datatype == MPI_UNSIGNED_LONG) {
957     APPLY_FUNC(a, b, length, unsigned long, MIN_OP);
958   } else if (*datatype == MPI_FLOAT) {
959     APPLY_FUNC(a, b, length, float, MIN_OP);
960   } else if (*datatype == MPI_DOUBLE) {
961     APPLY_FUNC(a, b, length, double, MIN_OP);
962   } else if (*datatype == MPI_LONG_DOUBLE) {
963     APPLY_FUNC(a, b, length, long double, MIN_OP);
964   }
965 }
966
967 static void sum_func(void *a, void *b, int *length,
968                      MPI_Datatype * datatype)
969 {
970   if (*datatype == MPI_CHAR) {
971     APPLY_FUNC(a, b, length, char, SUM_OP);
972   } else if (*datatype == MPI_SHORT) {
973     APPLY_FUNC(a, b, length, short, SUM_OP);
974   } else if (*datatype == MPI_INT) {
975     APPLY_FUNC(a, b, length, int, SUM_OP);
976   } else if (*datatype == MPI_LONG) {
977     APPLY_FUNC(a, b, length, long, SUM_OP);
978   } else if (*datatype == MPI_UNSIGNED_SHORT) {
979     APPLY_FUNC(a, b, length, unsigned short, SUM_OP);
980   } else if (*datatype == MPI_UNSIGNED) {
981     APPLY_FUNC(a, b, length, unsigned int, SUM_OP);
982   } else if (*datatype == MPI_UNSIGNED_LONG) {
983     APPLY_FUNC(a, b, length, unsigned long, SUM_OP);
984   } else if (*datatype == MPI_FLOAT) {
985     APPLY_FUNC(a, b, length, float, SUM_OP);
986   } else if (*datatype == MPI_DOUBLE) {
987     APPLY_FUNC(a, b, length, double, SUM_OP);
988   } else if (*datatype == MPI_LONG_DOUBLE) {
989     APPLY_FUNC(a, b, length, long double, SUM_OP);
990   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
991     APPLY_FUNC(a, b, length, float _Complex, SUM_OP);
992   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
993     APPLY_FUNC(a, b, length, double _Complex, SUM_OP);
994   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
995     APPLY_FUNC(a, b, length, long double _Complex, SUM_OP);
996   }
997 }
998
999 static void prod_func(void *a, void *b, int *length,
1000                       MPI_Datatype * datatype)
1001 {
1002   if (*datatype == MPI_CHAR) {
1003     APPLY_FUNC(a, b, length, char, PROD_OP);
1004   } else if (*datatype == MPI_SHORT) {
1005     APPLY_FUNC(a, b, length, short, PROD_OP);
1006   } else if (*datatype == MPI_INT) {
1007     APPLY_FUNC(a, b, length, int, PROD_OP);
1008   } else if (*datatype == MPI_LONG) {
1009     APPLY_FUNC(a, b, length, long, PROD_OP);
1010   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1011     APPLY_FUNC(a, b, length, unsigned short, PROD_OP);
1012   } else if (*datatype == MPI_UNSIGNED) {
1013     APPLY_FUNC(a, b, length, unsigned int, PROD_OP);
1014   } else if (*datatype == MPI_UNSIGNED_LONG) {
1015     APPLY_FUNC(a, b, length, unsigned long, PROD_OP);
1016   } else if (*datatype == MPI_FLOAT) {
1017     APPLY_FUNC(a, b, length, float, PROD_OP);
1018   } else if (*datatype == MPI_DOUBLE) {
1019     APPLY_FUNC(a, b, length, double, PROD_OP);
1020   } else if (*datatype == MPI_LONG_DOUBLE) {
1021     APPLY_FUNC(a, b, length, long double, PROD_OP);
1022   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
1023     APPLY_FUNC(a, b, length, float _Complex, PROD_OP);
1024   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
1025     APPLY_FUNC(a, b, length, double _Complex, PROD_OP);
1026   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
1027     APPLY_FUNC(a, b, length, long double _Complex, PROD_OP);
1028   }
1029 }
1030
1031 static void land_func(void *a, void *b, int *length,
1032                       MPI_Datatype * datatype)
1033 {
1034   if (*datatype == MPI_CHAR) {
1035     APPLY_FUNC(a, b, length, char, LAND_OP);
1036   } else if (*datatype == MPI_SHORT) {
1037     APPLY_FUNC(a, b, length, short, LAND_OP);
1038   } else if (*datatype == MPI_INT) {
1039     APPLY_FUNC(a, b, length, int, LAND_OP);
1040   } else if (*datatype == MPI_LONG) {
1041     APPLY_FUNC(a, b, length, long, LAND_OP);
1042   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1043     APPLY_FUNC(a, b, length, unsigned short, LAND_OP);
1044   } else if (*datatype == MPI_UNSIGNED) {
1045     APPLY_FUNC(a, b, length, unsigned int, LAND_OP);
1046   } else if (*datatype == MPI_UNSIGNED_LONG) {
1047     APPLY_FUNC(a, b, length, unsigned long, LAND_OP);
1048   } else if (*datatype == MPI_C_BOOL) {
1049     APPLY_FUNC(a, b, length, _Bool, LAND_OP);
1050   }
1051 }
1052
1053 static void lor_func(void *a, void *b, int *length,
1054                      MPI_Datatype * datatype)
1055 {
1056   if (*datatype == MPI_CHAR) {
1057     APPLY_FUNC(a, b, length, char, LOR_OP);
1058   } else if (*datatype == MPI_SHORT) {
1059     APPLY_FUNC(a, b, length, short, LOR_OP);
1060   } else if (*datatype == MPI_INT) {
1061     APPLY_FUNC(a, b, length, int, LOR_OP);
1062   } else if (*datatype == MPI_LONG) {
1063     APPLY_FUNC(a, b, length, long, LOR_OP);
1064   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1065     APPLY_FUNC(a, b, length, unsigned short, LOR_OP);
1066   } else if (*datatype == MPI_UNSIGNED) {
1067     APPLY_FUNC(a, b, length, unsigned int, LOR_OP);
1068   } else if (*datatype == MPI_UNSIGNED_LONG) {
1069     APPLY_FUNC(a, b, length, unsigned long, LOR_OP);
1070   } else if (*datatype == MPI_C_BOOL) {
1071     APPLY_FUNC(a, b, length, _Bool, LOR_OP);
1072   }
1073 }
1074
1075 static void lxor_func(void *a, void *b, int *length,
1076                       MPI_Datatype * datatype)
1077 {
1078   if (*datatype == MPI_CHAR) {
1079     APPLY_FUNC(a, b, length, char, LXOR_OP);
1080   } else if (*datatype == MPI_SHORT) {
1081     APPLY_FUNC(a, b, length, short, LXOR_OP);
1082   } else if (*datatype == MPI_INT) {
1083     APPLY_FUNC(a, b, length, int, LXOR_OP);
1084   } else if (*datatype == MPI_LONG) {
1085     APPLY_FUNC(a, b, length, long, LXOR_OP);
1086   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1087     APPLY_FUNC(a, b, length, unsigned short, LXOR_OP);
1088   } else if (*datatype == MPI_UNSIGNED) {
1089     APPLY_FUNC(a, b, length, unsigned int, LXOR_OP);
1090   } else if (*datatype == MPI_UNSIGNED_LONG) {
1091     APPLY_FUNC(a, b, length, unsigned long, LXOR_OP);
1092   } else if (*datatype == MPI_C_BOOL) {
1093     APPLY_FUNC(a, b, length, _Bool, LXOR_OP);
1094   }
1095 }
1096
1097 static void band_func(void *a, void *b, int *length,
1098                       MPI_Datatype * datatype)
1099 {
1100   if (*datatype == MPI_CHAR) {
1101     APPLY_FUNC(a, b, length, char, BAND_OP);
1102   }
1103   if (*datatype == MPI_SHORT) {
1104     APPLY_FUNC(a, b, length, short, BAND_OP);
1105   } else if (*datatype == MPI_INT) {
1106     APPLY_FUNC(a, b, length, int, BAND_OP);
1107   } else if (*datatype == MPI_LONG) {
1108     APPLY_FUNC(a, b, length, long, BAND_OP);
1109   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1110     APPLY_FUNC(a, b, length, unsigned short, BAND_OP);
1111   } else if (*datatype == MPI_UNSIGNED) {
1112     APPLY_FUNC(a, b, length, unsigned int, BAND_OP);
1113   } else if (*datatype == MPI_UNSIGNED_LONG) {
1114     APPLY_FUNC(a, b, length, unsigned long, BAND_OP);
1115   } else if (*datatype == MPI_BYTE) {
1116     APPLY_FUNC(a, b, length, uint8_t, BAND_OP);
1117   }
1118 }
1119
1120 static void bor_func(void *a, void *b, int *length,
1121                      MPI_Datatype * datatype)
1122 {
1123   if (*datatype == MPI_CHAR) {
1124     APPLY_FUNC(a, b, length, char, BOR_OP);
1125   } else if (*datatype == MPI_SHORT) {
1126     APPLY_FUNC(a, b, length, short, BOR_OP);
1127   } else if (*datatype == MPI_INT) {
1128     APPLY_FUNC(a, b, length, int, BOR_OP);
1129   } else if (*datatype == MPI_LONG) {
1130     APPLY_FUNC(a, b, length, long, BOR_OP);
1131   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1132     APPLY_FUNC(a, b, length, unsigned short, BOR_OP);
1133   } else if (*datatype == MPI_UNSIGNED) {
1134     APPLY_FUNC(a, b, length, unsigned int, BOR_OP);
1135   } else if (*datatype == MPI_UNSIGNED_LONG) {
1136     APPLY_FUNC(a, b, length, unsigned long, BOR_OP);
1137   } else if (*datatype == MPI_BYTE) {
1138     APPLY_FUNC(a, b, length, uint8_t, BOR_OP);
1139   }
1140 }
1141
1142 static void bxor_func(void *a, void *b, int *length,
1143                       MPI_Datatype * datatype)
1144 {
1145   if (*datatype == MPI_CHAR) {
1146     APPLY_FUNC(a, b, length, char, BXOR_OP);
1147   } else if (*datatype == MPI_SHORT) {
1148     APPLY_FUNC(a, b, length, short, BXOR_OP);
1149   } else if (*datatype == MPI_INT) {
1150     APPLY_FUNC(a, b, length, int, BXOR_OP);
1151   } else if (*datatype == MPI_LONG) {
1152     APPLY_FUNC(a, b, length, long, BXOR_OP);
1153   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1154     APPLY_FUNC(a, b, length, unsigned short, BXOR_OP);
1155   } else if (*datatype == MPI_UNSIGNED) {
1156     APPLY_FUNC(a, b, length, unsigned int, BXOR_OP);
1157   } else if (*datatype == MPI_UNSIGNED_LONG) {
1158     APPLY_FUNC(a, b, length, unsigned long, BXOR_OP);
1159   } else if (*datatype == MPI_BYTE) {
1160     APPLY_FUNC(a, b, length, uint8_t, BXOR_OP);
1161   }
1162 }
1163
1164 static void minloc_func(void *a, void *b, int *length,
1165                         MPI_Datatype * datatype)
1166 {
1167   if (*datatype == MPI_FLOAT_INT) {
1168     APPLY_FUNC(a, b, length, float_int, MINLOC_OP);
1169   } else if (*datatype == MPI_LONG_INT) {
1170     APPLY_FUNC(a, b, length, long_int, MINLOC_OP);
1171   } else if (*datatype == MPI_DOUBLE_INT) {
1172     APPLY_FUNC(a, b, length, double_int, MINLOC_OP);
1173   } else if (*datatype == MPI_SHORT_INT) {
1174     APPLY_FUNC(a, b, length, short_int, MINLOC_OP);
1175   } else if (*datatype == MPI_2INT) {
1176     APPLY_FUNC(a, b, length, int_int, MINLOC_OP);
1177   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1178     APPLY_FUNC(a, b, length, long_double_int, MINLOC_OP);
1179   }
1180 }
1181
1182 static void maxloc_func(void *a, void *b, int *length,
1183                         MPI_Datatype * datatype)
1184 {
1185   if (*datatype == MPI_FLOAT_INT) {
1186     APPLY_FUNC(a, b, length, float_int, MAXLOC_OP);
1187   } else if (*datatype == MPI_LONG_INT) {
1188     APPLY_FUNC(a, b, length, long_int, MAXLOC_OP);
1189   } else if (*datatype == MPI_DOUBLE_INT) {
1190     APPLY_FUNC(a, b, length, double_int, MAXLOC_OP);
1191   } else if (*datatype == MPI_SHORT_INT) {
1192     APPLY_FUNC(a, b, length, short_int, MAXLOC_OP);
1193   } else if (*datatype == MPI_2INT) {
1194     APPLY_FUNC(a, b, length, int_int, MAXLOC_OP);
1195   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1196     APPLY_FUNC(a, b, length, long_double_int, MAXLOC_OP);
1197   }
1198 }
1199
1200
1201 #define CREATE_MPI_OP(name, func)                             \
1202   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */ }; \
1203 MPI_Op name = &mpi_##name;
1204
1205 CREATE_MPI_OP(MPI_MAX, max_func);
1206 CREATE_MPI_OP(MPI_MIN, min_func);
1207 CREATE_MPI_OP(MPI_SUM, sum_func);
1208 CREATE_MPI_OP(MPI_PROD, prod_func);
1209 CREATE_MPI_OP(MPI_LAND, land_func);
1210 CREATE_MPI_OP(MPI_LOR, lor_func);
1211 CREATE_MPI_OP(MPI_LXOR, lxor_func);
1212 CREATE_MPI_OP(MPI_BAND, band_func);
1213 CREATE_MPI_OP(MPI_BOR, bor_func);
1214 CREATE_MPI_OP(MPI_BXOR, bxor_func);
1215 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
1216 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
1217
1218 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
1219 {
1220   MPI_Op op;
1221
1222   //FIXME: add commute param
1223   op = xbt_new(s_smpi_mpi_op_t, 1);
1224   op->func = function;
1225   return op;
1226 }
1227
1228 void smpi_op_destroy(MPI_Op op)
1229 {
1230   xbt_free(op);
1231 }
1232
1233 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
1234                    MPI_Datatype * datatype)
1235 {
1236   op->func(invec, inoutvec, len, datatype);
1237 }