static int master(int argc, char *argv[])
{
- int slaves_count = 0;
- msg_host_t *slaves = NULL;
- int number_of_tasks = 0;
- double task_comp_size = 0;
- double task_comm_size = 0;
int i;
- XBT_ATTRIB_UNUSED int read;
-
- read = sscanf(argv[1], "%d", &number_of_tasks);
- xbt_assert(read, "Invalid argument %s\n", argv[1]);
- read = sscanf(argv[2], "%lg", &task_comp_size);
- xbt_assert(read, "Invalid argument %s\n", argv[2]);
- read = sscanf(argv[3], "%lg", &task_comm_size);
- xbt_assert(read, "Invalid argument %s\n", argv[3]);
-
- { /* Process organization */
- slaves_count = argc - 4;
- slaves = xbt_new0(msg_host_t, slaves_count);
-
- for (i = 4; i < argc; i++) {
- slaves[i - 4] = MSG_host_by_name(argv[i]);
- if (slaves[i - 4] == NULL) {
- XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
- abort();
- }
- }
- }
- XBT_INFO("Got %d slave(s) :", slaves_count);
- for (i = 0; i < slaves_count; i++)
- XBT_INFO("%s", MSG_host_get_name(slaves[i]));
+ long number_of_tasks = xbt_str_parse_int(argv[1], "Invalid amount of tasks: %s");
+ double task_comp_size = xbt_str_parse_double(argv[2], "Invalid computational size: %s");
+ double task_comm_size = xbt_str_parse_double(argv[3], "Invalid communication size: %s");
+ long workers_count = xbt_str_parse_int(argv[4], "Invalid amount of workers: %s");
- XBT_INFO("Got %d task to process :", number_of_tasks);
+ XBT_INFO("Got %ld workers and %ld tasks to process", workers_count, number_of_tasks);
for (i = 0; i < number_of_tasks; i++) {
+ char mailbox[256];
+ sprintf(mailbox, "worker-%ld", i % workers_count);
+
msg_task_t task = MSG_task_create("Task", task_comp_size, task_comm_size, xbt_new0(double, 1));
*((double *) task->data) = MSG_get_clock();
- msg_error_t a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i % slaves_count]),10.0);
+ msg_error_t a = MSG_task_send_with_timeout(task,mailbox,10.0);
if (a == MSG_OK) {
XBT_INFO("Send completed");
XBT_INFO("Gloups. The cpu on which I'm running just turned off!. See you!");
free(task->data);
MSG_task_destroy(task);
- free(slaves);
return 0;
} else if (a == MSG_TRANSFER_FAILURE) {
- XBT_INFO("Mmh. Something went wrong with '%s'. Nevermind. Let's keep going!",
- MSG_host_get_name(slaves[i % slaves_count]));
+ XBT_INFO("Mmh. Something went wrong with '%s'. Nevermind. Let's keep going!", mailbox);
free(task->data);
MSG_task_destroy(task);
} else if (a == MSG_TIMEOUT) {
- XBT_INFO ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
- MSG_host_get_name(slaves[i % slaves_count]));
+ XBT_INFO ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!", mailbox);
free(task->data);
MSG_task_destroy(task);
} else {
}
XBT_INFO("All tasks have been dispatched. Let's tell everybody the computation is over.");
- for (i = 0; i < slaves_count; i++) {
+ for (i = 0; i < workers_count; i++) {
+ char mailbox[256];
+ sprintf(mailbox, "worker-%ld", i % workers_count);
msg_task_t task = MSG_task_create("finalize", 0, 0, FINALIZE);
- int a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i]),1.0);
+ int a = MSG_task_send_with_timeout(task,mailbox,1.0);
if (a == MSG_OK)
continue;
if (a == MSG_HOST_FAILURE) {
XBT_INFO("Gloups. The cpu on which I'm running just turned off!. See you!");
MSG_task_destroy(task);
- free(slaves);
return 0;
} else if (a == MSG_TRANSFER_FAILURE) {
- XBT_INFO("Mmh. Can't reach '%s'! Nevermind. Let's keep going!", MSG_host_get_name(slaves[i]));
+ XBT_INFO("Mmh. Can't reach '%s'! Nevermind. Let's keep going!", mailbox);
MSG_task_destroy(task);
} else if (a == MSG_TIMEOUT) {
- XBT_INFO("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
- MSG_host_get_name(slaves[i % slaves_count]));
+ XBT_INFO("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!", mailbox);
MSG_task_destroy(task);
} else {
XBT_INFO("Hey ?! What's up ? ");
- xbt_die("Unexpected behavior with '%s': %d", MSG_host_get_name(slaves[i]), a);
+ xbt_die("Unexpected behavior with '%s': %d", mailbox, a);
}
}
XBT_INFO("Goodbye now!");
- free(slaves);
return 0;
}
-static int slave(int argc, char *argv[])
+static int worker(int argc, char *argv[])
{
+ msg_task_t task = NULL;
+ char mailbox[80];
+
+ long id= xbt_str_parse_int(argv[1], "Invalid argument %s");
+
+ sprintf(mailbox, "worker-%ld", id);
+
while (1) {
- msg_task_t task = NULL;
int a;
double time1, time2;
time1 = MSG_get_clock();
- a = MSG_task_receive( &(task), MSG_host_get_name(MSG_host_self()) );
+ a = MSG_task_receive( &(task), mailbox);
time2 = MSG_get_clock();
if (a == MSG_OK) {
XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
if (MSG_task_get_data(task) == FINALIZE) {
MSG_task_destroy(task);
+ task = NULL;
break;
}
if (time1 < *((double *) task->data))
XBT_INFO("\"%s\" done", MSG_task_get_name(task));
free(task->data);
MSG_task_destroy(task);
+ task = NULL;
} else if (a == MSG_HOST_FAILURE) {
XBT_INFO("Gloups. The cpu on which I'm running just turned off!. See you!");
free(task->data);
MSG_task_destroy(task);
+ task = NULL;
return 0;
} else {
XBT_INFO("Hey ?! What's up ? ");
MSG_create_environment(argv[1]);
MSG_function_register("master", master);
- MSG_function_register("slave", slave);
+ MSG_function_register("worker", worker);
MSG_launch_application(argv[2]);
res = MSG_main();