- if(item) {
- xbt_fifo_remove_item(h->simdata->mbox[channel],item);
- break;
- }
- }
- }
-
- if(max_duration>0) {
- if(!first_time) {
- MSG_RETURN(MSG_OK);
- }
- }
- xbt_assert2(!(h_simdata->sleeping[channel]),
- "A process (%s(%d)) is already blocked on this channel",
- h_simdata->sleeping[channel]->name,
- h_simdata->sleeping[channel]->simdata->PID);
- h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
- if(max_duration>0) {
- __MSG_process_block(max_duration);
- } else {
- __MSG_process_block(-1);
- }
- h_simdata->sleeping[channel] = NULL;
- first_time = 0;
- if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
- == SURF_CPU_OFF)
- MSG_RETURN(MSG_HOST_FAILURE);
- /* OK, we should both be ready now. Are you there ? */
- }
-
- DEBUG1("OK, got a task (%s)", t->name);
-
- t_simdata = t->simdata;
- /* *task = __MSG_task_copy(t); */
- *task=t;
-
- /* Transfer */
- t_simdata->using++;
-
- while(MSG_process_is_suspended(t_simdata->sender)) {
- DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him",
- t_simdata->sender->name);
- m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
- if(__MSG_process_isBlocked(t_simdata->sender)) {
- DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
- __MSG_process_unblock(t_simdata->sender);
- task_to_wait_for->simdata->using++;
- __MSG_task_wait_event(process, task_to_wait_for);
- MSG_task_destroy(task_to_wait_for);
- } else {
- DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
- task_to_wait_for->simdata->using++;
- __MSG_task_wait_event(process, task_to_wait_for);
- MSG_task_destroy(task_to_wait_for);
- DEBUG0("He's resumed. He should block again. So let's free him.");
- __MSG_process_unblock(t_simdata->sender);
- break;
- }
- }
- DEBUG0("Calling SURF for communication creation");
- t_simdata->comm = surf_workstation_resource->extension_public->
- communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
- h->simdata->host, t_simdata->message_size,t_simdata->rate);
-
- surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
-
- if(__MSG_process_isBlocked(t_simdata->sender)) {
- DEBUG1("Unblocking %s",t_simdata->sender->name);
- __MSG_process_unblock(t_simdata->sender);
- }
-
- PAJE_PROCESS_PUSH_STATE(process,"C",t);
-
- do {
- DEBUG0("Waiting for action termination");
- __MSG_task_wait_event(process, t);
- state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
- } while (state==SURF_ACTION_RUNNING);
- DEBUG0("Action terminated");
-
- if(t->simdata->using>1) {
- xbt_fifo_unshift(msg_global->process_to_run,process);
- xbt_context_yield();
- }
-
- PAJE_PROCESS_POP_STATE(process);
- PAJE_COMM_STOP(process,t,channel);
-
- if(state == SURF_ACTION_DONE) {
- if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
- t_simdata->comm = NULL;
- MSG_RETURN(MSG_OK);
- } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
- == SURF_CPU_OFF) {
- if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
- t_simdata->comm = NULL;
- MSG_RETURN(MSG_HOST_FAILURE);
- } else {
- if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
- t_simdata->comm = NULL;
- MSG_RETURN(MSG_TRANSFER_FAILURE);
- }