realized. SMPI doesn't work at the moment, sorry. Check out the previous
version if you want to try it.
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3603
48e7efb5-ca39-0410-a469-
dd3cf9ba447f
int id;
int size;
int barrier;
int id;
int size;
int barrier;
- m_host_t *hosts;
- m_process_t *processes;
+ smx_host_t *hosts;
+ smx_process_t *processes;
};
typedef struct smpi_mpi_communicator_t smpi_mpi_communicator_t;
typedef smpi_mpi_communicator_t *MPI_Comm;
};
typedef struct smpi_mpi_communicator_t smpi_mpi_communicator_t;
typedef smpi_mpi_communicator_t *MPI_Comm;
#define MPI_DOUBLE (&smpi_mpi_double)
struct smpi_waitlist_node_t {
#define MPI_DOUBLE (&smpi_mpi_double)
struct smpi_waitlist_node_t {
struct smpi_waitlist_node_t *next;
};
typedef struct smpi_waitlist_node_t smpi_waitlist_node_t;
struct smpi_waitlist_node_t *next;
};
typedef struct smpi_waitlist_node_t smpi_waitlist_node_t;
int smpi_receiver(int argc, char *argv[]);
// smpi functions
int smpi_receiver(int argc, char *argv[]);
// smpi functions
-int smpi_comm_rank(smpi_mpi_communicator_t *comm, m_host_t host);
+int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host);
void smpi_isend(smpi_mpi_request_t*);
void smpi_irecv(smpi_mpi_request_t*);
void smpi_barrier(smpi_mpi_communicator_t *comm);
void smpi_isend(smpi_mpi_request_t*);
void smpi_irecv(smpi_mpi_request_t*);
void smpi_barrier(smpi_mpi_communicator_t *comm);
#include <stdio.h>
#include <sys/time.h>
#include "msg/msg.h"
#include <stdio.h>
#include <sys/time.h>
#include "msg/msg.h"
+#include "simix/simix.h"
#include "xbt/sysdep.h"
#include "xbt/xbt_portability.h"
#include "smpi.h"
#include "xbt/sysdep.h"
#include "xbt/xbt_portability.h"
#include "smpi.h"
smpi_received_t **smpi_received = NULL;
smpi_received_t **smpi_last_received = NULL;
smpi_received_t **smpi_received = NULL;
smpi_received_t **smpi_last_received = NULL;
-m_process_t *smpi_sender_processes = NULL;
-m_process_t *smpi_receiver_processes = NULL;
+smx_process_t *smpi_sender_processes = NULL;
+smx_process_t *smpi_receiver_processes = NULL;
int smpi_running_hosts = 0;
int smpi_running_hosts = 0;
void smpi_mpi_init() {
int i;
int size, rank;
void smpi_mpi_init() {
int i;
int size, rank;
- m_host_t *hosts;
- m_host_t host;
+ smx_host_t *hosts;
+ smx_host_t host;
double duration;
m_task_t mtask;
double duration;
m_task_t mtask;
smpi_running_hosts++;
// initialize some local variables
smpi_running_hosts++;
// initialize some local variables
- size = MSG_get_host_number();
- host = MSG_host_self();
- hosts = MSG_get_host_table();
+ size = SIMIX_host_get_number();
+ host = SIMIX_host_self();
+ hosts = SIMIX_host_get_table();
for(i = 0; i < size && host != hosts[i]; i++);
rank = i;
for(i = 0; i < size && host != hosts[i]; i++);
rank = i;
smpi_mpi_comm_world.barrier = 0;
smpi_mpi_comm_world.hosts = hosts;
smpi_mpi_comm_world.processes = xbt_malloc(sizeof(m_process_t) * size);
smpi_mpi_comm_world.barrier = 0;
smpi_mpi_comm_world.hosts = hosts;
smpi_mpi_comm_world.processes = xbt_malloc(sizeof(m_process_t) * size);
- smpi_mpi_comm_world.processes[0] = MSG_process_self();
+ smpi_mpi_comm_world.processes[0] = SIMIX_process_self();
// mpi datatypes
smpi_mpi_byte.size = (size_t)1;
// mpi datatypes
smpi_mpi_byte.size = (size_t)1;
mtask = (m_task_t)0;
MSG_task_get(&mtask, MPI_PORT);
MSG_task_destroy(mtask);
mtask = (m_task_t)0;
MSG_task_get(&mtask, MPI_PORT);
MSG_task_destroy(mtask);
- smpi_mpi_comm_world.processes[rank] = MSG_process_self();
+ smpi_mpi_comm_world.processes[rank] = SIMIX_process_self();
}
// now that mpi_comm_world_processes is set, it's safe to set a barrier
}
// now that mpi_comm_world_processes is set, it's safe to set a barrier
smpi_running_hosts--;
if (0 <= smpi_running_hosts) {
for(i = 0; i < smpi_mpi_comm_world.size; i++) {
smpi_running_hosts--;
if (0 <= smpi_running_hosts) {
for(i = 0; i < smpi_mpi_comm_world.size; i++) {
- if(MSG_process_is_suspended(smpi_sender_processes[i])) {
- MSG_process_resume(smpi_sender_processes[i]);
+ if(SIMIX_process_is_suspended(smpi_sender_processes[i])) {
+ SIMIX_process_resume(smpi_sender_processes[i]);
- if(MSG_process_is_suspended(smpi_receiver_processes[i])) {
- MSG_process_resume(smpi_receiver_processes[i]);
+ if(SIMIX_process_is_suspended(smpi_receiver_processes[i])) {
+ SIMIX_process_resume(smpi_receiver_processes[i]);
request->next = NULL;
current = request->waitlist;
while(NULL != current) {
request->next = NULL;
current = request->waitlist;
while(NULL != current) {
- if(MSG_process_is_suspended(current->process)) {
- MSG_process_resume(current->process);
+ if(SIMIX_process_is_suspended(current->process)) {
+ SIMIX_process_resume(current->process);
}
next = current->next;
xbt_free(current);
}
next = current->next;
xbt_free(current);
}
int smpi_host_rank_self() {
}
int smpi_host_rank_self() {
- return smpi_comm_rank(&smpi_mpi_comm_world, MSG_host_self());
+ return smpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self());
}
void smpi_isend(smpi_mpi_request_t *sendreq) {
}
void smpi_isend(smpi_mpi_request_t *sendreq) {
smpi_last_pending_send_requests[rank]->next = sendreq;
}
smpi_last_pending_send_requests[rank] = sendreq;
smpi_last_pending_send_requests[rank]->next = sendreq;
}
smpi_last_pending_send_requests[rank] = sendreq;
- if (MSG_process_is_suspended(smpi_sender_processes[rank])) {
- MSG_process_resume(smpi_sender_processes[rank]);
+ if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) {
+ SIMIX_process_resume(smpi_sender_processes[rank]);
int i;
comm->barrier++;
if(comm->barrier < comm->size) {
int i;
comm->barrier++;
if(comm->barrier < comm->size) {
- MSG_process_suspend(MSG_process_self());
+ SIMIX_process_suspend(SIMIX_process_self());
} else {
comm->barrier = 0;
for(i = 0; i < comm->size; i++) {
} else {
comm->barrier = 0;
for(i = 0; i < comm->size; i++) {
- if (MSG_process_is_suspended(comm->processes[i])) {
- MSG_process_resume(comm->processes[i]);
+ if (SIMIX_process_is_suspended(comm->processes[i])) {
+ SIMIX_process_resume(comm->processes[i]);
-int smpi_comm_rank(smpi_mpi_communicator_t *comm, m_host_t host) {
+int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host) {
int i;
for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
if (i >= comm->size) i = -1;
int i;
for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
if (i >= comm->size) i = -1;
}
smpi_last_pending_recv_requests[rank] = recvreq;
smpi_match_requests(rank);
}
smpi_last_pending_recv_requests[rank] = recvreq;
smpi_match_requests(rank);
- if (MSG_process_is_suspended(smpi_receiver_processes[rank])) {
- MSG_process_resume(smpi_receiver_processes[rank]);
+ if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) {
+ SIMIX_process_resume(smpi_receiver_processes[rank]);
if (NULL != request) {
if (!request->completed) {
waitnode = xbt_malloc(sizeof(smpi_waitlist_node_t));
if (NULL != request) {
if (!request->completed) {
waitnode = xbt_malloc(sizeof(smpi_waitlist_node_t));
- waitnode->process = MSG_process_self();
+ waitnode->process = SIMIX_process_self();
waitnode->next = NULL;
if (NULL == request->waitlist) {
request->waitlist = waitnode;
waitnode->next = NULL;
if (NULL == request->waitlist) {
request->waitlist = waitnode;
for(current = request->waitlist; NULL != current->next; current = current->next);
current->next = waitnode;
}
for(current = request->waitlist; NULL != current->next; current = current->next);
current->next = waitnode;
}
- MSG_process_suspend(waitnode->process);
+ SIMIX_process_suspend(waitnode->process);
}
if (NULL != status && MPI_STATUS_IGNORE != status) {
status->MPI_SOURCE = request->src;
}
if (NULL != status && MPI_STATUS_IGNORE != status) {
status->MPI_SOURCE = request->src;
}
int smpi_sender(int argc, char *argv[]) {
}
int smpi_sender(int argc, char *argv[]) {
char taskname[50];
size_t dsize;
void *data;
char taskname[50];
size_t dsize;
void *data;
m_task_t mtask;
int rank, fc, ft;
smpi_mpi_request_t *sendreq;
m_task_t mtask;
int rank, fc, ft;
smpi_mpi_request_t *sendreq;
- process = MSG_process_self();
+ process = SIMIX_process_self();
// wait for init
mtask = (m_task_t)0;
// wait for init
mtask = (m_task_t)0;
// send task
#ifdef DEBUG
// send task
#ifdef DEBUG
- printf("host %s attempting to send to host %s\n", MSG_host_get_name(MSG_host_self()), MSG_host_get_name(dhost));
+ printf("host %s attempting to send to host %s\n", SIMIX_host_get_name(SIMIX_host_self()), SIMIX_host_get_name(dhost));
#endif
MSG_task_put(mtask, dhost, MPI_PORT);
#endif
MSG_task_put(mtask, dhost, MPI_PORT);
- MSG_process_suspend(process);
+ SIMIX_process_suspend(process);
}
}
return 0;
}
int smpi_receiver(int argc, char **argv) {
}
}
return 0;
}
int smpi_receiver(int argc, char **argv) {
m_task_t mtask;
smpi_received_t *received;
int rank;
smpi_mpi_request_t *recvreq;
m_task_t mtask;
smpi_received_t *received;
int rank;
smpi_mpi_request_t *recvreq;
- process = MSG_process_self();
+ process = SIMIX_process_self();
// wait for init
mtask = (m_task_t)0;
// wait for init
mtask = (m_task_t)0;
#ifdef DEBUG
printf("host %s waiting to receive from anyone, but first in queue is (%d,%d,%d).\n",
#ifdef DEBUG
printf("host %s waiting to receive from anyone, but first in queue is (%d,%d,%d).\n",
- MSG_host_get_name(MSG_host_self()), recvreq->src, recvreq->dst, recvreq->tag);
+ SIMIX_host_get_name(SIMIX_host_self()), recvreq->src, recvreq->dst, recvreq->tag);
#endif
MSG_task_get(&mtask, MPI_PORT);
#endif
MSG_task_get(&mtask, MPI_PORT);
smpi_match_requests(rank);
} else {
smpi_match_requests(rank);
} else {
- MSG_process_suspend(process);
+ SIMIX_process_suspend(process);
if (NULL == tv) {
retval = -1;
} else {
if (NULL == tv) {
retval = -1;
} else {
+ now = SIMIX_get_clock();
tv->tv_sec = now;
tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
}
tv->tv_sec = now;
tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
}
void smpi_exit(int status) {
smpi_bench_end();
smpi_running_hosts--;
void smpi_exit(int status) {
smpi_bench_end();
smpi_running_hosts--;
- MSG_process_kill(MSG_process_self());
+ SIMIX_process_kill(SIMIX_process_self());