3 /* gras_rl - implementation of GRAS on top of the SimGrid simulator */
5 /* Authors: Martin Quinson */
6 /* Copyright (C) 2003 the OURAGAN project. */
8 /* This program is free software; you can redistribute it and/or modify it
9 under the terms of the license (GNU LGPL) which comes with this package. */
16 GRAS_LOG_DEFAULT_CATEGORY(GRAS);
20 gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
21 grasProcessData_t *pd;
24 if (!(pd=(grasProcessData_t *)malloc(sizeof(grasProcessData_t)))) {
25 fprintf(stderr,"grasInit: out of memory\n");
28 pd->grasMsgQueueLen=0;
29 pd->grasMsgQueue = NULL;
31 pd->grasCblListLen = 0;
32 pd->grasCblList = NULL;
34 if (MSG_process_set_data(MSG_process_self(),(void*)pd) != MSG_OK) {
39 if (!(hd=(gras_hostdata_t *)malloc(sizeof(gras_hostdata_t)))) {
40 fprintf(stderr,"grasInit: out of memory\n");
46 for (i=0; i<MAX_CHANNEL; i++) {
50 if (MSG_host_set_data(MSG_host_self(),(void*)hd) != MSG_OK) {
55 /* take a free channel for this process */
56 for (i=0; i<MAX_CHANNEL && hd->proc[i]; i++);
57 if (i == MAX_CHANNEL) {
59 "GRAS: Can't add a new process on %s, because all channel are already in use. Please increase MAX CHANNEL (which is %d for now) and recompile GRAS\n.",
60 MSG_host_get_name(MSG_host_self()),MAX_CHANNEL);
64 hd->proc[ i ] = MSG_process_self_PID();
66 /* take a free channel for this process */
67 for (i=0; i<MAX_CHANNEL && hd->proc[i]; i++);
68 if (i == MAX_CHANNEL) {
70 "GRAS: Can't add a new process on %s, because all channel are already in use. Please increase MAX CHANNEL (which is %d for now) and recompile GRAS\n.",
71 MSG_host_get_name(MSG_host_self()),MAX_CHANNEL);
75 hd->proc[ i ] = MSG_process_self_PID();
78 fprintf(stderr,"GRAS: Creating process '%s' (%d)\n",
79 MSG_process_get_name(MSG_process_self()),MSG_process_self_PID());
85 gras_process_finalize() {
86 gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
87 grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
88 int myPID=MSG_process_self_PID();
91 gras_assert0(hd && pd,"Run gras_process_init!!\n");
94 fprintf(stderr,"GRAS: Finalizing process '%s' (%d)\n",
95 MSG_process_get_name(MSG_process_self()),MSG_process_self_PID());
96 if (pd->grasMsgQueueLen) {
97 fprintf(stderr,"GRAS: Warning: process %d terminated, but some queued messages where not handled\n",MSG_process_self_PID());
100 for (i=0; i< MAX_CHANNEL; i++)
101 if (myPID == hd->proc[i])
104 for (i=0; i<hd->portLen; i++) {
105 if (hd->port2chan[ i ] == pd->chan) {
106 memmove(&(hd->port[i]), &(hd->port[i+1]), (hd->portLen -i -1) * sizeof(int));
107 memmove(&(hd->port2chan[i]), &(hd->port2chan[i+1]), (hd->portLen -i -1) * sizeof(int));
109 i--; /* counter the effect of the i++ at the end of the iteration */
115 /* **************************************************************************
116 * Openning/Maintaining/Closing connexions (private functions for both raw
117 * and regular sockets)
118 * **************************************************************************/
120 _gras_sock_server_open(unsigned short startingPort, unsigned short endingPort,
121 int raw, unsigned int bufSize, /* OUT */ gras_sock_t **sock) {
123 gras_hostdata_t *hd=hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
124 grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
126 const char *host=MSG_host_get_name(MSG_host_self());
128 gras_assert0(hd,"HostData=NULL !! Please run grasInit on each process\n");
129 gras_assert0(pd,"ProcessData=NULL !! Please run grasInit on each process\n");
131 for (port=startingPort ; port <= endingPort ; port++) {
132 for (i=0; i<hd->portLen && hd->port[i] != port; i++);
133 if (i<hd->portLen && hd->port[i] == port)
136 /* port not used so far. Do it */
137 if (i == hd->portLen) {
138 /* need to enlarge the tables */
140 hd->port2chan=(int*)realloc(hd->port2chan,hd->portLen*sizeof(int));
141 hd->port =(int*)realloc(hd->port ,hd->portLen*sizeof(int));
142 hd->raw =(int*)realloc(hd->raw ,hd->portLen*sizeof(int));
144 hd->port2chan=(int*)malloc(hd->portLen*sizeof(int));
145 hd->port =(int*)malloc(hd->portLen*sizeof(int));
146 hd->raw =(int*)malloc(hd->portLen*sizeof(int));
148 if (!hd->port2chan || !hd->port || !hd->raw) {
149 fprintf(stderr,"GRAS: PANIC: A malloc error did lose all ports attribution on this host\n");
154 hd->port2chan[ i ]=raw ? pd->rawChan : pd->chan;
158 /* Create the socket */
159 if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
160 fprintf(stderr,"GRAS: openServerSocket: out of memory\n");
164 (*sock)->server_sock = 1;
165 (*sock)->raw_sock = raw;
166 (*sock)->from_PID = -1;
167 (*sock)->to_PID = MSG_process_self_PID();
168 (*sock)->to_host = MSG_host_self();
169 (*sock)->to_port = port;
170 (*sock)->to_chan = pd->chan;
173 fprintf(stderr,"GRAS: '%s' (%d) ears on %s:%d%s (%p).\n",
174 MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
175 host,port,raw? " (mode RAW)":"",*sock);
179 /* if we go out of the previous for loop, that's that we didn't find any
180 suited port number */
183 "GRAS: can't find an empty port between %d and %d to open a socket on host %s\n.",
184 startingPort,endingPort,host);
185 return mismatch_error;
189 _gras_sock_client_open(const char *host, short port, int raw, unsigned int bufSize,
190 /* OUT */ gras_sock_t **sock) {
195 /* make sure this socket will reach someone */
196 if (!(peer=MSG_get_host_by_name(host))) {
197 fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",host);
198 return mismatch_error;
200 if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) {
201 fprintf(stderr,"GRAS: can't connect to %s: no process on this host.\n",host);
202 return mismatch_error;
204 for (i=0; i<hd->portLen && port != hd->port[i]; i++);
205 if (i == hd->portLen) {
206 fprintf(stderr,"GRAS: can't connect to %s:%d, no process listen on this port.\n",host,port);
207 return mismatch_error;
210 if (hd->raw[i] && !raw) {
211 fprintf(stderr,"GRAS: can't connect to %s:%d in regular mode, the process listen in raw mode on this port.\n",host,port);
212 return mismatch_error;
214 if (!hd->raw[i] && raw) {
215 fprintf(stderr,"GRAS: can't connect to %s:%d in raw mode, the process listen in regular mode on this port.\n",host,port);
216 return mismatch_error;
220 /* Create the socket */
221 if (!(*sock=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
222 fprintf(stderr,"GRAS: openClientSocket: out of memory\n");
226 (*sock)->server_sock = 0;
227 (*sock)->raw_sock = raw;
228 (*sock)->from_PID = MSG_process_self_PID();
229 (*sock)->to_PID = hd->proc[ hd->port2chan[i] ];
230 (*sock)->to_host = peer;
231 (*sock)->to_port = port;
232 (*sock)->to_chan = hd->port2chan[i];
235 fprintf(stderr,"GRAS: %s (PID %d) connects in %s mode to %s:%d (to_PID=%d).\n",
236 MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
237 raw?"RAW":"regular",host,port,(*sock)->to_PID);
242 gras_error_t _gras_sock_close(int raw, gras_sock_t *sd) {
243 gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self());
246 gras_assert0(hd,"HostData=NULL !! Please run grasInit on each process\n");
248 if (!sd) return no_error;
249 if (raw && !sd->raw_sock) {
250 fprintf(stderr,"GRAS: gras_rawsock_close: Was passed a regular socket. Please use gras_sock_close()\n");
252 if (!raw && sd->raw_sock) {
253 fprintf(stderr,"GRAS: grasSockClose: Was passed a raw socket. Please use gras_rawsock_close()\n");
255 if (sd->server_sock) {
256 /* server mode socket. Un register it from 'OS' tables */
258 i<hd->portLen && sd->to_port != hd->port[i];
261 if (i==hd->portLen) {
262 fprintf(stderr,"GRAS: closeSocket: The host does not know this server socket.\n");
264 memmove(&(hd->port[i]), &(hd->port[i+1]), (hd->portLen -i -1) * sizeof(int));
265 memmove(&(hd->raw[i]), &(hd->raw[i+1]), (hd->portLen -i -1) * sizeof(int));
266 memmove(&(hd->port2chan[i]), &(hd->port2chan[i+1]), (hd->portLen -i -1) * sizeof(int));
274 /* **************************************************************************
275 * Creating/Using regular sockets
276 * **************************************************************************/
278 gras_sock_server_open(unsigned short startingPort, unsigned short endingPort,
279 /* OUT */ gras_sock_t **sock) {
280 return _gras_sock_server_open(startingPort,endingPort,0,0,sock);
284 gras_sock_client_open(const char *host, short port,
285 /* OUT */ gras_sock_t **sock) {
286 return _gras_sock_client_open(host,port,0,0,sock);
289 gras_error_t gras_sock_close(gras_sock_t *sd) {
290 return _gras_sock_close(0,sd);
294 gras_sock_get_my_port(gras_sock_t *sd) {
295 if (!sd || !sd->server_sock) return -1;
300 gras_sock_get_peer_port(gras_sock_t *sd) {
301 if (!sd || sd->server_sock) return -1;
306 gras_sock_get_peer_name(gras_sock_t *sd) {
309 if (!sd) return NULL;
310 if ((proc=MSG_process_from_PID(sd->to_PID))) {
311 return (char*) MSG_host_get_name(MSG_process_get_host(proc));
313 fprintf(stderr,"GRAS: try to access hostname of unknown process %d\n",sd->to_PID);
314 return (char*) "(dead or unknown host)";
317 /* **************************************************************************
318 * Creating/Using raw sockets
319 * **************************************************************************/
320 gras_error_t gras_rawsock_server_open(unsigned short startingPort, unsigned short endingPort,
321 unsigned int bufSize, gras_rawsock_t **sock) {
322 return _gras_sock_server_open(startingPort,endingPort,1,bufSize,(gras_sock_t**)sock);
325 gras_error_t gras_rawsock_client_open(const char *host, short port,
326 unsigned int bufSize, gras_rawsock_t **sock) {
327 return _gras_sock_client_open(host,port,1,bufSize,(gras_sock_t**)sock);
330 gras_error_t gras_rawsock_close(gras_rawsock_t *sd) {
331 return _gras_sock_close(1,(gras_sock_t*)sd);
335 gras_rawsock_get_peer_port(gras_rawsock_t *sd) {
336 if (!sd || !sd->server_sock) return -1;
341 gras_rawsock_send(gras_rawsock_t *sock, unsigned int expSize, unsigned int msgSize) {
342 unsigned int bytesTotal;
343 static unsigned int count=0;
347 gras_assert0(sock->raw_sock,"Asked to send raw data on a regular socket\n");
350 bytesTotal < expSize;
351 bytesTotal += msgSize) {
353 sprintf(name,"Raw data[%d]",count++);
355 task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL);
357 fprintf(stderr, "%f:%s: gras_rawsock_send(%f %s -> %s) BEGIN\n",
359 MSG_process_get_name(MSG_process_self()),
360 ((double)msgSize)/(1024.0*1024.0),
361 MSG_host_get_name( MSG_host_self()),
362 MSG_host_get_name( sock->to_host));
364 if (MSG_task_put(task, sock->to_host,sock->to_chan) != MSG_OK) {
365 fprintf(stderr,"GRAS: msgSend: Problem during the MSG_task_put()\n");
366 return unknown_error;
368 /*fprintf(stderr, "%f:%s: gras_rawsock_send(%f -> %s) END\n",
370 MSG_process_get_name(MSG_process_self()),
371 ((double)msgSize)/(1024.0*1024.0),
372 MSG_host_get_name( sock->to_host));*/
378 gras_rawsock_recv(gras_rawsock_t *sock, unsigned int expSize, unsigned int msgSize,
379 unsigned int timeout) {
380 grasProcessData_t *pd=(grasProcessData_t *)MSG_process_get_data(MSG_process_self());
381 unsigned int bytesTotal;
385 gras_assert0(sock->raw_sock,"Asked to receive raw data on a regular socket\n");
387 startTime=gras_time();
389 bytesTotal < expSize;
390 bytesTotal += msgSize) {
394 fprintf(stderr, "%f:%s: gras_rawsock_recv() BEGIN\n",
396 MSG_process_get_name(MSG_process_self()));
399 if (MSG_task_Iprobe((m_channel_t) pd->rawChan)) {
400 if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) {
401 fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
402 return unknown_error;
404 if (MSG_task_destroy(task) != MSG_OK) {
405 fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
406 return unknown_error;
409 fprintf(stderr, "%f:%s: gras_rawsock_recv() END\n",
411 MSG_process_get_name(MSG_process_self()));
415 MSG_process_sleep(0.0001);
417 } while (gras_time() - startTime < timeout);
419 if (gras_time() - startTime > timeout)
420 return timeout_error;
426 /* **************************************************************************
427 * Actually exchanging messages
428 * **************************************************************************/
431 grasMsgRecv(gras_msg_t **msg,
434 double startTime=gras_time();
435 grasProcessData_t *pd=grasProcessDataGet();
439 if (MSG_task_Iprobe((m_channel_t) pd->chan)) {
440 if (MSG_task_get(&task, (m_channel_t) pd->chan) != MSG_OK) {
441 fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
442 return unknown_error;
445 *msg=(gras_msg_t*)MSG_task_get_data(task);
449 gras_msg_t *__dm_=*msg;
451 fprintf(stderr, "grasMsgRecv(%s) = %d seq (",
452 __dm_->entry->name, __dm_->entry->seqCount );
454 for (i=0; i<__dm_->entry->seqCount; i++) {
455 fprintf(stderr,"%d elem {",__dm_->dataCount[i]);
456 for (j=0; j<__dm_->dataCount[i]; j++) {
457 fprintf(stderr,"%p; ",__dm_->data[i]);
459 fprintf(stderr,"},");
461 fprintf(stderr, ")\n");
465 if (MSG_task_destroy(task) != MSG_OK) {
466 fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
467 return unknown_error;
472 MSG_process_sleep(1);
474 } while (gras_time()-startTime < timeOut || MSG_task_Iprobe((m_channel_t) pd->chan));
475 return timeout_error;
479 gras_msg_send(gras_sock_t *sd,
481 e_gras_free_directive_t freeDirective) {
483 grasProcessData_t *pd=grasProcessDataGet();
490 /* arg validity checks */
491 gras_assert0(msg,"Trying to send NULL message");
492 gras_assert0(sd ,"Trying to send over a NULL socket");
494 if (!(answer=(gras_sock_t*)malloc(sizeof(gras_sock_t)))) {
497 answer->server_sock=0;
499 answer->from_PID = sd->to_PID;
500 answer->to_PID = MSG_process_self_PID();
501 answer->to_host = MSG_host_self();
503 answer->to_chan = pd->chan;
505 sprintf(name,"%s[%d]",_msg->entry->name,count++);
506 /* if freeDirective == free_never, we have to build a copy of the message */
507 if (freeDirective == free_never) {
508 msg=gras_msg_copy(_msg);
515 fprintf(stderr,"Send %s with answer(%p)=",msg->entry->name,msg->sock);
516 fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
517 msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
518 msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
519 fprintf(stderr,"Send over %p=(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
520 sd,sd->server_sock,sd->raw_sock,sd->from_PID,sd->to_PID,sd->to_host,sd->to_port,sd->to_chan);
526 gras_msg_t *__dm_=msg;
528 fprintf(stderr, "gras_msg_send(%s) = %d seq (",
529 __dm_->entry->name, __dm_->entry->seqCount );
531 for (i=0; i<__dm_->entry->seqCount; i++) {
532 fprintf(stderr,"%d elem {",__dm_->dataCount[i]);
533 for (j=0; j<__dm_->dataCount[i]; j++) {
534 fprintf(stderr,"%p; ",__dm_->data[i]);
536 fprintf(stderr,"},");
538 fprintf(stderr, ")\n");
543 task=MSG_task_create(name,0,((double)msg->header->dataSize)/(1024.0*1024.0),msg);
544 if (MSG_task_put(task, sd->to_host,sd->to_chan) != MSG_OK) {
545 fprintf(stderr,"GRAS: msgSend: Problem during the MSG_task_put()\n");
546 return unknown_error;
551 gras_sock_t *gras_sock_new(void) {
552 return malloc(sizeof(gras_sock_t));
555 void grasSockFree(gras_sock_t *s) {
559 /* **************************************************************************
561 * **************************************************************************/
562 grasProcessData_t *grasProcessDataGet() {
563 return (grasProcessData_t *)MSG_process_get_data(MSG_process_self());
566 /* **************************************************************************
567 * Wrappers over OS functions
568 * **************************************************************************/
570 return MSG_getClock();
573 void gras_sleep(unsigned long sec, unsigned long usec) {
574 MSG_process_sleep((double)sec + ((double)usec)/1000000);