Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bug with MPI_Type_contiguous
[simgrid.git] / src / smpi / smpi_mpi_dt.c
1 /* smpi_mpi_dt.c -- MPI primitives to handle datatypes                        */
2 /* FIXME: a very incomplete implementation                                    */
3
4 /* Copyright (c) 2009, 2010. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include "private.h"
15 #include "smpi_mpi_dt_private.h"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi_dt, smpi,
18                                 "Logging specific to SMPI (datatype)");
19
20 #define CREATE_MPI_DATATYPE(name, type)       \
21   static s_smpi_mpi_datatype_t mpi_##name = { \
22     sizeof(type),  /* size */                 \
23     0,             /*was 1 has_subtype*/             \
24     0,             /* lb */                   \
25     sizeof(type),  /* ub = lb + size */       \
26     DT_FLAG_BASIC,  /* flags */              \
27     NULL           /* pointer on extended struct*/ \
28   };                                          \
29 MPI_Datatype name = &mpi_##name;
30
31
32 //The following are datatypes for the MPI functions MPI_MAXLOC and MPI_MINLOC.
33 typedef struct {
34   float value;
35   int index;
36 } float_int;
37 typedef struct {
38   long value;
39   int index;
40 } long_int;
41 typedef struct {
42   double value;
43   int index;
44 } double_int;
45 typedef struct {
46   short value;
47   int index;
48 } short_int;
49 typedef struct {
50   int value;
51   int index;
52 } int_int;
53 typedef struct {
54   long double value;
55   int index;
56 } long_double_int;
57
58 // Predefined data types
59 CREATE_MPI_DATATYPE(MPI_CHAR, char);
60 CREATE_MPI_DATATYPE(MPI_SHORT, short);
61 CREATE_MPI_DATATYPE(MPI_INT, int);
62 CREATE_MPI_DATATYPE(MPI_LONG, long);
63 CREATE_MPI_DATATYPE(MPI_LONG_LONG, long long);
64 CREATE_MPI_DATATYPE(MPI_SIGNED_CHAR, signed char);
65 CREATE_MPI_DATATYPE(MPI_UNSIGNED_CHAR, unsigned char);
66 CREATE_MPI_DATATYPE(MPI_UNSIGNED_SHORT, unsigned short);
67 CREATE_MPI_DATATYPE(MPI_UNSIGNED, unsigned int);
68 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG, unsigned long);
69 CREATE_MPI_DATATYPE(MPI_UNSIGNED_LONG_LONG, unsigned long long);
70 CREATE_MPI_DATATYPE(MPI_FLOAT, float);
71 CREATE_MPI_DATATYPE(MPI_DOUBLE, double);
72 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE, long double);
73 CREATE_MPI_DATATYPE(MPI_WCHAR, wchar_t);
74 CREATE_MPI_DATATYPE(MPI_C_BOOL, _Bool);
75 CREATE_MPI_DATATYPE(MPI_INT8_T, int8_t);
76 CREATE_MPI_DATATYPE(MPI_INT16_T, int16_t);
77 CREATE_MPI_DATATYPE(MPI_INT32_T, int32_t);
78 CREATE_MPI_DATATYPE(MPI_INT64_T, int64_t);
79 CREATE_MPI_DATATYPE(MPI_UINT8_T, uint8_t);
80 CREATE_MPI_DATATYPE(MPI_UINT16_T, uint16_t);
81 CREATE_MPI_DATATYPE(MPI_UINT32_T, uint32_t);
82 CREATE_MPI_DATATYPE(MPI_UINT64_T, uint64_t);
83 CREATE_MPI_DATATYPE(MPI_C_FLOAT_COMPLEX, float _Complex);
84 CREATE_MPI_DATATYPE(MPI_C_DOUBLE_COMPLEX, double _Complex);
85 CREATE_MPI_DATATYPE(MPI_C_LONG_DOUBLE_COMPLEX, long double _Complex);
86 CREATE_MPI_DATATYPE(MPI_AINT, MPI_Aint);
87 CREATE_MPI_DATATYPE(MPI_OFFSET, MPI_Offset);
88
89 CREATE_MPI_DATATYPE(MPI_FLOAT_INT, float_int);
90 CREATE_MPI_DATATYPE(MPI_LONG_INT, long_int);
91 CREATE_MPI_DATATYPE(MPI_DOUBLE_INT, double_int);
92 CREATE_MPI_DATATYPE(MPI_SHORT_INT, short_int);
93 CREATE_MPI_DATATYPE(MPI_2INT, int_int);
94 CREATE_MPI_DATATYPE(MPI_LONG_DOUBLE_INT, long_double_int);
95
96 // Internal use only
97 CREATE_MPI_DATATYPE(MPI_PTR, void*);
98
99
100 size_t smpi_datatype_size(MPI_Datatype datatype)
101 {
102   return datatype->size;
103 }
104
105
106
107 MPI_Aint smpi_datatype_lb(MPI_Datatype datatype)
108 {
109   return datatype->lb;
110 }
111
112 MPI_Aint smpi_datatype_ub(MPI_Datatype datatype)
113 {
114   return datatype->ub;
115 }
116
117 int smpi_datatype_extent(MPI_Datatype datatype, MPI_Aint * lb,
118                          MPI_Aint * extent)
119 {
120   int retval;
121
122   if ((datatype->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
123     retval = MPI_ERR_TYPE;
124   } else {
125     *lb = datatype->lb;
126     *extent = datatype->ub - datatype->lb;
127     retval = MPI_SUCCESS;
128   }
129   return retval;
130 }
131
132 int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
133                        void *recvbuf, int recvcount, MPI_Datatype recvtype)
134 {
135   int retval, count;
136
137   /* First check if we really have something to do */
138   if (recvcount == 0) {
139     retval = sendcount == 0 ? MPI_SUCCESS : MPI_ERR_TRUNCATE;
140   } else {
141     /* FIXME: treat packed cases */
142     sendcount *= smpi_datatype_size(sendtype);
143     recvcount *= smpi_datatype_size(recvtype);
144     count = sendcount < recvcount ? sendcount : recvcount;
145
146     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
147       memcpy(recvbuf, sendbuf, count);
148     }
149     else if (sendtype->has_subtype == 0)
150     {
151       s_smpi_subtype_t *subtype =  recvtype->substruct;
152       subtype->unserialize( sendbuf, recvbuf,1, subtype);
153     }
154     else if (recvtype->has_subtype == 0)
155     {
156       s_smpi_subtype_t *subtype =  sendtype->substruct;
157       subtype->serialize(sendbuf, recvbuf,1, subtype);
158     }else{
159       s_smpi_subtype_t *subtype =  sendtype->substruct;
160
161       s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)sendtype;
162
163       void * buf_tmp = malloc(count * type_c->size_oldtype);
164
165       subtype->serialize( sendbuf, buf_tmp,1, subtype);
166       subtype =  recvtype->substruct;
167       subtype->unserialize(recvbuf, buf_tmp,1, subtype);
168
169       free(buf_tmp);
170     }
171     retval = sendcount > recvcount ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
172   }
173
174   return retval;
175 }
176
177 /*
178  *  Copies noncontiguous data into contiguous memory.
179  *  @param contiguous_vector - output vector
180  *  @param noncontiguous_vector - input vector
181  *  @param type - pointer contening :
182  *      - stride - stride of between noncontiguous data
183  *      - block_length - the width or height of blocked matrix
184  *      - count - the number of rows of matrix
185  */
186 void serialize_vector( const void *noncontiguous_vector,
187                        void *contiguous_vector,
188                        size_t count,
189                        void *type)
190 {
191   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
192   int i;
193   char* contiguous_vector_char = (char*)contiguous_vector;
194   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
195
196   for (i = 0; i < type_c->block_count * count; i++) {
197     memcpy(contiguous_vector_char,
198            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
199
200     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
201     noncontiguous_vector_char += type_c->block_stride*type_c->size_oldtype;
202   }
203 }
204
205 /*
206  *  Copies contiguous data into noncontiguous memory.
207  *  @param noncontiguous_vector - output vector
208  *  @param contiguous_vector - input vector
209  *  @param type - pointer contening :
210  *      - stride - stride of between noncontiguous data
211  *      - block_length - the width or height of blocked matrix
212  *      - count - the number of rows of matrix
213  */
214 void unserialize_vector( const void *contiguous_vector,
215                          void *noncontiguous_vector,
216                          size_t count,
217                          void *type)
218 {
219   s_smpi_mpi_vector_t* type_c = (s_smpi_mpi_vector_t*)type;
220   int i;
221
222   char* contiguous_vector_char = (char*)contiguous_vector;
223   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
224
225   for (i = 0; i < type_c->block_count * count; i++) {
226     memcpy(noncontiguous_vector_char,
227            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);
228
229     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
230     noncontiguous_vector_char += type_c->block_stride*type_c->size_oldtype;
231   }
232 }
233
234 /*
235  * Create a Sub type vector to be able to serialize and unserialize it
236  * the structre s_smpi_mpi_vector_t is derived from s_smpi_subtype which
237  * required the functions unserialize and serialize
238  *
239  */
240 s_smpi_mpi_vector_t* smpi_datatype_vector_create( int block_stride,
241                                                   int block_length,
242                                                   int block_count,
243                                                   MPI_Datatype old_type,
244                                                   int size_oldtype){
245   s_smpi_mpi_vector_t *new_t= xbt_new(s_smpi_mpi_vector_t,1);
246   new_t->base.serialize = &serialize_vector;
247   new_t->base.unserialize = &unserialize_vector;
248   new_t->block_stride = block_stride;
249   new_t->block_length = block_length;
250   new_t->block_count = block_count;
251   new_t->old_type = old_type;
252   new_t->size_oldtype = size_oldtype;
253   return new_t;
254 }
255
256 void smpi_datatype_create(MPI_Datatype* new_type, int size, int has_subtype,
257                           void *struct_type, int flags){
258   MPI_Datatype new_t= xbt_new(s_smpi_mpi_datatype_t,1);
259   new_t->size = size;
260   new_t->has_subtype = has_subtype;
261   new_t->lb = 0;
262   new_t->ub = size;
263   new_t->flags = flags;
264   new_t->substruct = struct_type;
265   *new_type = new_t;
266 }
267
268 void smpi_datatype_free(MPI_Datatype* type){
269   xbt_free(*type);
270 }
271
272 int smpi_datatype_contiguous(int count, MPI_Datatype old_type, MPI_Datatype* new_type)
273 {
274   int retval;
275   if ((old_type->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
276     retval = MPI_ERR_TYPE;
277   } else {
278     smpi_datatype_create(new_type, count *
279                          smpi_datatype_size(old_type),0,NULL, DT_FLAG_CONTIGUOUS);
280     retval=MPI_SUCCESS;
281   }
282   return retval;
283 }
284
285 int smpi_datatype_vector(int count, int blocklen, int stride, MPI_Datatype old_type, MPI_Datatype* new_type)
286 {
287   int retval;
288   if (blocklen<=0) return MPI_ERR_ARG;
289   if ((old_type->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
290     retval = MPI_ERR_TYPE;
291   } else {
292     if(stride != blocklen){
293 if (old_type->has_subtype == 1)
294       XBT_WARN("vector contains a complex type - not yet handled");
295       s_smpi_mpi_vector_t* subtype = smpi_datatype_vector_create( stride,
296                                                                   blocklen,
297                                                                   count,
298                                                                   old_type,
299                                                                   smpi_datatype_size(old_type));
300
301       smpi_datatype_create(new_type, count * (blocklen) *
302                            smpi_datatype_size(old_type),
303                            1,
304                            subtype,
305                            DT_FLAG_VECTOR);
306       retval=MPI_SUCCESS;
307     }else{
308       /* in this situation the data are contignous thus it's not
309        * required to serialize and unserialize it*/
310       smpi_datatype_create(new_type, count * blocklen *
311                            smpi_datatype_size(old_type),
312                            0,
313                            NULL,
314                            DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
315       retval=MPI_SUCCESS;
316     }
317   }
318   return retval;
319 }
320
321
322
323 /*
324 Hvector Implementation - Vector with stride in bytes
325 */
326
327
328 /*
329  *  Copies noncontiguous data into contiguous memory.
330  *  @param contiguous_hvector - output hvector
331  *  @param noncontiguous_hvector - input hvector
332  *  @param type - pointer contening :
333  *      - stride - stride of between noncontiguous data, in bytes
334  *      - block_length - the width or height of blocked matrix
335  *      - count - the number of rows of matrix
336  */
337 void serialize_hvector( const void *noncontiguous_hvector,
338                        void *contiguous_hvector,
339                        size_t count,
340                        void *type)
341 {
342   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
343   int i;
344   char* contiguous_vector_char = (char*)contiguous_hvector;
345   char* noncontiguous_vector_char = (char*)noncontiguous_hvector;
346
347   for (i = 0; i < type_c->block_count * count; i++) {
348     memcpy(contiguous_vector_char,
349            noncontiguous_vector_char, type_c->block_length * type_c->size_oldtype);
350
351     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
352     noncontiguous_vector_char += type_c->block_stride;
353   }
354 }
355 /*
356  *  Copies contiguous data into noncontiguous memory.
357  *  @param noncontiguous_vector - output hvector
358  *  @param contiguous_vector - input hvector
359  *  @param type - pointer contening :
360  *      - stride - stride of between noncontiguous data, in bytes
361  *      - block_length - the width or height of blocked matrix
362  *      - count - the number of rows of matrix
363  */
364 void unserialize_hvector( const void *contiguous_vector,
365                          void *noncontiguous_vector,
366                          size_t count,
367                          void *type)
368 {
369   s_smpi_mpi_hvector_t* type_c = (s_smpi_mpi_hvector_t*)type;
370   int i;
371
372   char* contiguous_vector_char = (char*)contiguous_vector;
373   char* noncontiguous_vector_char = (char*)noncontiguous_vector;
374
375   for (i = 0; i < type_c->block_count * count; i++) {
376     memcpy(noncontiguous_vector_char,
377            contiguous_vector_char, type_c->block_length * type_c->size_oldtype);
378
379     contiguous_vector_char += type_c->block_length*type_c->size_oldtype;
380     noncontiguous_vector_char += type_c->block_stride;
381   }
382 }
383
384 /*
385  * Create a Sub type vector to be able to serialize and unserialize it
386  * the structre s_smpi_mpi_vector_t is derived from s_smpi_subtype which
387  * required the functions unserialize and serialize
388  *
389  */
390 s_smpi_mpi_hvector_t* smpi_datatype_hvector_create( MPI_Aint block_stride,
391                                                   int block_length,
392                                                   int block_count,
393                                                   MPI_Datatype old_type,
394                                                   int size_oldtype){
395   s_smpi_mpi_hvector_t *new_t= xbt_new(s_smpi_mpi_hvector_t,1);
396   new_t->base.serialize = &serialize_hvector;
397   new_t->base.unserialize = &unserialize_hvector;
398   new_t->block_stride = block_stride;
399   new_t->block_length = block_length;
400   new_t->block_count = block_count;
401   new_t->old_type = old_type;
402   new_t->size_oldtype = size_oldtype;
403   return new_t;
404 }
405
406 int smpi_datatype_hvector(int count, int blocklen, MPI_Aint stride, MPI_Datatype old_type, MPI_Datatype* new_type)
407 {
408   int retval;
409   if (blocklen<=0) return MPI_ERR_ARG;
410   if ((old_type->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
411     retval = MPI_ERR_TYPE;
412   } else {
413 if (old_type->has_subtype == 1)
414       XBT_WARN("hvector contains a complex type - not yet handled");
415     if(stride != blocklen*smpi_datatype_size(old_type)){
416       s_smpi_mpi_hvector_t* subtype = smpi_datatype_hvector_create( stride,
417                                                                     blocklen,
418                                                                     count,
419                                                                     old_type,
420                                                                     smpi_datatype_size(old_type));
421
422       smpi_datatype_create(new_type, count * blocklen *
423                            smpi_datatype_size(old_type),
424                            1,
425                            subtype,
426                            DT_FLAG_VECTOR);
427       retval=MPI_SUCCESS;
428     }else{
429       smpi_datatype_create(new_type, count * blocklen *
430                                                smpi_datatype_size(old_type),
431                                               0,
432                                               NULL,
433                                               DT_FLAG_VECTOR|DT_FLAG_CONTIGUOUS);
434       retval=MPI_SUCCESS;
435     }
436   }
437   return retval;
438 }
439
440
441 /*
442 Indexed Implementation
443 */
444
445 /*
446  *  Copies noncontiguous data into contiguous memory.
447  *  @param contiguous_indexed - output indexed
448  *  @param noncontiguous_indexed - input indexed
449  *  @param type - pointer contening :
450  *      - block_lengths - the width or height of blocked matrix
451  *      - block_indices - indices of each data, in element
452  *      - count - the number of rows of matrix
453  */
454 void serialize_indexed( const void *noncontiguous_indexed,
455                        void *contiguous_indexed,
456                        size_t count,
457                        void *type)
458 {
459   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
460   int i;
461   char* contiguous_indexed_char = (char*)contiguous_indexed;
462   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed;
463
464   for (i = 0; i < type_c->block_count * count; i++) {
465     memcpy(contiguous_indexed_char,
466            noncontiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
467
468     contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
469     noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*type_c->size_oldtype;
470   }
471 }
472 /*
473  *  Copies contiguous data into noncontiguous memory.
474  *  @param noncontiguous_indexed - output indexed
475  *  @param contiguous_indexed - input indexed
476  *  @param type - pointer contening :
477  *      - block_lengths - the width or height of blocked matrix
478  *      - block_indices - indices of each data, in element
479  *      - count - the number of rows of matrix
480  */
481 void unserialize_indexed( const void *contiguous_indexed,
482                          void *noncontiguous_indexed,
483                          size_t count,
484                          void *type)
485 {
486   s_smpi_mpi_indexed_t* type_c = (s_smpi_mpi_indexed_t*)type;
487   int i;
488
489   char* contiguous_indexed_char = (char*)contiguous_indexed;
490   char* noncontiguous_indexed_char = (char*)noncontiguous_indexed;
491
492   for (i = 0; i < type_c->block_count * count; i++) {
493     memcpy(noncontiguous_indexed_char,
494            contiguous_indexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
495
496     contiguous_indexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
497     noncontiguous_indexed_char = (char*)noncontiguous_indexed + type_c->block_indices[i+1]*type_c->size_oldtype;
498   }
499 }
500
501 /*
502  * Create a Sub type indexed to be able to serialize and unserialize it
503  * the structre s_smpi_mpi_indexed_t is derived from s_smpi_subtype which
504  * required the functions unserialize and serialize
505  */
506 s_smpi_mpi_indexed_t* smpi_datatype_indexed_create( int* block_lengths,
507                                                   int* block_indices,
508                                                   int block_count,
509                                                   MPI_Datatype old_type,
510                                                   int size_oldtype){
511   s_smpi_mpi_indexed_t *new_t= xbt_new(s_smpi_mpi_indexed_t,1);
512   new_t->base.serialize = &serialize_indexed;
513   new_t->base.unserialize = &unserialize_indexed;
514  //FIXME : copy those or assume they won't be freed ? 
515   new_t->block_lengths = block_lengths;
516   new_t->block_indices = block_indices;
517   new_t->block_count = block_count;
518   new_t->old_type = old_type;
519   new_t->size_oldtype = size_oldtype;
520   return new_t;
521 }
522
523
524 int smpi_datatype_indexed(int count, int* blocklens, int* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
525 {
526   int i;
527   int retval;
528   int size = 0;
529   int contiguous=1;
530   for(i=0; i< count; i++){
531     if   (blocklens[i]<=0)
532       return MPI_ERR_ARG;
533     size += blocklens[i];
534
535     if ( (i< count -1) && (indices[i]+blocklens[i] != indices[i+1]) )contiguous=0;
536   }
537   if ((old_type->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
538     retval = MPI_ERR_TYPE;
539   } else {
540
541     if (old_type->has_subtype == 1)
542       XBT_WARN("indexed contains a complex type - not yet handled");
543
544     if(!contiguous){
545       s_smpi_mpi_indexed_t* subtype = smpi_datatype_indexed_create( blocklens,
546                                                                     indices,
547                                                                     count,
548                                                                     old_type,
549                                                                     smpi_datatype_size(old_type));
550
551       smpi_datatype_create(new_type,  size *
552                            smpi_datatype_size(old_type),1, subtype, DT_FLAG_DATA);
553 }else{
554       smpi_datatype_create(new_type,  size *
555                            smpi_datatype_size(old_type),0, NULL, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
556 }
557     retval=MPI_SUCCESS;
558   }
559   return retval;
560 }
561
562
563 /*
564 Hindexed Implementation - Indexed with indices in bytes 
565 */
566
567 /*
568  *  Copies noncontiguous data into contiguous memory.
569  *  @param contiguous_hindexed - output hindexed
570  *  @param noncontiguous_hindexed - input hindexed
571  *  @param type - pointer contening :
572  *      - block_lengths - the width or height of blocked matrix
573  *      - block_indices - indices of each data, in bytes
574  *      - count - the number of rows of matrix
575  */
576 void serialize_hindexed( const void *noncontiguous_hindexed,
577                        void *contiguous_hindexed,
578                        size_t count,
579                        void *type)
580 {
581   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
582   int i;
583   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
584   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed;
585
586   for (i = 0; i < type_c->block_count * count; i++) {
587     memcpy(contiguous_hindexed_char,
588            noncontiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
589
590     contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
591     noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
592   }
593 }
594 /*
595  *  Copies contiguous data into noncontiguous memory.
596  *  @param noncontiguous_hindexed - output hindexed
597  *  @param contiguous_hindexed - input hindexed
598  *  @param type - pointer contening :
599  *      - block_lengths - the width or height of blocked matrix
600  *      - block_indices - indices of each data, in bytes
601  *      - count - the number of rows of matrix
602  */
603 void unserialize_hindexed( const void *contiguous_hindexed,
604                          void *noncontiguous_hindexed,
605                          size_t count,
606                          void *type)
607 {
608   s_smpi_mpi_hindexed_t* type_c = (s_smpi_mpi_hindexed_t*)type;
609   int i;
610
611   char* contiguous_hindexed_char = (char*)contiguous_hindexed;
612   char* noncontiguous_hindexed_char = (char*)noncontiguous_hindexed;
613
614   for (i = 0; i < type_c->block_count * count; i++) {
615     memcpy(noncontiguous_hindexed_char,
616            contiguous_hindexed_char, type_c->block_lengths[i] * type_c->size_oldtype);
617
618     contiguous_hindexed_char += type_c->block_lengths[i]*type_c->size_oldtype;
619     noncontiguous_hindexed_char = (char*)noncontiguous_hindexed + type_c->block_indices[i+1];
620   }
621 }
622
623 /*
624  * Create a Sub type hindexed to be able to serialize and unserialize it
625  * the structre s_smpi_mpi_hindexed_t is derived from s_smpi_subtype which
626  * required the functions unserialize and serialize
627  */
628 s_smpi_mpi_hindexed_t* smpi_datatype_hindexed_create( int* block_lengths,
629                                                   MPI_Aint* block_indices,
630                                                   int block_count,
631                                                   MPI_Datatype old_type,
632                                                   int size_oldtype){
633   s_smpi_mpi_hindexed_t *new_t= xbt_new(s_smpi_mpi_hindexed_t,1);
634   new_t->base.serialize = &serialize_hindexed;
635   new_t->base.unserialize = &unserialize_hindexed;
636  //FIXME : copy those or assume they won't be freed ? 
637   new_t->block_lengths = block_lengths;
638   new_t->block_indices = block_indices;
639   new_t->block_count = block_count;
640   new_t->old_type = old_type;
641   new_t->size_oldtype = size_oldtype;
642   return new_t;
643 }
644
645
646 int smpi_datatype_hindexed(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype old_type, MPI_Datatype* new_type)
647 {
648   int i;
649   int retval;
650   int size = 0;
651   int contiguous=1;
652   for(i=0; i< count; i++){
653     if   (blocklens[i]<=0)
654       return MPI_ERR_ARG;
655     size += blocklens[i];
656
657
658     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_type) != indices[i+1]) )contiguous=0;
659   }
660   if ((old_type->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED) {
661     retval = MPI_ERR_TYPE;
662   } else {
663     if (old_type->has_subtype == 1)
664       XBT_WARN("hindexed contains a complex type - not yet handled");
665
666     if(!contiguous){
667       s_smpi_mpi_hindexed_t* subtype = smpi_datatype_hindexed_create( blocklens,
668                                                                     indices,
669                                                                     count,
670                                                                     old_type,
671                                                                     smpi_datatype_size(old_type));
672
673       smpi_datatype_create(new_type,  size *
674                            smpi_datatype_size(old_type),1, subtype, DT_FLAG_DATA);
675     }else{
676       smpi_datatype_create(new_type,  size *
677                            smpi_datatype_size(old_type),0, NULL, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
678     }
679     retval=MPI_SUCCESS;
680   }
681   return retval;
682 }
683
684
685 /*
686 struct Implementation - Indexed with indices in bytes 
687 */
688
689 /*
690  *  Copies noncontiguous data into contiguous memory.
691  *  @param contiguous_struct - output struct
692  *  @param noncontiguous_struct - input struct
693  *  @param type - pointer contening :
694  *      - stride - stride of between noncontiguous data
695  *      - block_length - the width or height of blocked matrix
696  *      - count - the number of rows of matrix
697  */
698 void serialize_struct( const void *noncontiguous_struct,
699                        void *contiguous_struct,
700                        size_t count,
701                        void *type)
702 {
703   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
704   int i;
705   char* contiguous_struct_char = (char*)contiguous_struct;
706   char* noncontiguous_struct_char = (char*)noncontiguous_struct;
707
708   for (i = 0; i < type_c->block_count * count; i++) {
709     memcpy(contiguous_struct_char,
710            noncontiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
711     contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
712     noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
713   }
714 }
715 /*
716  *  Copies contiguous data into noncontiguous memory.
717  *  @param noncontiguous_struct - output struct
718  *  @param contiguous_struct - input struct
719  *  @param type - pointer contening :
720  *      - stride - stride of between noncontiguous data
721  *      - block_length - the width or height of blocked matrix
722  *      - count - the number of rows of matrix
723  */
724 void unserialize_struct( const void *contiguous_struct,
725                          void *noncontiguous_struct,
726                          size_t count,
727                          void *type)
728 {
729   s_smpi_mpi_struct_t* type_c = (s_smpi_mpi_struct_t*)type;
730   int i;
731
732   char* contiguous_struct_char = (char*)contiguous_struct;
733   char* noncontiguous_struct_char = (char*)noncontiguous_struct;
734
735   for (i = 0; i < type_c->block_count * count; i++) {
736     memcpy(noncontiguous_struct_char,
737            contiguous_struct_char, type_c->block_lengths[i] * smpi_datatype_size(type_c->old_types[i]));
738     contiguous_struct_char += type_c->block_lengths[i]*smpi_datatype_size(type_c->old_types[i]);
739     noncontiguous_struct_char = (char*)noncontiguous_struct + type_c->block_indices[i+1];
740   }
741 }
742
743 /*
744  * Create a Sub type struct to be able to serialize and unserialize it
745  * the structre s_smpi_mpi_struct_t is derived from s_smpi_subtype which
746  * required the functions unserialize and serialize
747  */
748 s_smpi_mpi_struct_t* smpi_datatype_struct_create( int* block_lengths,
749                                                   MPI_Aint* block_indices,
750                                                   int block_count,
751                                                   MPI_Datatype* old_types){
752   s_smpi_mpi_struct_t *new_t= xbt_new(s_smpi_mpi_struct_t,1);
753   new_t->base.serialize = &serialize_struct;
754   new_t->base.unserialize = &unserialize_struct;
755  //FIXME : copy those or assume they won't be freed ? 
756   new_t->block_lengths = block_lengths;
757   new_t->block_indices = block_indices;
758   new_t->block_count = block_count;
759   new_t->old_types = old_types;
760   return new_t;
761 }
762
763
764 int smpi_datatype_struct(int count, int* blocklens, MPI_Aint* indices, MPI_Datatype* old_types, MPI_Datatype* new_type)
765 {
766   int i;
767   size_t size = 0;
768   int contiguous=1;
769   size = 0;
770   for(i=0; i< count; i++){
771     if (blocklens[i]<=0)
772       return MPI_ERR_ARG;
773     if ((old_types[i]->flags & DT_FLAG_COMMITED) != DT_FLAG_COMMITED)
774       return MPI_ERR_TYPE;
775     if (old_types[i]->has_subtype == 1)
776       XBT_WARN("Struct contains a complex type - not yet handled");
777     size += blocklens[i]*smpi_datatype_size(old_types[i]);
778
779     if ( (i< count -1) && (indices[i]+blocklens[i]*smpi_datatype_size(old_types[i]) != indices[i+1]) )contiguous=0;
780   }
781
782   if(!contiguous){
783     s_smpi_mpi_struct_t* subtype = smpi_datatype_struct_create( blocklens,
784                                                               indices,
785                                                               count,
786                                                               old_types);
787
788     smpi_datatype_create(new_type,  size ,1, subtype, DT_FLAG_DATA);
789   }else{
790     smpi_datatype_create(new_type,  size,0, NULL, DT_FLAG_DATA|DT_FLAG_CONTIGUOUS);
791   }
792   return MPI_SUCCESS;
793 }
794
795 void smpi_datatype_commit(MPI_Datatype *datatype)
796 {
797   (*datatype)->flags=  ((*datatype)->flags | DT_FLAG_COMMITED);
798 }
799
800 typedef struct s_smpi_mpi_op {
801   MPI_User_function *func;
802 } s_smpi_mpi_op_t;
803
804 #define MAX_OP(a, b)  (b) = (a) < (b) ? (b) : (a)
805 #define MIN_OP(a, b)  (b) = (a) < (b) ? (a) : (b)
806 #define SUM_OP(a, b)  (b) += (a)
807 #define PROD_OP(a, b) (b) *= (a)
808 #define LAND_OP(a, b) (b) = (a) && (b)
809 #define LOR_OP(a, b)  (b) = (a) || (b)
810 #define LXOR_OP(a, b) (b) = (!(a) && (b)) || ((a) && !(b))
811 #define BAND_OP(a, b) (b) &= (a)
812 #define BOR_OP(a, b)  (b) |= (a)
813 #define BXOR_OP(a, b) (b) ^= (a)
814 #define MAXLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (b) : (a)
815 #define MINLOC_OP(a, b)  (b) = (a.value) < (b.value) ? (a) : (b)
816 //TODO : MINLOC & MAXLOC
817
818 #define APPLY_FUNC(a, b, length, type, func) \
819 {                                          \
820   int i;                                   \
821   type* x = (type*)(a);                    \
822   type* y = (type*)(b);                    \
823   for(i = 0; i < *(length); i++) {         \
824     func(x[i], y[i]);                      \
825   }                                        \
826 }
827
828 static void max_func(void *a, void *b, int *length,
829                      MPI_Datatype * datatype)
830 {
831   if (*datatype == MPI_CHAR) {
832     APPLY_FUNC(a, b, length, char, MAX_OP);
833   } else if (*datatype == MPI_SHORT) {
834     APPLY_FUNC(a, b, length, short, MAX_OP);
835   } else if (*datatype == MPI_INT) {
836     APPLY_FUNC(a, b, length, int, MAX_OP);
837   } else if (*datatype == MPI_LONG) {
838     APPLY_FUNC(a, b, length, long, MAX_OP);
839   } else if (*datatype == MPI_UNSIGNED_SHORT) {
840     APPLY_FUNC(a, b, length, unsigned short, MAX_OP);
841   } else if (*datatype == MPI_UNSIGNED) {
842     APPLY_FUNC(a, b, length, unsigned int, MAX_OP);
843   } else if (*datatype == MPI_UNSIGNED_LONG) {
844     APPLY_FUNC(a, b, length, unsigned long, MAX_OP);
845   } else if (*datatype == MPI_FLOAT) {
846     APPLY_FUNC(a, b, length, float, MAX_OP);
847   } else if (*datatype == MPI_DOUBLE) {
848     APPLY_FUNC(a, b, length, double, MAX_OP);
849   } else if (*datatype == MPI_LONG_DOUBLE) {
850     APPLY_FUNC(a, b, length, long double, MAX_OP);
851   }
852 }
853
854 static void min_func(void *a, void *b, int *length,
855                      MPI_Datatype * datatype)
856 {
857   if (*datatype == MPI_CHAR) {
858     APPLY_FUNC(a, b, length, char, MIN_OP);
859   } else if (*datatype == MPI_SHORT) {
860     APPLY_FUNC(a, b, length, short, MIN_OP);
861   } else if (*datatype == MPI_INT) {
862     APPLY_FUNC(a, b, length, int, MIN_OP);
863   } else if (*datatype == MPI_LONG) {
864     APPLY_FUNC(a, b, length, long, MIN_OP);
865   } else if (*datatype == MPI_UNSIGNED_SHORT) {
866     APPLY_FUNC(a, b, length, unsigned short, MIN_OP);
867   } else if (*datatype == MPI_UNSIGNED) {
868     APPLY_FUNC(a, b, length, unsigned int, MIN_OP);
869   } else if (*datatype == MPI_UNSIGNED_LONG) {
870     APPLY_FUNC(a, b, length, unsigned long, MIN_OP);
871   } else if (*datatype == MPI_FLOAT) {
872     APPLY_FUNC(a, b, length, float, MIN_OP);
873   } else if (*datatype == MPI_DOUBLE) {
874     APPLY_FUNC(a, b, length, double, MIN_OP);
875   } else if (*datatype == MPI_LONG_DOUBLE) {
876     APPLY_FUNC(a, b, length, long double, MIN_OP);
877   }
878 }
879
880 static void sum_func(void *a, void *b, int *length,
881                      MPI_Datatype * datatype)
882 {
883   if (*datatype == MPI_CHAR) {
884     APPLY_FUNC(a, b, length, char, SUM_OP);
885   } else if (*datatype == MPI_SHORT) {
886     APPLY_FUNC(a, b, length, short, SUM_OP);
887   } else if (*datatype == MPI_INT) {
888     APPLY_FUNC(a, b, length, int, SUM_OP);
889   } else if (*datatype == MPI_LONG) {
890     APPLY_FUNC(a, b, length, long, SUM_OP);
891   } else if (*datatype == MPI_UNSIGNED_SHORT) {
892     APPLY_FUNC(a, b, length, unsigned short, SUM_OP);
893   } else if (*datatype == MPI_UNSIGNED) {
894     APPLY_FUNC(a, b, length, unsigned int, SUM_OP);
895   } else if (*datatype == MPI_UNSIGNED_LONG) {
896     APPLY_FUNC(a, b, length, unsigned long, SUM_OP);
897   } else if (*datatype == MPI_FLOAT) {
898     APPLY_FUNC(a, b, length, float, SUM_OP);
899   } else if (*datatype == MPI_DOUBLE) {
900     APPLY_FUNC(a, b, length, double, SUM_OP);
901   } else if (*datatype == MPI_LONG_DOUBLE) {
902     APPLY_FUNC(a, b, length, long double, SUM_OP);
903   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
904     APPLY_FUNC(a, b, length, float _Complex, SUM_OP);
905   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
906     APPLY_FUNC(a, b, length, double _Complex, SUM_OP);
907   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
908     APPLY_FUNC(a, b, length, long double _Complex, SUM_OP);
909   }
910 }
911
912 static void prod_func(void *a, void *b, int *length,
913                       MPI_Datatype * datatype)
914 {
915   if (*datatype == MPI_CHAR) {
916     APPLY_FUNC(a, b, length, char, PROD_OP);
917   } else if (*datatype == MPI_SHORT) {
918     APPLY_FUNC(a, b, length, short, PROD_OP);
919   } else if (*datatype == MPI_INT) {
920     APPLY_FUNC(a, b, length, int, PROD_OP);
921   } else if (*datatype == MPI_LONG) {
922     APPLY_FUNC(a, b, length, long, PROD_OP);
923   } else if (*datatype == MPI_UNSIGNED_SHORT) {
924     APPLY_FUNC(a, b, length, unsigned short, PROD_OP);
925   } else if (*datatype == MPI_UNSIGNED) {
926     APPLY_FUNC(a, b, length, unsigned int, PROD_OP);
927   } else if (*datatype == MPI_UNSIGNED_LONG) {
928     APPLY_FUNC(a, b, length, unsigned long, PROD_OP);
929   } else if (*datatype == MPI_FLOAT) {
930     APPLY_FUNC(a, b, length, float, PROD_OP);
931   } else if (*datatype == MPI_DOUBLE) {
932     APPLY_FUNC(a, b, length, double, PROD_OP);
933   } else if (*datatype == MPI_LONG_DOUBLE) {
934     APPLY_FUNC(a, b, length, long double, PROD_OP);
935   } else if (*datatype == MPI_C_FLOAT_COMPLEX) {
936     APPLY_FUNC(a, b, length, float _Complex, PROD_OP);
937   } else if (*datatype == MPI_C_DOUBLE_COMPLEX) {
938     APPLY_FUNC(a, b, length, double _Complex, PROD_OP);
939   } else if (*datatype == MPI_C_LONG_DOUBLE_COMPLEX) {
940     APPLY_FUNC(a, b, length, long double _Complex, PROD_OP);
941   }
942 }
943
944 static void land_func(void *a, void *b, int *length,
945                       MPI_Datatype * datatype)
946 {
947   if (*datatype == MPI_CHAR) {
948     APPLY_FUNC(a, b, length, char, LAND_OP);
949   } else if (*datatype == MPI_SHORT) {
950     APPLY_FUNC(a, b, length, short, LAND_OP);
951   } else if (*datatype == MPI_INT) {
952     APPLY_FUNC(a, b, length, int, LAND_OP);
953   } else if (*datatype == MPI_LONG) {
954     APPLY_FUNC(a, b, length, long, LAND_OP);
955   } else if (*datatype == MPI_UNSIGNED_SHORT) {
956     APPLY_FUNC(a, b, length, unsigned short, LAND_OP);
957   } else if (*datatype == MPI_UNSIGNED) {
958     APPLY_FUNC(a, b, length, unsigned int, LAND_OP);
959   } else if (*datatype == MPI_UNSIGNED_LONG) {
960     APPLY_FUNC(a, b, length, unsigned long, LAND_OP);
961   } else if (*datatype == MPI_C_BOOL) {
962     APPLY_FUNC(a, b, length, _Bool, LAND_OP);
963   }
964 }
965
966 static void lor_func(void *a, void *b, int *length,
967                      MPI_Datatype * datatype)
968 {
969   if (*datatype == MPI_CHAR) {
970     APPLY_FUNC(a, b, length, char, LOR_OP);
971   } else if (*datatype == MPI_SHORT) {
972     APPLY_FUNC(a, b, length, short, LOR_OP);
973   } else if (*datatype == MPI_INT) {
974     APPLY_FUNC(a, b, length, int, LOR_OP);
975   } else if (*datatype == MPI_LONG) {
976     APPLY_FUNC(a, b, length, long, LOR_OP);
977   } else if (*datatype == MPI_UNSIGNED_SHORT) {
978     APPLY_FUNC(a, b, length, unsigned short, LOR_OP);
979   } else if (*datatype == MPI_UNSIGNED) {
980     APPLY_FUNC(a, b, length, unsigned int, LOR_OP);
981   } else if (*datatype == MPI_UNSIGNED_LONG) {
982     APPLY_FUNC(a, b, length, unsigned long, LOR_OP);
983   } else if (*datatype == MPI_C_BOOL) {
984     APPLY_FUNC(a, b, length, _Bool, LOR_OP);
985   }
986 }
987
988 static void lxor_func(void *a, void *b, int *length,
989                       MPI_Datatype * datatype)
990 {
991   if (*datatype == MPI_CHAR) {
992     APPLY_FUNC(a, b, length, char, LXOR_OP);
993   } else if (*datatype == MPI_SHORT) {
994     APPLY_FUNC(a, b, length, short, LXOR_OP);
995   } else if (*datatype == MPI_INT) {
996     APPLY_FUNC(a, b, length, int, LXOR_OP);
997   } else if (*datatype == MPI_LONG) {
998     APPLY_FUNC(a, b, length, long, LXOR_OP);
999   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1000     APPLY_FUNC(a, b, length, unsigned short, LXOR_OP);
1001   } else if (*datatype == MPI_UNSIGNED) {
1002     APPLY_FUNC(a, b, length, unsigned int, LXOR_OP);
1003   } else if (*datatype == MPI_UNSIGNED_LONG) {
1004     APPLY_FUNC(a, b, length, unsigned long, LXOR_OP);
1005   } else if (*datatype == MPI_C_BOOL) {
1006     APPLY_FUNC(a, b, length, _Bool, LXOR_OP);
1007   }
1008 }
1009
1010 static void band_func(void *a, void *b, int *length,
1011                       MPI_Datatype * datatype)
1012 {
1013   if (*datatype == MPI_CHAR) {
1014     APPLY_FUNC(a, b, length, char, BAND_OP);
1015   }
1016   if (*datatype == MPI_SHORT) {
1017     APPLY_FUNC(a, b, length, short, BAND_OP);
1018   } else if (*datatype == MPI_INT) {
1019     APPLY_FUNC(a, b, length, int, BAND_OP);
1020   } else if (*datatype == MPI_LONG) {
1021     APPLY_FUNC(a, b, length, long, BAND_OP);
1022   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1023     APPLY_FUNC(a, b, length, unsigned short, BAND_OP);
1024   } else if (*datatype == MPI_UNSIGNED) {
1025     APPLY_FUNC(a, b, length, unsigned int, BAND_OP);
1026   } else if (*datatype == MPI_UNSIGNED_LONG) {
1027     APPLY_FUNC(a, b, length, unsigned long, BAND_OP);
1028   } else if (*datatype == MPI_BYTE) {
1029     APPLY_FUNC(a, b, length, uint8_t, BAND_OP);
1030   }
1031 }
1032
1033 static void bor_func(void *a, void *b, int *length,
1034                      MPI_Datatype * datatype)
1035 {
1036   if (*datatype == MPI_CHAR) {
1037     APPLY_FUNC(a, b, length, char, BOR_OP);
1038   } else if (*datatype == MPI_SHORT) {
1039     APPLY_FUNC(a, b, length, short, BOR_OP);
1040   } else if (*datatype == MPI_INT) {
1041     APPLY_FUNC(a, b, length, int, BOR_OP);
1042   } else if (*datatype == MPI_LONG) {
1043     APPLY_FUNC(a, b, length, long, BOR_OP);
1044   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1045     APPLY_FUNC(a, b, length, unsigned short, BOR_OP);
1046   } else if (*datatype == MPI_UNSIGNED) {
1047     APPLY_FUNC(a, b, length, unsigned int, BOR_OP);
1048   } else if (*datatype == MPI_UNSIGNED_LONG) {
1049     APPLY_FUNC(a, b, length, unsigned long, BOR_OP);
1050   } else if (*datatype == MPI_BYTE) {
1051     APPLY_FUNC(a, b, length, uint8_t, BOR_OP);
1052   }
1053 }
1054
1055 static void bxor_func(void *a, void *b, int *length,
1056                       MPI_Datatype * datatype)
1057 {
1058   if (*datatype == MPI_CHAR) {
1059     APPLY_FUNC(a, b, length, char, BXOR_OP);
1060   } else if (*datatype == MPI_SHORT) {
1061     APPLY_FUNC(a, b, length, short, BXOR_OP);
1062   } else if (*datatype == MPI_INT) {
1063     APPLY_FUNC(a, b, length, int, BXOR_OP);
1064   } else if (*datatype == MPI_LONG) {
1065     APPLY_FUNC(a, b, length, long, BXOR_OP);
1066   } else if (*datatype == MPI_UNSIGNED_SHORT) {
1067     APPLY_FUNC(a, b, length, unsigned short, BXOR_OP);
1068   } else if (*datatype == MPI_UNSIGNED) {
1069     APPLY_FUNC(a, b, length, unsigned int, BXOR_OP);
1070   } else if (*datatype == MPI_UNSIGNED_LONG) {
1071     APPLY_FUNC(a, b, length, unsigned long, BXOR_OP);
1072   } else if (*datatype == MPI_BYTE) {
1073     APPLY_FUNC(a, b, length, uint8_t, BXOR_OP);
1074   }
1075 }
1076
1077 static void minloc_func(void *a, void *b, int *length,
1078                         MPI_Datatype * datatype)
1079 {
1080   if (*datatype == MPI_FLOAT_INT) {
1081     APPLY_FUNC(a, b, length, float_int, MINLOC_OP);
1082   } else if (*datatype == MPI_LONG_INT) {
1083     APPLY_FUNC(a, b, length, long_int, MINLOC_OP);
1084   } else if (*datatype == MPI_DOUBLE_INT) {
1085     APPLY_FUNC(a, b, length, double_int, MINLOC_OP);
1086   } else if (*datatype == MPI_SHORT_INT) {
1087     APPLY_FUNC(a, b, length, short_int, MINLOC_OP);
1088   } else if (*datatype == MPI_2INT) {
1089     APPLY_FUNC(a, b, length, int_int, MINLOC_OP);
1090   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1091     APPLY_FUNC(a, b, length, long_double_int, MINLOC_OP);
1092   }
1093 }
1094
1095 static void maxloc_func(void *a, void *b, int *length,
1096                         MPI_Datatype * datatype)
1097 {
1098   if (*datatype == MPI_FLOAT_INT) {
1099     APPLY_FUNC(a, b, length, float_int, MAXLOC_OP);
1100   } else if (*datatype == MPI_LONG_INT) {
1101     APPLY_FUNC(a, b, length, long_int, MAXLOC_OP);
1102   } else if (*datatype == MPI_DOUBLE_INT) {
1103     APPLY_FUNC(a, b, length, double_int, MAXLOC_OP);
1104   } else if (*datatype == MPI_SHORT_INT) {
1105     APPLY_FUNC(a, b, length, short_int, MAXLOC_OP);
1106   } else if (*datatype == MPI_2INT) {
1107     APPLY_FUNC(a, b, length, int_int, MAXLOC_OP);
1108   } else if (*datatype == MPI_LONG_DOUBLE_INT) {
1109     APPLY_FUNC(a, b, length, long_double_int, MAXLOC_OP);
1110   }
1111 }
1112
1113
1114 #define CREATE_MPI_OP(name, func)                             \
1115   static s_smpi_mpi_op_t mpi_##name = { &(func) /* func */ }; \
1116 MPI_Op name = &mpi_##name;
1117
1118 CREATE_MPI_OP(MPI_MAX, max_func);
1119 CREATE_MPI_OP(MPI_MIN, min_func);
1120 CREATE_MPI_OP(MPI_SUM, sum_func);
1121 CREATE_MPI_OP(MPI_PROD, prod_func);
1122 CREATE_MPI_OP(MPI_LAND, land_func);
1123 CREATE_MPI_OP(MPI_LOR, lor_func);
1124 CREATE_MPI_OP(MPI_LXOR, lxor_func);
1125 CREATE_MPI_OP(MPI_BAND, band_func);
1126 CREATE_MPI_OP(MPI_BOR, bor_func);
1127 CREATE_MPI_OP(MPI_BXOR, bxor_func);
1128 CREATE_MPI_OP(MPI_MAXLOC, maxloc_func);
1129 CREATE_MPI_OP(MPI_MINLOC, minloc_func);
1130
1131 MPI_Op smpi_op_new(MPI_User_function * function, int commute)
1132 {
1133   MPI_Op op;
1134
1135   //FIXME: add commute param
1136   op = xbt_new(s_smpi_mpi_op_t, 1);
1137   op->func = function;
1138   return op;
1139 }
1140
1141 void smpi_op_destroy(MPI_Op op)
1142 {
1143   xbt_free(op);
1144 }
1145
1146 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
1147                    MPI_Datatype * datatype)
1148 {
1149   op->func(invec, inoutvec, len, datatype);
1150 }