--- /dev/null
+/* $Id: masterslave_forwarder.c 5108 2007-12-04 16:29:05Z mquinson $ */\r
+\r
+/* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */\r
+\r
+/* This program is free software; you can redistribute it and/or modify it\r
+ * under the terms of the license (GNU LGPL) which comes with this package. */\r
+\r
+#include <stdio.h>\r
+#include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */\r
+#include "xbt/sysdep.h" /* calloc, printf */\r
+\r
+/* Create a log channel to have nice outputs. */\r
+#include "xbt/log.h"\r
+#include "xbt/asserts.h"\r
+XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,"Messages specific for this msg example");\r
+\r
+\r
+typedef enum {\r
+ PORT_22 = 0,\r
+ MAX_CHANNEL\r
+} channel_t;\r
+\r
+int master(int argc, char *argv[]);\r
+int slave(int argc, char *argv[]);\r
+int forwarder(int argc, char *argv[]);\r
+MSG_error_t test_all(const char *platform_file, const char *application_file);\r
+\r
+\r
+#define FINALIZE ((void*)221297) /* a magic number to tell people to stop working */\r
+\r
+/** Emitter function */\r
+int master(int argc, char *argv[])\r
+{\r
+ int alias_count = 0;\r
+ char** aliases = NULL;\r
+ m_task_t *todo = NULL;\r
+ int number_of_tasks = 0;\r
+ double task_comp_size = 0;\r
+ double task_comm_size = 0;\r
+\r
+\r
+ int i;\r
+\r
+ xbt_assert1(sscanf(argv[1],"%d", &number_of_tasks),"Invalid argument %s\n",argv[1]);\r
+ xbt_assert1(sscanf(argv[2],"%lg", &task_comp_size),"Invalid argument %s\n",argv[2]);\r
+ xbt_assert1(sscanf(argv[3],"%lg", &task_comm_size),"Invalid argument %s\n",argv[3]);\r
+\r
+ {\r
+ /* Task creation */\r
+ char sprintf_buffer[64];\r
+\r
+ todo = calloc(number_of_tasks, sizeof(m_task_t));\r
+\r
+ for (i = 0; i < number_of_tasks; i++) \r
+ {\r
+ sprintf(sprintf_buffer, "Task_%d", i);\r
+ todo[i] = MSG_task_create(sprintf_buffer, task_comp_size, task_comm_size, NULL);\r
+ }\r
+ }\r
+\r
+ { \r
+ /* Process organisation */\r
+ alias_count = argc - 4;\r
+ aliases = (char**) calloc(alias_count, sizeof(char*));\r
+\r
+ for(i = 4; i < argc; i++) \r
+ {\r
+ aliases[i-4] = strdup(argv[i]);\r
+ }\r
+ }\r
+\r
+ INFO2("Got %d aliases and %d tasks to process", alias_count,number_of_tasks);\r
+ \r
+ for (i = 0; i < alias_count; i++)\r
+ DEBUG1("%s", aliases[i]);\r
+\r
+ for (i = 0; i < number_of_tasks; i++) \r
+ {\r
+ INFO2("Sending \"%s\" to \"%s\"",todo[i]->name,aliases[i % alias_count]);\r
+ \r
+ if(!strcmp(MSG_host_get_name(MSG_host_self()), aliases[i % alias_count])) \r
+ {\r
+ INFO0("Hey ! It's me ! :)");\r
+ }\r
+\r
+ MSG_task_send(todo[i], aliases[i % alias_count]);\r
+ INFO0("Sent");\r
+ }\r
+\r
+ INFO0("All tasks have been dispatched. Let's tell everybody the computation is over.");\r
+ \r
+ for (i = 0; i < alias_count; i++) \r
+ MSG_task_send(MSG_task_create("finalize", 0, 0, FINALIZE),aliases[i]);\r
+\r
+ INFO0("Goodbye now!");\r
+ \r
+ for(i = 0; i < alias_count; i++)\r
+ free(aliases[i]);\r
+ \r
+ free(aliases);\r
+ free(todo);\r
+ return 0;\r
+} /* end_of_master */\r
+\r
+/** Receiver function */\r
+int slave(int argc, char *argv[])\r
+{\r
+ m_task_t task = NULL;\r
+ int res;\r
+ \r
+ while(1) \r
+ {\r
+ res = MSG_task_receive(&(task), MSG_host_get_name(MSG_host_self()));\r
+ xbt_assert0(res == MSG_OK, "MSG_task_receive failed");\r
+ \r
+ INFO1("Received \"%s\"", MSG_task_get_name(task));\r
+ \r
+ if (!strcmp(MSG_task_get_name(task),"finalize")) \r
+ {\r
+ MSG_task_destroy(task);\r
+ break;\r
+ }\r
+\r
+ INFO1("Processing \"%s\"", MSG_task_get_name(task));\r
+ MSG_task_execute(task);\r
+ INFO1("\"%s\" done", MSG_task_get_name(task));\r
+ MSG_task_destroy(task);\r
+ task = NULL;\r
+ }\r
+ \r
+ INFO0("I'm done. See you!");\r
+ return 0;\r
+} /* end_of_slave */\r
+\r
+/** Forwarder function */\r
+int forwarder(int argc, char *argv[])\r
+{\r
+ int i;\r
+ int alias_count;\r
+ char** aliases;\r
+ \r
+ { /* Process organisation */\r
+ alias_count = argc - 1;\r
+ aliases = calloc(alias_count, sizeof(char*));\r
+ \r
+ for (i = 1; i < argc; i++) \r
+ aliases[i-1] =strdup(argv[i]);\r
+ }\r
+\r
+ i=0;\r
+ \r
+ while(1) \r
+ {\r
+ m_task_t task = NULL;\r
+ int a;\r
+ \r
+ a = MSG_task_receive(&(task),MSG_host_get_name(MSG_host_self()));\r
+ \r
+ if (a == MSG_OK) \r
+ {\r
+ INFO1("Received \"%s\"", MSG_task_get_name(task));\r
+ \r
+ if(MSG_task_get_data(task)==FINALIZE) \r
+ {\r
+ INFO0("All tasks have been dispatched. Let's tell everybody the computation is over.");\r
+ \r
+ for (i = 0; i < alias_count; i++) \r
+ MSG_task_send(MSG_task_create("finalize", 0, 0, FINALIZE),aliases[i]);\r
+ \r
+ MSG_task_destroy(task);\r
+ break;\r
+ }\r
+ \r
+ INFO2("Sending \"%s\" to \"%s\"",MSG_task_get_name(task),aliases[i% alias_count]);\r
+ MSG_task_send(task, aliases[i % alias_count]);\r
+ i++;\r
+ } \r
+ else \r
+ {\r
+ INFO0("Hey ?! What's up ? ");\r
+ xbt_assert0(0,"Unexpected behavior");\r
+ }\r
+ }\r
+ \r
+ for(i = 0; i < alias_count; i++)\r
+ free(aliases[i]);\r
+\r
+ INFO0("I'm done. See you!");\r
+ return 0;\r
+ \r
+} /* end_of_forwarder */\r
+\r
+/** Test function */\r
+MSG_error_t test_all(const char *platform_file,const char *application_file)\r
+{\r
+ MSG_error_t res = MSG_OK;\r
+\r
+ { /* Simulation setting */\r
+ MSG_set_channel_number(MAX_CHANNEL);\r
+ MSG_paje_output("msg_test.trace");\r
+ MSG_create_environment(platform_file);\r
+ }\r
+ \r
+ {\r
+ /* Application deployment */\r
+ MSG_function_register("master", master);\r
+ MSG_function_register("slave", slave);\r
+ MSG_function_register("forwarder", forwarder);\r
+ MSG_launch_application(application_file);\r
+ }\r
+ \r
+ res = MSG_main();\r
+\r
+ INFO1("Simulation time %g",MSG_get_clock());\r
+ return res;\r
+} /* end_of_test_all */\r
+\r
+\r
+/** Main function */\r
+int main(int argc, char *argv[])\r
+{\r
+ MSG_error_t res = MSG_OK;\r
+ \r
+ MSG_global_init(&argc,argv);\r
+ \r
+ if (argc < 3) \r
+ {\r
+ printf ("Usage: %s platform_file deployment_file\n",argv[0]);\r
+ printf ("example: %s msg_platform.xml msg_deployment.xml\n",argv[0]);\r
+ exit(1);\r
+ }\r
+ \r
+ res = test_all(argv[1],argv[2]);\r
+ MSG_clean();\r
+ \r
+ if(res==MSG_OK)\r
+ return 0;\r
+ else\r
+ return 1;\r
+} /* end_of_main */\r