-/* Copyright (c) 2006-2014. The SimGrid Team.
+/* Copyright (c) 2006-2014, 2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
import org.simgrid.msg.NativeException;
public class AsyncTest {
-
- /* This only contains the launcher. If you do nothing more than than you can run
- * java simgrid.msg.Msg
- * which also contains such a launcher
- */
-
- public static void main(String[] args) throws NativeException {
-
- /* initialize the MSG simulation. Must be done before anything else (even logging). */
- Msg.init(args);
-
- if (args.length < 2) {
- Msg.info("Usage : Async platform_file deployment_file");
- Msg.info("example : Async basic_platform.xml basic_deployment.xml");
- System.exit(1);
- }
-
- /* construct the platform and deploy the application */
- Msg.createEnvironment(args[0]);
- Msg.deployApplication(args[1]);
-
- /* execute the simulation. */
- Msg.run();
- }
+ public static void main(String[] args) throws NativeException {
+ Msg.init(args);
+
+ if (args.length < 2) {
+ Msg.info("Usage : AsyncTest platform_file deployment_file");
+ Msg.info("example : AsyncTest ../platforms/platform.xml asyncDeployment.xml");
+ System.exit(1);
+ }
+
+ /* construct the platform and deploy the application */
+ Msg.createEnvironment(args[0]);
+ Msg.deployApplication(args[1]);
+
+ /* execute the simulation. */
+ Msg.run();
+ }
}
${examples_src}
${sources}
PARENT_SCOPE)
-set(bin_files
- ${bin_files}
- PARENT_SCOPE)
set(txt_files
${txt_files}
${CMAKE_CURRENT_SOURCE_DIR}/README
-/* Copyright (c) 2006-2014. The SimGrid Team.
+/* Copyright (c) 2006-2014, 2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
package async;
import org.simgrid.msg.Task;
-public class FinalizeTask extends Task {
- public FinalizeTask() {
- super("finalize",0,0);
- }
+public class FinalizeTask extends Task {
+ public FinalizeTask() {
+ super("finalize",0,0);
+ }
}
-
\ No newline at end of file
-/* Copyright (c) 2006-2014. The SimGrid Team.
- * All rights reserved. */
+/* Copyright (c) 2006-2014, 2016. The SimGrid Team.
+ * All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
package async;
-import org.simgrid.msg.Host;
import org.simgrid.msg.Msg;
-import org.simgrid.msg.MsgException;
+import org.simgrid.msg.Host;
import org.simgrid.msg.Task;
import org.simgrid.msg.Process;
-
+import org.simgrid.msg.MsgException;
public class Forwarder extends Process {
- public Forwarder(Host host, String name, String[]args) {
- super(host,name,args);
- }
- public void main(String[] args) throws MsgException {
- if (args.length < 3) {
- Msg.info("Forwarder needs 3 arguments (input mailbox, first output mailbox, last one)");
- Msg.info("Got "+args.length+" instead");
- System.exit(1);
- }
- int input = Integer.valueOf(args[0]).intValue();
- int firstOutput = Integer.valueOf(args[1]).intValue();
- int lastOutput = Integer.valueOf(args[2]).intValue();
-
- int taskCount = 0;
- int slavesCount = lastOutput - firstOutput + 1;
- Msg.info("Receiving on 'slave_"+input+"'");
- while(true) {
- Task task = Task.receive("slave_"+input);
-
- if (task instanceof FinalizeTask) {
- Msg.info("Got a finalize task. Let's forward (asynchronously) that we're done, and then sleep 20 seconds so that nobody gets a message from a terminated process.");
-
- for (int cpt = firstOutput; cpt<=lastOutput; cpt++) {
- Task tf = new FinalizeTask();
- tf.dsend("slave_"+cpt);
- }
- waitFor(20);
- break;
- }
- int dest = firstOutput + (taskCount % slavesCount);
-
- Msg.info("Sending \"" + task.getName() + "\" to \"slave_" + dest + "\"");
- task.send("slave_"+dest);
-
- taskCount++;
- }
-
-
- Msg.info("I'm done. See you!");
- }
+ public Forwarder(Host host, String name, String[]args) {
+ super(host,name,args);
+ }
+
+ public void main(String[] args) throws MsgException {
+ if (args.length < 3) {
+ Msg.info("Forwarder needs 3 arguments (input mailbox, first output mailbox, last one)");
+ Msg.info("Got "+args.length+" instead");
+ System.exit(1);
+ }
+ int input = Integer.valueOf(args[0]).intValue();
+ int firstOutput = Integer.valueOf(args[1]).intValue();
+ int lastOutput = Integer.valueOf(args[2]).intValue();
+
+ int taskCount = 0;
+ int slavesCount = lastOutput - firstOutput + 1;
+ Msg.info("Receiving on 'slave_"+input+"'");
+ while(true) {
+ Task task = Task.receive("slave_"+input);
+
+ if (task instanceof FinalizeTask) {
+ Msg.info("Got a finalize task. Let's forward (asynchronously) that we're done, and then sleep 20 seconds"+
+ " so that nobody gets a message from a terminated process.");
+
+ for (int cpt = firstOutput; cpt<=lastOutput; cpt++) {
+ Task tf = new FinalizeTask();
+ tf.dsend("slave_"+cpt);
+ }
+ waitFor(20);
+ break;
+ }
+ int dest = firstOutput + (taskCount % slavesCount);
+
+ Msg.info("Sending \"" + task.getName() + "\" to \"slave_" + dest + "\"");
+ task.send("slave_"+dest);
+
+ taskCount++;
+ }
+
+ Msg.info("I'm done. See you!");
+ }
}
-/* Master of a basic master/slave example in Java */
-
-/* Copyright (c) 2006-2014. The SimGrid Team.
+/* Copyright (c) 2006-2014, 2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
package async;
import java.util.ArrayList;
+import org.simgrid.msg.Msg;
import org.simgrid.msg.Comm;
import org.simgrid.msg.Host;
-import org.simgrid.msg.Msg;
-import org.simgrid.msg.MsgException;
-import org.simgrid.msg.Process;
import org.simgrid.msg.Task;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.MsgException;
public class Master extends Process {
- public Master(Host host, String name, String[]args) {
- super(host,name,args);
- }
- public void main(String[] args) throws MsgException {
- if (args.length < 4) {
- Msg.info("Master needs 4 arguments");
- System.exit(1);
- }
+ public Master(Host host, String name, String[]args) {
+ super(host,name,args);
+ }
+
+ public void main(String[] args) throws MsgException {
+ if (args.length < 4) {
+ Msg.info("Master needs 4 arguments");
+ System.exit(1);
+ }
+
+ int tasksCount = Integer.valueOf(args[0]).intValue();
+ double taskComputeSize = Double.valueOf(args[1]).doubleValue();
+ double taskCommunicateSize = Double.valueOf(args[2]).doubleValue();
+
+ int slavesCount = Integer.valueOf(args[3]).intValue();
- int tasksCount = Integer.valueOf(args[0]).intValue();
- double taskComputeSize = Double.valueOf(args[1]).doubleValue();
- double taskCommunicateSize = Double.valueOf(args[2]).doubleValue();
+ Msg.info("Hello! Got "+ slavesCount + " slaves and "+tasksCount+" tasks to process");
+ ArrayList<Comm> comms = new ArrayList<Comm>();
- int slavesCount = Integer.valueOf(args[3]).intValue();
+ for (int i = 0; i < tasksCount; i++) {
+ Task task = new Task("Task_" + i, taskComputeSize, taskCommunicateSize);
+ Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\"");
+ Comm comm = task.isend("slave_"+(i%slavesCount));
+ comms.add(comm);
+ }
- Msg.info("Hello! Got "+ slavesCount + " slaves and "+tasksCount+" tasks to process");
- ArrayList<Comm> comms = new ArrayList<Comm>();
-
- for (int i = 0; i < tasksCount; i++) {
- Task task = new Task("Task_" + i, taskComputeSize, taskCommunicateSize);
- Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\"");
- //task.send("slave_"+(i%slavesCount));
- Comm comm = task.isend("slave_"+(i%slavesCount));
- comms.add(comm);
- }
-
- while (comms.size() > 0) {
- for (int i = 0; i < comms.size(); i++) {
- try {
- if (comms.get(i).test()) {
- comms.remove(i);
- i--;
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- waitFor(1);
- }
-
- Msg.info("All tasks have been dispatched. Let's tell (asynchronously) everybody the computation is over, and sleep 20s so that nobody gets a message from a terminated process.");
+ while (comms.size() > 0) {
+ for (int i = 0; i < comms.size(); i++) {
+ try {
+ if (comms.get(i).test()) {
+ comms.remove(i);
+ i--;
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ waitFor(1);
+ }
+ Msg.info("All tasks have been dispatched. Let's tell (asynchronously) everybody the computation is over,"+
+ " and sleep 20s so that nobody gets a message from a terminated process.");
- for (int i = 0; i < slavesCount; i++) {
- FinalizeTask task = new FinalizeTask();
- task.dsend("slave_"+(i%slavesCount));
- }
- waitFor(20);
+ for (int i = 0; i < slavesCount; i++) {
+ FinalizeTask task = new FinalizeTask();
+ task.dsend("slave_"+(i%slavesCount));
+ }
+ waitFor(20);
- Msg.info("Goodbye now!");
- }
+ Msg.info("Goodbye now!");
+ }
}
-/* Copyright (c) 2006-2007, 2010, 2013-2014. The SimGrid Team.
+/* Copyright (c) 2006-2007, 2010, 2013-2014, 2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
package async;
+import org.simgrid.msg.Msg;
import org.simgrid.msg.Comm;
import org.simgrid.msg.Host;
-import org.simgrid.msg.HostFailureException;
-import org.simgrid.msg.Msg;
-import org.simgrid.msg.Process;
import org.simgrid.msg.Task;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.HostFailureException;
import org.simgrid.msg.TaskCancelledException;
import org.simgrid.msg.TimeoutException;
import org.simgrid.msg.TransferFailureException;
public class Slave extends Process {
- public Slave(Host host, String name, String[]args) {
- super(host,name,args);
- }
- public void main(String[] args) throws TransferFailureException, HostFailureException, TimeoutException {
- if (args.length < 1) {
- Msg.info("Slave needs 1 argument (its number)");
- System.exit(1);
- }
+ public Slave(Host host, String name, String[]args) {
+ super(host,name,args);
+ }
+
+ public void main(String[] args) throws TransferFailureException, HostFailureException, TimeoutException {
+ if (args.length < 1) {
+ Msg.info("Slave needs 1 argument (its number)");
+ System.exit(1);
+ }
+ int num = Integer.valueOf(args[0]).intValue();
+ Comm comm = null;
+ boolean slaveFinished = false;
+ while(!slaveFinished) {
+ try {
+ if (comm == null) {
+ Msg.info("Receiving on 'slave_" + num + "'");
+ comm = Task.irecv("slave_" + num);
+ } else {
+ if (comm.test()) {
+ Task task = comm.getTask();
- int num = Integer.valueOf(args[0]).intValue();
- Comm comm = null;
- boolean slaveFinished = false;
- while(!slaveFinished) {
- try
- {
- if (comm == null) {
- Msg.info("Receiving on 'slave_" + num + "'");
- comm = Task.irecv("slave_" + num);
- }
- else {
- if (comm.test()) {
- Task task = comm.getTask();
-
- if (task instanceof FinalizeTask) {
- comm = null;
- break;
- }
- Msg.info("Received a task");
- Msg.info("Received \"" + task.getName() + "\". Processing it.");
- try {
- task.execute();
- } catch (TaskCancelledException e) {
-
- }
- comm = null;
- }
- else {
- waitFor(1);
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- Msg.info("Received Finalize. I'm done. See you!");
- waitFor(20);
- }
+ if (task instanceof FinalizeTask) {
+ comm = null;
+ break;
+ }
+ Msg.info("Received a task");
+ Msg.info("Received \"" + task.getName() + "\". Processing it.");
+ try {
+ task.execute();
+ } catch (TaskCancelledException e) {
+
+ }
+ comm = null;
+ } else {
+ waitFor(1);
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ Msg.info("Received Finalize. I'm done. See you!");
+ waitFor(20);
+ }
}
\ No newline at end of file
<argument value="10"/> <!-- Communication size of each one -->
<argument value="7"/> <!-- Amount of slaves waiting for orders -->
</process>
-
+
<process host="Jackson" function="async.Forwarder">
<argument value="0"/> <!-- Input mailbox -->
<argument value="7"/> <!-- First output mailbox -->
<argument value="9"/> <!-- First output mailbox -->
<argument value="10"/> <!-- Last output mailbox -->
</process>
-
+
<process host="iRMX" function="async.Slave">
<argument value="2"/> <!-- Input mailbox -->
</process>
-/* Copyright (c) 2012-2014. The SimGrid Team.
+/* Copyright (c) 2012-2014,2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
import org.simgrid.msg.MsgException;
public class Bittorrent {
- public static void main(String[] args) throws MsgException {
- /* initialize the MSG simulation. Must be done before anything else (even logging). */
- Msg.init(args);
- if(args.length < 2) {
- Msg.info("Usage : Bittorrent platform_file deployment_file");
- Msg.info("example : Bittorrent platform.xml deployment.xml");
- System.exit(1);
- }
-
- /* construct the platform and deploy the application */
- Msg.createEnvironment(args[0]);
- Msg.deployApplication(args[1]);
-
- /* execute the simulation. */
- Msg.run();
- }
+ public static void main(String[] args) throws MsgException {
+ Msg.init(args);
+ if(args.length < 2) {
+ Msg.info("Usage : Bittorrent platform_file deployment_file");
+ Msg.info("example : Bittorrent ../platforms/platform.xml bittorrent.xml");
+ System.exit(1);
+ }
+ /* construct the platform and deploy the application */
+ Msg.createEnvironment(args[0]);
+ Msg.deployApplication(args[1]);
+
+ /* execute the simulation. */
+ Msg.run();
+ }
}
${examples_src}
${sources}
PARENT_SCOPE)
-set(bin_files
- ${bin_files}
- PARENT_SCOPE)
set(txt_files
${txt_files}
${CMAKE_CURRENT_SOURCE_DIR}/generate.py
-/* Copyright (c) 2006-2014. The SimGrid Team.
+/* Copyright (c) 2006-2014, 2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
package bittorrent;
-/**
- * Common constants for use in the simulation
- */
-public class Common {
-
- public static String TRACKER_MAILBOX = "tracker_mailbox";
-
- public static int FILE_SIZE = 5120;
- public static int FILE_PIECE_SIZE = 512;
- public static int FILE_PIECES = 10;
- public static int PIECES_BLOCKS = 5;
-
- public static int BLOCKS_REQUESTED = 2;
-
- public static int PIECE_COMM_SIZE = 1;
- /**
- * Information message size
- */
- public static int MESSAGE_SIZE = 1;
- /**
- * Max number of pairs sent by the tracker to clients
- */
- public static int MAXIMUM_PEERS = 50;
- /**
- * Interval of time where the peer should send a request to the tracker
- */
- public static int TRACKER_QUERY_INTERVAL = 1000;
- /**
- * Communication size for a task to the tracker
- */
- public static double TRACKER_COMM_SIZE = 0.01;
- /**
- * Timeout for the get peers data
- */
- public static int GET_PEERS_TIMEOUT = 10000;
- /**
- * Timeout for "standard" messages.
- */
- public static int TIMEOUT_MESSAGE = 10;
- /**
- * Timeout for tracker receive.
- */
- public static int TRACKER_RECEIVE_TIMEOUT = 10;
- /**
- * Number of peers that can be unchocked at a given time
- */
- public static int MAX_UNCHOKED_PEERS = 4;
- /**
- * Interval between each update of the choked peers
- */
- public static int UPDATE_CHOKED_INTERVAL = 30;
- /**
- * Number of pieces the peer asks for simultaneously
- */
- public static int MAX_PIECES = 1;
+
+/* Common constants for use in the simulation */
+public class Common {
+ public static String TRACKER_MAILBOX = "tracker_mailbox";
+ public static int FILE_SIZE = 5120;
+ public static int FILE_PIECE_SIZE = 512;
+ public static int FILE_PIECES = 10;
+ public static int PIECES_BLOCKS = 5;
+ public static int BLOCKS_REQUESTED = 2;
+ public static int PIECE_COMM_SIZE = 1;
+ /* Information message size */
+ public static int MESSAGE_SIZE = 1;
+ /* Max number of peers sent by the tracker to clients */
+ public static int MAXIMUM_PEERS = 50;
+ /* Interval of time where the peer should send a request to the tracker */
+ public static int TRACKER_QUERY_INTERVAL = 1000;
+ /* Communication size for a task to the tracker */
+ public static double TRACKER_COMM_SIZE = 0.01;
+ /* Timeout for the get peers data */
+ public static int GET_PEERS_TIMEOUT = 10000;
+ public static int TIMEOUT_MESSAGE = 10;
+ public static int TRACKER_RECEIVE_TIMEOUT = 10;
+ /* Number of peers that can be unchocked at a given time */
+ public static int MAX_UNCHOKED_PEERS = 4;
+ /* Interval between each update of the choked peers */
+ public static int UPDATE_CHOKED_INTERVAL = 30;
+ /* Number of pieces the peer asks for simultaneously */
+ public static int MAX_PIECES = 1;
}
-/* Copyright (c) 2006-2014. The SimGrid Team.
+/* Copyright (c) 2006-2014, 2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
package bittorrent;
-
import java.util.Arrays;
+
public class Connection {
- /**
- * Remote peer id
- */
- public int id;
- /**
- * Remote peer bitfield.
- */
- public char bitfield[];
- /**
- * Remote peer mailbox
- */
- public String mailbox;
- /**
- * Indicates if we are interested in something this peer has
- */
- public boolean amInterested = false;
- /**
- * Indicates if the peer is interested in one of our pieces
- */
- public boolean interested = false;
- /**
- * Indicates if the peer is choked for the current peer
- */
- public boolean chokedUpload = true;
- /**
- * Indicates if the peer has choked the current peer
- */
- public boolean chokedDownload = true;
- /**
- * Number of messages we have received from the peer
- */
- public int messagesCount = 0;
- /**
- * Peer speed.
- */
- public double peerSpeed = 0;
- /**
- * Last time the peer was unchoked
- */
- public double lastUnchoke = 0;
- /**
- * Constructor
- */
- public Connection(int id) {
- this.id = id;
- this.mailbox = Integer.toString(id);
- }
- /**
- * Add a new value to the peer speed average
- */
- public void addSpeedValue(double speed) {
- peerSpeed = peerSpeed * 0.55 + speed * 0.45;
- // peerSpeed = (peerSpeed * messagesCount + speed) / (++messagesCount);
- }
-
- @Override
- public String toString() {
- return "Connection [id=" + id + ", bitfield="
- + Arrays.toString(bitfield) + ", mailbox=" + mailbox
- + ", amInterested=" + amInterested + ", interested="
- + interested + ", chokedUpload=" + chokedUpload
- + ", chokedDownload=" + chokedDownload + "]";
- }
-
-
+ public int id;
+ public char bitfield[];
+ public String mailbox;
+ // Indicates if we are interested in something this peer has
+ public boolean amInterested = false;
+ // Indicates if the peer is interested in one of our pieces
+ public boolean interested = false;
+ // Indicates if the peer is choked for the current peer
+ public boolean chokedUpload = true;
+ // Indicates if the peer has choked the current peer
+ public boolean chokedDownload = true;
+ // Number of messages we have received from the peer
+ public int messagesCount = 0;
+ public double peerSpeed = 0;
+ public double lastUnchoke = 0;
+
+ public Connection(int id) {
+ this.id = id;
+ this.mailbox = Integer.toString(id);
+ }
+
+ // Add a new value to the peer speed average
+ public void addSpeedValue(double speed) {
+ peerSpeed = peerSpeed * 0.55 + speed * 0.45;
+ // peerSpeed = (peerSpeed * messagesCount + speed) / (++messagesCount);
+ }
+
+ @Override
+ public String toString() {
+ return "Connection [id=" + id + ", bitfield=" + Arrays.toString(bitfield) + ", mailbox=" + mailbox
+ + ", amInterested=" + amInterested + ", interested=" + interested + ", chokedUpload=" + chokedUpload
+ + ", chokedDownload=" + chokedDownload + "]";
+ }
}
-
\ No newline at end of file
-/* Copyright (c) 2006-2014. The SimGrid Team.
+/* Copyright (c) 2006-2014, 2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
package bittorrent;
-
import org.simgrid.msg.Task;
-/**
- * Tasks sent between peers
- */
+
public class MessageTask extends Task {
- public enum Type {
- HANDSHAKE,
- CHOKE,
- UNCHOKE,
- INTERESTED,
- NOTINTERESTED,
- HAVE,
- BITFIELD,
- REQUEST,
- PIECE
- };
- public Type type;
- public String issuerHostname;
- public String mailbox;
- public int peerId;
- public char bitfield[];
- public int index;
- public int blockIndex;
- public int blockLength;
- public boolean stalled;
- /**
- * Constructor, builds a value-less message
- * @param type
- * @param issuerHostname
- * @param mailbox
- * @param peerId
- */
- public MessageTask(Type type, String issuerHostname, String mailbox, int peerId) {
- this(type,issuerHostname,mailbox,peerId,-1,false,-1,-1);
- }
- /**
- * Constructor, builds a new "have/request/piece" message
- * @param type
- * @param issuerHostname
- * @param mailbox
- * @param peerId
- * @param index
- */
- public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index) {
- this(type,issuerHostname,mailbox,peerId,index,false,-1,-1);
- }
- /**
- * Constructor, builds a new bitfield message
- */
- public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, char bitfield[]) {
- this(type,issuerHostname,mailbox,peerId,-1,false,-1,-1);
- this.bitfield = bitfield;
- }
- /**
- * Constructor, build a new "request" message
- */
- public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index, int blockIndex, int blockLength) {
- this(type,issuerHostname,mailbox,peerId,index,false,blockIndex,blockLength);
- }
- /**
- * Constructor, build a new "piece" message
- */
- public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index, boolean stalled, int blockIndex, int blockLength) {
- this.type = type;
- this.issuerHostname = issuerHostname;
- this.mailbox = mailbox;
- this.peerId = peerId;
- this.index = index;
- this.stalled = stalled;
- this.blockIndex = blockIndex;
- this.blockLength = blockLength;
- }
+ public enum Type {
+ HANDSHAKE,
+ CHOKE,
+ UNCHOKE,
+ INTERESTED,
+ NOTINTERESTED,
+ HAVE,
+ BITFIELD,
+ REQUEST,
+ PIECE
+ };
+
+ public Type type;
+ public String issuerHostname;
+ public String mailbox;
+ public int peerId;
+ public char bitfield[];
+ public int index;
+ public int blockIndex;
+ public int blockLength;
+ public boolean stalled;
+
+ public MessageTask(Type type, String issuerHostname, String mailbox, int peerId) {
+ this(type,issuerHostname,mailbox,peerId,-1,false,-1,-1);
+ }
+
+ public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index) {
+ this(type,issuerHostname,mailbox,peerId,index,false,-1,-1);
+ }
+
+ // builds a new bitfield message
+ public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, char bitfield[]) {
+ this(type,issuerHostname,mailbox,peerId,-1,false,-1,-1);
+ this.bitfield = bitfield;
+ }
+
+ // build a new "request" message
+ public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index, int blockIndex,
+ int blockLength) {
+ this(type,issuerHostname,mailbox,peerId,index,false,blockIndex,blockLength);
+ }
+
+ // build a new "piece" message
+ public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index, boolean stalled,
+ int blockIndex, int blockLength) {
+ this.type = type;
+ this.issuerHostname = issuerHostname;
+ this.mailbox = mailbox;
+ this.peerId = peerId;
+ this.index = index;
+ this.stalled = stalled;
+ this.blockIndex = blockIndex;
+ this.blockLength = blockLength;
+ }
}
-/* Copyright (c) 2006-2014. The SimGrid Team.
+/* Copyright (c) 2006-2014, 2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
import java.util.Iterator;
import java.util.Map.Entry;
+import org.simgrid.msg.Msg;
import org.simgrid.msg.Comm;
import org.simgrid.msg.Host;
-import org.simgrid.msg.Msg;
-import org.simgrid.msg.MsgException;
+import org.simgrid.msg.Task;
import org.simgrid.msg.Process;
import org.simgrid.msg.RngStream;
-import org.simgrid.msg.Task;
+import org.simgrid.msg.MsgException;
-/**
- * Main class for peers execution
- */
public class Peer extends Process {
- protected int round = 0;
-
- protected double beginReceiveTime;
- protected double deadline;
-
- protected static RngStream stream = new RngStream();
-
- protected int id;
- protected String mailbox;
- protected String mailboxTracker;
- protected String hostname;
- protected int pieces = 0;
- protected char[] bitfield = new char[Common.FILE_PIECES];
- protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
-
- protected short[] piecesCount = new short[Common.FILE_PIECES];
-
- protected int piecesRequested = 0;
-
- protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
- protected int currentPiece = -1;
-
- protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();
- protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
-
- protected Comm commReceived = null;
-
- public Peer(Host host, String name, String[]args) {
- super(host,name,args);
- }
-
- @Override
- public void main(String[] args) throws MsgException {
- //Check arguments
- if (args.length != 3 && args.length != 2) {
- Msg.info("Wrong number of arguments");
- }
- if (args.length == 3) {
- init(Integer.valueOf(args[0]),true);
- }
- else {
- init(Integer.valueOf(args[0]),false);
- }
- //Retrieve the deadline
- deadline = Double.valueOf(args[1]);
- if (deadline < 0) {
- Msg.info("Wrong deadline supplied");
- return;
- }
- Msg.info("Hi, I'm joining the network with id " + id);
- //Getting peer data from the tracker
- if (getPeersData()) {
- Msg.debug("Got " + peers.size() + " peers from the tracker");
- Msg.debug("Here is my current status: " + getStatus());
- beginReceiveTime = Msg.getClock();
- if (hasFinished()) {
- pieces = Common.FILE_PIECES;
- sendHandshakeAll();
- seedLoop();
- }
- else {
- leechLoop();
- seedLoop();
- }
- }
- else {
- Msg.info("Couldn't contact the tracker.");
- }
- Msg.info("Here is my current status: " + getStatus());
- }
- /**
- * Peer main loop when it is leeching.
- */
- private void leechLoop() {
- double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
- Msg.debug("Start downloading.");
- /**
- * Send a "handshake" message to all the peers it got
- * (it couldn't have gotten more than 50 peers anyway)
- */
- sendHandshakeAll();
- //Wait for at least one "bitfield" message.
- waitForPieces();
- Msg.debug("Starting main leech loop");
- while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
- if (commReceived == null) {
- commReceived = Task.irecv(mailbox);
- }
- try {
- if (commReceived.test()) {
- handleMessage(commReceived.getTask());
- commReceived = null;
- }
- else {
- //If the user has a pending interesting
- if (currentPiece != -1) {
- sendInterestedToPeers();
- }
- else {
- if (currentPieces.size() < Common.MAX_PIECES) {
- updateCurrentPiece();
- }
- }
- //We don't execute the choke algorithm if we don't already have a piece
- if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
- updateChokedPeers();
- nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
- }
- else {
- waitFor(1);
- }
- }
- }
- catch (MsgException e) {
- commReceived = null;
- }
- }
- }
-
- /**
- * Peer main loop when it is seeding
- */
- private void seedLoop() {
- double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
- Msg.debug("Start seeding.");
- //start the main seed loop
- while (Msg.getClock() < deadline) {
- if (commReceived == null) {
- commReceived = Task.irecv(mailbox);
- }
- try {
- if (commReceived.test()) {
- handleMessage(commReceived.getTask());
- commReceived = null;
- }
- else {
- if (Msg.getClock() >= nextChokedUpdate) {
- updateChokedPeers();
- //TODO: Change the choked peer algorithm when seeding
- nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
- }
- else {
- waitFor(1);
- }
- }
- }
- catch (MsgException e) {
- commReceived = null;
- }
-
- }
- }
-
- /**
- * Initialize the various peer data
- * @param id id of the peer to take in the network
- * @param seed indicates if the peer is a seed
- */
- private void init(int id, boolean seed) {
- this.id = id;
- this.mailbox = Integer.toString(id);
- this.mailboxTracker = "tracker_" + Integer.toString(id);
- if (seed) {
- for (int i = 0; i < bitfield.length; i++) {
- bitfield[i] = '1';
- for (int j = 0; j < bitfieldBlocks[i].length; j++) {
- bitfieldBlocks[i][j] = '1';
- }
- }
- }
- else {
- for (int i = 0; i < bitfield.length; i++) {
- bitfield[i] = '0';
- for (int j = 0; j < bitfieldBlocks[i].length; j++) {
- bitfieldBlocks[i][j] = '0' ;
- }
- }
- }
- this.hostname = getHost().getName();
- }
- /**
- * Retrieves the peer list from the tracker
- */
- private boolean getPeersData() {
-
- boolean success = false, sendSuccess = false;
- double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
- //Build the task to send to the tracker
- TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
-
- while (!sendSuccess && Msg.getClock() < timeout) {
- try {
- Msg.debug("Sending a peer request to the tracker.");
- taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
- sendSuccess = true;
- }
- catch (MsgException e) {
-
- }
- }
- while (!success && Msg.getClock() < timeout) {
- commReceived = Task.irecv(this.mailboxTracker);
- try {
- commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
- if (commReceived.getTask() instanceof TrackerTask) {
- TrackerTask task = (TrackerTask)commReceived.getTask();
- for (Integer peerId: task.peers) {
- if (peerId != this.id) {
- peers.put(peerId, new Connection(peerId));
- }
- }
- success = true;
- }
- }
- catch (MsgException e) {
-
- }
- commReceived = null;
- }
- commReceived = null;
- return success;
- }
- /**
- * Handle a received message sent by another peer
- * @param task task received.
- */
- void handleMessage(Task task) {
- MessageTask message = (MessageTask)task;
- Connection remotePeer = peers.get(message.peerId);
- switch (message.type) {
- case HANDSHAKE:
- Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
- //Check if the peer is in our connection list
- if (remotePeer == null) {
- peers.put(message.peerId, new Connection(message.peerId));
- sendHandshake(message.mailbox);
- }
- //Send our bitfield to the pair
- sendBitfield(message.mailbox);
- break;
- case BITFIELD:
- Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
- //update the pieces list
- updatePiecesCountFromBitfield(message.bitfield);
- //Update the current piece
- if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
- updateCurrentPiece();
- }
- remotePeer.bitfield = message.bitfield.clone();
- break;
- case INTERESTED:
- Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
- assert remotePeer != null;
- remotePeer.interested = true;
- break;
- case NOTINTERESTED:
- Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
- assert remotePeer != null;
- remotePeer.interested = false;
- break;
- case UNCHOKE:
- Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
- assert remotePeer != null;
- remotePeer.chokedDownload = false;
- activePeers.put(remotePeer.id,remotePeer);
- sendRequestsToPeer(remotePeer);
- break;
- case CHOKE:
- Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
- assert remotePeer != null;
- remotePeer.chokedDownload = true;
- activePeers.remove(remotePeer.id);
- break;
- case HAVE:
- if (remotePeer.bitfield == null) {
- return;
- }
- Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
- assert message.index >= 0 && message.index < Common.FILE_PIECES;
- assert remotePeer.bitfield != null;
- remotePeer.bitfield[message.index] = '1';
- piecesCount[message.index]++;
- //Send interested message to the peer if he has what we want
- if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
- remotePeer.amInterested = true;
- sendInterested(remotePeer.mailbox);
- }
-
- if (currentPieces.contains(message.index)) {
- int blockIndex = getFirstBlock(message.index);
- int blockLength = Common.PIECES_BLOCKS - blockIndex ;
- blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
- sendRequest(message.mailbox,message.index,blockIndex,blockLength);
- }
- break;
- case REQUEST:
- assert message.index >= 0 && message.index < Common.FILE_PIECES;
- if (!remotePeer.chokedUpload) {
- Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
- if (bitfield[message.index] == '1') {
- sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);
- }
- else {
- Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
- }
- }
- break;
- case PIECE:
- if (message.stalled) {
- Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ") is stalled");
- }
- else {
- Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
- if (bitfield[message.index] == '0') {
- updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
- if (pieceComplete(message.index)) {
- piecesRequested--;
- //Removing the piece from our piece list.
- if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
- }
- //Setting the fact that we have the piece
- bitfield[message.index] = '1';
- pieces++;
- Msg.debug("My status is now " + getStatus());
- //Sending the information to all the peers we are connected to
- sendHave(message.index);
- //sending UNINTERESTED to peers that doesn't have what we want.
- updateInterestedAfterReceive();
- }
- }
- else {
- Msg.debug("However, we already have it.");
- }
- }
- break;
- }
- if (remotePeer != null) {
- remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
- }
- beginReceiveTime = Msg.getClock();
- }
- /**
- * Wait for the node to receive interesting bitfield messages (ie: non empty)
- * to be received
- */
- void waitForPieces() {
- boolean finished = false;
- while (Msg.getClock() < deadline && !finished) {
- if (commReceived == null) {
- commReceived = Task.irecv(mailbox);
- }
- try {
- commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
- handleMessage(commReceived.getTask());
- if (currentPiece != -1) {
- finished = true;
- }
- commReceived = null;
- }
- catch (MsgException e) {
- commReceived = null;
- }
- }
- }
-
- private boolean hasFinished() {
- for (int i = 0; i < bitfield.length; i++) {
- if (bitfield[i] == '1') {
- return true;
- }
- }
- return false;
- }
- /**
- * Updates the list of who has a piece from a bitfield
- * @param bitfield bitfield
- */
- private void updatePiecesCountFromBitfield(char bitfield[]) {
- for (int i = 0; i < Common.FILE_PIECES; i++) {
- if (bitfield[i] == '1') {
- piecesCount[i]++;
- }
- }
- }
- /**
- * Update the piece the peer is currently interested in.
- * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
- * If the peer has less than 3 pieces, he chooses a piece at random.
- * If the peer has more than pieces, he downloads the pieces that are the less
- * replicated
- */
- void updateCurrentPiece() {
- if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
- return;
- }
- if (true || pieces < 3) {
- int peerPiece;
- do {
- currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
- } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
- }
- else {
- //trivial min algorithm.
- //TODO
- }
- currentPieces.add(currentPiece);
- Msg.debug("New interested piece: " + currentPiece);
- assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
- }
- /**
- * Update the list of current choked and unchoked peers, using the
- * choke algorithm
- */
- private void updateChokedPeers() {
- round = (round + 1) % 3;
- if (peers.size() == 0) {
- return;
- }
- //remove a peer from the list
- Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
- if (it.hasNext()) {
- Entry<Integer,Connection> e = it.next();
- Connection peerChoked = e.getValue();
- peerChoked.chokedUpload = true;
- sendChoked(peerChoked.mailbox);
- activePeers.remove(e.getKey());
- }
- Connection peerChoosed = null;
- //Separate the case from when the peer is seeding.
- if (pieces == Common.FILE_PIECES) {
- //Find the last unchoked peer.
- double unchokeTime = deadline + 1;
- for (Connection connection : peers.values()) {
- if (connection.lastUnchoke < unchokeTime && connection.interested) {
- peerChoosed = connection;
- unchokeTime = connection.lastUnchoke;
- }
- }
- }
- else {
- //Random optimistic unchoking
- if (round == 0) {
- int j = 0, i;
- do {
- i = 0;
- int idChosen = stream.randInt(0,peers.size() - 1);
- for (Connection connection : peers.values()) {
- if (i == idChosen) {
- peerChoosed = connection;
- break;
- }
- i++;
- } //TODO: Not really the best way ever
- if (!peerChoosed.interested) {
- peerChoosed = null;
- }
- j++;
- } while (peerChoosed == null && j < Common.MAXIMUM_PEERS);
- }
- else {
- Connection fastest = null;
- double fastestSpeed = 0;
- for (Connection c : peers.values()) {
- if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
- fastest = c;
- fastestSpeed = c.peerSpeed;
- }
- }
- peerChoosed = fastest;
- }
- }
- if (peerChoosed != null) {
- activePeers.put(peerChoosed.id,peerChoosed);
- peerChoosed.chokedUpload = false;
- peerChoosed.lastUnchoke = Msg.getClock();
- sendUnchoked(peerChoosed.mailbox);
- }
- }
- /**
- * Updates our "interested" state about peers: send "not interested" to peers
- * that don't have any more pieces we want.
- */
- private void updateInterestedAfterReceive() {
- boolean interested;
- for (Connection connection : peers.values()) {
- interested = false;
- if (connection.amInterested) {
- for (Integer piece : currentPieces) {
- if (connection.bitfield[piece] == '1') {
- interested = true;
- break;
- }
- }
- if (!interested) {
- connection.amInterested = false;
- sendNotInterested(connection.mailbox);
- }
- }
- }
- }
- private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
- for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
- bitfieldBlocks[index][i] = '1';
- }
- }
- /**
- * Returns if a piece is complete in the peer's bitfield.
- * @param index the index of the piece.
- */
- private boolean pieceComplete(int index) {
- for (int i = 0; i < bitfieldBlocks[index].length; i++) {
- if (bitfieldBlocks[index][i] == '0') {
- return false;
- }
- }
- return true;
- }
- /**
- * Returns the first block of a piece that we don't have.
- */
- private int getFirstBlock(int piece) {
- int blockIndex = -1;
- for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
- if (bitfieldBlocks[piece][i] == '0') {
- blockIndex = i;
- break;
- }
- }
- return blockIndex;
- }
- /**
- * Send request messages to a peer that have unchoked us
- * @param remotePeer peer data to the peer we want to send the request
- */
- private void sendRequestsToPeer(Connection remotePeer) {
- if (remotePeer.bitfield == null) {
- return;
- }
- for (Integer piece : currentPieces) {
- //Getting the block to send.
- int blockIndex = -1, blockLength = 0;
- blockIndex = getFirstBlock(piece);
- blockLength = Common.PIECES_BLOCKS - blockIndex ;
- blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
- if (remotePeer.bitfield[piece] == '1') {
- sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
- }
- }
- }
- /**
- * Find the peers that have the current interested piece and send them
- * the "interested" message
- */
- private void sendInterestedToPeers() {
- if (currentPiece == -1) {
- return;
- }
- for (Connection connection : peers.values()) {
- if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
- connection.amInterested = true;
- MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
- task.dsend(connection.mailbox);
- }
- }
- currentPiece = -1;
- piecesRequested++;
- }
- /**
- * Send a "interested" message to a peer.
- */
- private void sendInterested(String mailbox) {
- MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
- task.dsend(mailbox);
- }
- /**
- * Send a "not interested" message to a peer
- * @param mailbox mailbox destination mailbox
- */
- private void sendNotInterested(String mailbox) {
- MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
- task.dsend(mailbox);
- }
- /**
- * Send a handshake message to all the peers the peer has.
- * @param peer peer data
- */
- private void sendHandshakeAll() {
- for (Connection remotePeer : peers.values()) {
- MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
- id);
- task.dsend(remotePeer.mailbox);
- }
- }
- /**
- * Send a "handshake" message to an user
- * @param mailbox mailbox where to we send the message
- */
- private void sendHandshake(String mailbox) {
- Msg.debug("Sending a HANDSHAKE to " + mailbox);
- MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
- task.dsend(mailbox);
- }
- /**
- * Send a "choked" message to a peer
- */
- private void sendChoked(String mailbox) {
- Msg.debug("Sending a CHOKE to " + mailbox);
- MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
- task.dsend(mailbox);
- }
- /**
- * Send a "unchoked" message to a peer
- */
- private void sendUnchoked(String mailbox) {
- Msg.debug("Sending a UNCHOKE to " + mailbox);
- MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
- task.dsend(mailbox);
- }
- /**
- * Send a "HAVE" message to all peers we are connected to
- */
- private void sendHave(int piece) {
- Msg.debug("Sending HAVE message to all my peers");
- for (Connection remotePeer : peers.values()) {
- MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
- task.dsend(remotePeer.mailbox);
- }
- }
- /**
- * Send a bitfield message to all the peers the peer has.
- * @param peer peer data
- */
- private void sendBitfield(String mailbox) {
- Msg.debug("Sending a BITFIELD to " + mailbox);
- MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
- task.dsend(mailbox);
- }
- /**
- * Send a "request" message to a pair, containing a request for a piece
- */
- private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
- Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + "," + (blockIndex + blockLength));
- MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, blockLength);
- task.dsend(mailbox);
- }
- /**
- * Send a "piece" message to a pair, containing a piece of the file
- */
- private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
- Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
- MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled, blockIndex, blockLength);
- task.dsend(mailbox);
- }
-
- private String getStatus() {
- String s = "";
- for (int i = 0; i < Common.FILE_PIECES; i++) {
- s = s + bitfield[i];
- }
- return s;
- }
+ protected int round = 0;
+ protected double beginReceiveTime;
+ protected double deadline;
+ protected static RngStream stream = new RngStream();
+ protected int id;
+ protected String mailbox;
+ protected String mailboxTracker;
+ protected String hostname;
+ protected int pieces = 0;
+ protected char[] bitfield = new char[Common.FILE_PIECES];
+ protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
+ protected short[] piecesCount = new short[Common.FILE_PIECES];
+ protected int piecesRequested = 0;
+ protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
+ protected int currentPiece = -1;
+ protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();
+ protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
+ protected Comm commReceived = null;
+
+ public Peer(Host host, String name, String[]args) {
+ super(host,name,args);
+ }
+
+ @Override
+ public void main(String[] args) throws MsgException {
+ //Check arguments
+ if (args.length != 3 && args.length != 2) {
+ Msg.info("Wrong number of arguments");
+ }
+ if (args.length == 3) {
+ init(Integer.valueOf(args[0]),true);
+ } else {
+ init(Integer.valueOf(args[0]),false);
+ }
+ //Retrieve the deadline
+ deadline = Double.valueOf(args[1]);
+ if (deadline < 0) {
+ Msg.info("Wrong deadline supplied");
+ return;
+ }
+ Msg.info("Hi, I'm joining the network with id " + id);
+ //Getting peer data from the tracker
+ if (getPeersData()) {
+ Msg.debug("Got " + peers.size() + " peers from the tracker");
+ Msg.debug("Here is my current status: " + getStatus());
+ beginReceiveTime = Msg.getClock();
+ if (hasFinished()) {
+ pieces = Common.FILE_PIECES;
+ sendHandshakeAll();
+ seedLoop();
+ } else {
+ leechLoop();
+ seedLoop();
+ }
+ } else {
+ Msg.info("Couldn't contact the tracker.");
+ }
+ Msg.info("Here is my current status: " + getStatus());
+ }
+
+ private void leechLoop() {
+ double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
+ Msg.debug("Start downloading.");
+ // Send a "handshake" message to all the peers it got(it couldn't have gotten more than 50 peers anyway)
+ sendHandshakeAll();
+ //Wait for at least one "bitfield" message.
+ waitForPieces();
+ Msg.debug("Starting main leech loop");
+ while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
+ if (commReceived == null) {
+ commReceived = Task.irecv(mailbox);
+ }
+ try {
+ if (commReceived.test()) {
+ handleMessage(commReceived.getTask());
+ commReceived = null;
+ } else {
+ //If the user has a pending interesting
+ if (currentPiece != -1) {
+ sendInterestedToPeers();
+ } else {
+ if (currentPieces.size() < Common.MAX_PIECES) {
+ updateCurrentPiece();
+ }
+ }
+ //We don't execute the choke algorithm if we don't already have a piece
+ if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
+ updateChokedPeers();
+ nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
+ } else {
+ waitFor(1);
+ }
+ }
+ }
+ catch (MsgException e) {
+ commReceived = null;
+ }
+ }
+ }
+
+ private void seedLoop() {
+ double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
+ Msg.debug("Start seeding.");
+ //start the main seed loop
+ while (Msg.getClock() < deadline) {
+ if (commReceived == null) {
+ commReceived = Task.irecv(mailbox);
+ }
+ try {
+ if (commReceived.test()) {
+ handleMessage(commReceived.getTask());
+ commReceived = null;
+ } else {
+ if (Msg.getClock() >= nextChokedUpdate) {
+ updateChokedPeers();
+ //TODO: Change the choked peer algorithm when seeding
+ nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
+ } else {
+ waitFor(1);
+ }
+ }
+ }
+ catch (MsgException e) {
+ commReceived = null;
+ }
+ }
+ }
+
+ /**
+ * @brief Initialize the various peer data
+ * @param id id of the peer to take in the network
+ * @param seed indicates if the peer is a seed
+ */
+ private void init(int id, boolean seed) {
+ this.id = id;
+ this.mailbox = Integer.toString(id);
+ this.mailboxTracker = "tracker_" + Integer.toString(id);
+ if (seed) {
+ for (int i = 0; i < bitfield.length; i++) {
+ bitfield[i] = '1';
+ for (int j = 0; j < bitfieldBlocks[i].length; j++) {
+ bitfieldBlocks[i][j] = '1';
+ }
+ }
+ } else {
+ for (int i = 0; i < bitfield.length; i++) {
+ bitfield[i] = '0';
+ for (int j = 0; j < bitfieldBlocks[i].length; j++) {
+ bitfieldBlocks[i][j] = '0' ;
+ }
+ }
+ }
+ this.hostname = getHost().getName();
+ }
+
+ private boolean getPeersData() {
+ boolean success = false, sendSuccess = false;
+ double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
+ //Build the task to send to the tracker
+ TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
+
+ while (!sendSuccess && Msg.getClock() < timeout) {
+ try {
+ Msg.debug("Sending a peer request to the tracker.");
+ taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
+ sendSuccess = true;
+ }
+ catch (MsgException e) {
+ }
+ }
+ while (!success && Msg.getClock() < timeout) {
+ commReceived = Task.irecv(this.mailboxTracker);
+ try {
+ commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
+ if (commReceived.getTask() instanceof TrackerTask) {
+ TrackerTask task = (TrackerTask)commReceived.getTask();
+ for (Integer peerId: task.peers) {
+ if (peerId != this.id) {
+ peers.put(peerId, new Connection(peerId));
+ }
+ }
+ success = true;
+ }
+ }
+ catch (MsgException e) {}
+ commReceived = null;
+ }
+ commReceived = null;
+ return success;
+ }
+
+ void handleMessage(Task task) {
+ MessageTask message = (MessageTask)task;
+ Connection remotePeer = peers.get(message.peerId);
+ switch (message.type) {
+ case HANDSHAKE:
+ Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
+ //Check if the peer is in our connection list
+ if (remotePeer == null) {
+ peers.put(message.peerId, new Connection(message.peerId));
+ sendHandshake(message.mailbox);
+ }
+ //Send our bitfield to the pair
+ sendBitfield(message.mailbox);
+ break;
+ case BITFIELD:
+ Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
+ //update the pieces list
+ updatePiecesCountFromBitfield(message.bitfield);
+ //Update the current piece
+ if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
+ updateCurrentPiece();
+ }
+ remotePeer.bitfield = message.bitfield.clone();
+ break;
+ case INTERESTED:
+ Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
+ assert remotePeer != null;
+ remotePeer.interested = true;
+ break;
+ case NOTINTERESTED:
+ Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
+ assert remotePeer != null;
+ remotePeer.interested = false;
+ break;
+ case UNCHOKE:
+ Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
+ assert remotePeer != null;
+ remotePeer.chokedDownload = false;
+ activePeers.put(remotePeer.id,remotePeer);
+ sendRequestsToPeer(remotePeer);
+ break;
+ case CHOKE:
+ Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
+ assert remotePeer != null;
+ remotePeer.chokedDownload = true;
+ activePeers.remove(remotePeer.id);
+ break;
+ case HAVE:
+ if (remotePeer.bitfield == null) {
+ return;
+ }
+ Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
+ assert message.index >= 0 && message.index < Common.FILE_PIECES;
+ assert remotePeer.bitfield != null;
+ remotePeer.bitfield[message.index] = '1';
+ piecesCount[message.index]++;
+ //Send interested message to the peer if he has what we want
+ if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
+ remotePeer.amInterested = true;
+ sendInterested(remotePeer.mailbox);
+ }
+
+ if (currentPieces.contains(message.index)) {
+ int blockIndex = getFirstBlock(message.index);
+ int blockLength = Common.PIECES_BLOCKS - blockIndex ;
+ blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
+ sendRequest(message.mailbox,message.index,blockIndex,blockLength);
+ }
+ break;
+ case REQUEST:
+ assert message.index >= 0 && message.index < Common.FILE_PIECES;
+ if (!remotePeer.chokedUpload) {
+ Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for "
+ + message.peerId);
+ if (bitfield[message.index] == '1') {
+ sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);
+ } else {
+ Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname
+ + ") but he is choked" );
+ }
+ }
+ break;
+ case PIECE:
+ if (message.stalled) {
+ Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname
+ + ") is stalled");
+ } else {
+ Msg.debug("Received piece " + message.index + " from " + message.peerId + " ("
+ + message.issuerHostname + ")");
+ if (bitfield[message.index] == '0') {
+ updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
+ if (pieceComplete(message.index)) {
+ piecesRequested--;
+ //Removing the piece from our piece list.
+ currentPieces.remove((Object)Integer.valueOf(message.index));
+ //Setting the fact that we have the piece
+ bitfield[message.index] = '1';
+ pieces++;
+ Msg.debug("My status is now " + getStatus());
+ //Sending the information to all the peers we are connected to
+ sendHave(message.index);
+ //sending UNINTERESTED to peers that doesn't have what we want.
+ updateInterestedAfterReceive();
+ }
+ } else {
+ Msg.debug("However, we already have it.");
+ }
+ }
+ break;
+ }
+ if (remotePeer != null) {
+ remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
+ }
+ beginReceiveTime = Msg.getClock();
+ }
+
+ void waitForPieces() {
+ boolean finished = false;
+ while (Msg.getClock() < deadline && !finished) {
+ if (commReceived == null) {
+ commReceived = Task.irecv(mailbox);
+ }
+ try {
+ commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
+ handleMessage(commReceived.getTask());
+ if (currentPiece != -1) {
+ finished = true;
+ }
+ commReceived = null;
+ }
+ catch (MsgException e) {
+ commReceived = null;
+ }
+ }
+ }
+
+ private boolean hasFinished() {
+ for (int i = 0; i < bitfield.length; i++) {
+ if (bitfield[i] == '1') {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @brief Updates the list of who has a piece from a bitfield
+ * @param bitfield bitfield
+ */
+ private void updatePiecesCountFromBitfield(char bitfield[]) {
+ for (int i = 0; i < Common.FILE_PIECES; i++) {
+ if (bitfield[i] == '1') {
+ piecesCount[i]++;
+ }
+ }
+ }
+
+ /**
+ * Update the piece the peer is currently interested in.
+ * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
+ * If the peer has less than 3 pieces, he chooses a piece at random.
+ * If the peer has more than pieces, he downloads the pieces that are the less
+ * replicated
+ */
+ void updateCurrentPiece() {
+ if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
+ return;
+ }
+ if (true || pieces < 3) {
+ int peerPiece;
+ do {
+ currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
+ } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
+ }
+ else {
+ //trivial min algorithm.
+ //TODO
+ }
+ currentPieces.add(currentPiece);
+ Msg.debug("New interested piece: " + currentPiece);
+ assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
+ }
+
+ // Update the list of current choked and unchoked peers, using the choke algorithm
+ private void updateChokedPeers() {
+ round = (round + 1) % 3;
+ if (peers.size() == 0) {
+ return;
+ }
+ //remove a peer from the list
+ Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
+ if (it.hasNext()) {
+ Entry<Integer,Connection> e = it.next();
+ Connection peerChoked = e.getValue();
+ peerChoked.chokedUpload = true;
+ sendChoked(peerChoked.mailbox);
+ activePeers.remove(e.getKey());
+ }
+ Connection peerChoosed = null;
+ //Separate the case from when the peer is seeding.
+ if (pieces == Common.FILE_PIECES) {
+ //Find the last unchoked peer.
+ double unchokeTime = deadline + 1;
+ for (Connection connection : peers.values()) {
+ if (connection.lastUnchoke < unchokeTime && connection.interested) {
+ peerChoosed = connection;
+ unchokeTime = connection.lastUnchoke;
+ }
+ }
+ } else {
+ //Random optimistic unchoking
+ if (round == 0) {
+ int j = 0, i;
+ do {
+ i = 0;
+ int idChosen = stream.randInt(0,peers.size() - 1);
+ for (Connection connection : peers.values()) {
+ if (i == idChosen) {
+ peerChoosed = connection;
+ break;
+ }
+ i++;
+ } //TODO: Not really the best way ever
+ if (!peerChoosed.interested) {
+ peerChoosed = null;
+ }
+ j++;
+ } while (peerChoosed == null && j < Common.MAXIMUM_PEERS);
+ } else {
+ Connection fastest = null;
+ double fastestSpeed = 0;
+ for (Connection c : peers.values()) {
+ if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
+ fastest = c;
+ fastestSpeed = c.peerSpeed;
+ }
+ }
+ peerChoosed = fastest;
+ }
+ }
+ if (peerChoosed != null) {
+ activePeers.put(peerChoosed.id,peerChoosed);
+ peerChoosed.chokedUpload = false;
+ peerChoosed.lastUnchoke = Msg.getClock();
+ sendUnchoked(peerChoosed.mailbox);
+ }
+ }
+
+ // Updates our "interested" state about peers: send "not interested" to peers that don't have any more pieces we want.
+ private void updateInterestedAfterReceive() {
+ boolean interested;
+ for (Connection connection : peers.values()) {
+ interested = false;
+ if (connection.amInterested) {
+ for (Integer piece : currentPieces) {
+ if (connection.bitfield[piece] == '1') {
+ interested = true;
+ break;
+ }
+ }
+ if (!interested) {
+ connection.amInterested = false;
+ sendNotInterested(connection.mailbox);
+ }
+ }
+ }
+ }
+
+ private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
+ for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
+ bitfieldBlocks[index][i] = '1';
+ }
+ }
+
+ // Returns if a piece is complete in the peer's bitfield.
+ private boolean pieceComplete(int index) {
+ for (int i = 0; i < bitfieldBlocks[index].length; i++) {
+ if (bitfieldBlocks[index][i] == '0') {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Returns the first block of a piece that we don't have.
+ private int getFirstBlock(int piece) {
+ int blockIndex = -1;
+ for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
+ if (bitfieldBlocks[piece][i] == '0') {
+ blockIndex = i;
+ break;
+ }
+ }
+ return blockIndex;
+ }
+
+ /**
+ * @brief Send request messages to a peer that have unchoked us
+ * @param remotePeer peer data to the peer we want to send the request
+ */
+ private void sendRequestsToPeer(Connection remotePeer) {
+ if (remotePeer.bitfield == null) {
+ return;
+ }
+ for (Integer piece : currentPieces) {
+ //Getting the block to send.
+ int blockIndex = -1, blockLength = 0;
+ blockIndex = getFirstBlock(piece);
+ blockLength = Common.PIECES_BLOCKS - blockIndex ;
+ blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
+ if (remotePeer.bitfield[piece] == '1') {
+ sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
+ }
+ }
+ }
+
+ // Find the peers that have the current interested piece and send them the "interested" message
+ private void sendInterestedToPeers() {
+ if (currentPiece == -1) {
+ return;
+ }
+ for (Connection connection : peers.values()) {
+ if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
+ connection.amInterested = true;
+ MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
+ task.dsend(connection.mailbox);
+ }
+ }
+ currentPiece = -1;
+ piecesRequested++;
+ }
+
+ // Send a "interested" message to a peer.
+ private void sendInterested(String mailbox) {
+ MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
+ task.dsend(mailbox);
+ }
+
+ /**
+ * @brief Send a "not interested" message to a peer
+ * @param mailbox mailbox destination mailbox
+ */
+ private void sendNotInterested(String mailbox) {
+ MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
+ task.dsend(mailbox);
+ }
+
+ // Send a handshake message to all the peers the peer has.
+ private void sendHandshakeAll() {
+ for (Connection remotePeer : peers.values()) {
+ MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox, id);
+ task.dsend(remotePeer.mailbox);
+ }
+ }
+
+ /**
+ * @brief Send a "handshake" message to an user
+ * @param mailbox mailbox where to we send the message
+ */
+ private void sendHandshake(String mailbox) {
+ Msg.debug("Sending a HANDSHAKE to " + mailbox);
+ MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
+ task.dsend(mailbox);
+ }
+
+ // Send a "choked" message to a peer
+ private void sendChoked(String mailbox) {
+ Msg.debug("Sending a CHOKE to " + mailbox);
+ MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
+ task.dsend(mailbox);
+ }
+
+ // Send a "unchoked" message to a peer
+ private void sendUnchoked(String mailbox) {
+ Msg.debug("Sending a UNCHOKE to " + mailbox);
+ MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
+ task.dsend(mailbox);
+ }
+
+ // Send a "HAVE" message to all peers we are connected to
+ private void sendHave(int piece) {
+ Msg.debug("Sending HAVE message to all my peers");
+ for (Connection remotePeer : peers.values()) {
+ MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
+ task.dsend(remotePeer.mailbox);
+ }
+ }
+ // Send a bitfield message to all the peers the peer has.
+ private void sendBitfield(String mailbox) {
+ Msg.debug("Sending a BITFIELD to " + mailbox);
+ MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
+ task.dsend(mailbox);
+ }
+ // Send a "request" message to a peer, containing a request for a piece
+ private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
+ Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + ","
+ + (blockIndex + blockLength));
+ MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex,
+ blockLength);
+ task.dsend(mailbox);
+ }
+
+ // Send a "piece" message to a peer, containing a piece of the file
+ private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
+ Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
+ MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled,
+ blockIndex, blockLength);
+ task.dsend(mailbox);
+ }
+
+ private String getStatus() {
+ String s = "";
+ for (int i = 0; i < Common.FILE_PIECES; i++) {
+ s = s + bitfield[i];
+ }
+ return s;
+ }
}
-
-/* Copyright (c) 2006-2014. The SimGrid Team.
+/* Copyright (c) 2006-2014, 2026. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
package bittorrent;
import java.util.ArrayList;
+import org.simgrid.msg.Msg;
import org.simgrid.msg.Comm;
import org.simgrid.msg.Host;
-import org.simgrid.msg.Msg;
-import org.simgrid.msg.MsgException;
+import org.simgrid.msg.Task;
import org.simgrid.msg.Process;
import org.simgrid.msg.RngStream;
-import org.simgrid.msg.Task;
-/**
- * Tracker, handle requests from peers.
- */
+import org.simgrid.msg.MsgException;
+
public class Tracker extends Process {
- protected RngStream stream;
- /**
- * Peers list
- */
- protected ArrayList<Integer> peersList;
- /**
- * End time for the simulation
- */
- protected double deadline;
- /**
- * Current comm received
- */
- protected Comm commReceived = null;
-
- public Tracker(Host host, String name, String[]args) {
- super(host,name,args);
- }
-
- @Override
- public void main(String[] args) throws MsgException {
- if (args.length != 1) {
- Msg.info("Wrong number of arguments for the tracker.");
- return;
- }
- //Build the RngStream object for randomness
- stream = new RngStream("tracker");
- //Retrieve the end time
- deadline = Double.valueOf(args[0]);
- //Building peers array
- peersList = new ArrayList<Integer>();
-
- Msg.info("Tracker launched.");
- while (Msg.getClock() < deadline) {
- if (commReceived == null) {
- commReceived = Task.irecv(Common.TRACKER_MAILBOX);
- }
- try {
- if (commReceived.test()) {
- Task task = commReceived.getTask();
- if (task instanceof TrackerTask) {
- TrackerTask tTask = (TrackerTask)task;
- //Sending peers to the peer
- int nbPeers = 0;
- while (nbPeers < Common.MAXIMUM_PEERS && nbPeers < peersList.size()) {
- int nextPeer;
- do {
- nextPeer = stream.randInt(0, peersList.size() - 1);
- } while (tTask.peers.contains(nextPeer));
- tTask.peers.add(peersList.get(nextPeer));
- nbPeers++;
- }
- //Adding the peer to our list
- peersList.add(tTask.peerId);
- tTask.type = TrackerTask.Type.ANSWER;
- //Setting the interval
- tTask.interval = Common.TRACKER_QUERY_INTERVAL;
- //Sending the task back to the peer
- tTask.dsend(tTask.mailbox);
- }
- commReceived = null;
- }
- else {
- waitFor(1);
- }
- }
- catch (MsgException e) {
- commReceived = null;
- }
- }
- Msg.info("Tracker is leaving");
- }
+ protected RngStream stream;
+ protected ArrayList<Integer> peersList;
+ protected double deadline;
+ protected Comm commReceived = null;
+
+ public Tracker(Host host, String name, String[]args) {
+ super(host,name,args);
+ }
+
+ @Override
+ public void main(String[] args) throws MsgException {
+ if (args.length != 1) {
+ Msg.info("Wrong number of arguments for the tracker.");
+ return;
+ }
+ //Build the RngStream object for randomness
+ stream = new RngStream("tracker");
+ //Retrieve the end time
+ deadline = Double.valueOf(args[0]);
+ //Building peers array
+ peersList = new ArrayList<Integer>();
+ Msg.info("Tracker launched.");
+ while (Msg.getClock() < deadline) {
+ if (commReceived == null) {
+ commReceived = Task.irecv(Common.TRACKER_MAILBOX);
+ }
+ try {
+ if (commReceived.test()) {
+ Task task = commReceived.getTask();
+ if (task instanceof TrackerTask) {
+ TrackerTask tTask = (TrackerTask)task;
+ //Sending peers to the peer
+ int nbPeers = 0;
+ while (nbPeers < Common.MAXIMUM_PEERS && nbPeers < peersList.size()) {
+ int nextPeer;
+ do {
+ nextPeer = stream.randInt(0, peersList.size() - 1);
+ } while (tTask.peers.contains(nextPeer));
+ tTask.peers.add(peersList.get(nextPeer));
+ nbPeers++;
+ }
+ //Adding the peer to our list
+ peersList.add(tTask.peerId);
+ tTask.type = TrackerTask.Type.ANSWER;
+ //Setting the interval
+ tTask.interval = Common.TRACKER_QUERY_INTERVAL;
+ //Sending the task back to the peer
+ tTask.dsend(tTask.mailbox);
+ }
+ commReceived = null;
+ } else {
+ waitFor(1);
+ }
+ }
+ catch (MsgException e) {
+ commReceived = null;
+ }
+ }
+ Msg.info("Tracker is leaving");
+ }
}
-/* Copyright (c) 2006-2014. The SimGrid Team.
+/* Copyright (c) 2006-2014, 2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
import org.simgrid.msg.Task;
-/**
- * Task exchanged between the tracker
- * and the peers.
- */
+/* Task exchanged between the tracker and the peers. */
public class TrackerTask extends Task {
- /**
- * Type of the tasks
- */
- public enum Type {
- REQUEST,
- ANSWER
- };
- public Type type;
- public String hostname;
- public String mailbox;
- public int peerId;
- public int uploaded;
- public int downloaded;
- public int left;
- public double interval;
- public ArrayList<Integer> peers;
-
- public TrackerTask(String hostname, String mailbox, int peerId) {
- this(hostname, mailbox, peerId, 0, 0, Common.FILE_SIZE);
- }
- public TrackerTask(String hostname, String mailbox, int peerId, int uploaded, int downloaded, int left) {
- super("", 0, Common.TRACKER_COMM_SIZE);
- this.type = Type.REQUEST;
- this.hostname = hostname;
- this.mailbox = mailbox;
- this.peerId = peerId;
- this.uploaded = uploaded;
- this.downloaded = downloaded;
- this.left = left;
- this.peers = new ArrayList<Integer>();
- }
-
+ public enum Type {
+ REQUEST,
+ ANSWER
+ };
+
+ public Type type;
+ public String hostname;
+ public String mailbox;
+ public int peerId;
+ public int uploaded;
+ public int downloaded;
+ public int left;
+ public double interval;
+ public ArrayList<Integer> peers;
+
+ public TrackerTask(String hostname, String mailbox, int peerId) {
+ this(hostname, mailbox, peerId, 0, 0, Common.FILE_SIZE);
+ }
+
+ public TrackerTask(String hostname, String mailbox, int peerId, int uploaded, int downloaded, int left) {
+ super("", 0, Common.TRACKER_COMM_SIZE);
+ this.type = Type.REQUEST;
+ this.hostname = hostname;
+ this.mailbox = mailbox;
+ this.peerId = peerId;
+ this.uploaded = uploaded;
+ this.downloaded = downloaded;
+ this.left = left;
+ this.peers = new ArrayList<Integer>();
+ }
}
<?xml version='1.0'?>
<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid/simgrid.dtd">
<platform version="4">
-
<process host="Jacquelin" function="bittorrent.Tracker">
- <argument value="3000" />
+ <argument value="3000" />
</process>
<process host="Boivin" function="bittorrent.Peer">
- <argument value="00000002"/> <!-- my id -->
- <argument value="5000" /> <!-- end time -->
- <argument value="1" /> <!-- indicates if the bittorrent.Peer is a seed at the begining of the simulation -->
+ <argument value="00000002"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
+ <argument value="1" /> <!-- indicates if the bittorrent.Peer is a seed at the begining of the simulation -->
</process>
<process host="Jean_Yves" function="bittorrent.Peer">
- <argument value="00000003"/> <!-- my id -->
- <argument value="5000" /> <!-- end time -->
+ <argument value="00000003"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
</process>
<process host="TeX" function="bittorrent.Peer">
- <argument value="00000004"/> <!-- my id -->
- <argument value="5000" /> <!-- end time -->
+ <argument value="00000004"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
</process>
<process host="Geoff" function="bittorrent.Peer">
- <argument value="00000005"/> <!-- my id -->
- <argument value="5000" /> <!-- end time -->
+ <argument value="00000005"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
</process>
<process host="Disney" function="bittorrent.Peer">
- <argument value="00000006"/> <!-- my id -->
- <argument value="5000" /> <!-- end time -->
+ <argument value="00000006"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
</process>
<process host="iRMX" function="bittorrent.Peer">
- <argument value="00000007"/> <!-- my id -->
- <argument value="5000" /> <!-- end time -->
+ <argument value="00000007"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
</process>
<process host="McGee" function="bittorrent.Peer">
- <argument value="00000008"/> <!-- my id -->
- <argument value="5000" /> <!-- end time -->
+ <argument value="00000008"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
</process>
-
</platform>
import org.simgrid.msg.Msg;
public class Chord {
- public static void main(String[] args) {
- /* initialize the MSG simulation. Must be done before anything else (even logging). */
- Msg.init(args);
- if(args.length < 2) {
- Msg.info("Usage : Chord platform_file deployment_file");
- Msg.info("example : Chord platform.xml deployment.xml");
- System.exit(1);
- }
- /* construct the platform and deploy the application */
- Msg.createEnvironment(args[0]);
- Msg.deployApplication(args[1]);
-
- /* execute the simulation. */
- Msg.run();
- }
+ public static void main(String[] args) {
+ Msg.init(args);
+ if(args.length < 2) {
+ Msg.info("Usage : Chord platform_file deployment_file");
+ Msg.info("example : Chord ../platforms/platform.xml chord.xml");
+ System.exit(1);
+ }
+ /* construct the platform and deploy the application */
+ Msg.createEnvironment(args[0]);
+ Msg.deployApplication(args[1]);
+
+ /* execute the simulation. */
+ Msg.run();
+ }
}
package chord;
+import chord.Common;
import org.simgrid.msg.Task;
-import chord.Common;
-/**
- * Base class for all Tasks in Chord.
- */
public class ChordTask extends Task {
- public String issuerHostName;
- public String answerTo;
- public ChordTask() {
- this(null,null);
- }
- public ChordTask(String issuerHostName, String answerTo) {
- super(null, Common.COMP_SIZE, Common.COMM_SIZE);
- this.issuerHostName = issuerHostName;
- this.answerTo = answerTo;
- }
+ public String issuerHostName;
+ public String answerTo;
+ public ChordTask() {
+ this(null,null);
+ }
+ public ChordTask(String issuerHostName, String answerTo) {
+ super(null, Common.COMP_SIZE, Common.COMM_SIZE);
+ this.issuerHostName = issuerHostName;
+ this.answerTo = answerTo;
+ }
}
* under the terms of the license (GNU LGPL) which comes with this package. */
package chord;
-/**
- * Common constants used over the simulation
- */
+
public class Common {
- public final static int COMM_SIZE = 10;
- public final static int COMP_SIZE = 0;
-
- public final static int NB_BITS = 24;
- public final static int NB_KEYS = 16777216;
- public final static int TIMEOUT = 50;
- public final static int MAX_SIMULATION_TIME = 1000;
- public final static int PERIODIC_STABILIZE_DELAY = 20;
- public final static int PERIODIC_FIX_FINGERS_DELAY = 120;
- public final static int PERIODIC_CHECK_PREDECESSOR_DELAY = 120;
- public final static int PERIODIC_LOOKUP_DELAY = 10;
+ public final static int COMM_SIZE = 10;
+ public final static int COMP_SIZE = 0;
+
+ public final static int NB_BITS = 24;
+ public final static int NB_KEYS = 16777216;
+ public final static int TIMEOUT = 50;
+ public final static int MAX_SIMULATION_TIME = 1000;
+ public final static int PERIODIC_STABILIZE_DELAY = 20;
+ public final static int PERIODIC_FIX_FINGERS_DELAY = 120;
+ public final static int PERIODIC_CHECK_PREDECESSOR_DELAY = 120;
+ public final static int PERIODIC_LOOKUP_DELAY = 10;
}
package chord;
public class FindSuccessorAnswerTask extends ChordTask {
- public int answerId;
+ public int answerId;
- public FindSuccessorAnswerTask(String issuerHostname, String answerTo, int answerId) {
- super(issuerHostname,answerTo);
- this.answerId = answerId;
- }
+ public FindSuccessorAnswerTask(String issuerHostname, String answerTo, int answerId) {
+ super(issuerHostname,answerTo);
+ this.answerId = answerId;
+ }
}
package chord;
public class FindSuccessorTask extends ChordTask {
- public int requestId;
-
- public FindSuccessorTask(String issuerHostname, String answerTo, int requestId) {
- super(issuerHostname, answerTo);
- this.requestId = requestId;
- }
+ public int requestId;
+
+ public FindSuccessorTask(String issuerHostname, String answerTo, int requestId) {
+ super(issuerHostname, answerTo);
+ this.requestId = requestId;
+ }
}
package chord;
public class GetPredecessorAnswerTask extends ChordTask {
- public int answerId;
- public GetPredecessorAnswerTask(String issuerHostname, String answerTo, int answerId) {
- super(issuerHostname,answerTo);
- this.answerId = answerId;
- }
+ public int answerId;
+ public GetPredecessorAnswerTask(String issuerHostname, String answerTo, int answerId) {
+ super(issuerHostname,answerTo);
+ this.answerId = answerId;
+ }
}
package chord;
public class GetPredecessorTask extends ChordTask {
- public GetPredecessorTask(String issuerHostName, String answerTo) {
- super(issuerHostName, answerTo);
- }
+ public GetPredecessorTask(String issuerHostName, String answerTo) {
+ super(issuerHostName, answerTo);
+ }
}
package chord;
+import org.simgrid.msg.Msg;
import org.simgrid.msg.Comm;
import org.simgrid.msg.Host;
-import org.simgrid.msg.Msg;
-import org.simgrid.msg.MsgException;
-import org.simgrid.msg.Process;
import org.simgrid.msg.Task;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.MsgException;
import org.simgrid.msg.TimeoutException;
-/**
- * Node data
- */
public class Node extends Process {
- /**
- * Node id
- */
- protected int id;
- /**
- * Node mailbox
- */
- protected String mailbox;
- /**
- * Predecessor id
- */
- protected int predId;
- /**
- * Predecessor mailbox
- */
- protected String predMailbox;
- /**
- * Index of the next finger to fix
- */
- protected int nextFingerToFix;
- /**
- * Current communication
- */
- protected Comm commReceive;
- /**
- * Last time I changed a finger or my predecessor
- */
- protected double lastChangeDate;
- /**
- * Node fingers
- */
- int fingers[];
- /**
- * Constructor
- */
- public Node(Host host, String name, String[] args) {
- super(host,name,args);
- }
- @Override
- public void main(String[] args) throws MsgException {
- if (args.length != 2 && args.length != 4) {
- Msg.info("You need to provide 2 or 4 arguments.");
- return;
- }
- double initTime = Msg.getClock();
- int i;
- boolean joinSuccess = false;
- double deadline;
-
- double nextStabilizeDate = initTime + Common.PERIODIC_STABILIZE_DELAY;
- double nextFixFingersDate = initTime + Common.PERIODIC_FIX_FINGERS_DELAY;
- double nextCheckPredecessorDate = initTime + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
- double nextLookupDate = initTime + Common.PERIODIC_LOOKUP_DELAY;
-
- id = Integer.valueOf(args[0]);
- mailbox = Integer.toString(id);
-
- fingers = new int[Common.NB_BITS];
- for (i = 0; i < Common.NB_BITS; i++) {
- fingers[i] = -1;
- setFinger(i,this.id);
- }
-
- //First node
- if (args.length == 2) {
- deadline = Integer.valueOf(args[1]);
- create();
- joinSuccess = true;
- }
- else {
- int knownId = Integer.valueOf(args[1]);
- deadline = Integer.valueOf(args[3]);
- //Msg.info("Hey! Let's join the system with the id " + id + ".");
-
- joinSuccess = join(knownId);
- }
- if (joinSuccess) {
- double currentClock = Msg.getClock();
- while (currentClock < (initTime + deadline) && currentClock < Common.MAX_SIMULATION_TIME) {
- if (commReceive == null) {
- commReceive = Task.irecv(this.mailbox);
- }
- try {
- if (!commReceive.test()) {
- if (currentClock >= nextStabilizeDate) {
- stabilize();
- nextStabilizeDate = Msg.getClock() + Common.PERIODIC_STABILIZE_DELAY;
- }
- else if (currentClock >= nextFixFingersDate) {
- fixFingers();
- nextFixFingersDate = Msg.getClock() + Common.PERIODIC_FIX_FINGERS_DELAY;
- }
- else if (currentClock >= nextCheckPredecessorDate) {
- this.checkPredecessor();
- nextCheckPredecessorDate = Msg.getClock() + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
- }
- else if (currentClock >= nextLookupDate) {
- this.randomLookup();
- nextLookupDate = Msg.getClock() + Common.PERIODIC_LOOKUP_DELAY;
- }
- else {
- waitFor(5);
- }
- currentClock = Msg.getClock();
- }
- else {
- handleTask(commReceive.getTask());
- currentClock = Msg.getClock();
- commReceive = null;
-
- }
- }
- catch (Exception e) {
- currentClock = Msg.getClock();
- commReceive = null;
- }
-
- }
- leave();
- if (commReceive != null) {
- commReceive = null;
- }
- }
- else {
- Msg.info("I couldn't join the ring");
- }
- }
- void handleTask(Task task) {
- if (task instanceof FindSuccessorTask) {
- FindSuccessorTask fTask = (FindSuccessorTask)task;
- Msg.debug("Receiving a 'Find Successor' request from " + fTask.issuerHostName + " for id " + fTask.requestId);
- // is my successor the successor?
- if (isInInterval(fTask.requestId, this.id + 1, fingers[0])) {
- //Msg.info("Send the request to " + fTask.answerTo + " with answer " + fingers[0]);
- FindSuccessorAnswerTask answer = new FindSuccessorAnswerTask(getHost().getName(), mailbox, fingers[0]);
- answer.dsend(fTask.answerTo);
- }
- else {
- // otherwise, forward the request to the closest preceding finger in my table
- int closest = closestPrecedingNode(fTask.requestId);
- //Msg.info("Forward the request to " + closest);
- fTask.dsend(Integer.toString(closest));
- }
- }
- else if (task instanceof GetPredecessorTask) {
- GetPredecessorTask gTask = (GetPredecessorTask)(task);
- Msg.debug("Receiving a 'Get Predecessor' request from " + gTask.issuerHostName);
- GetPredecessorAnswerTask answer = new GetPredecessorAnswerTask(getHost().getName(), mailbox, predId);
- answer.dsend(gTask.answerTo);
- }
- else if (task instanceof NotifyTask) {
- NotifyTask nTask = (NotifyTask)task;
- notify(nTask.requestId);
- }
- else {
- Msg.debug("Ignoring unexpected task of type:" + task);
- }
- }
- /**
- * @brief Makes the current node quit the system
- */
- void leave() {
- Msg.debug("Well Guys! I Think it's time for me to quit ;)");
- quitNotify(1); //Notify my successor
- quitNotify(-1); //Notify my predecessor.
- // TODO ...
- }
- /**
- * @brief Notifies the successor or the predecessor of the current node
- * of the departure
- * @param to 1 to notify the successor, -1 to notify the predecessor
- */
- static void quitNotify( int to) {
- //TODO
- }
- /**
- * @brief Initializes the current node as the first one of the system.
- */
- void create() {
- Msg.debug("Create a new Chord ring...");
- setPredecessor(-1);
-
- }
- /**
- * Makes the current node join the ring, knowing the id of a node
- * already in the ring
- */
- boolean join(int knownId) {
- Msg.info("Joining the ring with id " + this.id + " knowing node " + knownId);
- setPredecessor(-1);
- int successorId = remoteFindSuccessor(knownId, this.id);
- if (successorId == -1) {
- Msg.info("Cannot join the ring.");
- }
- else {
- setFinger(0, successorId);
- }
- return successorId != -1;
- }
-
- /**
- * Sets the node predecessor
- */
- void setPredecessor(int predecessorId) {
- if (predecessorId != predId) {
- predId = predecessorId;
- if (predecessorId != -1) {
- predMailbox = Integer.toString(predId);
- }
- lastChangeDate = Msg.getClock();
- }
- }
- /**
- * @brief Asks another node its predecessor.
- * @param askTo the node to ask to
- * @return the id of its predecessor node, or -1 if the request failed
- * (or if the node does not know its predecessor)
- */
- int remoteGetPredecessor(int askTo) {
- int predecessorId = -1;
- boolean stop = false;
- Msg.debug("Sending a 'Get Predecessor' request to " + askTo);
- String mailboxTo = Integer.toString(askTo);
- GetPredecessorTask sendTask = new GetPredecessorTask(getHost().getName(), this.mailbox);
- try {
- sendTask.send(mailboxTo, Common.TIMEOUT);
- try {
- do {
- if (commReceive == null) {
- commReceive = Task.irecv(this.mailbox);
- }
- commReceive.waitCompletion(Common.TIMEOUT);
- Task taskReceived = commReceive.getTask();
- if (taskReceived instanceof GetPredecessorAnswerTask) {
- predecessorId = ((GetPredecessorAnswerTask) taskReceived).answerId;
- stop = true;
- }
- else {
- handleTask(taskReceived);
- }
- commReceive = null;
- } while (!stop);
-
- }
- catch (MsgException e) {
- commReceive = null;
- stop = true;
- }
- }
- catch (MsgException e) {
- Msg.debug("Failed to send the Get Predecessor request");
- }
-
-
- return predecessorId;
- }
- /**
- * @brief Makes the current node find the successor node of an id.
- * @param node the current node
- * @param id the id to find
- * @return the id of the successor node, or -1 if the request failed
- */
- int findSuccessor(int id) {
- if (isInInterval(id, this.id + 1, fingers[0])) {
- return fingers[0];
- }
-
- int closest = this.closestPrecedingNode(id);
- return remoteFindSuccessor(closest, id);
- }
- /**
- * @brief Asks another node the successor node of an id.
- */
- int remoteFindSuccessor(int askTo, int id) {
- int successor = -1;
- boolean stop = false;
- String mailbox = Integer.toString(askTo);
- Task sendTask = new FindSuccessorTask(getHost().getName(), this.mailbox, id);
- Msg.debug("Sending a 'Find Successor' request to " + mailbox + " for id " + id);
- try {
- sendTask.send(mailbox, Common.TIMEOUT);
- do {
- if (commReceive == null) {
- commReceive = Task.irecv(this.mailbox);
- }
- try {
- commReceive.waitCompletion(Common.TIMEOUT);
- Task task = commReceive.getTask();
- if (task instanceof FindSuccessorAnswerTask) {
- //TODO: Check if this this our answer.
- FindSuccessorAnswerTask fTask = (FindSuccessorAnswerTask) task;
- stop = true;
- successor = fTask.answerId;
- }
- else {
- handleTask(task);
- }
- commReceive = null;
- }
- catch (TimeoutException e) {
- stop = true;
- commReceive = null;
- }
- } while (!stop);
- }
- catch (TimeoutException e) {
- Msg.debug("Failed to send the 'Find Successor' request");
- }
- catch (MsgException e) {
- Msg.debug("Failed to receive Find Successor");
- }
-
- return successor;
-
- }
- /**
- * @brief This function is called periodically. It checks the immediate
- * successor of the current node.
- */
- void stabilize() {
- Msg.debug("Stabilizing node");
- int candidateId;
- int successorId = fingers[0];
- if (successorId != this.id){
- candidateId = remoteGetPredecessor(successorId);
- }
- else {
- candidateId = predId;
- }
- //This node is a candidate to become my new successor
- if (candidateId != -1 && isInInterval(candidateId, this.id + 1, successorId - 1)) {
- setFinger(0, candidateId);
- }
- if (successorId != this.id) {
- remoteNotify(successorId, this.id);
- }
-
- }
- /**
- * \brief Notifies the current node that its predecessor may have changed.
- * \param candidate_id the possible new predecessor
- */
- void notify(int predecessorCandidateId) {
- if (predId == -1 || isInInterval(predecessorCandidateId, predId + 1, this.id - 1 )) {
- setPredecessor(predecessorCandidateId);
- }
- else {
- //Don't have to change the predecessor.
- }
- }
- /**
- * \brief Notifies a remote node that its predecessor may have changed.
- * \param notify_id id of the node to notify
- * \param candidate_id the possible new predecessor
- */
- void remoteNotify(int notifyId, int predecessorCandidateId) {
- Msg.debug("Sending a 'Notify' request to " + notifyId);
- Task sentTask = new NotifyTask(getHost().getName(), this.mailbox, predecessorCandidateId);
- sentTask.dsend(Integer.toString(notifyId));
- }
- /**
- * \brief This function is called periodically.
- * It refreshes the finger table of the current node.
- */
- void fixFingers() {
- Msg.debug("Fixing fingers");
- int i = this.nextFingerToFix;
- int id = this.findSuccessor(this.id + (int)Math.pow(2,i)); //FIXME: SLOW
- if (id != -1) {
- if (id != fingers[i]) {
- setFinger(i, id);
- }
- nextFingerToFix = (i + 1) % Common.NB_BITS;
- }
- }
- /**
- * \brief This function is called periodically.
- * It checks whether the predecessor has failed
- */
- void checkPredecessor() {
- //TODO
- }
- /**
- * \brief Performs a find successor request to a random id.
- */
- void randomLookup() {
- int id = 1337;
- //Msg.info("Making a lookup request for id " + id);
- findSuccessor(id);
- }
-
-
-
- /**
- * @brief Returns the closest preceding finger of an id
- * with respect to the finger table of the current node.
- * @param id the id to find
- * \return the closest preceding finger of that id
- */
- int closestPrecedingNode(int id) {
- int i;
- for (i = Common.NB_BITS - 1; i >= 0; i--) {
- if (isInInterval(fingers[i], this.id + 1, id - 1)) {
- return fingers[i];
- }
- }
- return this.id;
- }
- /**
- * @brief Returns whether an id belongs to the interval [start, end].
- *
- * The parameters are noramlized to make sure they are between 0 and nb_keys - 1).
- * 1 belongs to [62, 3]
- * 1 does not belong to [3, 62]
- * 63 belongs to [62, 3]
- * 63 does not belong to [3, 62]
- * 24 belongs to [21, 29]
- * 24 does not belong to [29, 21]
- *
- * \param id id to check
- * \param start lower bound
- * \param end upper bound
- * \return a non-zero value if id in in [start, end]
- */
- static boolean isInInterval(int id, int start, int end) {
- id = normalize(id);
- start = normalize(start);
- end = normalize(end);
-
- // make sure end >= start and id >= start
- if (end < start) {
- end += Common.NB_KEYS;
- }
- if (id < start) {
- id += Common.NB_KEYS;
- }
- return (id <= end);
-
- }
- /**
- * @brief Turns an id into an equivalent id in [0, nb_keys).
- * @param id an id
- * @return the corresponding normalized id
- */
- static int normalize(int id) {
- return id & (Common.NB_KEYS - 1);
- }
- /**
- * \brief Sets a finger of the current node.
- * \param finger_index index of the finger to set (0 to nb_bits - 1)
- * \param id the id to set for this finger
- */
- void setFinger(int fingerIndex, int id) {
- if (id != fingers[fingerIndex]) {
- fingers[fingerIndex] = id;
- lastChangeDate = Msg.getClock();
- }
- }
+ protected int id;
+ protected String mailbox;
+ protected int predId;
+ protected String predMailbox;
+ protected int nextFingerToFix;
+ protected Comm commReceive;
+ ///Last time I changed a finger or my predecessor
+ protected double lastChangeDate;
+ int fingers[];
+
+ public Node(Host host, String name, String[] args) {
+ super(host,name,args);
+ }
+
+ @Override
+ public void main(String[] args) throws MsgException {
+ if (args.length != 2 && args.length != 4) {
+ Msg.info("You need to provide 2 or 4 arguments.");
+ return;
+ }
+ double initTime = Msg.getClock();
+ int i;
+ boolean joinSuccess = false;
+ double deadline;
+
+ double nextStabilizeDate = initTime + Common.PERIODIC_STABILIZE_DELAY;
+ double nextFixFingersDate = initTime + Common.PERIODIC_FIX_FINGERS_DELAY;
+ double nextCheckPredecessorDate = initTime + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
+ double nextLookupDate = initTime + Common.PERIODIC_LOOKUP_DELAY;
+
+ id = Integer.valueOf(args[0]);
+ mailbox = Integer.toString(id);
+
+ fingers = new int[Common.NB_BITS];
+ for (i = 0; i < Common.NB_BITS; i++) {
+ fingers[i] = -1;
+ setFinger(i,this.id);
+ }
+
+ //First node
+ if (args.length == 2) {
+ deadline = Integer.valueOf(args[1]);
+ create();
+ joinSuccess = true;
+ } else {
+ int knownId = Integer.valueOf(args[1]);
+ deadline = Integer.valueOf(args[3]);
+ //Msg.info("Hey! Let's join the system with the id " + id + ".");
+
+ joinSuccess = join(knownId);
+ }
+ if (joinSuccess) {
+ double currentClock = Msg.getClock();
+ while (currentClock < (initTime + deadline) && currentClock < Common.MAX_SIMULATION_TIME) {
+ if (commReceive == null) {
+ commReceive = Task.irecv(this.mailbox);
+ }
+ try {
+ if (!commReceive.test()) {
+ if (currentClock >= nextStabilizeDate) {
+ stabilize();
+ nextStabilizeDate = Msg.getClock() + Common.PERIODIC_STABILIZE_DELAY;
+ } else if (currentClock >= nextFixFingersDate) {
+ fixFingers();
+ nextFixFingersDate = Msg.getClock() + Common.PERIODIC_FIX_FINGERS_DELAY;
+ } else if (currentClock >= nextCheckPredecessorDate) {
+ this.checkPredecessor();
+ nextCheckPredecessorDate = Msg.getClock() + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
+ } else if (currentClock >= nextLookupDate) {
+ this.randomLookup();
+ nextLookupDate = Msg.getClock() + Common.PERIODIC_LOOKUP_DELAY;
+ } else {
+ waitFor(5);
+ }
+ currentClock = Msg.getClock();
+ } else {
+ handleTask(commReceive.getTask());
+ currentClock = Msg.getClock();
+ commReceive = null;
+ }
+ }
+ catch (Exception e) {
+ currentClock = Msg.getClock();
+ commReceive = null;
+ }
+ }
+ leave();
+ if (commReceive != null) {
+ commReceive = null;
+ }
+ } else {
+ Msg.info("I couldn't join the ring");
+ }
+ }
+
+ void handleTask(Task task) {
+ if (task instanceof FindSuccessorTask) {
+ FindSuccessorTask fTask = (FindSuccessorTask)task;
+ Msg.debug("Receiving a 'Find Successor' request from " + fTask.issuerHostName + " for id " + fTask.requestId);
+ // is my successor the successor?
+ if (isInInterval(fTask.requestId, this.id + 1, fingers[0])) {
+ //Msg.info("Send the request to " + fTask.answerTo + " with answer " + fingers[0]);
+ FindSuccessorAnswerTask answer = new FindSuccessorAnswerTask(getHost().getName(), mailbox, fingers[0]);
+ answer.dsend(fTask.answerTo);
+ } else {
+ // otherwise, forward the request to the closest preceding finger in my table
+ int closest = closestPrecedingNode(fTask.requestId);
+ //Msg.info("Forward the request to " + closest);
+ fTask.dsend(Integer.toString(closest));
+ }
+ } else if (task instanceof GetPredecessorTask) {
+ GetPredecessorTask gTask = (GetPredecessorTask)(task);
+ Msg.debug("Receiving a 'Get Predecessor' request from " + gTask.issuerHostName);
+ GetPredecessorAnswerTask answer = new GetPredecessorAnswerTask(getHost().getName(), mailbox, predId);
+ answer.dsend(gTask.answerTo);
+ } else if (task instanceof NotifyTask) {
+ NotifyTask nTask = (NotifyTask)task;
+ notify(nTask.requestId);
+ } else {
+ Msg.debug("Ignoring unexpected task of type:" + task);
+ }
+ }
+
+ void leave() {
+ Msg.debug("Well Guys! I Think it's time for me to quit ;)");
+ quitNotify(1); //Notify my successor
+ quitNotify(-1); //Notify my predecessor.
+ }
+
+ /**
+ * @brief Notifies the successor or the predecessor of the current node of the departure
+ * @param to 1 to notify the successor, -1 to notify the predecessor
+ */
+ static void quitNotify( int to) {
+ //TODO
+ }
+
+ /**
+ * @brief Initializes the current node as the first one of the system.
+ */
+ void create() {
+ Msg.debug("Create a new Chord ring...");
+ setPredecessor(-1);
+ }
+
+ // Makes the current node join the ring, knowing the id of a node already in the ring
+ boolean join(int knownId) {
+ Msg.info("Joining the ring with id " + this.id + " knowing node " + knownId);
+ setPredecessor(-1);
+ int successorId = remoteFindSuccessor(knownId, this.id);
+ if (successorId == -1) {
+ Msg.info("Cannot join the ring.");
+ } else {
+ setFinger(0, successorId);
+ }
+ return successorId != -1;
+ }
+
+ void setPredecessor(int predecessorId) {
+ if (predecessorId != predId) {
+ predId = predecessorId;
+ if (predecessorId != -1) {
+ predMailbox = Integer.toString(predId);
+ }
+ lastChangeDate = Msg.getClock();
+ }
+ }
+
+ /**
+ * @brief Asks another node its predecessor.
+ * @param askTo the node to ask to
+ * @return the id of its predecessor node, or -1 if the request failed(or if the node does not know its predecessor)
+ */
+ int remoteGetPredecessor(int askTo) {
+ int predecessorId = -1;
+ boolean stop = false;
+ Msg.debug("Sending a 'Get Predecessor' request to " + askTo);
+ String mailboxTo = Integer.toString(askTo);
+ GetPredecessorTask sendTask = new GetPredecessorTask(getHost().getName(), this.mailbox);
+ try {
+ sendTask.send(mailboxTo, Common.TIMEOUT);
+ try {
+ do {
+ if (commReceive == null) {
+ commReceive = Task.irecv(this.mailbox);
+ }
+ commReceive.waitCompletion(Common.TIMEOUT);
+ Task taskReceived = commReceive.getTask();
+ if (taskReceived instanceof GetPredecessorAnswerTask) {
+ predecessorId = ((GetPredecessorAnswerTask) taskReceived).answerId;
+ stop = true;
+ } else {
+ handleTask(taskReceived);
+ }
+ commReceive = null;
+ } while (!stop);
+ }
+ catch (MsgException e) {
+ commReceive = null;
+ stop = true;
+ }
+ }
+ catch (MsgException e) {
+ Msg.debug("Failed to send the Get Predecessor request");
+ }
+ return predecessorId;
+ }
+
+ /**
+ * @brief Makes the current node find the successor node of an id.
+ * @param node the current node
+ * @param id the id to find
+ * @return the id of the successor node, or -1 if the request failed
+ */
+ int findSuccessor(int id) {
+ if (isInInterval(id, this.id + 1, fingers[0])) {
+ return fingers[0];
+ }
+
+ int closest = this.closestPrecedingNode(id);
+ return remoteFindSuccessor(closest, id);
+ }
+
+ // Asks another node the successor node of an id.
+ int remoteFindSuccessor(int askTo, int id) {
+ int successor = -1;
+ boolean stop = false;
+ String mailbox = Integer.toString(askTo);
+ Task sendTask = new FindSuccessorTask(getHost().getName(), this.mailbox, id);
+ Msg.debug("Sending a 'Find Successor' request to " + mailbox + " for id " + id);
+ try {
+ sendTask.send(mailbox, Common.TIMEOUT);
+ do {
+ if (commReceive == null) {
+ commReceive = Task.irecv(this.mailbox);
+ }
+ try {
+ commReceive.waitCompletion(Common.TIMEOUT);
+ Task task = commReceive.getTask();
+ if (task instanceof FindSuccessorAnswerTask) {
+ //TODO: Check if this this our answer.
+ FindSuccessorAnswerTask fTask = (FindSuccessorAnswerTask) task;
+ stop = true;
+ successor = fTask.answerId;
+ } else {
+ handleTask(task);
+ }
+ commReceive = null;
+ }
+ catch (TimeoutException e) {
+ stop = true;
+ commReceive = null;
+ }
+ } while (!stop);
+ }
+ catch (TimeoutException e) {
+ Msg.debug("Failed to send the 'Find Successor' request");
+ }
+ catch (MsgException e) {
+ Msg.debug("Failed to receive Find Successor");
+ }
+
+ return successor;
+ }
+
+ // This function is called periodically. It checks the immediate successor of the current node.
+ void stabilize() {
+ Msg.debug("Stabilizing node");
+ int candidateId;
+ int successorId = fingers[0];
+ if (successorId != this.id){
+ candidateId = remoteGetPredecessor(successorId);
+ } else {
+ candidateId = predId;
+ }
+ //This node is a candidate to become my new successor
+ if (candidateId != -1 && isInInterval(candidateId, this.id + 1, successorId - 1)) {
+ setFinger(0, candidateId);
+ }
+ if (successorId != this.id) {
+ remoteNotify(successorId, this.id);
+ }
+ }
+
+ /**
+ * @brief Notifies the current node that its predecessor may have changed.
+ * @param candidate_id the possible new predecessor
+ */
+ void notify(int predecessorCandidateId) {
+ if (predId == -1 || isInInterval(predecessorCandidateId, predId + 1, this.id - 1 )) {
+ setPredecessor(predecessorCandidateId);
+ }
+ }
+
+ /**
+ * @brief Notifies a remote node that its predecessor may have changed.
+ * @param notify_id id of the node to notify
+ * @param candidate_id the possible new predecessor
+ */
+ void remoteNotify(int notifyId, int predecessorCandidateId) {
+ Msg.debug("Sending a 'Notify' request to " + notifyId);
+ Task sentTask = new NotifyTask(getHost().getName(), this.mailbox, predecessorCandidateId);
+ sentTask.dsend(Integer.toString(notifyId));
+ }
+
+ // This function is called periodically.
+ // It refreshes the finger table of the current node.
+ void fixFingers() {
+ Msg.debug("Fixing fingers");
+ int i = this.nextFingerToFix;
+ int id = this.findSuccessor(this.id + (int)Math.pow(2,i)); //FIXME: SLOW
+ if (id != -1) {
+ if (id != fingers[i]) {
+ setFinger(i, id);
+ }
+ nextFingerToFix = (i + 1) % Common.NB_BITS;
+ }
+ }
+
+ // This function is called periodically.
+ // It checks whether the predecessor has failed
+ void checkPredecessor() {
+ //TODO
+ }
+
+ // Performs a find successor request to a random id.
+ void randomLookup() {
+ int id = 1337;
+ //Msg.info("Making a lookup request for id " + id);
+ findSuccessor(id);
+ }
+
+ /**
+ * @brief Returns the closest preceding finger of an id with respect to the finger table of the current node.
+ * @param id the id to find
+ * @return the closest preceding finger of that id
+ */
+ int closestPrecedingNode(int id) {
+ int i;
+ for (i = Common.NB_BITS - 1; i >= 0; i--) {
+ if (isInInterval(fingers[i], this.id + 1, id - 1)) {
+ return fingers[i];
+ }
+ }
+ return this.id;
+ }
+
+ /**
+ * @brief Returns whether an id belongs to the interval [start, end].
+ *
+ * The parameters are noramlized to make sure they are between 0 and nb_keys - 1).
+ * 1 belongs to [62, 3]
+ * 1 does not belong to [3, 62]
+ * 63 belongs to [62, 3]
+ * 63 does not belong to [3, 62]
+ * 24 belongs to [21, 29]
+ * 24 does not belong to [29, 21]
+ *
+ * @param id id to check
+ * @param start lower bound
+ * @param end upper bound
+ * @return a non-zero value if id in in [start, end]
+ */
+ static boolean isInInterval(int id, int start, int end) {
+ id = normalize(id);
+ start = normalize(start);
+ end = normalize(end);
+
+ // make sure end >= start and id >= start
+ if (end < start) {
+ end += Common.NB_KEYS;
+ }
+ if (id < start) {
+ id += Common.NB_KEYS;
+ }
+ return (id <= end);
+ }
+
+ /**
+ * @brief Turns an id into an equivalent id in [0, nb_keys).
+ * @param id an id
+ * @return the corresponding normalized id
+ */
+ static int normalize(int id) {
+ return id & (Common.NB_KEYS - 1);
+ }
+
+ /**
+ * @brief Sets a finger of the current node.
+ * @param finger_index index of the finger to set (0 to nb_bits - 1)
+ * @param id the id to set for this finger
+ */
+ void setFinger(int fingerIndex, int id) {
+ if (id != fingers[fingerIndex]) {
+ fingers[fingerIndex] = id;
+ lastChangeDate = Msg.getClock();
+ }
+ }
}
package chord;
public class NotifyTask extends ChordTask {
- public int requestId;
- public NotifyTask(String issuerHostname, String answerTo, int requestId) {
- super(issuerHostname, answerTo);
- this.requestId = requestId;
- }
+ public int requestId;
+ public NotifyTask(String issuerHostname, String answerTo, int requestId) {
+ super(issuerHostname, answerTo);
+ this.requestId = requestId;
+ }
}
<?xml version='1.0'?>
<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid/simgrid.dtd">
<platform version="4">
-
<process host="Gatien" function="chord.Node">
<argument value="48"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="400"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="McGee" function="chord.Node">
<argument value="42"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="300"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="iRMX" function="chord.Node">
<argument value="38"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="200"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="Geoff" function="chord.Node">
<argument value="32"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="100"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="TeX" function="chord.Node">
<argument value="21"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="40"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="Jean_Yves" function="chord.Node">
<argument value="14"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="16"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="Boivin" function="chord.Node">
<argument value="8"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="1"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="Jacquelin" function="chord.Node">
<argument value="1"/> <!-- my id -->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
</platform>
${examples_src}
${sources}
PARENT_SCOPE)
-set(bin_files
- ${bin_files}
- PARENT_SCOPE)
-set(txt_files
- ${txt_files}
- PARENT_SCOPE)
-/* Copyright (c) 2012-2014. The SimGrid Team.
+/* Copyright (c) 2012-2014, 2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
package cloud;
-import org.simgrid.msg.Host;
import org.simgrid.msg.Msg;
+import org.simgrid.msg.Host;
import org.simgrid.msg.MsgException;
-/**
- * Example showing the use of the new experimental Cloud API.
- */
+
public class Cloud {
- public static final double task_comp_size = 10;
- public static final double task_comm_size = 10;
- public static final int hostNB = 2 ;
- public static void main(String[] args) throws MsgException {
- Msg.init(args);
-
- if (args.length < 1) {
- Msg.info("Usage : Cloud platform_file");
- Msg.info("Usage : Cloud platform.xml");
- System.exit(1);
- }
- /* Construct the platform */
- Msg.createEnvironment(args[0]);
- Host[] hosts = Host.all();
- if (hosts.length < hostNB+1) {
- Msg.info("I need at least "+ (hostNB+1) +" hosts in the platform file, but " + args[0] + " contains only " + hosts.length + " hosts");
- System.exit(42);
- }
- Msg.info("Start"+ hostNB +" hosts");
- new Master(hosts[0],"Master",hosts).start();
- /* Execute the simulation */
- Msg.run();
-
+ public static final double task_comp_size = 10;
+ public static final double task_comm_size = 10;
+ public static final int hostNB = 2 ;
+ public static void main(String[] args) throws MsgException {
+ Msg.init(args);
+
+ if (args.length < 1) {
+ Msg.info("Usage : Cloud platform_file");
+ Msg.info("Usage : Cloud ../platforms/platform.xml");
+ System.exit(1);
+ }
+
+ /* Construct the platform */
+ Msg.createEnvironment(args[0]);
+ Host[] hosts = Host.all();
+ if (hosts.length < hostNB+1) {
+ Msg.info("I need at least "+ (hostNB+1) +" hosts in the platform file, but " + args[0] + " contains only "
+ + hosts.length + " hosts");
+ System.exit(42);
}
+ Msg.info("Start"+ hostNB +" hosts");
+ new Master(hosts[0],"Master",hosts).start();
+ /* Execute the simulation */
+ Msg.run();
+ }
}
-/* Copyright (c) 2012-2014. The SimGrid Team.
+/* Copyright (c) 2012-2014,2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
import org.simgrid.msg.Task;
public class FinalizeTask extends Task {
- public FinalizeTask(double compSize, double commSize) {
- super("Finalize",compSize,commSize);
- }
+ public FinalizeTask(double compSize, double commSize) {
+ super("Finalize",compSize,commSize);
+ }
}
\ No newline at end of file
import java.util.ArrayList;
-import org.simgrid.msg.Host;
import org.simgrid.msg.Msg;
-import org.simgrid.msg.MsgException;
-import org.simgrid.msg.Process;
-import org.simgrid.msg.Task;
import org.simgrid.msg.VM;
+import org.simgrid.msg.Host;
+import org.simgrid.msg.Task;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.MsgException;
public class Master extends Process {
- private Host[] hosts;
-
- public Master(Host host, String name, Host[] hosts) {
- super(host,name,null);
- this.hosts = hosts;
- }
- public void main(String[] args) throws MsgException {
- int slavesCount = Cloud.hostNB;
-
- ArrayList<VM> vms = new ArrayList<VM>();
-
- // Create one VM per host and bind a process inside each one.
- for (int i = 0; i < slavesCount; i++) {
- Msg.info("create VM0"+i);
- VM vm = new VM(hosts[i+1],"VM0"+i);
- vm.start();
- vms.add(vm);
- Slave slave = new Slave(vm,i);
- Msg.info("Put Worker "+slave.getName()+ " on "+vm.getName());
- slave.start();
-
- }
- Msg.info("Launched " + vms.size() + " VMs");
-
- Msg.info("Send a first batch of work to everyone");
- workBatch(slavesCount);
-
- Msg.info("Suspend all VMs");
- for (int i = 0; i < vms.size(); i++) {
- Msg.info("Suspend "+vms.get(i).getName());
- vms.get(i).suspend();
- }
-
- Msg.info("Wait a while");
- waitFor(2);
-
- Msg.info("Resume all VMs.");
- for (int i = 0; i < vms.size(); i++) {
- vms.get(i).resume();
- }
-
- Msg.info("Sleep long enough for everyone to be done with previous batch of work");
- waitFor(1000 - Msg.getClock());
-
-/* Msg.info("Add one more process per VM.");
- for (int i = 0; i < vms.size(); i++) {
- VM vm = vms.get(i);
- Slave slave = new Slave(vm,i + vms.size());
- slave.start();
- }
-
- workBatch(slavesCount * 2);
+ private Host[] hosts;
+
+ public Master(Host host, String name, Host[] hosts) {
+ super(host,name,null);
+ this.hosts = hosts;
+ }
+
+ public void main(String[] args) throws MsgException {
+ int slavesCount = Cloud.hostNB;
+ ArrayList<VM> vms = new ArrayList<VM>();
+
+ // Create one VM per host and bind a process inside each one.
+ for (int i = 0; i < slavesCount; i++) {
+ Msg.info("create VM0"+i);
+ VM vm = new VM(hosts[i+1],"VM0"+i);
+ vm.start();
+ vms.add(vm);
+ Slave slave = new Slave(vm,i);
+ Msg.info("Put Worker "+slave.getName()+ " on "+vm.getName());
+ slave.start();
+ }
+
+ Msg.info("Launched " + vms.size() + " VMs");
+
+ Msg.info("Send a first batch of work to everyone");
+ workBatch(slavesCount);
+
+ Msg.info("Suspend all VMs");
+ for (int i = 0; i < vms.size(); i++) {
+ Msg.info("Suspend "+vms.get(i).getName());
+ vms.get(i).suspend();
+ }
+
+ Msg.info("Wait a while");
+ waitFor(2);
+
+ Msg.info("Resume all VMs.");
+ for (int i = 0; i < vms.size(); i++) {
+ vms.get(i).resume();
+ }
+
+ Msg.info("Sleep long enough for everyone to be done with previous batch of work");
+ waitFor(1000 - Msg.getClock());
+
+/* Msg.info("Add one more process per VM.");
+ for (int i = 0; i < vms.size(); i++) {
+ VM vm = vms.get(i);
+ Slave slave = new Slave(vm,i + vms.size());
+ slave.start();
+ }
+
+ workBatch(slavesCount * 2);
*/
- Msg.info("Migrate everyone to "+hosts[3].getName());
- for (int i = 0; i < vms.size(); i++) {
- Msg.info("Migrate "+vms.get(i).getName()+"from"+hosts[i+1].getName()+"to "+hosts[3].getName());
- vms.get(i).migrate(hosts[3]);
- }
-
-
- Msg.info("Let's shut down the simulation and kill everyone.");
-
- for (int i = 0; i < vms.size(); i++) {
- vms.get(i).shutdown();
- }
- Msg.info("Master done.");
- }
-
- public void workBatch(int slavesCount) throws MsgException {
- for (int i = 0; i < slavesCount; i++) {
- Task task = new Task("Task0" + i, Cloud.task_comp_size, Cloud.task_comm_size);
- Msg.info("Sending to WRK0" + i);
- task.send("MBOX:WRK0" + i);
- }
- }
+ Msg.info("Migrate everyone to "+hosts[3].getName());
+ for (int i = 0; i < vms.size(); i++) {
+ Msg.info("Migrate "+vms.get(i).getName()+"from"+hosts[i+1].getName()+"to "+hosts[3].getName());
+ vms.get(i).migrate(hosts[3]);
+ }
+
+ Msg.info("Let's shut down the simulation and kill everyone.");
+
+ for (int i = 0; i < vms.size(); i++) {
+ vms.get(i).shutdown();
+ }
+ Msg.info("Master done.");
+ }
+
+ public void workBatch(int slavesCount) throws MsgException {
+ for (int i = 0; i < slavesCount; i++) {
+ Task task = new Task("Task0" + i, Cloud.task_comp_size, Cloud.task_comm_size);
+ Msg.info("Sending to WRK0" + i);
+ task.send("MBOX:WRK0" + i);
+ }
+ }
}
-/* Copyright (c) 2012-2014. The SimGrid Team.
+/* Copyright (c) 2012-2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
package cloud;
-import org.simgrid.msg.Host;
import org.simgrid.msg.Msg;
-import org.simgrid.msg.MsgException;
-import org.simgrid.msg.Process;
+import org.simgrid.msg.Host;
import org.simgrid.msg.Task;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.MsgException;
public class Slave extends Process {
- private int number;
- public Slave(Host host, int number) {
- super(host,"WRK0" + number,null);
- this.number = number;
- }
- public void main(String[] args) throws MsgException {
- Msg.info(this.getName() +" is listenning on MBOX:WRK0"+ number);
- while(true) {
- Task task;
- try {
- task = Task.receive("MBOX:WRK0"+number);
- } catch (MsgException e) {
- Msg.debug("Received failed. I'm done. See you!");
- break;
- }
- if (task instanceof FinalizeTask) {
- Msg.info("Received Finalize. I'm done. See you!");
- break;
- }
- Msg.info("Received \"" + task.getName() + "\". Processing it.");
- try {
- task.execute();
- } catch (MsgException e) {
-
- }
- Msg.info(this.getName() +" executed task (" + task.getName()+")");
- }
+ private int number;
+ public Slave(Host host, int number) {
+ super(host,"WRK0" + number,null);
+ this.number = number;
+ }
-
- }
+ public void main(String[] args) throws MsgException {
+ Msg.info(this.getName() +" is listenning on MBOX:WRK0"+ number);
+ while(true) {
+ Task task;
+ try {
+ task = Task.receive("MBOX:WRK0"+number);
+ } catch (MsgException e) {
+ Msg.debug("Received failed. I'm done. See you!");
+ break;
+ }
+ if (task instanceof FinalizeTask) {
+ Msg.info("Received Finalize. I'm done. See you!");
+ break;
+ }
+ Msg.info("Received \"" + task.getName() + "\". Processing it.");
+ try {
+ task.execute();
+ } catch (MsgException e) {
+ }
+ Msg.info(this.getName() +" executed task (" + task.getName()+")");
+ }
+ }
}
${examples_src}
${sources}
PARENT_SCOPE)
-set(bin_files
- ${bin_files}
- PARENT_SCOPE)
-set(txt_files
- ${txt_files}
- PARENT_SCOPE)
import org.simgrid.msg.*;
import org.simgrid.msg.Process;
-/* This class is a process in charge of running the test. It creates and starts the VMs, and fork processes within the VMs */
+/* This class is a process in charge of running the test. It creates and starts the VMs, and fork processes within VMs */
public class EnergyVMRunner extends Process {
- public class DummyProcess extends Process {
- public DummyProcess (Host host, String name) {
- super(host, name);
- }
-
- public void main(String[] args) {
- Task task = new Task(this.getHost().getName()+"-task", 300E6 , 0);
- try {
- task.execute();
- } catch (Exception e) {
- e.printStackTrace();
- }
- Msg.info("This worker is done.");
- }
- }
-
- EnergyVMRunner(Host host, String name, String[] args) throws HostNotFoundException, NativeException {
- super(host, name, args);
- }
-
- public void main(String[] strings) throws MsgException, HostNotFoundException {
- double startTime = 0;
- double endTime = 0;
-
- /* get hosts */
- Host host1 = Host.getByName("MyHost1");
- Host host2 = Host.getByName("MyHost2");
- Host host3 = Host.getByName("MyHost3");
-
- Msg.info("Creating and starting two VMs");
- VM vmHost1 = new VM(host1, "vmHost1", 4, 2048, 100, null, 1024 * 20, 10,50);
- vmHost1.start();
-
- VM vmHost3 = new VM(host3, "vmHost3", 4, 2048, 100, null, 1024 * 20, 10,50);
- vmHost3.start();
-
- Msg.info("Create two tasks on Host1: one inside a VM, the other directly on the host");
- new DummyProcess (vmHost1, "p11");
- new DummyProcess (host1, "p12");
-
- Msg.info("Create two tasks on Host2: both directly on the host");
- new DummyProcess (host2, "p21");
- new DummyProcess (host2, "p22");
-
- Msg.info("Create two tasks on Host3: both inside a VM");
- new DummyProcess (vmHost3, "p31");
- new DummyProcess (vmHost3, "p312");
-
- Msg.info("Wait 5 seconds. The tasks are still running (they run for 3 seconds, but 2 tasks are co-located, so they run for 6 seconds)");
- waitFor(5);
- Msg.info("Wait another 5 seconds. The tasks stop at some point in between");
- waitFor(5);
-
- vmHost1.shutdown();
- vmHost3.shutdown();
- }
+ public class DummyProcess extends Process {
+ public DummyProcess (Host host, String name) {
+ super(host, name);
+ }
+
+ public void main(String[] args) {
+ Task task = new Task(this.getHost().getName()+"-task", 300E6 , 0);
+ try {
+ task.execute();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Msg.info("This worker is done.");
+ }
+ }
+
+ EnergyVMRunner(Host host, String name, String[] args) throws HostNotFoundException, NativeException {
+ super(host, name, args);
+ }
+
+ public void main(String[] strings) throws MsgException, HostNotFoundException {
+ double startTime = 0;
+ double endTime = 0;
+
+ /* get hosts */
+ Host host1 = Host.getByName("MyHost1");
+ Host host2 = Host.getByName("MyHost2");
+ Host host3 = Host.getByName("MyHost3");
+
+ Msg.info("Creating and starting two VMs");
+ VM vmHost1 = new VM(host1, "vmHost1", 4, 2048, 100, null, 1024 * 20, 10,50);
+ vmHost1.start();
+
+ VM vmHost3 = new VM(host3, "vmHost3", 4, 2048, 100, null, 1024 * 20, 10,50);
+ vmHost3.start();
+
+ Msg.info("Create two tasks on Host1: one inside a VM, the other directly on the host");
+ new DummyProcess (vmHost1, "p11");
+ new DummyProcess (host1, "p12");
+
+ Msg.info("Create two tasks on Host2: both directly on the host");
+ new DummyProcess (host2, "p21");
+ new DummyProcess (host2, "p22");
+
+ Msg.info("Create two tasks on Host3: both inside a VM");
+ new DummyProcess (vmHost3, "p31");
+ new DummyProcess (vmHost3, "p312");
+
+ Msg.info("Wait 5 seconds. The tasks are still running (they run for 3 seconds, but 2 tasks are co-located, "
+ + "so they run for 6 seconds)");
+ waitFor(5);
+ Msg.info("Wait another 5 seconds. The tasks stop at some point in between");
+ waitFor(5);
+
+ vmHost1.shutdown();
+ vmHost3.shutdown();
+ }
}
package cloud.energy;
+import org.simgrid.msg.Msg;
import org.simgrid.msg.Host;
import org.simgrid.msg.HostNotFoundException;
-import org.simgrid.msg.Msg;
import org.simgrid.msg.NativeException;
public class Main {
- public static void main(String[] args) throws NativeException, HostNotFoundException {
- /* Init. internal values */
- Msg.energyInit();
- Msg.init(args);
+ public static void main(String[] args) throws NativeException, HostNotFoundException {
+ Msg.energyInit();
+ Msg.init(args);
- if (args.length < 1) {
- Msg.info("Usage: Main platform_file.xml");
- System.exit(1);
- }
+ if (args.length < 1) {
+ Msg.info("Usage: Main ../platforms/energy_platform_file.xml");
+ System.exit(1);
+ }
- /* construct the platform */
- Msg.createEnvironment(args[0]);
-
- /* Create and start a runner for the experiment */
- new EnergyVMRunner(Host.all()[0],"energy VM runner",null).start();
+ /* construct the platform */
+ Msg.createEnvironment(args[0]);
+
+ /* Create and start a runner for the experiment */
+ new EnergyVMRunner(Host.all()[0],"energy VM runner",null).start();
- Msg.run();
- }
+ Msg.run();
+ }
}
${examples_src}
${sources}
PARENT_SCOPE)
-set(bin_files
- ${bin_files}
- PARENT_SCOPE)
set(txt_files
${CMAKE_CURRENT_SOURCE_DIR}/README
${txt_files}
package cloud.migration;
+import org.simgrid.msg.Msg;
import org.simgrid.msg.Host;
import org.simgrid.msg.HostNotFoundException;
-import org.simgrid.msg.Msg;
import org.simgrid.msg.NativeException;
public class Main {
- private static boolean endOfTest = false;
-
- public static void setEndOfTest(){
- endOfTest=true;
- }
+ private static boolean endOfTest = false;
- public static boolean isEndOfTest(){
- return endOfTest;
- }
-
- public static void main(String[] args) throws NativeException {
- /* Init. internal values */
- Msg.init(args);
+ public static void setEndOfTest(){
+ endOfTest=true;
+ }
- if (args.length < 2) {
- Msg.info("Usage : Main platform_file.xml dployment_file.xml");
- System.exit(1);
- }
+ public static boolean isEndOfTest(){
+ return endOfTest;
+ }
- /* construct the platform and deploy the application */
- Msg.createEnvironment(args[0]);
- Msg.deployApplication(args[1]);
+ public static void main(String[] args) throws NativeException {
+ Msg.init(args);
- Msg.run();
+ if (args.length < 2) {
+ Msg.info("Usage : Main platform_file.xml dployment_file.xml");
+ System.exit(1);
+ }
+ /* construct the platform and deploy the application */
+ Msg.createEnvironment(args[0]);
+ Msg.deployApplication(args[1]);
- }
+ Msg.run();
+ }
}
-/* Copyright (c) 2014. The SimGrid Team.
+/* Copyright (c) 2014, 2016. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
package cloud.migration;
+import java.util.ArrayList;
+import java.util.List;
+
import org.simgrid.msg.*;
import org.simgrid.msg.Process;
-import java.util.ArrayList;
-import java.util.List;
public class Test extends Process{
- Test(Host host, String name, String[] args) throws HostNotFoundException, NativeException {
- super(host, name, args);
+ Test(Host host, String name, String[] args) throws HostNotFoundException, NativeException {
+ super(host, name, args);
+ }
+
+ public void main(String[] strings) throws MsgException {
+ double startTime = 0;
+ double endTime = 0;
+
+ /* get hosts 1 and 2*/
+ Host host0 = null;
+ Host host1 = null;
+
+ try {
+ host0 = Host.getByName("host0");
+ host1 = Host.getByName("host1");
+ }catch (HostNotFoundException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
- public void main(String[] strings) throws MsgException {
-
- double startTime = 0;
- double endTime = 0;
-
- /* get hosts 1 and 2*/
- Host host0 = null;
- Host host1 = null;
-
- try {
- host0 = Host.getByName("host0");
- host1 = Host.getByName("host1");
- }catch (HostNotFoundException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
-
- List<VM> vms = new ArrayList<VM>();
-
- /* Create VM1 */
- int dpRate = 70;
- int load1 = 90;
- int load2 = 80;
-
-
- Msg.info("This example evaluates the migration time of a VM in presence of collocated VMs on the source and the dest nodes");
- Msg.info("The migrated VM has a memory intensity rate of 70% of the network BW and a cpu load of 90% \"(see cloudcom 2013 paper \"Adding a Live Migration Model Into SimGrid\" for further information) ");
-
- Msg.info("Load of collocated VMs fluctuate between 0 and 90% in order to create a starvation issue and see whether it impacts or not the migration time");
- XVM vm1 = null;
- vm1 = new XVM(
- host0,
- "vm0",
- 1, // Nb of vcpu
- 2048, // Ramsize,
- 125, // Net Bandwidth
- null, //VM disk image
- -1, //size of disk image,
- 125, // Net bandwidth,
- dpRate // Memory intensity
+ List<VM> vms = new ArrayList<VM>();
+
+ /* Create VM1 */
+ int dpRate = 70;
+ int load1 = 90;
+ int load2 = 80;
+
+ Msg.info("This example evaluates the migration time of a VM in presence of collocated VMs on the source and "
+ + "the dest nodes");
+ Msg.info("The migrated VM has a memory intensity rate of 70% of the network BW and a cpu load of 90% \" "
+ +"(see cloudcom 2013 paper \"Adding a Live Migration Model Into SimGrid\" for further information) ");
+
+ Msg.info("Load of collocated VMs fluctuate between 0 and 90% in order to create a starvation issue and see "
+ + "whether it impacts or not the migration time");
+ XVM vm1 = null;
+ vm1 = new XVM(host0, "vm0",
+ 1, // Nb of vcpu
+ 2048, // Ramsize,
+ 125, // Net Bandwidth
+ null, //VM disk image
+ -1, //size of disk image,
+ 125, // Net bandwidth,
+ dpRate // Memory intensity
);
- vms.add(vm1);
- vm1.start();
-
- /* Collocated VMs */
- int collocatedSrc = 6;
- int vmSrcLoad[] = {
- 80,
- 0,
- 90,
- 40,
- 30,
- 90,
- };
-
- XVM tmp = null;
- for (int i=1 ; i<= collocatedSrc ; i++){
- tmp = new XVM(
- host0,
- "vm"+i,
- 1, // Nb of vcpu
- 2048, // Ramsize,
- 125, // Net Bandwidth
- null, //VM disk image
- -1, //size of disk image,
- 125, // Net bandwidth,
- dpRate // Memory intensity
- );
- vms.add(tmp);
- tmp.start();
- tmp.setLoad(vmSrcLoad[i-1]);
- }
-
- int collocatedDst = 6;
- int vmDstLoad[] = {
- 0,
- 40,
- 90,
- 100,
- 0,
- 80,
- };
-
- for (int i=1 ; i <= collocatedDst ; i++){
- tmp = new XVM(
- host1,
- "vm"+(i+collocatedSrc),
- 1, // Nb of vcpu
- 2048, // Ramsize,
- 125, // Net Bandwidth
- null, //VM disk image
- -1, //size of disk image,
- 125, // Net bandwidth,
- dpRate // Memory intensity
- );
- vms.add(tmp);
- tmp.start();
- tmp.setLoad(vmDstLoad[i-1]);
- }
-
- Msg.info("Round trip of VM1 (load "+load1+"%)");
- vm1.setLoad(load1);
- Msg.info(" - Launch migration from host 0 to host 1");
- startTime = Msg.getClock();
- vm1.migrate(host1);
- endTime = Msg.getClock();
- Msg.info(" - End of Migration from host 0 to host 1 (duration:"+(endTime-startTime)+")");
- Msg.info(" - Launch migration from host 1 to host 0");
- startTime = Msg.getClock();
- vm1.migrate(host0);
- endTime = Msg.getClock();
- Msg.info(" - End of Migration from host 1 to host 0 (duration:"+(endTime-startTime)+")");
-
-
- Msg.info("\n \n \nRound trip of VM1 (load "+load2+"%)");
- vm1.setLoad(load2);
- Msg.info(" - Launch migration from host 0 to host 1");
- startTime = Msg.getClock();
- vm1.migrate(host1);
- endTime = Msg.getClock();
- Msg.info(" - End of Migration from host 0 to host 1 (duration:"+(endTime-startTime)+")");
- Msg.info(" - Launch migration from host 1 to host 0");
- startTime = Msg.getClock();
- vm1.migrate(host0);
- endTime = Msg.getClock();
- Msg.info(" - End of Migration from host 1 to host 0 (duration:"+(endTime-startTime)+")");
-
- Main.setEndOfTest();
- Msg.info("Forcefully destroy VMs");
- for (VM vm: vms)
- vm.finalize();
-
+ vms.add(vm1);
+ vm1.start();
+
+ /* Collocated VMs */
+ int collocatedSrc = 6;
+ int vmSrcLoad[] = {
+ 80,
+ 0,
+ 90,
+ 40,
+ 30,
+ 90,
+ };
+
+ XVM tmp = null;
+ for (int i=1 ; i<= collocatedSrc ; i++){
+ tmp = new XVM(host0, "vm"+i,
+ 1, // Nb of vcpu
+ 2048, // Ramsize,
+ 125, // Net Bandwidth
+ null, //VM disk image
+ -1, //size of disk image,
+ 125, // Net bandwidth,
+ dpRate // Memory intensity
+ );
+ vms.add(tmp);
+ tmp.start();
+ tmp.setLoad(vmSrcLoad[i-1]);
}
+
+ int collocatedDst = 6;
+ int vmDstLoad[] = {
+ 0,
+ 40,
+ 90,
+ 100,
+ 0,
+ 80,
+ };
+
+ for (int i=1 ; i <= collocatedDst ; i++){
+ tmp = new XVM(host1, "vm"+(i+collocatedSrc),
+ 1, // Nb of vcpu
+ 2048, // Ramsize,
+ 125, // Net Bandwidth
+ null, //VM disk image
+ -1, //size of disk image,
+ 125, // Net bandwidth,
+ dpRate // Memory intensity
+ );
+ vms.add(tmp);
+ tmp.start();
+ tmp.setLoad(vmDstLoad[i-1]);
+ }
+
+ Msg.info("Round trip of VM1 (load "+load1+"%)");
+ vm1.setLoad(load1);
+ Msg.info(" - Launch migration from host 0 to host 1");
+ startTime = Msg.getClock();
+ vm1.migrate(host1);
+ endTime = Msg.getClock();
+ Msg.info(" - End of Migration from host 0 to host 1 (duration:"+(endTime-startTime)+")");
+ Msg.info(" - Launch migration from host 1 to host 0");
+ startTime = Msg.getClock();
+ vm1.migrate(host0);
+ endTime = Msg.getClock();
+ Msg.info(" - End of Migration from host 1 to host 0 (duration:"+(endTime-startTime)+")");
+
+ Msg.info("");
+ Msg.info("");
+ Msg.info("Round trip of VM1 (load "+load2+"%)");
+ vm1.setLoad(load2);
+ Msg.info(" - Launch migration from host 0 to host 1");
+ startTime = Msg.getClock();
+ vm1.migrate(host1);
+ endTime = Msg.getClock();
+ Msg.info(" - End of Migration from host 0 to host 1 (duration:"+(endTime-startTime)+")");
+ Msg.info(" - Launch migration from host 1 to host 0");
+ startTime = Msg.getClock();
+ vm1.migrate(host0);
+ endTime = Msg.getClock();
+ Msg.info(" - End of Migration from host 1 to host 0 (duration:"+(endTime-startTime)+")");
+
+ Main.setEndOfTest();
+ Msg.info("Forcefully destroy VMs");
+ for (VM vm: vms)
+ vm.finalize();
+ }
}
package cloud.migration;
-import org.simgrid.msg.*;
-import org.simgrid.msg.Process;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-// This test aims at validating that the migration process is robust in face of host turning off either on the SRC node or on the DST node.
+import org.simgrid.msg.*;
+import org.simgrid.msg.Process;
+// This test aims at validating that the migration process is robust in face of host turning off either on the SRC
+// node or on the DST node.
public class TestHostOnOff extends Process{
- public static Host host0 = null;
- public static Host host1 = null;
- public static Host host2 = null;
-
+ public static Host host0 = null;
+ public static Host host1 = null;
+ public static Host host2 = null;
- TestHostOnOff(Host host, String name, String[] args) throws HostNotFoundException, NativeException {
- super(host, name, args);
- }
- public void main(String[] strings) throws MsgException {
+ TestHostOnOff(Host host, String name, String[] args) throws HostNotFoundException, NativeException {
+ super(host, name, args);
+ }
- double startTime = 0;
- double endTime = 0;
+ public void main(String[] strings) throws MsgException {
+ double startTime = 0;
+ double endTime = 0;
- /* get hosts 1 and 2*/
- try {
- host0 = Host.getByName("host0");
- host1 = Host.getByName("host1");
- host1 = Host.getByName("host2");
- }catch (HostNotFoundException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
-
- // Robustness on the SRC node
- //for (int i =0 ; i < 55000 ; i++)
- // test_vm_migrate(host1, i);
-
- // Robustness on the DST node
- //for (int i =0 ; i < 55000 ; i++)
- // test_vm_migrate(host2, i);
-
- /* End of Tests */
- Msg.info("Nor more tests, Bye Bye !");
- Main.setEndOfTest();
+ /* get hosts 1 and 2*/
+ try {
+ host0 = Host.getByName("host0");
+ host1 = Host.getByName("host1");
+ host1 = Host.getByName("host2");
+ }catch (HostNotFoundException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
- public static void test_vm_migrate (Host hostToKill, long killAt) throws MsgException {
- Msg.info("**** **** **** ***** ***** Test Migrate with host shutdown ***** ***** **** **** ****");
- Msg.info("Turn on one host, assign a VM on this host, launch a process inside the VM, migrate the VM and turn off either the SRC or DST");
-
- host1.off();
- host2.off();
- host1.on();
- host2.on();
-
- // Create VM0
- int dpRate = 70;
- XVM vm0 = null;
- vm0 = new XVM(
- host1,
- "vm0",
- 1, // Nb of vcpu
- 2048, // Ramsize,
- 125, // Net Bandwidth
- null, //VM disk image
- -1, //size of disk image,
- 125, // Net bandwidth,
- dpRate // Memory intensity
+ // Robustness on the SRC node
+ //for (int i =0 ; i < 55000 ; i++)
+ // test_vm_migrate(host1, i);
+
+ // Robustness on the DST node
+ //for (int i =0 ; i < 55000 ; i++)
+ // test_vm_migrate(host2, i);
+
+ /* End of Tests */
+ Msg.info("Nor more tests, Bye Bye !");
+ Main.setEndOfTest();
+ }
+
+ public static void test_vm_migrate (Host hostToKill, long killAt) throws MsgException {
+ Msg.info("**** **** **** ***** ***** Test Migrate with host shutdown ***** ***** **** **** ****");
+ Msg.info("Turn on one host, assign a VM on this host, launch a process inside the VM, migrate the VM and "
+ + "turn off either the SRC or DST");
+
+ host1.off();
+ host2.off();
+ host1.on();
+ host2.on();
+
+ // Create VM0
+ int dpRate = 70;
+ XVM vm0 = null;
+ vm0 = new XVM(host1, "vm0",
+ 1, // Nb of vcpu
+ 2048, // Ramsize,
+ 125, // Net Bandwidth
+ null, //VM disk image
+ -1, //size of disk image,
+ 125, // Net bandwidth,
+ dpRate // Memory intensity
);
- vm0.start();
- vm0.setLoad(90);
-
- String[] args = new String[3];
-
- args[0] = "vm0";
- args[1] = "host1";
- args[2] = "host2";
- new Process(host1, "Migrate-" + new Random().nextDouble(), args) {
- public void main(String[] args) {
- Host destHost = null;
- Host sourceHost = null;
-
- try {
- sourceHost = Host.getByName(args[1]);
- destHost = Host.getByName(args[2]);
- } catch (Exception e) {
- e.printStackTrace();
- System.err.println("You are trying to migrate from/to a non existing node");
- }
- if (destHost != null) {
- if (sourceHost.isOn() && destHost.isOn()) {
-
- try {
- Msg.info("Migrate vm "+args[0]+" to node "+destHost.getName());
- VM.getVMByName(args[0]).migrate(destHost);
- } catch (HostFailureException e) {
- e.printStackTrace();
- Msg.info("Something occurs during the migration that cannot validate the operation");
- }
- }
- }
-
+ vm0.start();
+ vm0.setLoad(90);
+
+ String[] args = new String[3];
+
+ args[0] = "vm0";
+ args[1] = "host1";
+ args[2] = "host2";
+ new Process(host1, "Migrate-" + new Random().nextDouble(), args) {
+ public void main(String[] args) {
+ Host destHost = null;
+ Host sourceHost = null;
+
+ try {
+ sourceHost = Host.getByName(args[1]);
+ destHost = Host.getByName(args[2]);
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.err.println("You are trying to migrate from/to a non existing node");
+ }
+ if (destHost != null) {
+ if (sourceHost.isOn() && destHost.isOn()) {
+ try {
+ Msg.info("Migrate vm "+args[0]+" to node "+destHost.getName());
+ VM.getVMByName(args[0]).migrate(destHost);
+ } catch (HostFailureException e) {
+ e.printStackTrace();
+ Msg.info("Something occurs during the migration that cannot validate the operation");
}
- }.start();
-
- // Wait killAt ms before killing thehost
- Process.sleep(killAt);
- hostToKill.off();
- Process.sleep(5);
- Msg.info("The migration process should be stopped and we should catch an exception\n");
- Process.sleep(5);
-
- Process.sleep(50000);
- Msg.info("Destroy VMs");
- vm0.shutdown();
- Process.sleep(20000);
- }
-
-
- public static void test_vm_shutdown_destroy () throws HostFailureException {
-
- Msg.info("**** **** **** ***** ***** Test shutdown a VM ***** ***** **** **** ****");
- Msg.info("Turn on host1, assign a VM on host1, launch a process inside the VM, and turn off the vm, " +
- "and check whether you can reallocate the same VM");
-
-
- // Create VM0
- int dpRate = 70;
- XVM vm0 = null;
- vm0 = new XVM(
- host1,
- "vm0",
- 1, // Nb of vcpu
- 2048, // Ramsize,
- 125, // Net Bandwidth
- null, //VM disk image
- -1, //size of disk image,
- 125, // Net bandwidth,
- dpRate // Memory intensity
+ }
+ }
+ }
+ }.start();
+
+ // Wait killAt ms before killing thehost
+ Process.sleep(killAt);
+ hostToKill.off();
+ Process.sleep(5);
+ Msg.info("The migration process should be stopped and we should catch an exception\n");
+ Process.sleep(5);
+
+ Process.sleep(50000);
+ Msg.info("Destroy VMs");
+ vm0.shutdown();
+ Process.sleep(20000);
+ }
+
+ public static void test_vm_shutdown_destroy () throws HostFailureException {
+ Msg.info("**** **** **** ***** ***** Test shutdown a VM ***** ***** **** **** ****");
+ Msg.info("Turn on host1, assign a VM on host1, launch a process inside the VM, and turn off the vm, " +
+ "and check whether you can reallocate the same VM");
+
+ // Create VM0
+ int dpRate = 70;
+ XVM vm0 = null;
+ vm0 = new XVM(host1, "vm0",
+ 1, // Nb of vcpu
+ 2048, // Ramsize,
+ 125, // Net Bandwidth
+ null, //VM disk image
+ -1, //size of disk image,
+ 125, // Net bandwidth,
+ dpRate // Memory intensity
);
- Msg.info("Start VM0");
- vm0.start();
- vm0.setLoad(90);
-
- Process.sleep(5000);
-
- Msg.info("Shutdown VM0");
- vm0.shutdown();
- Process.sleep(5000);
-
- Msg.info("Restart VM0");
- vm0 = new XVM(
- host1,
- "vm0",
- 1, // Nb of vcpu
- 2048, // Ramsize,
- 125, // Net Bandwidth
- null, //VM disk image
- -1, //size of disk image,
- 125, // Net bandwidth,
- dpRate // Memory intensity
+ Msg.info("Start VM0");
+ vm0.start();
+ vm0.setLoad(90);
+
+ Process.sleep(5000);
+
+ Msg.info("Shutdown VM0");
+ vm0.shutdown();
+ Process.sleep(5000);
+
+ Msg.info("Restart VM0");
+ vm0 = new XVM(host1, "vm0",
+ 1, // Nb of vcpu
+ 2048, // Ramsize,
+ 125, // Net Bandwidth
+ null, //VM disk image
+ -1, //size of disk image,
+ 125, // Net bandwidth,
+ dpRate // Memory intensity
);
- vm0.start();
- vm0.setLoad(90);
-
- Msg.info("You suceed to recreate and restart a VM without generating any exception ! Great the Test is ok");
-
- Process.sleep(5000);
- vm0.shutdown();
- }
-
-}
-
-
+ vm0.start();
+ vm0.setLoad(90);
+ Msg.info("You suceed to recreate and restart a VM without generating any exception ! Great the Test is ok");
+ Process.sleep(5000);
+ vm0.shutdown();
+ }
+}
package cloud.migration;
+import org.simgrid.msg.Msg;
+import org.simgrid.msg.VM;
import org.simgrid.msg.Host;
import org.simgrid.msg.HostNotFoundException;
import org.simgrid.msg.HostFailureException;
-import org.simgrid.msg.Msg;
-import org.simgrid.msg.VM;
-/**
- * A stupid VM extension to associate a daemon to the VM
- */
public class XVM extends VM {
+ private int dpIntensity;
+ private int netBW;
+ private int ramsize;
+ private int currentLoad;
+ private Daemon daemon;
- private int dpIntensity;
- private int netBW;
- private int ramsize;
- private int currentLoad;
-
- private Daemon daemon;
-
- public XVM(Host host, String name,
- int nbCores, int ramsize, int netBW, String diskPath, int diskSize, int migNetBW, int dpIntensity){
- super(host, name, nbCores, ramsize, netBW, diskPath, diskSize, (int)(migNetBW*0.9), dpIntensity);
- this.currentLoad = 0;
- this.netBW = netBW ;
- this. dpIntensity = dpIntensity ;
- this.ramsize= ramsize;
- this.daemon = new Daemon(this, 100);
+ public XVM(Host host, String name,
+ int nbCores, int ramsize, int netBW, String diskPath, int diskSize, int migNetBW, int dpIntensity){
+ super(host, name, nbCores, ramsize, netBW, diskPath, diskSize, (int)(migNetBW*0.9), dpIntensity);
+ this.currentLoad = 0;
+ this.netBW = netBW ;
+ this. dpIntensity = dpIntensity ;
+ this.ramsize= ramsize;
+ this.daemon = new Daemon(this, 100);
+ }
+ public void setLoad(int load){
+ if (load >0) {
+ this.setBound(load);
+ // this.getDaemon().setLoad(load);
+ daemon.resume();
+ } else{
+ daemon.suspend();
}
+ currentLoad = load ;
+ }
- public void setLoad(int load){
- if (load >0) {
- this.setBound(load);
- // this.getDaemon().setLoad(load);
- daemon.resume();
- }
- else{
- daemon.suspend();
- }
- currentLoad = load ;
+ public void start(){
+ super.start();
+ try {
+ daemon.start();
+ } catch (HostNotFoundException e) {
+ e.printStackTrace();
}
+ this.setLoad(0);
+ }
- public void start(){
- super.start();
- try {
- daemon.start();
- } catch (HostNotFoundException e) {
- e.printStackTrace();
- }
- this.setLoad(0);
+ public Daemon getDaemon(){
+ return this.daemon;
+ }
- }
- public Daemon getDaemon(){
- return this.daemon;
- }
- public int getLoad(){
- System.out.println("Remaining comp:" + this.daemon.getRemaining());
- return this.currentLoad;
- }
+ public int getLoad(){
+ System.out.println("Remaining comp:" + this.daemon.getRemaining());
+ return this.currentLoad;
+ }
- public void migrate(Host host) throws HostFailureException {
- Msg.info("Start migration of VM " + this.getName() + " to " + host.getName());
- Msg.info(" currentLoad:" + this.currentLoad + "/ramSize:" + this.ramsize + "/dpIntensity:" + this.dpIntensity
- + "/remaining:" + String.format(java.util.Locale.US, "%.2E",this.daemon.getRemaining()));
- try{
- super.migrate(host);
- } catch (Exception e){
- Msg.info("Something wrong during the live migration of VM "+this.getName());
- throw new HostFailureException();
- }
- this.setLoad(this.currentLoad); //Fixed the fact that setBound is not propagated to the new node.
- Msg.info("End of migration of VM " + this.getName() + " to node " + host.getName());
+ public void migrate(Host host) throws HostFailureException {
+ Msg.info("Start migration of VM " + this.getName() + " to " + host.getName());
+ Msg.info(" currentLoad:" + this.currentLoad + "/ramSize:" + this.ramsize + "/dpIntensity:" + this.dpIntensity
+ + "/remaining:" + String.format(java.util.Locale.US, "%.2E",this.daemon.getRemaining()));
+ try{
+ super.migrate(host);
+ } catch (Exception e){
+ Msg.info("Something wrong during the live migration of VM "+this.getName());
+ throw new HostFailureException();
}
+ this.setLoad(this.currentLoad); //Fixed the fact that setBound is not propagated to the new node.
+ Msg.info("End of migration of VM " + this.getName() + " to node " + host.getName());
+ }
}
<?xml version='1.0'?>
<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid/simgrid.dtd">
<platform version="4">
- <process host="host0" function="cloud/migration/Test">
- </process>
+ <process host="host0" function="cloud/migration/Test"/>
</platform>
$ java -classpath ${classpath:=.} cloud/migration/Main ${srcdir:=.}/../platforms/two_hosts_platform.xml ${srcdir:=.}/cloud/migration/deploy_simple.xml
> [0.000000] [jmsg/INFO] Using regular java threads.
> [host0:cloud/migration/Test:(1) 0.000000] [jmsg/INFO] This example evaluates the migration time of a VM in presence of collocated VMs on the source and the dest nodes
-> [host0:cloud/migration/Test:(1) 0.000000] [jmsg/INFO] The migrated VM has a memory intensity rate of 70% of the network BW and a cpu load of 90% "(see cloudcom 2013 paper "Adding a Live Migration Model Into SimGrid" for further information)
+> [host0:cloud/migration/Test:(1) 0.000000] [jmsg/INFO] The migrated VM has a memory intensity rate of 70% of the network BW and a cpu load of 90% " (see cloudcom 2013 paper "Adding a Live Migration Model Into SimGrid" for further information)
> [host0:cloud/migration/Test:(1) 0.000000] [jmsg/INFO] Load of collocated VMs fluctuate between 0 and 90% in order to create a starvation issue and see whether it impacts or not the migration time
> [0.000000] [surf_vm/INFO] Create VM(vm0)@PM(host0) with 0 mounted disks
> [0.000000] [surf_vm/INFO] Create VM(vm1)@PM(host0) with 0 mounted disks
> [host0:cloud/migration/Test:(1) 67.551019] [jmsg/INFO] End of migration of VM vm0 to node host0
> [host0:cloud/migration/Test:(1) 67.551019] [jmsg/INFO] - End of Migration from host 1 to host 0 (duration:32.46684874546391)
> [host0:cloud/migration/Test:(1) 67.551019] [jmsg/INFO]
->
->
-> Round trip of VM1 (load 80%)
+> [host0:cloud/migration/Test:(1) 67.551019] [jmsg/INFO]
+> [host0:cloud/migration/Test:(1) 67.551019] [jmsg/INFO] Round trip of VM1 (load 80%)
> [host0:cloud/migration/Test:(1) 67.551019] [jmsg/INFO] - Launch migration from host 0 to host 1
> [host0:cloud/migration/Test:(1) 67.551019] [jmsg/INFO] Start migration of VM vm0 to host1
> [host0:cloud/migration/Test:(1) 67.551019] [jmsg/INFO] currentLoad:80/ramSize:2048/dpIntensity:70/remaining:4.64E+11
${examples_src}
${sources}
PARENT_SCOPE)
-set(bin_files
- ${bin_files}
- PARENT_SCOPE)
-set(txt_files
- ${txt_files}
- ${CMAKE_CURRENT_SOURCE_DIR}/README
- PARENT_SCOPE)
import org.simgrid.msg.NativeException;
public class CommTimeTest {
-
- /* This only contains the launcher. If you do nothing more than than you can run
- * java simgrid.msg.Msg
- * which also contains such a launcher
- */
-
- public static void main(String[] args) throws NativeException {
-
- /* initialize the MSG simulation. Must be done before anything else (even logging). */
- Msg.init(args);
+ public static void main(String[] args) throws NativeException {
- if(args.length < 2) {
- Msg.info("Usage : CommTime platform_file deployment_file");
- Msg.info("example : CommTime comm_time_platform.xml comm_time_deployment.xml");
- System.exit(1);
- }
-
- /* construct the platform and deploy the application */
- Msg.createEnvironment(args[0]);
- Msg.deployApplication(args[1]);
-
- /* execute the simulation. */
- Msg.run();
+ Msg.init(args);
+
+ if(args.length < 2) {
+ Msg.info("Usage : CommTime platform_file deployment_file");
+ Msg.info("example : CommTime ../platforms/platform.xml commTimeDeployment.xml");
+ System.exit(1);
}
+
+ /* construct the platform and deploy the application */
+ Msg.createEnvironment(args[0]);
+ Msg.deployApplication(args[1]);
+
+ /* execute the simulation. */
+ Msg.run();
+ }
}
package commTime;
import org.simgrid.msg.*;
-public class FinalizeTask extends Task {
- public FinalizeTask() {
- super("",0,0);
- }
+public class FinalizeTask extends Task {
+ public FinalizeTask() {
+ super("",0,0);
+ }
}
-
\ No newline at end of file
-/* Master of a basic master/slave example in Java */
-
/* Copyright (c) 2006-2014. The SimGrid Team.
* All rights reserved. */
import org.simgrid.msg.Task;
public class Master extends Process {
- public Master(Host host, String name, String[]args) {
- super(host,name,args);
- }
- public void main(String[] args) throws MsgException {
- if (args.length < 4) {
- Msg.info("Master needs 4 arguments");
- System.exit(1);
- }
-
- int tasksCount = Integer.valueOf(args[0]).intValue();
- double taskComputeSize = Double.valueOf(args[1]).doubleValue();
- double taskCommunicateSize = Double.valueOf(args[2]).doubleValue();
-
- int slavesCount = Integer.valueOf(args[3]).intValue();
-
- Msg.info("Hello! Got "+ slavesCount + " slaves and "+tasksCount+" tasks to process");
-
- for (int i = 0; i < tasksCount; i++) {
- Task task = new Task("Task_" + i, taskComputeSize, taskCommunicateSize);
- if (i%1000==0)
- Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\"");
- task.send("slave_"+(i%slavesCount));
- }
-
- Msg.info("All tasks have been dispatched. Let's tell everybody the computation is over.");
-
- for (int i = 0; i < slavesCount; i++) {
- FinalizeTask task = new FinalizeTask();
- task.send("slave_"+(i%slavesCount));
- }
-
- Msg.info("Goodbye now!");
+ public Master(Host host, String name, String[]args) {
+ super(host,name,args);
+ }
+
+ public void main(String[] args) throws MsgException {
+ if (args.length < 4) {
+ Msg.info("Master needs 4 arguments");
+ System.exit(1);
+ }
+
+ int tasksCount = Integer.valueOf(args[0]).intValue();
+ double taskComputeSize = Double.valueOf(args[1]).doubleValue();
+ double taskCommunicateSize = Double.valueOf(args[2]).doubleValue();
+
+ int slavesCount = Integer.valueOf(args[3]).intValue();
+
+ Msg.info("Hello! Got "+ slavesCount + " slaves and "+tasksCount+" tasks to process");
+
+ for (int i = 0; i < tasksCount; i++) {
+ Task task = new Task("Task_" + i, taskComputeSize, taskCommunicateSize);
+ if (i%1000==0)
+ Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\"");
+ task.send("slave_"+(i%slavesCount));
+ }
+
+ Msg.info("All tasks have been dispatched. Let's tell everybody the computation is over.");
+
+ for (int i = 0; i < slavesCount; i++) {
+ FinalizeTask task = new FinalizeTask();
+ task.send("slave_"+(i%slavesCount));
}
+ Msg.info("Goodbye now!");
+ }
}
+++ /dev/null
-This directory is almost exactly the same example than the
-master/slave, the only differences are:
- * there is no forwarder here
- * the outputs are a bit less verbose
- * the example give a lot more work to do (this is used for benchmarking)
\ No newline at end of file
* under the terms of the license (GNU LGPL) which comes with this package. */
package commTime;
-
import org.simgrid.msg.*;
public class Slave extends org.simgrid.msg.Process {
- public Slave(Host host, String name, String[]args) {
- super(host,name,args);
- }
- public void main(String[] args) throws MsgException {
- if (args.length < 1) {
- Msg.info("Slave needs 1 argument (its number)");
- System.exit(1);
- }
+ public Slave(Host host, String name, String[]args) {
+ super(host,name,args);
+ }
+ public void main(String[] args) throws MsgException {
+ if (args.length < 1) {
+ Msg.info("Slave needs 1 argument (its number)");
+ System.exit(1);
+ }
- int num = Integer.valueOf(args[0]).intValue();
- Msg.info("Receiving on 'slave_"+num+"'");
-
- while(true) {
- Task task = Task.receive("slave_"+num);
-
- if (task instanceof FinalizeTask) {
- break;
- }
- task.execute();
- }
-
- Msg.info("Received Finalize. I'm done. See you!");
+ int num = Integer.valueOf(args[0]).intValue();
+ Msg.info("Receiving on 'slave_"+num+"'");
+
+ while(true) {
+ Task task = Task.receive("slave_"+num);
+ if (task instanceof FinalizeTask) {
+ break;
+ }
+ task.execute();
}
+ Msg.info("Received Finalize. I'm done. See you!");
+ }
}
\ No newline at end of file
<argument value="10"/> <!-- Communication size of each one -->
<argument value="21"/> <!-- Amount of commTime.Slaves waiting for orders -->
</process>
-
- <process host="iRMX" function="commTime.Slave">
- <argument value="0"/> <!-- Input mailbox -->
- </process>
-
- <process host="Bousquet" function="commTime.Slave">
- <argument value="1"/></process>
- <process host="Soucy" function="commTime.Slave">
- <argument value="2"/></process>
- <process host="Casavant" function="commTime.Slave">
- <argument value="3"/></process>
- <process host="Jackson" function="commTime.Slave">
- <argument value="4"/></process>
- <process host="Geoff" function="commTime.Slave">
- <argument value="5"/></process>
- <process host="Disney" function="commTime.Slave">
- <argument value="6"/></process>
- <process host="McGee" function="commTime.Slave">
- <argument value="7"/></process>
- <process host="Gatien" function="commTime.Slave">
- <argument value="8"/></process>
- <process host="Laroche" function="commTime.Slave">
- <argument value="9"/></process>
- <process host="Tanguay" function="commTime.Slave">
- <argument value="10"/></process>
- <process host="Morin" function="commTime.Slave">
- <argument value="11"/></process>
- <process host="Ethernet" function="commTime.Slave">
- <argument value="12"/></process>
- <process host="Bellemarre" function="commTime.Slave">
- <argument value="13"/></process>
- <process host="Harry" function="commTime.Slave">
- <argument value="14"/></process>
- <process host="Olivier" function="commTime.Slave">
- <argument value="15"/></process>
- <process host="Boucherville" function="commTime.Slave">
- <argument value="16"/></process>
- <process host="Pointe_Claire" function="commTime.Slave">
- <argument value="17"/></process>
- <process host="Kansas" function="commTime.Slave">
- <argument value="18"/></process>
- <process host="King" function="commTime.Slave">
- <argument value="19"/></process>
- <process host="Lapointe" function="commTime.Slave">
- <argument value="20"/></process>
+
+ <process host="iRMX" function="commTime.Slave"> <argument value="0"/> <!-- Input mailbox --></process>
+ <process host="Bousquet" function="commTime.Slave"> <argument value="1"/></process>
+ <process host="Soucy" function="commTime.Slave"> <argument value="2"/></process>
+ <process host="Casavant" function="commTime.Slave"> <argument value="3"/></process>
+ <process host="Jackson" function="commTime.Slave"> <argument value="4"/></process>
+ <process host="Geoff" function="commTime.Slave"> <argument value="5"/></process>
+ <process host="Disney" function="commTime.Slave"> <argument value="6"/></process>
+ <process host="McGee" function="commTime.Slave"> <argument value="7"/></process>
+ <process host="Gatien" function="commTime.Slave"> <argument value="8"/></process>
+ <process host="Laroche" function="commTime.Slave"> <argument value="9"/></process>
+ <process host="Tanguay" function="commTime.Slave"> <argument value="10"/></process>
+ <process host="Morin" function="commTime.Slave"> <argument value="11"/></process>
+ <process host="Ethernet" function="commTime.Slave"> <argument value="12"/></process>
+ <process host="Bellemarre" function="commTime.Slave"> <argument value="13"/></process>
+ <process host="Harry" function="commTime.Slave"> <argument value="14"/></process>
+ <process host="Olivier" function="commTime.Slave"> <argument value="15"/></process>
+ <process host="Boucherville" function="commTime.Slave"> <argument value="16"/></process>
+ <process host="Pointe_Claire" function="commTime.Slave"> <argument value="17"/></process>
+ <process host="Kansas" function="commTime.Slave"> <argument value="18"/></process>
+ <process host="King" function="commTime.Slave"> <argument value="19"/></process>
+ <process host="Lapointe" function="commTime.Slave"> <argument value="20"/></process>
</platform>
${examples_src}
${sources}
PARENT_SCOPE)
-set(bin_files
- ${bin_files}
- PARENT_SCOPE)
-set(txt_files
- ${txt_files}
- PARENT_SCOPE)
package energy;
-import org.simgrid.msg.Host;
import org.simgrid.msg.Msg;
+import org.simgrid.msg.Host;
import org.simgrid.msg.MsgException;
-/**
- * Example showing the use of the new experimental Cloud API.
- */
+
public class Energy {
- public static final double task_comp_size = 10;
- public static final double task_comm_size = 10;
- public static final int hostNB = 2 ;
- public static void main(String[] args) throws MsgException {
- Msg.energyInit();
- Msg.init(args);
-
- if (args.length < 1) {
- Msg.info("Usage : Cloud platform_file");
- Msg.info("Usage : Cloud platform.xml");
- System.exit(1);
- }
- /* Construct the platform */
- Msg.createEnvironment(args[0]);
- Host[] hosts = Host.all();
- if (hosts.length < 1) {
- Msg.info("I need at least one host in the platform file, but " + args[0] + " contains only " + hosts.length + " hosts");
- System.exit(42);
- }
- /* Instanciate a process */
- new EnergyConsumer(hosts[0],"energyConsumer",null).start();
- /* Execute the simulation */
- Msg.run();
-
+ public static final double task_comp_size = 10;
+ public static final double task_comm_size = 10;
+ public static final int hostNB = 2 ;
+
+ public static void main(String[] args) throws MsgException {
+ Msg.energyInit();
+ Msg.init(args);
+
+ if (args.length < 1) {
+ Msg.info("Usage : Energy platform_file");
+ Msg.info("Usage : Energy ../platforms/energy_platform.xml");
+ System.exit(1);
+ }
+ /* Construct the platform */
+ Msg.createEnvironment(args[0]);
+ Host[] hosts = Host.all();
+ if (hosts.length < 1) {
+ Msg.info("I need at least one host in the platform file, but " + args[0] + " has no host at all");
+ System.exit(42);
}
+ /* Instanciate a process */
+ new EnergyConsumer(hosts[0],"energyConsumer",null).start();
+ /* Execute the simulation */
+ Msg.run();
+ }
}
package energy;
+import org.simgrid.msg.Msg;
import org.simgrid.msg.Comm;
import org.simgrid.msg.Host;
-import org.simgrid.msg.Msg;
-import org.simgrid.msg.MsgException;
-import org.simgrid.msg.Process;
import org.simgrid.msg.Task;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.MsgException;
import org.simgrid.msg.TimeoutException;
public class EnergyConsumer extends Process {
- public EnergyConsumer(Host host, String name, String[] args) {
- super(host,name,args);
- }
- @Override
- public void main(String[] args) throws MsgException {
- Msg.info("Currently consumed energy: "+getHost().getConsumedEnergy());
- this.waitFor(10);
- Msg.info("Currently consumed energy after sleeping 10 sec: "+getHost().getConsumedEnergy());
- new Task(null, 1E9, 0).execute();
- Msg.info("Currently consumed energy after executing 1E9 flops: "+getHost().getConsumedEnergy());
- }
+ public EnergyConsumer(Host host, String name, String[] args) {
+ super(host,name,args);
+ }
+
+ @Override
+ public void main(String[] args) throws MsgException {
+ Msg.info("Currently consumed energy: "+getHost().getConsumedEnergy());
+ this.waitFor(10);
+ Msg.info("Currently consumed energy after sleeping 10 sec: "+getHost().getConsumedEnergy());
+ new Task(null, 1E9, 0).execute();
+ Msg.info("Currently consumed energy after executing 1E9 flops: "+getHost().getConsumedEnergy());
+ }
}
set(tesh_files
${tesh_files}
+ ${CMAKE_CURRENT_SOURCE_DIR}/storage.tesh
PARENT_SCOPE)
set(xml_files
${xml_files}
${examples_src}
${sources}
PARENT_SCOPE)
-set(bin_files
- ${bin_files}
- PARENT_SCOPE)
-set(txt_files
- ${txt_files}
- PARENT_SCOPE)
package io;
-import org.simgrid.msg.Host;
import org.simgrid.msg.Msg;
-import org.simgrid.msg.MsgException;
+import org.simgrid.msg.Host;
import org.simgrid.msg.Process;
-import org.simgrid.msg.HostNotFoundException;
import org.simgrid.msg.Storage;
+import org.simgrid.msg.HostNotFoundException;
+import org.simgrid.msg.MsgException;
public class Client extends Process {
-
public Client(Host host, int number) throws HostNotFoundException {
super(host, Integer.toString(number), null);
}
-
+
public void main(String[] args) throws MsgException {
-
- // Retrieve all mount points of current host
+ // Retrieve all mount points of current host
Storage[] storages = getHost().getMountedStorage();
-
- for (int i = 0; i < storages.length; i++) {
- // For each disk mounted on host
- Msg.info("------------------------------------");
- Msg.info("Disk name: "+storages[i].getName());
- Msg.info("Size: "+storages[i].getSize()+" bytes.");
- Msg.info("Free Size: "+storages[i].getFreeSize()+" bytes.");
- Msg.info("Used Size: "+storages[i].getUsedSize()+" bytes.");
-
- }
-
- Storage st = Storage.getByName("Disk2");
- Msg.info("Disk name: "+st.getName());
- Msg.info("Attached to host:"+st.getHost());
-
-
- st.setProperty("key","Pierre");
- Msg.info("Property key: "+st.getProperty("key"));
-
- Host h = Host.currentHost();
- h.setProperty("key2","Pierre");
- Msg.info("Property key2: "+h.getProperty("key"));
-
-
- String[] attach = h.getAttachedStorage();
- for (int j = 0; j < attach.length; j++) {
- Msg.info("Disk attached: "+attach[j]);
- }
-
- Msg.info("**************** ALL *************************");
-
- Storage[] stos = Storage.all();
- for (int i = 0; i < stos.length; i++) {
- Msg.info("Disk: "+ stos[i].getName());
- }
-
-
+
+ for (int i = 0; i < storages.length; i++) {
+ // For each disk mounted on host
+ Msg.info("------------------------------------");
+ Msg.info("Disk name: "+storages[i].getName());
+ Msg.info("Size: "+storages[i].getSize()+" bytes.");
+ Msg.info("Free Size: "+storages[i].getFreeSize()+" bytes.");
+ Msg.info("Used Size: "+storages[i].getUsedSize()+" bytes.");
+ }
+
+ Storage st = Storage.getByName("Disk2");
+ Msg.info("Disk name: "+st.getName());
+ Msg.info("Attached to host:"+st.getHost());
+
+ st.setProperty("key","Pierre");
+ Msg.info("Property key: "+st.getProperty("key"));
+
+ Host h = Host.currentHost();
+ h.setProperty("key2","Pierre");
+ Msg.info("Property key2: "+h.getProperty("key2"));
+
+ String[] attach = h.getAttachedStorage();
+ for (int j = 0; j < attach.length; j++) {
+ Msg.info("Disk attached: "+attach[j]);
+ }
+
+ Msg.info("**************** ALL *************************");
+ Storage[] stos = Storage.all();
+ for (int i = 0; i < stos.length; i++) {
+ Msg.info("Disk: "+ stos[i].getName());
+ }
}
}
\ No newline at end of file
package io;
-import org.simgrid.msg.Host;
import org.simgrid.msg.Msg;
+import org.simgrid.msg.Host;
import org.simgrid.msg.MsgException;
-/**
- * This example demonstrates of how to use the other
- * kind of resources, such as disk or GPU. These resources are quite
- * experimental for now, but here we go anyway.
- */
+
public class IO {
- public static void main(String[] args) throws MsgException {
- Msg.init(args);
- if(args.length < 1) {
- Msg.info("Usage : IO platform_file ");
- Msg.info("example : IO platform.xml ");
- System.exit(1);
- }
- Msg.createEnvironment(args[0]);
-
- Host[] hosts = Host.all();
-
- Msg.info("Number of hosts:" + hosts.length);
- for (int i = 0; i < hosts.length && i < 4; i++) {
- new io.Node(hosts[i],i).start();
- }
-
- Msg.run();
+ public static void main(String[] args) throws MsgException {
+ Msg.init(args);
+ if(args.length < 1) {
+ Msg.info("Usage : IO platform_file ");
+ Msg.info("example : IO ../platforms/storage/storage.xml ");
+ System.exit(1);
+ }
+
+ Msg.createEnvironment(args[0]);
+
+ Host[] hosts = Host.all();
+
+ Msg.info("Number of hosts:" + hosts.length);
+ for (int i = 0; i < hosts.length && i < 4; i++) {
+ new io.Node(hosts[i],i).start();
}
+
+ Msg.run();
+ }
}
\ No newline at end of file
package io;
+import org.simgrid.msg.Msg;
import org.simgrid.msg.File;
import org.simgrid.msg.Host;
+import org.simgrid.msg.Process;
import org.simgrid.msg.HostNotFoundException;
-import org.simgrid.msg.Msg;
import org.simgrid.msg.MsgException;
-import org.simgrid.msg.Process;
public class Node extends Process {
- private static String FILENAME1 = "/doc/simgrid/examples/platforms/g5k.xml";
- private static String FILENAME2 = "\\Windows\\setupact.log";
- private static String FILENAME3 = "/doc/simgrid/examples/platforms/g5k_cabinets.xml";
- private static String FILENAME4 = "/doc/simgrid/examples/platforms/nancy.xml";
-
- protected int number;
-
- public Node(Host host, int number) throws HostNotFoundException {
- super(host, Integer.toString(number), null);
- this.number = number;
- }
- public void main(String[] args) throws MsgException {
- String mount = "";
- String filename;
- switch (number) {
- case 0:
- mount = "/home";
- filename = mount + FILENAME1;
- break;
- case 1:
- mount = "c:";
- filename = mount + FILENAME2;
- break;
- case 2:
- mount = "/home";
- filename = mount + FILENAME3;
- break;
- case 3:
- mount = "/home";
- filename = mount + FILENAME4;
- break;
- default:
- mount = "/home";
- filename = mount + FILENAME1;
- }
- Msg.info("Open file " + filename);
- File file = new File(filename);
+ private static String FILENAME1 = "/doc/simgrid/examples/platforms/g5k.xml";
+ private static String FILENAME2 = "\\Windows\\setupact.log";
+ private static String FILENAME3 = "/doc/simgrid/examples/platforms/g5k_cabinets.xml";
+ private static String FILENAME4 = "/doc/simgrid/examples/platforms/nancy.xml";
+
+ protected int number;
+
+ public Node(Host host, int number) throws HostNotFoundException {
+ super(host, Integer.toString(number), null);
+ this.number = number;
+ }
+
+ public void main(String[] args) throws MsgException {
+ String mount = "";
+ String filename;
+ switch (number) {
+ case 0:
+ mount = "/home";
+ filename = mount + FILENAME1;
+ break;
+ case 1:
+ mount = "c:";
+ filename = mount + FILENAME2;
+ break;
+ case 2:
+ mount = "/home";
+ filename = mount + FILENAME3;
+ break;
+ case 3:
+ mount = "/home";
+ filename = mount + FILENAME4;
+ break;
+ default:
+ mount = "/home";
+ filename = mount + FILENAME1;
+ }
+
+ Msg.info("Open file " + filename);
+ File file = new File(filename);
+
+ long read = file.read(10000000,1);
+ Msg.info("Having read " + read + " on " + filename);
- long read = file.read(10000000,1);
- Msg.info("Having read " + read + " on " + filename);
-
- long write = file.read(100000,1);
- Msg.info("Having write " + write + " on " + filename);
+ long write = file.read(100000,1);
+ Msg.info("Having write " + write + " on " + filename);
- read = file.read(10000000,1);
- Msg.info("Having read " + read + " on " + filename);
- }
+ read = file.read(10000000,1);
+ Msg.info("Having read " + read + " on " + filename);
+ }
}
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
-
-
package io;
-
-import org.simgrid.msg.Host;
import org.simgrid.msg.Msg;
+import org.simgrid.msg.Host;
import org.simgrid.msg.MsgException;
+public class Storage {
+ public static void main(String[] args) throws MsgException {
+ Msg.init(args);
+ if(args.length < 1) {
+ Msg.info("Usage : Storage platform_file ");
+ Msg.info("example : Storage ../platforms/storage/storage.xml ");
+ System.exit(1);
+ }
+ Msg.createEnvironment(args[0]);
-public class Storage {
- public static void main(String[] args) throws MsgException {
- Msg.init(args);
- if(args.length < 1) {
- Msg.info("Usage : storage platform_file ");
- Msg.info("example : storage platform.xml ");
- System.exit(1);
- }
- Msg.createEnvironment(args[0]);
-
- Host[] hosts = Host.all();
- new io.Client(hosts[0],0).start();
+ Host[] hosts = Host.all();
+ new io.Client(hosts[0],0).start();
- Msg.run();
+ Msg.run();
}
}
<?xml version='1.0'?>
<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid/simgrid.dtd">
<platform version="4">
-
<process host="Gatien" function="node">
<argument value="48"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="400"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="McGee" function="node">
<argument value="42"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="300"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="iRMX" function="node">
<argument value="38"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="200"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="Geoff" function="node">
<argument value="32"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="100"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="TeX" function="node">
<argument value="21"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="40"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="Jean_Yves" function="node">
<argument value="14"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="16"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="Boivin" function="node">
<argument value="8"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="1"/> <!-- time to sleep before it starts-->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
<process host="Jacquelin" function="node">
<argument value="1"/> <!-- my id -->
- <argument value ="600"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
-
</platform>