3 /* gras_rl - implementation of GRAS on top of the SimGrid simulator */
5 /* Authors: Martin Quinson */
6 /* Copyright (C) 2003 the OURAGAN project. */
8 /* This program is free software; you can redistribute it and/or modify it
9 under the terms of the license (GNU LGPL) which comes with this package. */
16 GRAS_LOG_DEFAULT_CATEGORY(GRAS);
18 /* **************************************************************************
19 * Openning/Maintaining/Closing connexions (private functions for both raw
20 * and regular sockets)
21 * **************************************************************************/
23 _gras_sock_server_open(unsigned short startingPort, unsigned short endingPort,
24 int raw, unsigned int bufSize, /* OUT */ gras_sock_t **sock) {
26 gras_hostdata_t *hd=hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
27 grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
29 const char *host=MSG_host_get_name(MSG_host_self());
31 gras_assert0(hd,"HostData=NULL !! Please run grasInit on each process\n");
32 gras_assert0(pd,"ProcessData=NULL !! Please run grasInit on each process\n");
34 for (port=startingPort ; port <= endingPort ; port++) {
35 for (i=0; i<hd->portLen && hd->port[i] != port; i++);
36 if (i<hd->portLen && hd->port[i] == port)
39 /* port not used so far. Do it */
40 if (i == hd->portLen) {
41 /* need to enlarge the tables */
43 hd->port2chan=(int*)realloc(hd->port2chan,hd->portLen*sizeof(int));
44 hd->port =(int*)realloc(hd->port ,hd->portLen*sizeof(int));
45 hd->raw =(int*)realloc(hd->raw ,hd->portLen*sizeof(int));
47 hd->port2chan=(int*)malloc(hd->portLen*sizeof(int));
48 hd->port =(int*)malloc(hd->portLen*sizeof(int));
49 hd->raw =(int*)malloc(hd->portLen*sizeof(int));
51 if (!hd->port2chan || !hd->port || !hd->raw) {
52 fprintf(stderr,"GRAS: PANIC: A malloc error did lose all ports attribution on this host\n");
57 hd->port2chan[ i ]=raw ? pd->rawChan : pd->chan;
61 /* Create the socket */
62 if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
63 fprintf(stderr,"GRAS: openServerSocket: out of memory\n");
67 (*sock)->server_sock = 1;
68 (*sock)->raw_sock = raw;
69 (*sock)->from_PID = -1;
70 (*sock)->to_PID = MSG_process_self_PID();
71 (*sock)->to_host = MSG_host_self();
72 (*sock)->to_port = port;
73 (*sock)->to_chan = pd->chan;
76 fprintf(stderr,"GRAS: '%s' (%d) ears on %s:%d%s (%p).\n",
77 MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
78 host,port,raw? " (mode RAW)":"",*sock);
82 /* if we go out of the previous for loop, that's that we didn't find any
86 "GRAS: can't find an empty port between %d and %d to open a socket on host %s\n.",
87 startingPort,endingPort,host);
88 return mismatch_error;
92 _gras_sock_client_open(const char *host, short port, int raw, unsigned int bufSize,
93 /* OUT */ gras_sock_t **sock) {
98 /* make sure this socket will reach someone */
99 if (!(peer=MSG_get_host_by_name(host))) {
100 fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",host);
101 return mismatch_error;
103 if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) {
104 fprintf(stderr,"GRAS: can't connect to %s: no process on this host.\n",host);
105 return mismatch_error;
107 for (i=0; i<hd->portLen && port != hd->port[i]; i++);
108 if (i == hd->portLen) {
109 fprintf(stderr,"GRAS: can't connect to %s:%d, no process listen on this port.\n",host,port);
110 return mismatch_error;
113 if (hd->raw[i] && !raw) {
114 fprintf(stderr,"GRAS: can't connect to %s:%d in regular mode, the process listen in raw mode on this port.\n",host,port);
115 return mismatch_error;
117 if (!hd->raw[i] && raw) {
118 fprintf(stderr,"GRAS: can't connect to %s:%d in raw mode, the process listen in regular mode on this port.\n",host,port);
119 return mismatch_error;
123 /* Create the socket */
124 if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
125 fprintf(stderr,"GRAS: openClientSocket: out of memory\n");
129 (*sock)->server_sock = 0;
130 (*sock)->raw_sock = raw;
131 (*sock)->from_PID = MSG_process_self_PID();
132 (*sock)->to_PID = hd->proc[ hd->port2chan[i] ];
133 (*sock)->to_host = peer;
134 (*sock)->to_port = port;
135 (*sock)->to_chan = hd->port2chan[i];
138 fprintf(stderr,"GRAS: %s (PID %d) connects in %s mode to %s:%d (to_PID=%d).\n",
139 MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
140 raw?"RAW":"regular",host,port,(*sock)->to_PID);
145 gras_error_t _gras_sock_close(int raw, gras_sock_t *sd) {
146 gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
149 gras_assert0(hd,"HostData=NULL !! Please run grasInit on each process\n");
151 if (!sd) return no_error;
152 if (raw && !sd->raw_sock) {
153 fprintf(stderr,"GRAS: gras_rawsock_close: Was passed a regular socket. Please use gras_sock_close()\n");
155 if (!raw && sd->raw_sock) {
156 fprintf(stderr,"GRAS: grasSockClose: Was passed a raw socket. Please use gras_rawsock_close()\n");
158 if (sd->server_sock) {
159 /* server mode socket. Un register it from 'OS' tables */
161 i<hd->portLen && sd->to_port != hd->port[i];
164 if (i==hd->portLen) {
165 fprintf(stderr,"GRAS: closeSocket: The host does not know this server socket.\n");
167 memmove(&(hd->port[i]), &(hd->port[i+1]), (hd->portLen -i -1) * sizeof(int));
168 memmove(&(hd->raw[i]), &(hd->raw[i+1]), (hd->portLen -i -1) * sizeof(int));
169 memmove(&(hd->port2chan[i]), &(hd->port2chan[i+1]), (hd->portLen -i -1) * sizeof(int));
177 /* **************************************************************************
178 * Creating/Using regular sockets
179 * **************************************************************************/
181 gras_sock_server_open(unsigned short startingPort, unsigned short endingPort,
182 /* OUT */ gras_sock_t **sock) {
183 return _gras_sock_server_open(startingPort,endingPort,0,0,sock);
187 gras_sock_client_open(const char *host, short port,
188 /* OUT */ gras_sock_t **sock) {
189 return _gras_sock_client_open(host,port,0,0,sock);
192 gras_error_t gras_sock_close(gras_sock_t *sd) {
193 return _gras_sock_close(0,sd);
197 gras_sock_get_my_port(gras_sock_t *sd) {
198 if (!sd || !sd->server_sock) return -1;
203 gras_sock_get_peer_port(gras_sock_t *sd) {
204 if (!sd || sd->server_sock) return -1;
209 gras_sock_get_peer_name(gras_sock_t *sd) {
212 if (!sd) return NULL;
213 if ((proc=MSG_process_from_PID(sd->to_PID))) {
214 return (char*) MSG_host_get_name(MSG_process_get_host(proc));
216 fprintf(stderr,"GRAS: try to access hostname of unknown process %d\n",sd->to_PID);
217 return (char*) "(dead or unknown host)";
220 /* **************************************************************************
221 * Creating/Using raw sockets
222 * **************************************************************************/
223 gras_error_t gras_rawsock_server_open(unsigned short startingPort, unsigned short endingPort,
224 unsigned int bufSize, gras_rawsock_t **sock) {
225 return _gras_sock_server_open(startingPort,endingPort,1,bufSize,(gras_sock_t**)sock);
228 gras_error_t gras_rawsock_client_open(const char *host, short port,
229 unsigned int bufSize, gras_rawsock_t **sock) {
230 return _gras_sock_client_open(host,port,1,bufSize,(gras_sock_t**)sock);
233 gras_error_t gras_rawsock_close(gras_rawsock_t *sd) {
234 return _gras_sock_close(1,(gras_sock_t*)sd);
238 gras_rawsock_get_peer_port(gras_rawsock_t *sd) {
239 if (!sd || !sd->server_sock) return -1;
244 gras_rawsock_send(gras_rawsock_t *sock, unsigned int expSize, unsigned int msgSize) {
245 unsigned int bytesTotal;
246 static unsigned int count=0;
250 gras_assert0(sock->raw_sock,"Asked to send raw data on a regular socket\n");
253 bytesTotal < expSize;
254 bytesTotal += msgSize) {
256 sprintf(name,"Raw data[%d]",count++);
258 task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL);
260 fprintf(stderr, "%f:%s: gras_rawsock_send(%f %s -> %s) BEGIN\n",
262 MSG_process_get_name(MSG_process_self()),
263 ((double)msgSize)/(1024.0*1024.0),
264 MSG_host_get_name( MSG_host_self()),
265 MSG_host_get_name( sock->to_host));
267 if (MSG_task_put(task, sock->to_host,sock->to_chan) != MSG_OK) {
268 fprintf(stderr,"GRAS: msgSend: Problem during the MSG_task_put()\n");
269 return unknown_error;
271 /*fprintf(stderr, "%f:%s: gras_rawsock_send(%f -> %s) END\n",
273 MSG_process_get_name(MSG_process_self()),
274 ((double)msgSize)/(1024.0*1024.0),
275 MSG_host_get_name( sock->to_host));*/
281 gras_rawsock_recv(gras_rawsock_t *sock, unsigned int expSize, unsigned int msgSize,
282 unsigned int timeout) {
283 grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
284 unsigned int bytesTotal;
288 gras_assert0(sock->raw_sock,"Asked to receive raw data on a regular socket\n");
290 startTime=gras_time();
292 bytesTotal < expSize;
293 bytesTotal += msgSize) {
297 fprintf(stderr, "%f:%s: gras_rawsock_recv() BEGIN\n",
299 MSG_process_get_name(MSG_process_self()));
302 if (MSG_task_Iprobe((m_channel_t) pd->rawChan)) {
303 if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) {
304 fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
305 return unknown_error;
307 if (MSG_task_destroy(task) != MSG_OK) {
308 fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
309 return unknown_error;
312 fprintf(stderr, "%f:%s: gras_rawsock_recv() END\n",
314 MSG_process_get_name(MSG_process_self()));
318 MSG_process_sleep(0.0001);
320 } while (gras_time() - startTime < timeout);
322 if (gras_time() - startTime > timeout)
323 return timeout_error;
329 /* **************************************************************************
330 * Actually exchanging messages
331 * **************************************************************************/
334 grasMsgRecv(gras_msg_t **msg,
337 double startTime=gras_time();
338 grasProcessData_t *pd=grasProcessDataGet();
342 if (MSG_task_Iprobe((m_channel_t) pd->chan)) {
343 if (MSG_task_get(&task, (m_channel_t) pd->chan) != MSG_OK) {
344 fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
345 return unknown_error;
348 *msg=(gras_msg_t*)MSG_task_get_data(task);
352 gras_msg_t *__dm_=*msg;
354 fprintf(stderr, "grasMsgRecv(%s) = %d seq (",
355 __dm_->entry->name, __dm_->entry->seqCount );
357 for (i=0; i<__dm_->entry->seqCount; i++) {
358 fprintf(stderr,"%d elem {",__dm_->dataCount[i]);
359 for (j=0; j<__dm_->dataCount[i]; j++) {
360 fprintf(stderr,"%p; ",__dm_->data[i]);
362 fprintf(stderr,"},");
364 fprintf(stderr, ")\n");
368 if (MSG_task_destroy(task) != MSG_OK) {
369 fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
370 return unknown_error;
375 MSG_process_sleep(1);
377 } while (gras_time()-startTime < timeOut || MSG_task_Iprobe((m_channel_t) pd->chan));
378 return timeout_error;
382 gras_msg_send(gras_sock_t *sd,
384 e_gras_free_directive_t freeDirective) {
386 grasProcessData_t *pd=grasProcessDataGet();
393 /* arg validity checks */
394 gras_assert0(msg,"Trying to send NULL message");
395 gras_assert0(sd ,"Trying to send over a NULL socket");
397 if (!(answer=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
400 answer->server_sock=0;
402 answer->from_PID = sd->to_PID;
403 answer->to_PID = MSG_process_self_PID();
404 answer->to_host = MSG_host_self();
406 answer->to_chan = pd->chan;
408 sprintf(name,"%s[%d]",_msg->entry->name,count++);
409 /* if freeDirective == free_never, we have to build a copy of the message */
410 if (freeDirective == free_never) {
411 msg=gras_msg_copy(_msg);
418 fprintf(stderr,"Send %s with answer(%p)=",msg->entry->name,msg->sock);
419 fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
420 msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
421 msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
422 fprintf(stderr,"Send over %p=(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
423 sd,sd->server_sock,sd->raw_sock,sd->from_PID,sd->to_PID,sd->to_host,sd->to_port,sd->to_chan);
429 gras_msg_t *__dm_=msg;
431 fprintf(stderr, "gras_msg_send(%s) = %d seq (",
432 __dm_->entry->name, __dm_->entry->seqCount );
434 for (i=0; i<__dm_->entry->seqCount; i++) {
435 fprintf(stderr,"%d elem {",__dm_->dataCount[i]);
436 for (j=0; j<__dm_->dataCount[i]; j++) {
437 fprintf(stderr,"%p; ",__dm_->data[i]);
439 fprintf(stderr,"},");
441 fprintf(stderr, ")\n");
446 task=MSG_task_create(name,0,((double)msg->header->dataSize)/(1024.0*1024.0),msg);
447 if (MSG_task_put(task, sd->to_host,sd->to_chan) != MSG_OK) {
448 fprintf(stderr,"GRAS: msgSend: Problem during the MSG_task_put()\n");
449 return unknown_error;
454 gras_sock_t *gras_sock_new(void) {
455 return malloc(sizeof(gras_sock_t));
458 void grasSockFree(gras_sock_t *s) {
462 /* **************************************************************************
464 * **************************************************************************/
465 grasProcessData_t *grasProcessDataGet() {
466 return (grasProcessData_t *)MSG_process_get_data(MSG_process_self());
469 /* **************************************************************************
470 * Wrappers over OS functions
471 * **************************************************************************/
473 return MSG_getClock();
476 void gras_sleep(unsigned long sec, unsigned long usec) {
477 MSG_process_sleep((double)sec + ((double)usec)/1000000);