Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add new entry in Release_Notes.
[simgrid.git] / teshsuite / smpi / mpich3-test / io / i_aggregation1.c
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  *  (C) 2014 by Argonne National Laboratory.
4  *      See COPYRIGHT in top-level directory.
5  */
6
7 /* Test case from John Bent (ROMIO req #835)
8  * Aggregation code was not handling certain access patterns when collective
9  * buffering forced */
10
11 /* Uses nonblocking collective I/O.*/
12
13 #include <unistd.h>
14 #include <stdlib.h>
15 #include <mpi.h>
16 #include <stdio.h>
17 #include <string.h>
18
19 #define NUM_OBJS 4
20 #define OBJ_SIZE 1048576
21
22 extern char *optarg;
23 extern int optind, opterr, optopt;
24
25
26 char *prog = NULL;
27 int debug = 0;
28
29 static void Usage(int line)
30 {
31     int rank;
32     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
33     if (rank == 0) {
34         fprintf(stderr,
35                 "Usage (line %d): %s [-d] [-h] -f filename\n"
36                 "\t-d for debugging\n"
37                 "\t-h to turn on the hints to force collective aggregation\n", line, prog);
38     }
39     exit(0);
40 }
41
42 static void fatal_error(int mpi_ret, MPI_Status * mpi_stat, const char *msg)
43 {
44     fprintf(stderr, "Fatal error %s: %d\n", msg, mpi_ret);
45     MPI_Abort(MPI_COMM_WORLD, -1);
46 }
47
48 static void print_hints(int rank, MPI_File * mfh)
49 {
50     MPI_Info info;
51     int nkeys;
52     int i, dummy_int;
53     char key[1024];
54     char value[1024];
55
56     MPI_Barrier(MPI_COMM_WORLD);
57     if (rank == 0) {
58         MPI_File_get_info(*mfh, &info);
59         MPI_Info_get_nkeys(info, &nkeys);
60
61         printf("HINTS:\n");
62         for (i = 0; i < nkeys; i++) {
63             MPI_Info_get_nthkey(info, i, key);
64             printf("%35s -> ", key);
65             MPI_Info_get(info, key, 1024, value, &dummy_int);
66             printf("%s\n", value);
67         }
68         MPI_Info_free(&info);
69     }
70     MPI_Barrier(MPI_COMM_WORLD);
71 }
72
73 static void fill_buffer(char *buffer, int bufsize, int rank, MPI_Offset offset)
74 {
75     memset((void *) buffer, 0, bufsize);
76     snprintf(buffer, bufsize, "Hello from %d at %lld\n", rank, offset);
77 }
78
79 static MPI_Offset get_offset(int rank, int num_objs, int obj_size, int which_obj)
80 {
81     MPI_Offset offset;
82     offset = (MPI_Offset) rank *num_objs * obj_size + which_obj * obj_size;
83     return offset;
84 }
85
86 static void write_file(char *target, int rank, MPI_Info * info)
87 {
88     MPI_File wfh;
89     MPI_Request *request;
90     MPI_Status *mpi_stat;
91     int mpi_ret;
92     int i;
93     char **buffer;
94
95     request = (MPI_Request *) malloc(NUM_OBJS * sizeof(MPI_Request));
96     mpi_stat = (MPI_Status *) malloc(NUM_OBJS * sizeof(MPI_Status));
97     buffer = (char **) malloc(NUM_OBJS * sizeof(char *));
98
99     if (debug)
100         printf("%d writing file %s\n", rank, target);
101
102     if ((mpi_ret = MPI_File_open(MPI_COMM_WORLD, target,
103                                  MPI_MODE_WRONLY | MPI_MODE_CREATE, *info, &wfh))
104         != MPI_SUCCESS) {
105         fatal_error(mpi_ret, NULL, "open for write");
106     }
107
108     /* nonblocking collective write */
109     for (i = 0; i < NUM_OBJS; i++) {
110         MPI_Offset offset = get_offset(rank, NUM_OBJS, OBJ_SIZE, i);
111         buffer[i] = (char *) malloc(OBJ_SIZE);
112         fill_buffer(buffer[i], OBJ_SIZE, rank, offset);
113         if (debug)
114             printf("%s", buffer[i]);
115         if ((mpi_ret = MPI_File_iwrite_at_all(wfh, offset, buffer[i], OBJ_SIZE,
116                                               MPI_CHAR, &request[i]))
117             != MPI_SUCCESS) {
118             fatal_error(mpi_ret, NULL, "write");
119         }
120     }
121
122     if (debug)
123         print_hints(rank, &wfh);
124
125     MPI_Waitall(NUM_OBJS, request, mpi_stat);
126
127     if ((mpi_ret = MPI_File_close(&wfh)) != MPI_SUCCESS) {
128         fatal_error(mpi_ret, NULL, "close for write");
129     }
130     if (debug)
131         printf("%d wrote file %s\n", rank, target);
132
133     for (i = 0; i < NUM_OBJS; i++)
134         free(buffer[i]);
135     free(buffer);
136     free(mpi_stat);
137     free(request);
138 }
139
140 static int reduce_corruptions(int corrupt_blocks)
141 {
142     int mpi_ret;
143     int sum;
144     if ((mpi_ret = MPI_Reduce(&corrupt_blocks, &sum, 1, MPI_INT, MPI_SUM, 0,
145                               MPI_COMM_WORLD)) != MPI_SUCCESS) {
146         fatal_error(mpi_ret, NULL, "MPI_Reduce");
147     }
148     return sum;
149 }
150
151 static void read_file(char *target, int rank, MPI_Info * info, int *corrupt_blocks)
152 {
153     MPI_File rfh;
154     MPI_Offset *offset;
155     MPI_Request *request;
156     MPI_Status *mpi_stat;
157     int mpi_ret;
158     int i;
159     char **buffer;
160     char **verify_buf = NULL;
161
162     offset = (MPI_Offset *) malloc(NUM_OBJS * sizeof(MPI_Offset));
163     request = (MPI_Request *) malloc(NUM_OBJS * sizeof(MPI_Request));
164     mpi_stat = (MPI_Status *) malloc(NUM_OBJS * sizeof(MPI_Status));
165     buffer = (char **) malloc(NUM_OBJS * sizeof(char *));
166     verify_buf = (char **) malloc(NUM_OBJS * sizeof(char *));
167
168     if (debug)
169         printf("%d reading file %s\n", rank, target);
170
171     if ((mpi_ret = MPI_File_open(MPI_COMM_WORLD, target, MPI_MODE_RDONLY,
172                                  *info, &rfh)) != MPI_SUCCESS) {
173         fatal_error(mpi_ret, NULL, "open for read");
174     }
175
176     /* nonblocking collective read */
177     for (i = 0; i < NUM_OBJS; i++) {
178         offset[i] = get_offset(rank, NUM_OBJS, OBJ_SIZE, i);
179         buffer[i] = (char *) malloc(OBJ_SIZE);
180         verify_buf[i] = (char *) malloc(OBJ_SIZE);
181         fill_buffer(verify_buf[i], OBJ_SIZE, rank, offset[i]);
182         if (debug)
183             printf("Expecting %s", verify_buf[i]);
184         if ((mpi_ret = MPI_File_iread_at_all(rfh, offset[i], buffer[i],
185                                              OBJ_SIZE, MPI_CHAR, &request[i]))
186             != MPI_SUCCESS) {
187             fatal_error(mpi_ret, NULL, "read");
188         }
189     }
190
191     MPI_Waitall(NUM_OBJS, request, mpi_stat);
192
193     /* verification */
194     for (i = 0; i < NUM_OBJS; i++) {
195         if (memcmp(verify_buf[i], buffer[i], OBJ_SIZE) != 0) {
196             (*corrupt_blocks)++;
197             printf("Corruption at %lld\n", offset[i]);
198             if (debug) {
199                 printf("\tExpecting %s\n" "\tRecieved  %s\n", verify_buf[i], buffer[i]);
200             }
201         }
202     }
203
204     if ((mpi_ret = MPI_File_close(&rfh)) != MPI_SUCCESS) {
205         fatal_error(mpi_ret, NULL, "close for read");
206     }
207
208     for (i = 0; i < NUM_OBJS; i++) {
209         free(verify_buf[i]);
210         free(buffer[i]);
211     }
212     free(verify_buf);
213     free(buffer);
214     free(mpi_stat);
215     free(request);
216     free(offset);
217 }
218
219 static void set_hints(MPI_Info * info)
220 {
221     MPI_Info_set(*info, "romio_cb_write", "enable");
222     MPI_Info_set(*info, "romio_no_indep_rw", "1");
223     MPI_Info_set(*info, "cb_nodes", "1");
224     MPI_Info_set(*info, "cb_buffer_size", "4194304");
225 }
226
227 /*
228 void
229 set_hints(MPI_Info *info, char *hints) {
230     char *delimiter = " ";
231     char *hints_cp  = strdup(hints);
232     char *key = strtok(hints_cp, delimiter);
233     char *val;
234     while (key) {
235         val = strtok(NULL, delimiter);
236         if (debug) printf("HINT: %s = %s\n", key, val);
237         if (! val) {
238             Usage(__LINE__);
239         }
240         MPI_Info_set(*info, key, val);
241         key = strtok(NULL, delimiter);
242     }
243     free(hints_cp);
244 }
245 */
246
247 int main(int argc, char *argv[])
248 {
249     int nproc = 1, rank = 0;
250     char *target = NULL;
251     int c;
252     MPI_Info info;
253     int mpi_ret;
254     int corrupt_blocks = 0;
255
256     MPI_Init(&argc, &argv);
257     MPI_Comm_size(MPI_COMM_WORLD, &nproc);
258     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
259
260     if ((mpi_ret = MPI_Info_create(&info)) != MPI_SUCCESS) {
261         if (rank == 0)
262             fatal_error(mpi_ret, NULL, "MPI_info_create.\n");
263     }
264
265     prog = strdup(argv[0]);
266
267     if (argc > 1) {
268         while ((c = getopt(argc, argv, "df:h")) != EOF) {
269             switch (c) {
270             case 'd':
271                 debug = 1;
272                 break;
273             case 'f':
274                 target = strdup(optarg);
275                 break;
276             case 'h':
277                 set_hints(&info);
278                 break;
279             default:
280                 Usage(__LINE__);
281             }
282         }
283         if (!target) {
284             Usage(__LINE__);
285         }
286     }
287     else {
288         target = (char*)"testfile";
289         set_hints(&info);
290     }
291
292     write_file(target, rank, &info);
293     read_file(target, rank, &info, &corrupt_blocks);
294
295     corrupt_blocks = reduce_corruptions(corrupt_blocks);
296     if (rank == 0) {
297         if (corrupt_blocks == 0) {
298             fprintf(stdout, " No Errors\n");
299         }
300         else {
301             fprintf(stdout, "%d/%d blocks corrupt\n", corrupt_blocks, nproc * NUM_OBJS);
302         }
303     }
304     MPI_Info_free(&info);
305
306     MPI_Finalize();
307     free(prog);
308     exit(0);
309 }