From c0318ba7319bb30cd54a3e2fb0cfe55a0a155629 Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Miqu=C3=A9e?= Date: Mon, 8 Feb 2010 11:47:55 +0100 Subject: [PATCH 1/1] Creating the JaceP2P repository. --- Makefile | 32 + Manifest | 3 + src/jaceP2P/.#Task.java | 1 + src/jaceP2P/Backup.java | 73 + src/jaceP2P/BackupConvg.java | 50 + src/jaceP2P/BackupsManager.java | 440 +++++ src/jaceP2P/ForwardCount.java | 8 + src/jaceP2P/HandleClient.java | 70 + src/jaceP2P/HeartBeatSNode.java | 65 + src/jaceP2P/HeartBeatSpawner.java | 72 + src/jaceP2P/HeartBeatThread.java | 127 ++ src/jaceP2P/JaceBuffer.java | 149 ++ src/jaceP2P/JaceDaemon.java | 353 ++++ src/jaceP2P/JaceInterface.java | 73 + src/jaceP2P/JaceP2P.java | 118 ++ src/jaceP2P/JaceServer.java | 298 +++ src/jaceP2P/JaceSession.java | 87 + src/jaceP2P/JaceSpawner.java | 1509 +++++++++++++++ src/jaceP2P/JaceSpawnerInterface.java | 41 + src/jaceP2P/JaceSpawnerServer.java | 147 ++ src/jaceP2P/JaceSuperNode.java | 164 ++ src/jaceP2P/JaceSuperNodeInterface.java | 57 + src/jaceP2P/JaceSuperNodeServer.java | 635 +++++++ src/jaceP2P/LastSave.java | 19 + src/jaceP2P/ListeTask.java | 114 ++ src/jaceP2P/Loader.java | 55 + src/jaceP2P/LocalHost.java | 105 ++ src/jaceP2P/Message.java | 83 + src/jaceP2P/MsgChrono.java | 23 + src/jaceP2P/MsgQueue.java | 205 +++ src/jaceP2P/Node.java | 108 ++ src/jaceP2P/Register.java | 296 +++ src/jaceP2P/RunningApplication.java | 105 ++ src/jaceP2P/ScanThread.java | 155 ++ src/jaceP2P/ScanThreadSpawner.java | 110 ++ src/jaceP2P/ScanThreadSuperNode.java | 139 ++ src/jaceP2P/SendVerdictThread.java | 34 + src/jaceP2P/SendVerifThread.java | 32 + src/jaceP2P/Sender.java | 111 ++ src/jaceP2P/SenderRmi.java | 77 + src/jaceP2P/SenderSocket.java | 110 ++ src/jaceP2P/SuperNodeData.java | 50 + src/jaceP2P/SuperNodeListe.java | 418 +++++ src/jaceP2P/Task.java | 1630 +++++++++++++++++ src/jaceP2P/TaskId.java | 68 + src/jaceP2P/TaskLauncher.java | 166 ++ src/jaceP2P/TokenThread.java | 168 ++ .../UpdateRegisterConcernedThread.java | 201 ++ src/jaceP2P/UpdateRegisterThread.java | 192 ++ 49 files changed, 9346 insertions(+) create mode 100644 Makefile create mode 100644 Manifest create mode 120000 src/jaceP2P/.#Task.java create mode 100644 src/jaceP2P/Backup.java create mode 100644 src/jaceP2P/BackupConvg.java create mode 100644 src/jaceP2P/BackupsManager.java create mode 100644 src/jaceP2P/ForwardCount.java create mode 100644 src/jaceP2P/HandleClient.java create mode 100644 src/jaceP2P/HeartBeatSNode.java create mode 100644 src/jaceP2P/HeartBeatSpawner.java create mode 100644 src/jaceP2P/HeartBeatThread.java create mode 100644 src/jaceP2P/JaceBuffer.java create mode 100644 src/jaceP2P/JaceDaemon.java create mode 100644 src/jaceP2P/JaceInterface.java create mode 100644 src/jaceP2P/JaceP2P.java create mode 100644 src/jaceP2P/JaceServer.java create mode 100644 src/jaceP2P/JaceSession.java create mode 100644 src/jaceP2P/JaceSpawner.java create mode 100644 src/jaceP2P/JaceSpawnerInterface.java create mode 100644 src/jaceP2P/JaceSpawnerServer.java create mode 100644 src/jaceP2P/JaceSuperNode.java create mode 100644 src/jaceP2P/JaceSuperNodeInterface.java create mode 100644 src/jaceP2P/JaceSuperNodeServer.java create mode 100644 src/jaceP2P/LastSave.java create mode 100644 src/jaceP2P/ListeTask.java create mode 100644 src/jaceP2P/Loader.java create mode 100644 src/jaceP2P/LocalHost.java create mode 100644 src/jaceP2P/Message.java create mode 100644 src/jaceP2P/MsgChrono.java create mode 100644 src/jaceP2P/MsgQueue.java create mode 100644 src/jaceP2P/Node.java create mode 100644 src/jaceP2P/Register.java create mode 100644 src/jaceP2P/RunningApplication.java create mode 100644 src/jaceP2P/ScanThread.java create mode 100644 src/jaceP2P/ScanThreadSpawner.java create mode 100644 src/jaceP2P/ScanThreadSuperNode.java create mode 100644 src/jaceP2P/SendVerdictThread.java create mode 100644 src/jaceP2P/SendVerifThread.java create mode 100644 src/jaceP2P/Sender.java create mode 100644 src/jaceP2P/SenderRmi.java create mode 100644 src/jaceP2P/SenderSocket.java create mode 100644 src/jaceP2P/SuperNodeData.java create mode 100644 src/jaceP2P/SuperNodeListe.java create mode 100644 src/jaceP2P/Task.java create mode 100644 src/jaceP2P/TaskId.java create mode 100644 src/jaceP2P/TaskLauncher.java create mode 100644 src/jaceP2P/TokenThread.java create mode 100644 src/jaceP2P/UpdateRegisterConcernedThread.java create mode 100644 src/jaceP2P/UpdateRegisterThread.java diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b747d81 --- /dev/null +++ b/Makefile @@ -0,0 +1,32 @@ +# +# Makefile for JaceP2P plateform +# Author: Sébastien Miquée +# + +SRC=src +PACKAGE=jaceP2P +BIN=bin +JAR=JaceP2P.jar + +all: compile jar + +compile:clean + javac -d ./$(BIN) ./$(SRC)/$(PACKAGE)/*.java + rmic -d ./$(BIN) jaceP2P.JaceServer + rmic -d ./$(BIN) jaceP2P.JaceSuperNodeServer + +rmi:compile + rmic -d ./$(BIN) jaceP2P.JaceServer + rmic -d ./$(BIN) jaceP2P.JaceSuperNodeServer + +jar: + jar cvfm ./$(JAR) Manifest -C ./$(BIN) $(PACKAGE) + +clean: + rm -rf ./$(BIN)/* $(JAR) + + +# +## +# + diff --git a/Manifest b/Manifest new file mode 100644 index 0000000..6747fb0 --- /dev/null +++ b/Manifest @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Main-Class: jaceP2P.JaceP2P +Class-path: ./JaceP2P.jar:. \ No newline at end of file diff --git a/src/jaceP2P/.#Task.java b/src/jaceP2P/.#Task.java new file mode 120000 index 0000000..f0abedf --- /dev/null +++ b/src/jaceP2P/.#Task.java @@ -0,0 +1 @@ +miquee@plop.16872:1253690040 \ No newline at end of file diff --git a/src/jaceP2P/Backup.java b/src/jaceP2P/Backup.java new file mode 100644 index 0000000..8e94e38 --- /dev/null +++ b/src/jaceP2P/Backup.java @@ -0,0 +1,73 @@ +package jaceP2P; + +import java.util.Vector; + +public class Backup implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + // attributes + private int taskRank = -1; + private int iteration = -1; + private int timeStep = -1; + private byte[] data = null; + + // constructeurs + public Backup() { + } + + public Backup(int rank) { + taskRank = rank; + } + + public Backup(int rank, byte[] flux) { + taskRank = rank; + data = flux; + } + + public Backup(int rank, int ite, byte[] flux) { + taskRank = rank; + iteration = ite; + data = flux; + } + + // methods + public synchronized void setTaskRank(int rank) { + taskRank = rank; + } + + public synchronized void setIteration(int ite) { + iteration = ite; + } + + public synchronized void setData(byte[] d) { + data = d; + } + + public synchronized int getTaskRank() { + return taskRank; + } + + public synchronized Vector getIterationStep() { + Vector v = new Vector(); + v.add(iteration); + v.add(timeStep); + return v; + } + + public synchronized int getIteration() { + return iteration; + } + + public synchronized byte[] getData() { + return data; + } + + public synchronized void setStep(int timeStep) { + this.timeStep = timeStep; + } + + public synchronized int getStep() { + return timeStep; + } +} diff --git a/src/jaceP2P/BackupConvg.java b/src/jaceP2P/BackupConvg.java new file mode 100644 index 0000000..e32e584 --- /dev/null +++ b/src/jaceP2P/BackupConvg.java @@ -0,0 +1,50 @@ +package jaceP2P; + +import java.util.*; + +public class BackupConvg implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + public String state; + public boolean underTh; + public int verifNum; + public Vector resp; + public boolean localCV_state; + public int nb_not_recv; + public int sendId; + public boolean electedNode; + public boolean respSent; + public String action; + public boolean verdict; + public boolean finalStep; + public Vector neighbors; + public Vector neighborsValues; + public int timeStep; + public LastSave lastSave; + public int jaceP2P_Iteration; + public boolean recievedVerdict; + public Vector reduceAll; + public boolean initialized = false; + + public BackupConvg() { + } + + @SuppressWarnings("unchecked") + public void affecter(Task sauv) { + sauv.underTh = underTh; + sauv.state = state; + sauv.nb_not_recv = nb_not_recv; + sauv.electedNode = electedNode; + sauv.respSent = respSent; + sauv.neighbors = (Vector) neighbors.clone(); + sauv.neighborsValues = (Vector) neighborsValues.clone(); + sauv.resp = (Vector) resp.clone(); + sauv.verifNum = verifNum; + sauv.sendId = sendId; + sauv.finalStep = finalStep; + sauv.action = action; + sauv.verdict = verdict; + sauv.localCV_state = localCV_state; + sauv.reduceAll = reduceAll; + } +} diff --git a/src/jaceP2P/BackupsManager.java b/src/jaceP2P/BackupsManager.java new file mode 100644 index 0000000..8b96e2f --- /dev/null +++ b/src/jaceP2P/BackupsManager.java @@ -0,0 +1,440 @@ +package jaceP2P; + +import java.util.Vector; + +public class BackupsManager { + final int MAX_COUNT_NOT_ALIVE = 3; + + // attributes + public static BackupsManager Instance; + public int myRank = -1; + public Vector liste = new Vector(); + public Vector liste_Convg = new Vector(); + + // constructors + private BackupsManager() { + } + + public static BackupsManager Instance() { + if (Instance == null) { + Instance = new BackupsManager(); + } + return Instance; + } + + // retourne le nb de Backups ds la liste) + public synchronized int size() { + return liste.size(); + } + + public synchronized void clean() { + for (int i = 0; i < liste.size(); i++) { + ((Backup) liste.get(i)).setIteration(-1); + ((Backup) liste.get(i)).setData(null); + ((Backup) liste_Convg.get(i)).setIteration(-1); + ((Backup) liste_Convg.get(i)).setData(null); + } + } + + public synchronized void purge() { + liste.clear(); + liste_Convg.clear(); + myRank = -1; + Instance = null; + } + + public synchronized int getMyRank() { + return myRank; + } + + public synchronized void initialize(int req) { + ListeTask tskList = Register.Instance().getListeOfTasks(); + + TaskId t = tskList.getTaskIdOfHostStub(LocalHost.Instance().getStub()); + if (t == null) { + System.out.println("no corresponding task !!!!!!!!!!!!!!!!"); + // TODO + // what can we do ????????????????? + } + myRank = t.getRank(); + int rankOfBackTask; + + // get the number of backup nodes there are for each task i + int numBackNodes = Register.Instance().getNumBackupNeighbors(); + System.out.println("numBackNodes : " + numBackNodes); + int numOfTasks = Register.Instance().getNbOfTasks(); + System.out.println("nombre de taches=" + numOfTasks); + int tmp; + int lastBackupRank; + int lastBackupRankConv; + + // ****************************** STEP 1 : Create the empty + // BackupsManager + // ** + + for (int i = 1; i <= numBackNodes; i++) { + // ------------ 1 - for backups "i + n" (to the right of i) + rankOfBackTask = (myRank + i) % numOfTasks; + // System.out.println("i : " + i + ", rankOfBackTask : " + + // rankOfBackTask); + Backup b_right = new Backup(rankOfBackTask); + addBackupTask(b_right, 0); // erase if exists so no redondancy + Backup b_rightConv = new Backup(rankOfBackTask); + addBackupTask(b_rightConv, 1); // erase if exists so no redondancy + + // ------------ 2 - for backups "i - n" (to the left of i) + tmp = myRank - i; + if (tmp >= 0) { + rankOfBackTask = tmp % numOfTasks; + } else { + rankOfBackTask = numOfTasks - (Math.abs(tmp) % numOfTasks); + } + // System.out.println("i : " + i + ", rankOfBackTask : " + + // rankOfBackTask); + Backup b_left = new Backup(rankOfBackTask); + addBackupTask(b_left, 0); // erase if exists so no redondancy + Backup b_leftConv = new Backup(rankOfBackTask); + addBackupTask(b_leftConv, 1); // erase if exists so no redondancy + } + + // ****************************** STEP 2 : get the Backup for my task + // ** get an eventual Backup (if there is one) to restart on me (try 3 + // times) + if (req == 0) { + int res = -1; + int j = 0; + + while (j < 3 && res == -1) { + // scan all backup Nodes to know the rank of task + // which Backup is the most recent for my tasks + lastBackupRank = getLastRemoteBackupRank(); // return -1 if no + // Backups + lastBackupRankConv = getLastRemoteBackupRankConvg(); + // Knowing the Node which has the last (most recent) Backup for + // me, + // I get this Backup in order to restart it + System.out.println("I am going to get the Backup on " + + lastBackupRank); + System.out.println("I am going to get the Backup Conv on " + + lastBackupRankConv); + res = restartMyTask(lastBackupRank, lastBackupRankConv); + if (res != -1) { + System.out.println("Backup successfully got and restarted"); + } else { + System.out + .println("FAILED to get or restart the Backup at try " + + j + + "... I retry to get a Backup on another Node (" + + (2 - j) + " times again)"); + } + j++; + } + + // get backups of all neighboring nodes + int destRank; + TaskId destTaskId; + TaskId id = Register.Instance().getListeOfTasks() + .getTaskIdOfHostStub(LocalHost.Instance().getStub()); + int myRank = id.getRank(); + JaceInterface destStub; + for (int i = 0; i < BackupsManager.Instance().size(); i++) { + destRank = BackupsManager.Instance().getBackupTaskAtIndex(i, 0) + .getTaskRank(); + destTaskId = Register.Instance().getListeOfTasks() + .getTaskIdOfRank(destRank); + destStub = destTaskId.getHostStub(); + new GetBackupForNewNode(destStub, myRank).start(); + } + + } else + restartMyTask(-1, -1); + } + + public class GetBackupForNewNode extends Thread { + JaceInterface stub; + int myRank; + + public GetBackupForNewNode(JaceInterface destStub, int myRank) { + stub = destStub; + this.myRank = myRank; + } + + public void run() { + try { + + stub.getBackupForNewNode(myRank); + } catch (Exception e) { + } + } + } + + public synchronized void addBackupTask(Backup t, int tag) { + + // regarder si existe cette tache ds le vecteur + if (tag == 0) { + int isIn = existBackupTaskOfRank(t.getTaskRank(), tag); + + if (isIn == -1) { // si elle y est pas on l'isere + liste.addElement(t); + + } + + else { // si elle y est on remplace l'ancienne + liste.set(isIn, t); + + } + } else { + int isIn = existBackupTaskOfRank(t.getTaskRank(), tag); + + if (isIn == -1) { + liste_Convg.addElement(t); + } else + liste_Convg.set(isIn, t); + } + } + + public int existBackupTaskOfRank(int rank, int tag) { + int existe = -1; + int index = 0; + if (tag == 0) + while ((existe == -1) && (index < liste.size())) { + if (rank == ((Backup) liste.get(index)).getTaskRank()) { + existe = index; + } else + index++; + } + else + while ((existe == -1) && (index < liste_Convg.size())) { + if (rank == ((Backup) liste_Convg.get(index)).getTaskRank()) { + existe = index; + } else + index++; + } + return existe; + } + + // returns rank of the task that has my most recent Backup + private int getLastRemoteBackupRank() { + int rank; + TaskId task; + JaceInterface stub = null; + int ite; + int lastIte = -1; + int lastBackupRank = -1; + Vector v; + int timeStep; + int lastStep = -1; + boolean ack = false; + int count; + + // ask for ite number of Backups of all my Backup Nodes + for (int i = 0; i < liste.size(); i++) { + ite = -1; + timeStep = -1; + rank = ((Backup) liste.get(i)).getTaskRank(); + task = Register.Instance().getListeOfTasks().getTaskIdOfRank(rank); + if (task == null) + System.out.println("la tache " + rank + + "n'est pas trouvee dans le registre de " + myRank); + + stub = task.getHostStub(); + if (stub == null) { + System.out.println("stub is null, this node is dead !!!!"); + } else { + count = 0; + // if node not dead, try 3 times to know + // the ite number of its Backup for me + do { + try { + // TODO ??? + // threads or not, for the invokation ? + count++; + v = stub.getIterationOfBackup(myRank, 0); + ite = ((Integer) v.get(0)).intValue(); + + timeStep = ((Integer) v.get(1)).intValue(); + System.out.println(" data ite = " + ite + " timeStep=" + + timeStep + " available from task " + rank); + ack = true; + } catch (Exception e) { + System.out.println("The Node " + task.getHostIP() + + " does not answer at try " + count); + } + } while ((ack == false) && (count < MAX_COUNT_NOT_ALIVE)); + if (lastStep < timeStep) { + lastStep = timeStep; + lastIte = ite; + lastBackupRank = rank; + } else if (lastStep == timeStep && ite > lastIte) { + lastIte = ite; + lastBackupRank = rank; + } + } + } + System.out.println("last backup rank =" + lastBackupRank); + return lastBackupRank; + } + + private int getLastRemoteBackupRankConvg() { + int rank; + TaskId task; + JaceInterface stub = null; + int ite; + int lastIte = -1; + int lastBackupRank = -1; + boolean ack = false; + int count; + Vector v; + int timeStep; + int lastStep = -1; + // ask for ite number of Backups of all my Backup Nodes + for (int i = 0; i < liste_Convg.size(); i++) { + ite = -1; + timeStep = -1; + rank = ((Backup) liste_Convg.get(i)).getTaskRank(); + task = Register.Instance().getListeOfTasks().getTaskIdOfRank(rank); + stub = task.getHostStub(); + if (stub == null) { + System.out.println("stub is null, this node is dead !!!!"); + } else { + count = 0; + // if node not dead, try 3 times to know + // the ite number of its Backup for me + do { + try { + // TODO ??? + // threads or not, for the invokation ? + count++; + v = stub.getIterationOfBackup(myRank, 1); + ite = ((Integer) v.get(0)).intValue(); + timeStep = ((Integer) v.get(1)).intValue(); + System.out.println(" conv ite = " + ite + " timeStep=" + + timeStep + " available from task " + rank); + // System.out.println("ite = " + ite + + // " available for task " + rank); + ack = true; + } catch (Exception e) { + System.out.println("The Node " + task.getHostIP() + + " does not answer at try " + count); + } + } while ((ack == false) && (count < MAX_COUNT_NOT_ALIVE)); + if (lastStep < timeStep) { + lastStep = timeStep; + lastIte = ite; + lastBackupRank = rank; + } else if (lastStep == timeStep && ite > lastIte) { + lastIte = ite; + lastBackupRank = rank; + } + + } + } + return lastBackupRank; + } + + // return 0 if good (if I successfully got a Backup), + // return -1 elsewhere + private synchronized int restartMyTask(int lastBackupRank, + int lastBackupRankConv) { + Backup newBackup = null; + Backup newBackupConvg = null; + TaskId task; + JaceInterface stub = null; + + // If no Backups, lastBackupRank = -1, we start the thread at beginning + if (lastBackupRank == -1) { + newBackup = null; + } else { + // get the lastBackupRank on the corresponding BackupNode + task = Register.Instance().getListeOfTasks().getTaskIdOfRank( + lastBackupRank); + stub = task.getHostStub(); + try { + newBackup = stub.getRemoteBackup(myRank, 0); + // System.out.println("got new back normale for "+myRank); + } catch (Exception e) { + System.out.println("the node " + task.getHostIP() + + " does not answer"); + // exit in order to get back and get a previous Backup + // since this one which failed was the most recent + return -1; + } + } + + if (lastBackupRankConv == -1) { + newBackupConvg = null; + } else { + // get the lastBackupRank on the corresponding BackupNode + task = Register.Instance().getListeOfTasks().getTaskIdOfRank( + lastBackupRankConv); + stub = task.getHostStub(); + try { + + newBackupConvg = stub.getRemoteBackup(myRank, 1); + // System.out.println("got new back conv for "+myRank); + } catch (Exception e) { + System.out.println("the node " + task.getHostIP() + + " does not answer"); + // exit in order to get back and get a previous Backup + // since this one which failed was the most recent + return -1; + } + } + + // in JaceSession, clean the taskObject (Task) + // and the taskThread (Thread) + JaceSession.Instance().init(); + // create 1 TaskLauncher that will (re)start the local task within a + // computing thread + TaskLauncher launcher = new TaskLauncher(); + launcher.loadOrReloadTask(newBackup, newBackupConvg); + return 0; + } + + public Backup getBackupTaskOfRank(int rank, int tag) { + int is = -1; + if (tag == 0) { + if (liste.isEmpty()) { + return null; + } else { + is = existBackupTaskOfRank(rank, tag); + if (is != -1) { + // System.out.println("chercher un backup normale"); + return (Backup) liste.get(is); + } else { + System.out.println("cette tache ou ce backup existe po"); + return null; + } + } + } else { + if (liste_Convg.isEmpty()) { + return null; + } else { + is = existBackupTaskOfRank(rank, tag); + if (is != -1) { + // System.out.println("chercher un backup convg"); + return (Backup) liste_Convg.get(is); + } else { + System.out.println("cette tache ou ce backup existe po"); + return null; + } + } + } + } + + public synchronized Backup getBackupTaskAtIndex(int index, int tag) { + if (tag == 0) + if (index < liste.size()) { + return (Backup) liste.get(index); + } else { + System.out.println("cette task n'existe pas"); + return null; + } + else if (index < liste_Convg.size()) { + return (Backup) liste_Convg.get(index); + } else { + System.out.println("cette task n'existe pas"); + return null; + } + } +} diff --git a/src/jaceP2P/ForwardCount.java b/src/jaceP2P/ForwardCount.java new file mode 100644 index 0000000..1aeafc6 --- /dev/null +++ b/src/jaceP2P/ForwardCount.java @@ -0,0 +1,8 @@ +package jaceP2P; + +public class ForwardCount extends Thread { + + public void run() { + SuperNodeListe.Instance().forwardCountNode(); + } +} diff --git a/src/jaceP2P/HandleClient.java b/src/jaceP2P/HandleClient.java new file mode 100644 index 0000000..de8c9a7 --- /dev/null +++ b/src/jaceP2P/HandleClient.java @@ -0,0 +1,70 @@ +package jaceP2P; + +import java.net.*; +import java.io.*; + +public class HandleClient extends Thread { + Socket s; + ObjectInputStream in; + String name; + + @SuppressWarnings("static-access") + public HandleClient(Socket s) { + try { + this.s = s; + in = new ObjectInputStream(s.getInputStream()); + ObjectOutputStream out = new ObjectOutputStream(s.getOutputStream()); + name = s.getInetAddress().getLocalHost().getHostName(); + Register.Instance().getNodeOfName(name).setOutputStream(out); + } catch (Exception e) { + System.err.println("error in HandleClient Constructor: " + e); + e.printStackTrace(System.out); + } + } + + public HandleClient(Socket s, int type) { + try { + this.s = s; + in = new ObjectInputStream(s.getInputStream()); + } catch (Exception e) { + System.out.println("error in HandleClient Constructor: " + e); + e.printStackTrace(System.out); + } + } + + public void run() { + // int j=0; + Message msg = null; + try { + while (s.isConnected() + && LocalHost.Instance().getStartedThreads() == true) { + Object x = in.readObject(); + if (x instanceof Message) { + msg = (Message) x; + // try{ + // msg=(Message)in.readObject(); + if (msg.getTimeStep() == JaceSession.Instance() + .getTaskObject().timeStep) + // System.out.println("recieved message from "+msg.getSender().getHostName()+" tag="+msg.getSrc_tag()+" iter= "+msg.getSrc_iteration()+" "+j); + MsgQueue.Instance().add(msg); + // j++; + } + } + + in.close(); + s.close(); + } catch (Exception e) { + // System.out.println("error in HandleClient run method :"+e); + try { + + in.close(); + Register.Instance().getNodeOfName(name).getOutputStream() + .close(); + Register.Instance().getNodeOfName(name).setOutputStream(null); + s.close(); + } catch (Exception e1) { + // System.out.println("unable to close socket in HandleClient run method :"+e); + } + } + } +} diff --git a/src/jaceP2P/HeartBeatSNode.java b/src/jaceP2P/HeartBeatSNode.java new file mode 100644 index 0000000..2fd9dff --- /dev/null +++ b/src/jaceP2P/HeartBeatSNode.java @@ -0,0 +1,65 @@ +package jaceP2P; + +public class HeartBeatSNode extends Thread { + + // atributes + public static HeartBeatSNode Instance; + + private JaceSuperNodeInterface server = null; + private int beat; + // private long time; + + int count = 0; + + // constructors + private HeartBeatSNode() { + } + + public static HeartBeatSNode Instance() { + if (Instance == null) { + Instance = new HeartBeatSNode(); + } + return Instance; + } + + public void setHeartTime(int timeBeat) { + beat = timeBeat; + } + + public int getHeartTime() { + return beat; + } + + public void setServer(JaceSuperNodeInterface serverEntity) { + server = serverEntity; + count = 0; + // JaceBuffer.Instance().purge(); + // MsgQueue.Instance().purge(); + + } + + public JaceSuperNodeInterface getServer() { + return server; + } + + public void run() { + // long timeGiven; + // long begin; + // long end; + while (true) { + try { + // each "time" milisecondes, get the register if it has changed + Thread.sleep(beat); + // System.out.println("sleeping for "+beat); + // time = System.currentTimeMillis(); + + server.beating(TokenThread.Instance().getToken()); + + yield(); + + } catch (Exception e) { + } + + } + } +} diff --git a/src/jaceP2P/HeartBeatSpawner.java b/src/jaceP2P/HeartBeatSpawner.java new file mode 100644 index 0000000..abd9099 --- /dev/null +++ b/src/jaceP2P/HeartBeatSpawner.java @@ -0,0 +1,72 @@ +package jaceP2P; + +public class HeartBeatSpawner extends Thread { + + // atributes + public static HeartBeatSpawner Instance; + + private boolean running = true; + private int beat; + // private long time; + private JaceSpawnerInterface spawnerStub = null; + + int count = 0; + + // constructors + private HeartBeatSpawner() { + } + + public static HeartBeatSpawner Instance() { + if (Instance == null) { + System.out.println("creating new HeartBeatSpawner "); + Instance = new HeartBeatSpawner(); + } + return Instance; + } + + public void setHeartTime(int timeBeat) { + beat = timeBeat; + } + + public int getHeartTime() { + return beat; + } + + public void setServer(JaceSpawnerInterface serverEntity) { + spawnerStub = serverEntity; + count = 0; + // JaceBuffer.Instance().purge(); + // MsgQueue.Instance().purge(); + + } + + public JaceSpawnerInterface getServer() { + return spawnerStub; + } + + public void kill() { + running = false; + Instance = null; + } + + public void run() { + // long timeGiven; + // long begin; + // long end; + while (running) { + try { + // each "time" milisecondes, get the register if it has changed + Thread.sleep(beat); + // System.out.println("sleeping for "+beat); + // time = System.currentTimeMillis(); + + spawnerStub.beating(); + + yield(); + + } catch (Exception e) { + } + + } + } +} diff --git a/src/jaceP2P/HeartBeatThread.java b/src/jaceP2P/HeartBeatThread.java new file mode 100644 index 0000000..9e88be2 --- /dev/null +++ b/src/jaceP2P/HeartBeatThread.java @@ -0,0 +1,127 @@ +package jaceP2P; + +public class HeartBeatThread extends Thread { + + // atributes + public static HeartBeatThread Instance; + + private Object server = null; + private int beat; + // private long time; + private JaceSuperNodeInterface superNodeStub = null; + // private JaceInterface stub = null; + private boolean running = true; + int count = 0; + + // constructors + private HeartBeatThread() { + } + + public static HeartBeatThread Instance() { + if (Instance == null) { + Instance = new HeartBeatThread(); + } + return Instance; + } + + public void setHeartTime(int timeBeat) { + beat = timeBeat; + } + + public int getHeartTime() { + return beat; + } + + public void setServer(Object serverEntity) { + server = serverEntity; + count = 0; + // JaceBuffer.Instance().purge(); + // MsgQueue.Instance().purge(); + + } + + public void kill() { + System.out.println("Killing HeartBeatThread ..."); + running = false; + Instance = null; + } + + public void run() { + // long timeGiven; + // long begin; + // long end; + int count = 0; + while (running) { + try { + // each "time" milisecondes, get the register if it has changed + Thread.sleep(beat); + // time = System.currentTimeMillis(); + if (server instanceof JaceSuperNodeInterface) { + + superNodeStub = (JaceSuperNodeInterface) server; + // begin = System.currentTimeMillis(); + superNodeStub.beating(LocalHost.Instance().getStub()); + + } // else { + + // stub = (JaceInterface) server; + + // timeGiven = stub.beating(LocalHost.Instance().getStub()); + // } + + yield(); + + } catch (Exception e) { + try { + if (server instanceof JaceSuperNodeInterface) { + System.out.println("The SuperNode is Dead : " + e); + LocalHost.Instance().getStub().reconnectSuperNode(); + } else { + // System.out.println("The spawner is Dead : " + e); + // LocalHost.Instance().getStub().reconnectSuperNode(); + System.out + .println("The next node maybe dead!!!! count=" + + count); + count++; + if (count > 3) + try { + int myRank; + TaskId id = Register.Instance() + .getListeOfTasks().getTaskIdOfHostStub( + LocalHost.Instance().getStub()); + myRank = id.getRank(); + Register newReg = Register.Instance() + .getSpawnerStub().getRegister(myRank); + if (newReg != null) { + Register.Instance().replaceBy(newReg); + TaskId neighbor = Register + .Instance() + .getListeOfTasks() + .getTaskIdOfRank( + (myRank + 1) + % Register + .Instance() + .getListeOfTasks() + .getSize()); + server = (Object) neighbor.getHostStub(); + count = 0; + } else + System.out + .println("The server returned a null register oh nooooooooooooooooooo"); + } catch (Exception e2) { + System.err + .println("Unable to contact the Spawner :" + + e2); + } + } + + yield(); + } catch (Exception ex) { + System.err.println("Cannot reconnect to the SuperNode " + + ex); + } + } + count++; + } + } +} diff --git a/src/jaceP2P/JaceBuffer.java b/src/jaceP2P/JaceBuffer.java new file mode 100644 index 0000000..36fc1e4 --- /dev/null +++ b/src/jaceP2P/JaceBuffer.java @@ -0,0 +1,149 @@ +package jaceP2P; + +import java.util.ArrayList; + +public class JaceBuffer { + + // attributes + int id; + static int nb = 0; + private ArrayList liste; + boolean msgConsumed; // attribut utiliser ds JaceSender pr savoir si msg + // enlever de la liste de JaceBuffer + long time; + boolean stopGet = false; + + // constructors + + public JaceBuffer() { + /* liste=new Vector(); */ + liste = new ArrayList(); + msgConsumed = false; + time = 0; + id = nb; + nb++; + System.out.println("new JaceBuffer .... id=" + id + " ....."); + } + + // methods + public void purge() { + + liste.clear(); + } + + // retourne l'index d'un Message de meme tag ET meme destinataire ET meme + // envoyeur que "msg" + private synchronized int exist(Message msg) { + int existe = -1; + + int index = 0; + while ((existe == -1) && (index < liste.size())) { + + if (msg.getReceiver().getRank() == (((Message) liste.get(index)) + .getReceiver().getRank())) { + for (int i = 0; i < liste.size(); i++) + // System.out.println("exist id="+id+" element "+i+" to "+((Message)liste.get(i)).getReceiver().getRank()); + existe = index; + } else + index++; + } + + return existe; + } + + public synchronized void add(Message msg) { + int is = -1; + + synchronized (liste) { + is = exist(msg); + + // si existe deja 1 Message de meme tag ET meme envoyeur ET meme + // destinataire, on l'ecrase + if (is != -1) { + liste.set(is, msg); + + // System.out.println("id="+id+" remplacer un message a la place "+is+" ds le buffer pour "+msg.getReceiver().getRank()+" liste size "+liste.size()); + + } else { + // si existe pas de Message de meme tag ET meme envoyeur ET meme + // destinataire, on l'ajoute + liste.add(msg); + // System.out.println("id="+id+" ajouter un message au buffer pour "+msg.getReceiver().getRank()+" liste size "+liste.size()); + + } + } + try { + // notifyAll(); + synchronized (this) { + notify(); + } + + } catch (Exception e) { + System.out.println("error notifying Sender :" + e); + } + + } + + public synchronized Message getMessageAt(int index) { + // System.out.println("size = " + liste.size()); + return (Message) liste.get(index); + } + + public synchronized Message get() { + msgConsumed = false; + + // tant que aucun message, j'attend + while (liste.isEmpty() && stopGet == false) { + try { + // System.out.println("BUFFER : rien, j'attend, liste vide "+liste.isEmpty()); + wait(); + } catch (Exception e) { + } + ; + } + // System.out.println("Took message from Buffer"); + + Message tmp = null; + synchronized (liste) { + + try { + tmp = (Message) ((Message) liste.get(0)).clone(); + liste.remove(0); + // System.out.println("id="+id+" get message du buffer pour "+tmp.getReceiver().getRank()+" liste size "+liste.size()); + } catch (Exception e) { + System.out.println("unable to get message :" + e); + } + + } + msgConsumed = true; + time = System.currentTimeMillis(); + return tmp; + } + + public synchronized void viewAll() { + if (liste.isEmpty()) { + // System.out.println("pas de msg a envoyer"); + } else { + Message msg; + TaskId sender, dest; + System.out.print("id=" + id + " nb msg ds JaceBuffer : " + + liste.size()); + for (int i = 0; i < liste.size(); i++) { + msg = (Message) liste.get(i); + sender = msg.getSender(); + dest = msg.getReceiver(); + System.out.print("\nmsg " + i + " : tag = " + msg.getTag() + + ", src : " + sender.getRank() + " " + + sender.getHostIP() + ", dest : " + dest.getRank() + + " " + dest.getHostIP() + ", data = " + msg.getData() + + "\n"); + + } + } + } + + public synchronized int getSize() { + return liste.size(); + } + +} diff --git a/src/jaceP2P/JaceDaemon.java b/src/jaceP2P/JaceDaemon.java new file mode 100644 index 0000000..21a8c17 --- /dev/null +++ b/src/jaceP2P/JaceDaemon.java @@ -0,0 +1,353 @@ +package jaceP2P; + +import java.net.ServerSocket; +import java.net.Socket; +import java.rmi.Naming; + +import and.Mapping.Utils; + +public class JaceDaemon { + // attributes + public static JaceDaemon Instance = null; + private String snode_IP = null; + private int snode_port = 1098; + private int daemonPort = 1098; + private int heartTime; // HeartBeat frequency + private String protocol; + private boolean running = false; + + public JaceDaemon(String superNode, int port, String comProtocol) { + if (!superNode.equals("-d")) { + // snode_IP = LocalHost.Instance().resolve(superNode); // get IP of + // the super node + snode_IP = superNode; + } + + daemonPort = port; + protocol = comProtocol; + running = true; + Instance = this; + + // initialize(); + } + + /** + * + **/ + public JaceDaemon() { + } + + /** + * + **/ + public String getProtocol() { + return protocol; + } + + /** + * + **/ + public synchronized static JaceDaemon Instance() { + if (Instance == null) { + Instance = new JaceDaemon(); + } + + return Instance; + } + + /** + * + **/ + public boolean isRunning() { + return running; + } + + /** + * + **/ + public void initialize() { + LocalHost.Instance().setPort(daemonPort); + exportObject(); // Iinstanciate the JaceServer localy + reconnectSuperNode(); // Connect to one of the SuperNodes + if (protocol.equals("socket")) { + listenForRequests(); + } + + /* + * System.out.println("sleep de 5 secondes avt tuer Daemon"); try { + * Thread.sleep(5000); } catch(Exception e1) {} + * + * + * sortir(); + */ + } + + /** + * + **/ + public void listenForRequests() { + ServerSocket ss = null; + + try { + + ss = new ServerSocket(LocalHost.Instance().getSocketPort()); + + } catch (Exception e) { + System.err.println("Error initializing ServerSocket: " + e); + } + + while (true) { + Socket s = null; + try { + // System.out.println( "Nouveau message" ) ; + s = ss.accept(); + new HandleClient(s).start(); + } catch (Exception e) { + System.err.println("Error handling Client: " + e); + } + } + } + + /** + * + **/ + private void exportObject() { + // if( protocol.equals( "rmi" ) ) { + JaceInterface ref = null; + JaceServer myServer = null; + + //System.out.println("Name of local machine is : " + // + LocalHost.Instance().getName()); + //System.out.println("IP of local machine is : " + // + LocalHost.Instance().getIP()); + + try { + // launch the JaceServer + myServer = new JaceServer( /* this */); + java.rmi.registry.LocateRegistry.createRegistry(daemonPort); + java.rmi.registry.LocateRegistry.getRegistry(daemonPort).rebind( + "JaceServer", myServer); + ref = (JaceInterface) Naming.lookup("rmi://" + + LocalHost.Instance().getIP() + ":" + daemonPort + + "/JaceServer"); + } catch (Exception e) { + System.err + .println("JaceP2P_Error in JaceRuntime.exportObject() when creating the local JaceServer " + + e); + System.err.println("Exit from JaceRuntime.exportObject"); + System.exit(1); + } + + LocalHost.Instance().setStub(ref); + // } + } + + /** + * + **/ + @SuppressWarnings("unused") + private void sortir() { + if (protocol.equals("rmi")) { + try { + java.rmi.registry.LocateRegistry.getRegistry(daemonPort) + .unbind("JaceServer"); + //System.out.println("Unbind done !!!!!!!!!!!1"); + } catch (Exception e) { + System.err + .println("JaceP2P_Error in JaceRuntime.sortir() when unbind " + + e); + } + } + + } + + /** + * + **/ + public void reconnectSuperNode() { + + //System.out.println("I'm looking for a JaceP2P Super Node"); + //System.out.println(protocol); + // if( protocol.equals( "rmi" ) ) { + JaceSuperNodeInterface snodeStub = null; + boolean connected = false; + + // while( connected == false ) { + if (snode_IP != null) { + try { + //System.out.println("Trying to invoke Super Node " + snode_IP); + snodeStub = (JaceSuperNodeInterface) Naming.lookup("rmi://" + + snode_IP + ":" + snode_port + "/JaceSuperNode"); + //System.out.println("Succesfully located " + snode_IP); + + // Add stub and IP in LocalHost to store it until super node + // death + LocalHost.Instance().setSuperNodeStub(snodeStub); + LocalHost.Instance().setSuperNodeIP(snode_IP); + connected = true; + + } catch (Exception e) { + //System.out + // .println("Snode not launched, try another one (1/2s)"); + try { + Thread.sleep(500); + } catch (Exception e1) { + // nothing + } + } + } + + if (connected == false) { + int i = 0; + + SuperNodeListe.Instance().staticInitialization(); + + while (connected == false + && i < SuperNodeListe.Instance().getListe().size()) { + SuperNodeData d = null; + d = SuperNodeListe.Instance().getSuperNodeData(i); + snode_IP = LocalHost.Instance().resolve(d.getIP()); + snode_port = d.getPort(); + + try { + //System.out.println("Trying to invoke super node " + // + snode_IP); + snodeStub = (JaceSuperNodeInterface) Naming.lookup("rmi://" + + snode_IP + ":" + snode_port + "/JaceSuperNode"); + // System.out.println( "succesfully located " + snode_IP ) ; + + // Add stub and IP in LocalHost to store it until super node + // death + LocalHost.Instance().setSuperNodeStub(snodeStub); + LocalHost.Instance().setSuperNodeIP(snode_IP); + connected = true; + + } catch (Exception e) { + //System.out.println("Snode " + snode_IP + // + " not launched, try another one (1/2s)"); + i++; + + try { + Thread.sleep(500); + } catch (Exception e1) { + // nothing + } + + // If error, exit the loop and reenter in order to find + // another Snode + // continue ; + } + } + } + + if (connected == false) { + System.err + .println("All the SuperNodes in the list are dead, unable to connect to the platform"); + System.exit(1); + } else { + + // Registering to the Super Node + try { + heartTime = snodeStub.getSuperNodeBeat(); + + // snodeStub.workerRegistering( LocalHost.Instance().getStub(), + // LocalHost.Instance().getIP(), + // LocalHost.Instance().getName(), + // daemonPort ) ; + + snodeStub.workerRegistering(LocalHost.Instance().getStub(), + LocalHost.Instance().getIP(), LocalHost.Instance() + .getName(), daemonPort, Utils.createGNode()); + + // Launching the heart beat thread + HeartBeatThread.Instance().setHeartTime(heartTime); + + //System.out.println("Starting to ping the Super Node"); + HeartBeatThread.Instance().setServer((Object) snodeStub); + + if (HeartBeatThread.Instance().isAlive() == false) { + // HeartBeatThread.Instance().setPriority(Thread.MAX_PRIORITY); + HeartBeatThread.Instance().start(); // Starting the beating + } + + //System.out.println("Succesfully registered to Super Node " + // + snode_IP); + + } catch (Exception e) { + System.err + .println("JaceP2P_Error in JaceDamon reconnectSuperNode when invoking function workerRegistering() : " + + e); + reconnectSuperNode(); + } + } + // } + // } + } + + /** + * + **/ + public void reinitDaemon() { + + if (JaceDaemon.Instance().getProtocol().equals("rmi")) { + SenderRmi.Instance().getBuffer().stopGet = true; + + try { + SenderRmi.Instance().notify(); + } catch (Exception e) { + // nothing + } + + SenderRmi.Instance().kill(); + + } else { + SenderSocket.Instance().getBuffer().stopGet = true; + + try { + SenderSocket.Instance().notify(); + } catch (Exception e) { + // nothing + } + + SenderSocket.Instance().kill(); + } + + //System.out.println("$$$$$$$$$$ Killed Sender $$$$$$$$$$"); + ScanThread.Instance().kill(); + LocalHost.Instance().setStartedThreads(false); + System.out.println("Reinitialization of the Daemon"); + + System.out.println("I kill the application if any exists"); + // ScanThread.Instance().setScanning( false ) ; + // System.out.println( "Set ScanThread off" ) ; + // Sender.Instance().kill() ; + + try { + Thread.sleep(2500); + } catch (Exception e) { + // nothing + } + + // Sender.Instance().running = false ; + try { + // Cleaning JaceBuffer, MsgQueue, Register + // and deleting taskObject and taskThread + JaceSession.Instance().kill(); // Also to do when a node has been + // down, in the case he is back + System.out.println("Application killed properly"); + BackupsManager.Instance().purge(); + Register.Instance().purge(); + + } catch (Exception e) { + System.err.println("Crashed killing the application : " + e); + } + + System.out.println("Daemon reinitialized"); + // Runtime.getRuntime().gc() ; + reconnectSuperNode(); + } + +} + +/** ! **/ + diff --git a/src/jaceP2P/JaceInterface.java b/src/jaceP2P/JaceInterface.java new file mode 100644 index 0000000..0ddebda --- /dev/null +++ b/src/jaceP2P/JaceInterface.java @@ -0,0 +1,73 @@ +package jaceP2P; + +import java.rmi.Remote; +import java.rmi.RemoteException; +import java.util.Vector; + + + +public interface JaceInterface extends Remote { + public void reconnectSuperNode() throws RemoteException; + + public int updateRegister(Register newReg, JaceInterface stub, int req) + throws RemoteException; + + public Vector getIterationOfBackup(int remoteRank, int tag) + throws RemoteException; + + public Backup getRemoteBackup(int remoteRank, int tag) + throws RemoteException; + + public void suicide(String debugMsg) throws RemoteException; + + public void iSendYou(Message msg) throws RemoteException; + + public int getTimeStep() throws RemoteException; + + public void saveTask(int rank, byte[] tsk, int iteration, int timeStep, + String appliName, int tag) throws RemoteException; + + public void setSaved(boolean bool) throws RemoteException; + + public boolean getReloading() throws RemoteException; + + public int getVerifNum() throws RemoteException; + + public String getState() throws RemoteException; + + public void initializeVerif(int tag) throws RemoteException; + + public void savOrFinOrRest(int tag, int step, boolean verd, + Vector reduceAll) throws RemoteException; + + public boolean setNbNeighboursNotConv(int tag, int idNeigh, + int neighborTimeStep) throws RemoteException; + + public int getNbNeighboursNotConv() throws RemoteException; + + public void response(int neighId, int tag, int response, + Vector recievedValue) throws RemoteException; + + public boolean ping() throws RemoteException; + + public void updateHeart(JaceInterface stub) throws RemoteException; + + public void updateHeart(JaceSuperNodeInterface stub) throws RemoteException; + + public long beating(JaceInterface stub) throws RemoteException; + + public void setScanning(boolean bool) throws RemoteException; + + public JaceSpawnerInterface transformIntoSpawner(String[] params, + String appliName, Register reg, int nbTasks, + JaceSuperNodeInterface snodeStub, int rank, int heartTime, int tag, + int nbdc, int nbsdc, int nbDaemonPerSpawner, int nbDaemonPerThread) + throws RemoteException; + + public void setSpawner(JaceSpawnerInterface spawnerStub) + throws RemoteException; + + public void updateRegister(Node oldNode, Node node) throws RemoteException; + + public void getBackupForNewNode(int rank) throws RemoteException; +} diff --git a/src/jaceP2P/JaceP2P.java b/src/jaceP2P/JaceP2P.java new file mode 100644 index 0000000..b3554d5 --- /dev/null +++ b/src/jaceP2P/JaceP2P.java @@ -0,0 +1,118 @@ +// Primary class to launch JaceP2P + +// jaceP2P.JaceP2P -[Daemon|Spawner|SNode]