+#include "xbt/virtu.h"
+#include "mc/mc.h"
+#include "xbt/replay.h"
+#include <errno.h>
+#include "simix/smx_private.h"
+#include "surf/surf.h"
+#include "simgrid/sg_config.h"
+
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
+
+
+static int match_recv(void* a, void* b, smx_action_t ignored) {
+ MPI_Request ref = (MPI_Request)a;
+ MPI_Request req = (MPI_Request)b;
+ XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
+
+ xbt_assert(ref, "Cannot match recv against null reference");
+ xbt_assert(req, "Cannot match recv against null request");
+ if((ref->src == MPI_ANY_SOURCE || req->src == ref->src)
+ && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag)){
+ //we match, we can transfer some values
+ // FIXME : move this to the copy function ?
+ if(ref->src == MPI_ANY_SOURCE)ref->real_src = req->src;
+ if(ref->tag == MPI_ANY_TAG)ref->real_tag = req->tag;
+ if(ref->real_size < req->real_size) ref->truncated = 1;
+ if(req->detached==1){
+ ref->detached_sender=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
+ }
+ return 1;
+ }else return 0;
+}
+
+static int match_send(void* a, void* b,smx_action_t ignored) {
+ MPI_Request ref = (MPI_Request)a;
+ MPI_Request req = (MPI_Request)b;
+ XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
+ xbt_assert(ref, "Cannot match send against null reference");
+ xbt_assert(req, "Cannot match send against null request");
+
+ if((req->src == MPI_ANY_SOURCE || req->src == ref->src)
+ && (req->tag == MPI_ANY_TAG || req->tag == ref->tag))
+ {
+ if(req->src == MPI_ANY_SOURCE)req->real_src = ref->src;
+ if(req->tag == MPI_ANY_TAG)req->real_tag = ref->tag;
+ if(req->real_size < ref->real_size) req->truncated = 1;
+ if(ref->detached==1){
+ req->detached_sender=ref; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
+ }
+
+ return 1;
+ } else return 0;
+}
+
+
+typedef struct s_smpi_factor *smpi_factor_t;
+typedef struct s_smpi_factor {
+ long factor;
+ int nb_values;
+ double values[4];//arbitrary set to 4
+} s_smpi_factor_t;
+xbt_dynar_t smpi_os_values = NULL;
+xbt_dynar_t smpi_or_values = NULL;
+
+
+// Methods used to parse and store the values for timing injections in smpi
+// These are taken from surf/network.c and generalized to have more factors
+// These methods should be merged with those in surf/network.c (moved somewhere in xbt ?)
+
+static int factor_cmp(const void *pa, const void *pb)
+{
+ return (((s_smpi_factor_t*)pa)->factor > ((s_smpi_factor_t*)pb)->factor);
+}
+
+
+static xbt_dynar_t parse_factor(const char *smpi_coef_string)
+{
+ char *value = NULL;
+ unsigned int iter = 0;
+ s_smpi_factor_t fact;
+ int i=0;
+ xbt_dynar_t smpi_factor, radical_elements, radical_elements2 = NULL;
+
+ smpi_factor = xbt_dynar_new(sizeof(s_smpi_factor_t), NULL);
+ radical_elements = xbt_str_split(smpi_coef_string, ";");
+ xbt_dynar_foreach(radical_elements, iter, value) {
+ fact.nb_values=0;
+ radical_elements2 = xbt_str_split(value, ":");
+ if (xbt_dynar_length(radical_elements2) <2 || xbt_dynar_length(radical_elements2) > 5)
+ xbt_die("Malformed radical for smpi factor!");
+ for(i =0; i<xbt_dynar_length(radical_elements2);i++ ){
+ if (i==0){
+ fact.factor = atol(xbt_dynar_get_as(radical_elements2, i, char *));
+ }else{
+ fact.values[fact.nb_values] = atof(xbt_dynar_get_as(radical_elements2, i, char *));
+ fact.nb_values++;
+ }
+ }
+
+ xbt_dynar_push_as(smpi_factor, s_smpi_factor_t, fact);
+ XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
+ xbt_dynar_free(&radical_elements2);
+ }
+ xbt_dynar_free(&radical_elements);
+ iter=0;
+ xbt_dynar_sort(smpi_factor, &factor_cmp);
+ xbt_dynar_foreach(smpi_factor, iter, fact) {
+ XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
+ }
+ return smpi_factor;
+}
+
+static double smpi_os(double size)
+{
+ if (!smpi_os_values)
+ smpi_os_values =
+ parse_factor(sg_cfg_get_string("smpi/os"));
+
+ unsigned int iter = 0;
+ s_smpi_factor_t fact;
+ double current=0.0;
+ xbt_dynar_foreach(smpi_os_values, iter, fact) {
+ if (size <= fact.factor) {
+ XBT_DEBUG("os : %lf <= %ld return %f", size, fact.factor, current);
+ return current;
+ }else{
+ current=fact.values[0]+fact.values[1]*size;
+ }
+ }
+ XBT_DEBUG("os : %lf > %ld return %f", size, fact.factor, current);
+
+ return current;
+}
+
+static double smpi_or(double size)
+{
+ if (!smpi_or_values)
+ smpi_or_values =
+ parse_factor(sg_cfg_get_string("smpi/or"));
+
+ unsigned int iter = 0;
+ s_smpi_factor_t fact;
+ double current=0.0;
+ xbt_dynar_foreach(smpi_or_values, iter, fact) {
+ if (size <= fact.factor) {
+ XBT_DEBUG("or : %lf <= %ld return %f", size, fact.factor, current);
+ return current;
+ }else
+ current=fact.values[0]+fact.values[1]*size;
+ }
+ XBT_DEBUG("or : %lf > %ld return %f", size, fact.factor, current);
+
+ return current;
+}
+
+static MPI_Request build_request(void *buf, int count,
+ MPI_Datatype datatype, int src, int dst,
+ int tag, MPI_Comm comm, unsigned flags)
+{
+ MPI_Request request;
+
+ void *old_buf = NULL;
+
+ request = xbt_new(s_smpi_mpi_request_t, 1);
+
+ s_smpi_subtype_t *subtype = datatype->substruct;
+
+ if(datatype->has_subtype == 1){
+ // This part handles the problem of non-contiguous memory
+ old_buf = buf;
+ buf = xbt_malloc(count*smpi_datatype_size(datatype));
+ if (flags & SEND) {
+ subtype->serialize(old_buf, buf, count, datatype->substruct);
+ }
+ }
+
+ request->buf = buf;
+ // This part handles the problem of non-contiguous memory (for the
+ // unserialisation at the reception)
+ request->old_buf = old_buf;
+ request->old_type = datatype;
+
+ request->size = smpi_datatype_size(datatype) * count;
+ request->src = src;
+ request->dst = dst;
+ request->tag = tag;
+ request->comm = comm;
+ request->action = NULL;
+ request->flags = flags;
+ request->detached = 0;
+ request->detached_sender = NULL;
+
+ request->truncated = 0;
+ request->real_size = 0;
+ request->real_tag = 0;
+
+ request->refcount=1;
+#ifdef HAVE_TRACING
+ request->send = 0;
+ request->recv = 0;
+#endif
+ if (flags & SEND) smpi_datatype_unuse(datatype);
+
+ return request;
+}
+
+
+void smpi_empty_status(MPI_Status * status) {
+ if(status != MPI_STATUS_IGNORE) {
+ status->MPI_SOURCE=MPI_ANY_SOURCE;
+ status->MPI_TAG=MPI_ANY_TAG;
+ status->count=0;
+ }
+}
+
+void smpi_action_trace_run(char *path)
+{
+ char *name;
+ xbt_dynar_t todo;
+ xbt_dict_cursor_t cursor;
+
+ action_fp=NULL;
+ if (path) {
+ action_fp = fopen(path, "r");
+ xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
+ strerror(errno));
+ }
+
+ if (!xbt_dict_is_empty(action_queues)) {
+ XBT_WARN
+ ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
+
+
+ xbt_dict_foreach(action_queues, cursor, name, todo) {
+ XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
+ }
+ }
+
+ if (path)
+ fclose(action_fp);
+ xbt_dict_free(&action_queues);
+ action_queues = xbt_dict_new_homogeneous(NULL);
+}
+
+static void smpi_mpi_request_free_voidp(void* request)
+{
+ MPI_Request req = request;
+ smpi_mpi_request_free(&req);
+}
+
+/* MPI Low level calls */
+MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
+ int dst, int tag, MPI_Comm comm)
+{
+ MPI_Request request =
+ build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
+ comm, PERSISTENT | SEND);
+ request->refcount++;
+ return request;
+}
+
+MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
+ int src, int tag, MPI_Comm comm)
+{
+ MPI_Request request =
+ build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
+ comm, PERSISTENT | RECV);
+ request->refcount++;
+ return request;
+}
+
+void smpi_mpi_start(MPI_Request request)
+{
+ smx_rdv_t mailbox;
+
+ xbt_assert(!request->action,
+ "Cannot (re)start a non-finished communication");
+ if(request->flags & RECV) {
+ print_request("New recv", request);
+ if (request->size < sg_cfg_get_int("smpi/async_small_thres"))
+ mailbox = smpi_process_mailbox_small();
+ else
+ mailbox = smpi_process_mailbox();
+ // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
+ request->real_size=request->size;
+ smpi_datatype_use(request->old_type);
+ request->action = simcall_comm_irecv(mailbox, request->buf, &request->real_size, &match_recv, request);
+
+ double sleeptime = smpi_or(request->size);
+ if(sleeptime!=0.0){
+ simcall_process_sleep(sleeptime);
+ XBT_DEBUG("receiving size of %zu : sleep %lf ", request->size, smpi_or(request->size));
+ }
+
+ } else {
+
+ int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
+/* if(receiver == MPI_UNDEFINED) {*/
+/* XBT_WARN("Trying to send a message to a wrong rank");*/
+/* return;*/
+/* }*/
+ print_request("New send", request);
+ if (request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode
+ mailbox = smpi_process_remote_mailbox_small(receiver);
+ }else{
+ XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
+ mailbox = smpi_process_remote_mailbox(receiver);
+ }
+ if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
+ void *oldbuf = NULL;
+ request->detached = 1;
+ request->refcount++;
+ if(request->old_type->has_subtype == 0){
+ oldbuf = request->buf;
+ if (oldbuf){
+ request->buf = xbt_malloc(request->size);
+ memcpy(request->buf,oldbuf,request->size);
+ }
+ }
+ XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
+ }
+ // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
+ request->real_size=request->size;
+ smpi_datatype_use(request->old_type);
+ double sleeptime = smpi_os(request->size);
+ if(sleeptime!=0.0){
+ simcall_process_sleep(sleeptime);
+ XBT_DEBUG("sending size of %zu : sleep %lf ", request->size, smpi_os(request->size));
+ }
+ request->action =
+ simcall_comm_isend(mailbox, request->size, -1.0,
+ request->buf, request->real_size,
+ &match_send,
+ &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
+ request,
+ // detach if msg size < eager/rdv switch limit
+ request->detached);
+
+#ifdef HAVE_TRACING
+ /* FIXME: detached sends are not traceable (request->action == NULL) */
+ if (request->action)
+ simcall_set_category(request->action, TRACE_internal_smpi_get_category());
+#endif
+
+ }
+
+}
+
+void smpi_mpi_startall(int count, MPI_Request * requests)
+{
+ int i;
+
+ for(i = 0; i < count; i++) {
+ smpi_mpi_start(requests[i]);
+ }
+}
+
+void smpi_mpi_request_free(MPI_Request * request)
+{
+
+ if((*request) != MPI_REQUEST_NULL){
+ (*request)->refcount--;
+ if((*request)->refcount<0) xbt_die("wrong refcount");