3 /* ddt_exchange - send/recv data described */
5 /* Copyright (c) 2003 Olivier Aumage. */
6 /* Copyright (c) 2003, 2004 Martin Quinson. */
7 /* All rights reserved. */
9 /* This program is free software; you can redistribute it and/or modify it
10 * under the terms of the license (GNU LGPL) which comes with this package. */
12 #include "gras/DataDesc/datadesc_private.h"
13 #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ddt_exchange,datadesc,
16 "Sending data over the network");
17 const char *gras_datadesc_cat_names[9] = {
19 "scalar", "struct", "union", "ref", "array", "ignored",
22 static gras_datadesc_type_t int_type = NULL;
23 static gras_datadesc_type_t pointer_type = NULL;
24 static _XBT_INLINE xbt_error_t gras_dd_send_int(gras_socket_t sock, int i);
25 static _XBT_INLINE xbt_error_t gras_dd_recv_int(gras_socket_t sock, int r_arch, int *i);
27 static _XBT_INLINE xbt_error_t
28 gras_dd_alloc_ref(xbt_dict_t refs, long int size,
29 char **r_ref, long int r_len,
30 char **l_ref, int detect_cycle);
32 static _XBT_INLINE int
33 gras_dd_is_r_null(char **r_ptr, long int length);
36 gras_datadesc_send_rec(gras_socket_t sock,
39 gras_datadesc_type_t type,
43 gras_datadesc_recv_rec(gras_socket_t sock,
46 gras_datadesc_type_t type,
55 static _XBT_INLINE xbt_error_t
56 gras_dd_send_int(gras_socket_t sock,int i) {
59 int_type = gras_datadesc_by_name("int");
63 DEBUG1("send_int(%d)",i);
64 return gras_trp_chunk_send(sock, (char*)&i, int_type->size[GRAS_THISARCH]);
67 static _XBT_INLINE xbt_error_t
68 gras_dd_recv_int(gras_socket_t sock, int r_arch, int *i) {
72 int_type = gras_datadesc_by_name("int");
76 if (int_type->size[GRAS_THISARCH] >= int_type->size[r_arch]) {
77 TRY(gras_trp_chunk_recv(sock, (char*)i, int_type->size[r_arch]));
78 if (r_arch != GRAS_THISARCH)
79 TRY(gras_dd_convert_elm(int_type,1,r_arch, i,i));
81 void *ptr = xbt_malloc(int_type->size[r_arch]);
83 TRY(gras_trp_chunk_recv(sock, (char*)ptr, int_type->size[r_arch]));
84 if (r_arch != GRAS_THISARCH)
85 TRY(gras_dd_convert_elm(int_type,1,r_arch, ptr,i));
88 DEBUG1("recv_int(%d)",*i);
94 * Note: here we suppose that the remote NULL is a sequence
95 * of 'length' bytes set to 0.
96 * FIXME: Check in configure?
98 static _XBT_INLINE int
99 gras_dd_is_r_null(char **r_ptr, long int length) {
102 for (i=0; i<length; i++) {
103 if ( ((unsigned char*)r_ptr) [i]) {
111 static _XBT_INLINE xbt_error_t
112 gras_dd_alloc_ref(xbt_dict_t refs,
115 long int r_len, /* pointer_type->size[r_arch] */
120 xbt_assert1(size>0,"Cannot allocate %ld bytes!", size);
121 l_data = xbt_malloc((size_t)size);
124 DEBUG2("l_data=%p, &l_data=%p",(void*)l_data,(void*)&l_data);
126 DEBUG3("alloc_ref: r_ref=%p; *r_ref=%p, r_len=%ld",
127 (void*)r_ref, (void*)(r_ref?*r_ref:NULL), r_len);
128 if (detect_cycle && r_ref && !gras_dd_is_r_null( r_ref, r_len)) {
129 void *ptr = xbt_malloc(sizeof(void *));
131 CRITICAL0("detect_cycle");
132 memcpy(ptr,l_ref, sizeof(void *));
134 DEBUG2("Insert %p under %p",*(void**)ptr, *(void**)r_ref);
137 xbt_dict_set_ext(refs,(const char *) r_ref, r_len, ptr, xbt_free_fct);
145 * Copy the data pointed by src and described by type
146 * to a new location, and store a pointer to it in dst.
149 xbt_error_t gras_datadesc_cpy(gras_datadesc_type_t type,
156 gras_datadesc_send_rec(gras_socket_t sock,
159 gras_datadesc_type_t type,
165 gras_datadesc_type_t sub_type; /* type on which we recurse */
167 VERB2("Send a %s (%s)",
168 type->name, gras_datadesc_cat_names[type->category_code]);
171 type->send(state,data);
174 switch (type->category_code) {
175 case e_gras_datadesc_type_cat_scalar:
176 TRY(gras_trp_chunk_send(sock, data, type->size[GRAS_THISARCH]));
179 case e_gras_datadesc_type_cat_struct: {
180 gras_dd_cat_struct_t struct_data;
181 gras_dd_cat_field_t field;
184 struct_data = type->category.struct_data;
185 xbt_assert1(struct_data.closed,
186 "Please call gras_datadesc_declare_struct_close on %s before sending it",
188 VERB1(">> Send all fields of the structure %s",type->name);
189 xbt_dynar_foreach(struct_data.fields, cpt, field) {
191 field_data += field->offset[GRAS_THISARCH];
193 sub_type = field->type;
196 field->pre(state,field_data);
198 VERB1("Send field %s",field->name);
199 TRY(gras_datadesc_send_rec(sock,state,refs,sub_type, field_data, detect_cycle || sub_type->cycle));
202 field->post(state,field_data);
204 VERB1("<< Sent all fields of the structure %s", type->name);
209 case e_gras_datadesc_type_cat_union: {
210 gras_dd_cat_union_t union_data;
211 gras_dd_cat_field_t field=NULL;
214 union_data = type->category.union_data;
216 xbt_assert1(union_data.closed,
217 "Please call gras_datadesc_declare_union_close on %s before sending it",
219 /* retrieve the field number */
220 field_num = union_data.selector(state, data);
222 xbt_assert1(field_num > 0,
223 "union field selector of %s gave a negative value",
226 xbt_assert3(field_num < xbt_dynar_length(union_data.fields),
227 "union field selector of %s returned %d but there is only %lu fields",
228 type->name, field_num, xbt_dynar_length(union_data.fields));
230 /* Send the field number */
231 TRY(gras_dd_send_int(sock, field_num));
233 /* Send the content */
234 field = xbt_dynar_get_as(union_data.fields, field_num, gras_dd_cat_field_t);
235 sub_type = field->type;
238 field->pre(state,data);
240 TRY(gras_datadesc_send_rec(sock,state,refs, sub_type, data, detect_cycle || sub_type->cycle));
243 field->post(state,data);
248 case e_gras_datadesc_type_cat_ref: {
249 gras_dd_cat_ref_t ref_data;
251 void **ref=(void**)data;
254 ref_data = type->category.ref_data;
256 /* Detect the referenced type and send it to peer if needed */
257 sub_type = ref_data.type;
258 if (sub_type == NULL) {
259 sub_type = (*ref_data.selector)(state,data);
260 TRY(gras_dd_send_int(sock, sub_type->code));
263 /* Send the actual value of the pointer for cycle handling */
265 pointer_type = gras_datadesc_by_name("data pointer");
266 xbt_assert(pointer_type);
269 TRY(gras_trp_chunk_send(sock, (char*)data,
270 pointer_type->size[GRAS_THISARCH]));
272 /* Send the pointed data only if not already sent */
273 if (*(void**)data == NULL) {
274 VERB0("Not sending NULL referenced data");
277 errcode = detect_cycle
278 ? xbt_dict_get_ext(refs,(char*)ref, sizeof(void*), &dummy)
280 if (errcode == mismatch_error) {
281 VERB1("Sending data referenced at %p", (void*)*ref);
283 xbt_dict_set_ext(refs, (char*)ref, sizeof(void*), ref, NULL);
284 TRY(gras_datadesc_send_rec(sock,state,refs, sub_type, *ref, detect_cycle || sub_type->cycle));
286 } else if (errcode == no_error) {
287 VERB1("Not sending data referenced at %p (already done)", (void*)*ref);
295 case e_gras_datadesc_type_cat_array: {
296 gras_dd_cat_array_t array_data;
301 array_data = type->category.array_data;
303 /* determine and send the element count */
304 count = array_data.fixed_size;
306 count = array_data.dynamic_size(state,data);
307 xbt_assert1(count >=0,
308 "Invalid (negative) array size for type %s",type->name);
309 TRY(gras_dd_send_int(sock, count));
312 /* send the content */
313 sub_type = array_data.type;
314 elm_size = sub_type->aligned_size[GRAS_THISARCH];
315 if (sub_type->category_code == e_gras_datadesc_type_cat_scalar) {
316 VERB1("Array of %ld scalars, send it in one shot",count);
317 TRY(gras_trp_chunk_send(sock, data,
318 sub_type->aligned_size[GRAS_THISARCH] * count));
319 } else if (sub_type->category_code == e_gras_datadesc_type_cat_array &&
320 sub_type->category.array_data.fixed_size > 0 &&
321 sub_type->category.array_data.type->category_code == e_gras_datadesc_type_cat_scalar) {
323 VERB1("Array of %ld fixed array of scalars, send it in one shot",count);
324 TRY(gras_trp_chunk_send(sock, data,
325 sub_type->category.array_data.type->aligned_size[GRAS_THISARCH]
326 * count * sub_type->category.array_data.fixed_size));
329 for (cpt=0; cpt<count; cpt++) {
330 TRY(gras_datadesc_send_rec(sock,state,refs, sub_type, ptr, detect_cycle || sub_type->cycle));
338 xbt_assert0(0, "Invalid type");
345 * gras_datadesc_send:
347 * Copy the data pointed by src and described by type to the socket
350 xbt_error_t gras_datadesc_send(gras_socket_t sock,
351 gras_datadesc_type_t type,
356 xbt_dict_t refs; /* all references already sent */
358 refs = xbt_dict_new();
359 state = gras_cbps_new();
361 errcode = gras_datadesc_send_rec(sock,state,refs,type,(char*)src, type->cycle);
363 xbt_dict_free(&refs);
364 gras_cbps_free(&state);
370 * gras_datadesc_recv_rec:
372 * Do the data reception job recursively.
374 * subsize used only to deal with vicious case of reference to dynamic array.
375 * This size is needed at the reference reception level (to allocate enough
376 * space) and at the array reception level (to fill enough room).
378 * Having this size passed as an argument of the recursive function is a crude
379 * hack, but I was told that working code is sometimes better than neat one ;)
382 gras_datadesc_recv_rec(gras_socket_t sock,
385 gras_datadesc_type_t type,
395 gras_datadesc_type_t sub_type;
397 VERB2("Recv a %s @%p", type->name, (void*)l_data);
400 switch (type->category_code) {
401 case e_gras_datadesc_type_cat_scalar:
402 if (type->size[GRAS_THISARCH] == type->size[r_arch]) {
403 TRY(gras_trp_chunk_recv(sock, (char*)l_data, type->size[r_arch]));
404 if (r_arch != GRAS_THISARCH)
405 TRY(gras_dd_convert_elm(type,1,r_arch, l_data,l_data));
407 void *ptr = xbt_malloc(type->size[r_arch]);
409 TRY(gras_trp_chunk_recv(sock, (char*)ptr, type->size[r_arch]));
410 if (r_arch != GRAS_THISARCH)
411 TRY(gras_dd_convert_elm(type,1,r_arch, ptr,l_data));
416 case e_gras_datadesc_type_cat_struct: {
417 gras_dd_cat_struct_t struct_data;
418 gras_dd_cat_field_t field;
420 struct_data = type->category.struct_data;
422 xbt_assert1(struct_data.closed,
423 "Please call gras_datadesc_declare_struct_close on %s before receiving it",
425 VERB1(">> Receive all fields of the structure %s",type->name);
426 xbt_dynar_foreach(struct_data.fields, cpt, field) {
427 char *field_data = l_data + field->offset[GRAS_THISARCH];
429 sub_type = field->type;
431 TRY(gras_datadesc_recv_rec(sock,state,refs, sub_type,
434 detect_cycle || sub_type->cycle));
436 VERB1("<< Received all fields of the structure %s", type->name);
441 case e_gras_datadesc_type_cat_union: {
442 gras_dd_cat_union_t union_data;
443 gras_dd_cat_field_t field=NULL;
446 union_data = type->category.union_data;
448 xbt_assert1(union_data.closed,
449 "Please call gras_datadesc_declare_union_close on %s before receiving it",
451 /* retrieve the field number */
452 TRY(gras_dd_recv_int(sock, r_arch, &field_num));
454 RAISE1(mismatch_error,
455 "Received union field for %s is negative", type->name);
456 if (field_num < xbt_dynar_length(union_data.fields))
457 RAISE3(mismatch_error,
458 "Received union field for %s is %d but there is only %lu fields",
459 type->name, field_num, xbt_dynar_length(union_data.fields));
461 /* Recv the content */
462 field = xbt_dynar_get_as(union_data.fields, field_num, gras_dd_cat_field_t);
463 sub_type = field->type;
465 TRY(gras_datadesc_recv_rec(sock,state,refs, sub_type,
468 detect_cycle || sub_type->cycle));
472 case e_gras_datadesc_type_cat_ref: {
475 gras_dd_cat_ref_t ref_data;
477 ref_data = type->category.ref_data;
479 /* Get the referenced type locally or from peer */
480 sub_type = ref_data.type;
481 if (sub_type == NULL) {
483 TRY(gras_dd_recv_int(sock, r_arch, &ref_code));
484 TRY(gras_datadesc_by_id(ref_code, &sub_type));
487 /* Get the actual value of the pointer for cycle handling */
489 pointer_type = gras_datadesc_by_name("data pointer");
490 xbt_assert(pointer_type);
493 r_ref = xbt_malloc(pointer_type->size[r_arch]);
495 TRY(gras_trp_chunk_recv(sock, (char*)r_ref,
496 pointer_type->size[r_arch]));
498 /* Receive the pointed data only if not already sent */
499 if (gras_dd_is_r_null(r_ref, pointer_type->size[r_arch])) {
500 VERB1("Not receiving data remotely referenced @%p since it's NULL",
502 *(void**)l_data = NULL;
507 errcode = detect_cycle
508 ? xbt_dict_get_ext(refs,
509 (char*)r_ref, pointer_type->size[r_arch],
513 if (errcode == mismatch_error) {
515 void *l_referenced=NULL;
517 VERB2("Receiving a ref to '%s', remotely @%p",
518 sub_type->name, *(void**)r_ref);
519 if (sub_type->category_code == e_gras_datadesc_type_cat_array) {
520 /* Damn. Reference to a dynamic array. Allocating the size for it
521 is more complicated */
522 gras_dd_cat_array_t array_data = sub_type->category.array_data;
523 gras_datadesc_type_t subsub_type;
525 subsubcount = array_data.fixed_size;
526 if (subsubcount == 0)
527 TRY(gras_dd_recv_int(sock, r_arch, &subsubcount));
529 subsub_type = array_data.type;
532 TRY(gras_dd_alloc_ref(refs,
533 subsub_type->size[GRAS_THISARCH] * subsubcount,
534 r_ref,pointer_type->size[r_arch],
535 (char**)&l_referenced,
538 TRY(gras_dd_alloc_ref(refs,sub_type->size[GRAS_THISARCH],
539 r_ref,pointer_type->size[r_arch],
540 (char**)&l_referenced,
544 TRY(gras_datadesc_recv_rec(sock,state,refs, sub_type,
545 r_arch,r_ref,pointer_type->size[r_arch],
546 (char*)l_referenced, subsubcount,
547 detect_cycle || sub_type->cycle));
548 *(void**)l_data=l_referenced;
549 VERB3("'%s' remotely referenced at %p locally at %p",
550 sub_type->name, *(void**)r_ref, l_referenced);
552 } else if (errcode == no_error) {
553 VERB2("NOT receiving data remotely referenced @%p (already done, @%p here)",
554 *(void**)r_ref, *(void**)l_ref);
556 *(void**)l_data=*l_ref;
565 case e_gras_datadesc_type_cat_array: {
566 gras_dd_cat_array_t array_data;
571 array_data = type->category.array_data;
572 /* determine element count locally, or from caller, or from peer */
573 count = array_data.fixed_size;
577 TRY(gras_dd_recv_int(sock, r_arch, &count));
579 RAISE1(mismatch_error,
580 "Invalid (=0) array size for type %s",type->name);
582 /* receive the content */
583 sub_type = array_data.type;
584 if (sub_type->category_code == e_gras_datadesc_type_cat_scalar) {
585 VERB1("Array of %d scalars, get it in one shoot", count);
586 if (sub_type->aligned_size[GRAS_THISARCH] >=
587 sub_type->aligned_size[r_arch]) {
588 TRY(gras_trp_chunk_recv(sock, (char*)l_data,
589 sub_type->aligned_size[r_arch] * count));
590 if (r_arch != GRAS_THISARCH)
591 TRY(gras_dd_convert_elm(sub_type,count,r_arch, l_data,l_data));
593 ptr = xbt_malloc(sub_type->aligned_size[r_arch] * count);
595 TRY(gras_trp_chunk_recv(sock, (char*)ptr,
596 sub_type->size[r_arch] * count));
597 if (r_arch != GRAS_THISARCH)
598 TRY(gras_dd_convert_elm(sub_type,count,r_arch, ptr,l_data));
601 } else if (sub_type->category_code == e_gras_datadesc_type_cat_array &&
602 sub_type->category.array_data.fixed_size > 0 &&
603 sub_type->category.array_data.type->category_code == e_gras_datadesc_type_cat_scalar) {
604 gras_datadesc_type_t subsub_type;
605 array_data = sub_type->category.array_data;
606 subsub_type = array_data.type;
608 VERB1("Array of %d fixed array of scalars, get it in one shot",count);
609 if (subsub_type->aligned_size[GRAS_THISARCH] >=
610 subsub_type->aligned_size[r_arch]) {
611 TRY(gras_trp_chunk_recv(sock, (char*)l_data,
612 subsub_type->aligned_size[r_arch] * count *
613 array_data.fixed_size));
614 if (r_arch != GRAS_THISARCH)
615 TRY(gras_dd_convert_elm(subsub_type,count*array_data.fixed_size,r_arch, l_data,l_data));
617 ptr = xbt_malloc(subsub_type->aligned_size[r_arch] * count*array_data.fixed_size);
619 TRY(gras_trp_chunk_recv(sock, (char*)ptr,
620 subsub_type->size[r_arch] * count*array_data.fixed_size));
621 if (r_arch != GRAS_THISARCH)
622 TRY(gras_dd_convert_elm(subsub_type,count*array_data.fixed_size,r_arch, ptr,l_data));
628 /* not scalar content, get it recursively (may contain pointers) */
629 elm_size = sub_type->aligned_size[GRAS_THISARCH];
630 VERB2("Receive a %d-long array of %s",count, sub_type->name);
633 for (cpt=0; cpt<count; cpt++) {
634 TRY(gras_datadesc_recv_rec(sock,state,refs, sub_type,
635 r_arch, NULL, 0, ptr,-1,
636 detect_cycle || sub_type->cycle));
644 xbt_assert0(0, "Invalid type");
648 type->recv(state,l_data);
654 * gras_datadesc_recv:
656 * Get an instance of the datatype described by @type from the @socket,
657 * and store a pointer to it in @dst
661 gras_datadesc_recv(gras_socket_t sock,
662 gras_datadesc_type_t type,
667 gras_cbps_t state; /* callback persistent state */
668 xbt_dict_t refs; /* all references already sent */
670 refs = xbt_dict_new();
671 state = gras_cbps_new();
673 errcode = gras_datadesc_recv_rec(sock, state, refs, type,
678 xbt_dict_free(&refs);
679 gras_cbps_free(&state);