* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
+ * under the terms of the license (GNU LGPL) which comes with this package. */
#include "private.h"
#include "smpi_mpi_dt_private.h"
#include "mc/mc.h"
+#include "xbt/replay.h"
#include "surf/surf.h"
#include "simix/smx_private.h"
#include "simgrid/sg_config.h"
MPI_Comm comm_self;
void *data; /* user data */
int index;
- int initialized;
+ char state;
int sampling; /* inside an SMPI_SAMPLE_ block? */
} s_smpi_process_data_t;
void smpi_process_destroy(void)
{
int index = smpi_process_index();
- process_data[index]->index = -100;
+ process_data[index]->state = SMPI_FINALIZED;
XBT_DEBUG("<%d> Process left the game", index);
}
*/
void smpi_process_finalize(void)
{
- // wait for all pending asynchronous comms to finish
- while (SIMIX_process_has_pending_comms(SIMIX_process_self())) {
- simcall_process_sleep(0.01);
+ int i;
+ int size = smpi_comm_size(MPI_COMM_WORLD);
+ int rank = smpi_comm_rank(MPI_COMM_WORLD);
+ /* All non-root send & receive zero-length message. */
+ if (rank > 0) {
+ smpi_mpi_ssend (NULL, 0, MPI_BYTE, 0,
+ COLL_TAG_BARRIER,
+ MPI_COMM_WORLD);
+ smpi_mpi_recv (NULL, 0, MPI_BYTE, 0,
+ COLL_TAG_BARRIER,
+ MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+ }
+ /* The root collects and broadcasts the messages. */
+ else {
+ MPI_Request* requests;
+ requests = (MPI_Request*)malloc( size * sizeof(MPI_Request) );
+ for (i = 1; i < size; ++i) {
+ requests[i] = smpi_mpi_irecv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE,
+ COLL_TAG_BARRIER, MPI_COMM_WORLD
+ );
+ }
+ smpi_mpi_waitall( size-1, requests+1, MPI_STATUSES_IGNORE );
+ for (i = 1; i < size; ++i) {
+ requests[i] = smpi_mpi_issend(NULL, 0, MPI_BYTE, i,
+ COLL_TAG_BARRIER,
+ MPI_COMM_WORLD
+ );
+ }
+ smpi_mpi_waitall( size-1, requests+1, MPI_STATUSES_IGNORE );
+ free( requests );
}
+
}
/**
*/
int smpi_process_finalized()
{
- return (smpi_process_index() == -100);
- // If finalized, this value has been set to -100;
+ int index = smpi_process_index();
+ if (index != MPI_UNDEFINED)
+ return (process_data[index]->state == SMPI_FINALIZED);
+ else
+ return 0;
}
/**
int smpi_process_initialized(void)
{
int index = smpi_process_index();
- return ((index != -100) && (index != MPI_UNDEFINED)
- && (process_data[index]->initialized));
+ return ( (index != MPI_UNDEFINED)
+ && (process_data[index]->state == SMPI_INITIALIZED));
}
/**
void smpi_process_mark_as_initialized(void)
{
int index = smpi_process_index();
- if ((index != -100) && (index != MPI_UNDEFINED))
- process_data[index]->initialized = 1;
+ if ((index != MPI_UNDEFINED) && (!process_data[index]->state != SMPI_FINALIZED))
+ process_data[index]->state = SMPI_INITIALIZED;
}
void *buff, size_t buff_size)
{
XBT_DEBUG("Copy the data over");
+ if(_xbt_replay_is_active()) return;
memcpy(comm->comm.dst_buff, buff, buff_size);
if (comm->comm.detached) {
// if this is a detached send, the source buffer was duplicated by SMPI
if (MC_is_active())
MC_ignore_heap(process_data[i]->timer, xbt_os_timer_size());
process_data[i]->comm_self = MPI_COMM_NULL;
- process_data[i]->initialized = 0;
+ process_data[i]->state = SMPI_UNINITIALIZED;
process_data[i]->sampling = 0;
}
group = smpi_group_new(process_count);
{
smpi_process_init(&argc, &argv);
user_main_();
- //xbt_die("Should not be in this smpi_simulated_main");
return 0;
}