3 import java.io.ByteArrayOutputStream;
4 import java.io.ObjectInputStream;
5 import java.io.ObjectOutputStream;
6 import java.rmi.RemoteException;
7 import java.util.Vector;
9 //import java.util.Calendar;
10 //import java.util.GregorianCalendar;
12 //import com.jamonapi.*;
14 public class Task implements Runnable, Cloneable, java.io.Serializable {
15 private static final long serialVersionUID = 1L;
17 public double errorLoc = 0;
18 public int saveParameter;
21 public TaskId jaceTaskId = null;
22 public String[] jaceArgs;
23 public boolean reloading = false;
24 public String state = "NORMAL";
25 public int nb_not_recv;
26 public boolean electedNode = false;
27 public boolean respSent = false;
28 public Vector<Integer> resp;
29 public int verifNum = 0;
30 public LastSave lastSave = new LastSave();
31 public Vector<Integer> neighbors;
32 public Vector<Boolean> neighborsValues;
33 public Vector<Integer> dependancies;
34 public Vector<Boolean> values;
36 public String action = "nothing";
37 public boolean verdict = false;
38 public boolean recievedVerdict = false;
39 public int jaceP2P_Iteration = 0;
40 public boolean finalStep = false;
41 // public Monitor mon1=null;
42 public int timeStep = 0; // time discretization counter for non-stationary
44 public boolean localCV_state;
45 public boolean jaceP2P_globalCV_state = false;
46 // attribute to know if an appli has finished yet
47 private boolean finalize = false;
48 public boolean pseudoPerBeg;
49 public boolean pseudoPerEnd;
50 public boolean underTh = false;
51 public boolean saved[];
52 // attributes for BackupNodes
53 private int saveRound = 0;
54 private int[] saveTab = null;
55 public boolean savedResults = false;
57 public BackupConvg sauvConvg = new BackupConvg();
58 public boolean postReloading = false;
59 public Vector<?> reduceAll;
66 reduceAll = new Vector<Object>();
67 dependancies = new Vector<Integer>();
68 values = new Vector<Boolean>();
69 resp = new Vector<Integer>();
70 saved = new boolean[2];
72 neighbors = new Vector<Integer>();
73 neighborsValues = new Vector<Boolean>();
76 public void getBackupForNewNode(int rank) {
78 JaceInterface stub = null;
79 task = Register.Instance().getListeOfTasks().getTaskIdOfRank(rank);
81 stub = task.getHostStub();
83 // if no stub there is a problem
85 System.err.println("Unable to send backup on task of rank " + rank);
87 // if there is a stub, send the stream to that node
89 ByteArrayOutputStream stream;
91 stream = convertTask2stream(sauv);
92 stub.saveTask(jaceMyId, stream.toByteArray(), sauv.lastSave
93 .getLastSave(), sauv.timeStep, Register.Instance()
96 synchronized (sauvConvg) {
97 if (sauvConvg.initialized == true) {
98 stream = convertBackupConv2stream(sauvConvg);
99 stub.saveTask(jaceMyId, stream.toByteArray(),
100 sauvConvg.lastSave.getLastSave(),
101 sauvConvg.timeStep, Register.Instance()
105 } catch (Exception e) {
106 System.err.println("Error in getBackupForNewNode :" + e);
107 e.printStackTrace(System.out);
112 // method to overload by user in the appli to convert the stream sent by the
113 // BackupNode to a Task object of the type of the appli this method is
115 // in TaskLauncher.loadBackupAndRestart() and
116 // TaskLauncher.loadOrReloadTask()
117 public Task jaceP2P_ConvertStream(ObjectInputStream stream) {
121 public void printSav() {
124 // method to overload by user in the appli to specify the reduceAll method
125 public synchronized void reduceAll(Vector<?> recievedValue) {
128 public void setId(TaskId Id) {
130 jaceMyId = jaceTaskId.getRank();
132 jaceSize = Register.Instance().getNbOfTasks();
133 } catch (Exception e) {
135 System.err.println("SetId is bad !! " + e + " "
136 + LocalHost.Instance().getName());
137 // jaceSize = Register.Instance().getListeOfTasks().getSize();
138 } catch (Exception e2) {
139 System.err.println("Not localised the spawner : " + e2);
144 public void setParam(String[] arg) {
149 public void setJaceSize(int nbTasks) {
153 public TaskId getId() {
158 public int getTimeStep() {
162 // method to overload by user in the appli to identify the neighbors of a
165 public int[] getDependencies(int i) {
169 // method to overload by user in the appli to init each task at beginning of
172 public void jaceP2P_InitTask() {
175 // TaskLauncher.loadOrReloadTask()
176 public void jaceP2P_ReinitTask() {
179 // method to overload by user in the appli to safeguard the results
180 public void saveResults() {
183 // method to overload by user in the appli to save only the requiered
184 // attributes (iter, vecteurs,......) this method is called in
185 // Task.jaceP2P_Save()
186 public Task jaceP2P_SaveFromCrash() {
187 System.out.println("JaceSaveFromCrash Task");
191 public void jaceFinalize() {
194 System.out.println("Ready to Death Task:" + jaceMyId);
197 Register.Instance().getSpawnerStub().killApplication(
198 LocalHost.Instance().getStub());
200 } catch (Exception e) {
202 .println("Cannot join the Spawner to kill application: "
206 JaceDaemon.Instance().reinitDaemon();
210 public void jaceP2P_ReinitConv() {
211 // System.out.println("reinit conv");
215 // last_iter.removeAllElements();
217 reinitializeVectors();
219 } catch (Exception e) {
220 System.err.println("Error in jaceP2P_reinitConv():" + e);
222 pseudoPerBeg = false;
223 pseudoPerEnd = false;
224 synchronized (lastSave) {
225 lastSave = new LastSave();
227 // reinit les var de conv de Task generique
228 localCV_state = false;
229 jaceP2P_globalCV_state = false;
233 public void reinitializeVectors() throws RemoteException {
234 values.removeAllElements();
235 dependancies.removeAllElements();
236 neighbors.removeAllElements();
237 neighborsValues.removeAllElements();
238 resp.removeAllElements();
241 public long jaceP2P_getChronoValue() {
245 result = Register.Instance().getSpawnerStub().getChronoValue(
246 Register.Instance().getAppliName());
247 } catch (Exception e) {
249 .println("JaceP2P_Error in Task.jaceP2P_getChronoValue() on SuperNode : "
257 //System.out.println("ds run()");
261 *asynchronous sending (non blocking), of data object to an other task
264 * the object (serializable) data to be send
266 * the task id for receiver's task
268 * the tag for message
270 @SuppressWarnings("static-access")
271 public void jaceSend(Object buffer, int dest, int tag, double erreur_locale) {
272 // System.out.println("dest : " + dest);
276 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(dest);
279 System.err.println("In jaceSend recv = null !");
281 JaceSession.Instance().getTaskThread().sleep(10);
282 JaceSession.Instance().getTaskThread().yield();
283 } catch (Exception e) {
287 // TODO : virer ce else, mais chercher pkoi recev est null des fois
289 if (recev.getRank() != dest) {
290 System.err.println("Problem !! pas le meme dest que ds les param");
294 // TODO : rajouter nom de l'appli ds Messag
297 stub = recev.getHostStub();
298 if (stub.getTimeStep() == timeStep) {
299 // System.out.println("************ "+verifNum+" *************");
300 Message msg = new Message();
301 msg.setParam(buffer, jaceTaskId, recev, tag, timeStep,
302 jaceP2P_Iteration, verifNum, erreur_locale);
304 // on met le message ds
305 // JaceBuffer.Instance() (la liste des Message a
309 * if(JaceDaemon.Instance().getProtocol().equals("socket"
310 * )) SenderSocket.Instance().buffer.add(msg); else
311 * if(JaceDaemon.Instance().getProtocol().equals("rmi"))
312 * SenderRmi.Instance().buffer.add(msg);
314 // System.out.println("putting message to "+dest+" in the buffer");
315 Sender.Instance().getBuffer().add(msg);
317 // if(JaceDaemon.Instance().getProtocol().equals("rmi"))
318 // SenderRmi.Instance().getBuffer().add(msg);
320 // SenderSocket.Instance().getBuffer().add(msg);
322 } catch (Exception e) {
323 System.err.println("Unable to send data message to " + dest
326 // System.out.println("TASK : g mis un msg qui doit etre envoye");
327 // envoie toujours asynchrone !!!!!!!! : le Message partira
328 // instantanement car
329 // -Sender faisait JaceBuffer.Instance().get() qui contient un
331 // -et ici, Task fait JaceBuffer.Instance().add(msg) qui
332 // contient un notifyAll()
334 JaceSession.Instance().getTaskThread().sleep(10);
335 JaceSession.Instance().getTaskThread().yield();
336 } catch (Exception e) {
338 // System.out.println("TASK : je sort de jaceSend");
340 } catch (Exception e) {
341 if (Register.Instance().getListeOfTasks() == null)
342 System.err.println("Tasks list is null: " + e);
348 *not-blocking reception of data object, return an object
351 * the task id for the task sender
353 * the tag for message
355 @SuppressWarnings("static-access")
356 public Object jaceReceive(int sender, int tag) {
357 if (jaceP2P_Iteration == 0 || postReloading) {
359 if (notExist(sender)) {
363 } catch (Exception e) {
367 Message tmp = MsgQueue.Instance().get(sender, tag);
369 // System.out.println("recu MSG de tache " + sender + " (" +
370 // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
371 // + ") MsgQueue : " +
372 // MsgQueue.Instance().getSize()+" message src_tag="+tmp.getSrc_tag()+
373 // " localError="+tmp.getLocalError());
377 if (underTh == true && jaceP2P_Iteration != 0 && !reloading) {
378 index = depIndex(sender);
380 if ((!(state.equals("VERIF")) || verifNum == tmp
383 // System.out.println("dep["+sender+"]=true, index="+index);
384 setValues(index, true);
388 } catch (Exception e) {
389 System.err.println("Error jaceReceive :" + e);
390 System.err.println("Sender=" + sender);
392 return (tmp.getData());
395 // System.out.println("RIENNN recu de tache " + sender + " (" +
396 // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
397 // + ") taille MsgQueue : " + MsgQueue.Instance().getSize());
398 } catch (Exception e) {
399 if (Register.Instance().getListeOfTasks() == null)
400 System.err.println("Tasks list is null: " + e);
403 JaceSession.Instance().getTaskThread().sleep(10);
404 JaceSession.Instance().getTaskThread().yield();
405 } catch (Exception e) {
411 public boolean notExist(int sender) {
412 int i = dependancies.indexOf((Object) (new Integer(sender)));
419 public void setDep(int value) {
420 dependancies.add(new Integer(value));
421 values.add(new Boolean(false));
424 public int depIndex(int sender) {
425 int index = dependancies.indexOf((Object) (new Integer(sender)));
429 public void setValues(int index, boolean value) {
430 values.setElementAt(new Boolean(value), index);
433 @SuppressWarnings("static-access")
434 public void jaceP2P_Save() {
437 if (jaceP2P_Iteration == 0) {
438 // request 0 when task at barrier
439 // request 1 when only at convergence
441 } else if ((jaceP2P_Iteration % saveParameter) == 0) {
442 // clone the Task at that step of computations
444 synchronized (sauv) {
445 sauv = getTask2save();
447 // send it to the corresponding BackupNode in a round robin
449 new SaveTaskThread(sauv).start();
453 JaceSession.Instance().getTaskThread().sleep(10);
454 JaceSession.Instance().getTaskThread().yield();
455 } catch (Exception e) {
459 // request 0 when Saving all task
460 // request 3 when saving only convergence data
461 @SuppressWarnings("static-access")
462 public void broadcastTasks(int request) {
463 ByteArrayOutputStream stream;
465 // 1 - clone the Task at that step of computations and serialize it
467 synchronized (sauv) {
468 sauv = getTask2save();
469 stream = convertTask2stream(sauv);
471 // 2 - create de saveTab if necessary
472 if (saveTab == null) {
476 JaceInterface stub = null;
478 // 3 - send the stream to all the BackupNodes
479 for (int i = 0; i < saveTab.length; i++) {
480 // System.out.println("saveTab[" + i + "] = " +
483 // 3.1 - get the stub of destinatory
484 // System.out.println("Saving on neighbor "+i);
486 task = Register.Instance().getListeOfTasks()
487 .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
488 stub = task.getHostStub();
489 } catch (Exception e) {
491 .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
495 // 3.2 - send the stream to that stub
497 // System.out.println("saving on second list");
499 new BroadcastTaskThread(stub, jaceMyId, stream
501 sauv.lastSave.getLastSave(), sauv.timeStep,
502 Register.Instance().getAppliName(), 0,
509 } catch (Exception e) {
510 e.printStackTrace(System.out);
513 synchronized (sauvConvg) {
514 sauvConvg = getBackupConvg2Save();
515 stream = convertBackupConv2stream(sauvConvg);
517 // 2 - create de saveTab if necessary
518 if (saveTab == null) {
522 JaceInterface stub = null;
524 // 3 - send the stream to all the BackupNodes
525 for (int i = 0; i < saveTab.length; i++) {
526 // System.out.println("saveTab[" + i + "] = " + saveTab[i]);
528 // 3.1 - get the stub of destinatory
529 // System.out.println("Saving on neighbor "+i);
531 task = Register.Instance().getListeOfTasks()
532 .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
533 stub = task.getHostStub();
534 } catch (Exception e) {
536 .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
540 // 3.2 - send the stream to that stub
542 * if(request==3){ if (stub != null) { new
543 * BroadcastTaskThread(stub, jaceMyId, stream.toByteArray(),
544 * sauvConvg.lastSave.getLastSave(),timeStep,
545 * Register.Instance().getAppliName(),1,saveTab[i]).start();
546 * } //System.out.println("saving on second list"); } else{
549 new BroadcastTaskThread(stub, jaceMyId, stream
550 .toByteArray(), sauvConvg.lastSave
551 .getLastSave(), sauvConvg.timeStep, Register
552 .Instance().getAppliName(), 1, saveTab[i])
556 // System.out.println("saving on first list");
561 // JaceSession.Instance().getTaskThread().sleep(10);
562 JaceSession.Instance().getTaskThread().yield();
565 catch (Exception e) {
569 private ByteArrayOutputStream convertBackupConv2stream(BackupConvg t) {
570 // System.out.println("beginning of the checkpointing process......");
571 ByteArrayOutputStream stream = new ByteArrayOutputStream();
573 ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
574 fluxOut.writeObject(t);
576 } catch (Exception e) {
578 .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
581 // System.out.println("taille du tablo de byte : " +
582 // stream.toByteArray().length + " bytes");
586 // convert the task in stream to send it to the BackupNode
587 private ByteArrayOutputStream convertTask2stream(Task t) {
588 // System.out.println("beginning of the checkpointing process......");
589 ByteArrayOutputStream stream = new ByteArrayOutputStream();
591 ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
592 fluxOut.writeObject(t);
594 } catch (Exception e) {
596 .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
599 // System.out.println("taille du tablo de byte : " +
600 // stream.toByteArray().length + " bytes");
604 // get the task of the appli with the data to save (in
605 // jaceP2P_SaveFromCrash() overloaded in the appli)
606 @SuppressWarnings("unchecked")
607 public Task getTask2save() {
609 sauv = jaceP2P_SaveFromCrash();
611 // System.out.println("Saving data at it="+sauv.it);
612 // assign the default attributes of a task (params of the appli and
613 // TaskId of the local node)
614 sauv.setId(this.jaceTaskId);
615 sauv.setParam(Register.Instance().getParams());
616 sauv.timeStep = timeStep;
618 // assign the iteration number that had the appli at the moment of the
620 sauv.jaceP2P_Iteration = jaceP2P_Iteration;
621 sauv.saveRound = saveRound;
623 sauv.underTh = underTh;
624 sauv.jaceP2P_globalCV_state = jaceP2P_globalCV_state;
625 // System.out.println("I checkpoint the task at ite " +
626 // sauv.jaceP2P_Iteration);
627 // attributes needed to detect global convergence
631 sauv.nb_not_recv = nb_not_recv;
632 sauv.electedNode = electedNode;
633 sauv.respSent = respSent;
634 sauv.neighbors = (Vector) neighbors.clone();
635 sauv.neighborsValues = (Vector) neighborsValues.clone();
636 sauv.resp = (Vector) resp.clone();
637 sauv.verifNum = verifNum;
638 sauv.sendId = sendId;
639 sauv.reduceAll = reduceAll;
640 sauv.finalStep = finalStep;
641 sauv.action = action;
642 sauv.verdict = verdict;
643 sauv.localCV_state = localCV_state;
644 sauv.recievedVerdict = recievedVerdict;
645 synchronized (lastSave) {
646 lastSave.increment();
647 sauv.lastSave = lastSave;
649 } catch (Exception e) {
650 System.err.println("Problem with RMI !");
656 @SuppressWarnings("unchecked")
657 public BackupConvg getBackupConvg2Save() {
661 // sauvConvg=new BackupConvg();
662 sauvConvg.state = state;
663 sauvConvg.underTh = underTh;
664 sauvConvg.nb_not_recv = nb_not_recv;
665 sauvConvg.electedNode = electedNode;
666 sauvConvg.respSent = respSent;
667 sauvConvg.neighbors = (Vector) neighbors.clone();
668 sauvConvg.neighborsValues = (Vector) neighborsValues.clone();
669 sauvConvg.resp = (Vector) resp.clone();
670 sauvConvg.verifNum = verifNum;
671 sauvConvg.sendId = sendId;
672 sauvConvg.finalStep = finalStep;
673 sauvConvg.action = action;
674 sauvConvg.verdict = verdict;
675 sauvConvg.localCV_state = localCV_state;
676 sauvConvg.timeStep = timeStep;
677 sauvConvg.recievedVerdict = recievedVerdict;
678 sauvConvg.jaceP2P_Iteration = jaceP2P_Iteration;
679 sauvConvg.reduceAll = reduceAll;
681 synchronized (lastSave) {
682 lastSave.increment();
683 sauvConvg.lastSave = lastSave;
685 sauvConvg.initialized = true;
686 } catch (Exception e) {
687 System.err.println("Problem with RMI:" + e);
693 private void createSaveTab() {
695 saveTab = new int[BackupsManager.Instance().size()];
697 // 2 - assign the taskId to each cell of the saveTab
698 // System.out.println("in TASK");
699 for (int i = 0; i < saveTab.length; i++) {
700 saveTab[i] = BackupsManager.Instance().getBackupTaskAtIndex(i, 0)
702 // System.out.print(saveTab[i] + ", ");
704 // System.out.println("saveTab created..... size : " + saveTab.length);
707 public void setSaved(boolean bool) {
708 synchronized (saved) {
710 if (saved[0] == false)
712 else if (saved[1] == false)
722 public void waitForAck(int tag) {
723 if (Register.Instance().getNumBackupNeighbors() != 0) {
726 // Calendar cal = new GregorianCalendar();
727 // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
728 // System.out.println("sleeping till acknowledge saved");
729 while (getSaved() == false)
735 Register newReg = Register.Instance()
736 .getSpawnerStub().getRegister(jaceMyId);
737 if (newReg != null) {
738 Register.Instance().replaceBy(newReg);
741 .println("I got a null register from the spawner");
744 .println("Sleeping till acknowledge saved");
746 } catch (Exception e2) {
748 .println("Unable to get register from spawner :"
750 e2.printStackTrace(System.out);
753 } catch (Exception e) {
755 // Calendar cal1 = new GregorianCalendar();
756 // System.out.println("end save at time="+cal1.get(Calendar.MINUTE)+":"+cal1.get(Calendar.SECOND));
760 public void jaceP2P_GlobalConvergence(boolean under_th) {
765 if (under_th != underTh && postReloading == false) {
766 // 1 - update sous seuil
769 if (jaceP2P_Iteration == 0
770 && (postReloading == false || resp.size() == 0)) {
771 detectNeighbors(jaceMyId, jaceSize);
779 // affiche les valeurs des variables
781 // Calendar cal = new GregorianCalendar();
782 // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
783 // System.out.println("MyId="+jaceMyId+" sous seuil="+underTh+" etat="+state+" verifNum="+verifNum+" localCV="+localCV_state+" leader="+electedNode+" respSent="+respSent);
784 // System.out.println("dep="+getValues()+" pBeg="+pseudoPerBeg+" PEnd="+pseudoPerEnd+" sendId="+sendId);
785 // System.out.println("neigh="+nb_not_recv+" action="+action+" negative_resp="+testNegativeResp()+" reloading="+reloading+"\n"+" finalStep="+finalStep+" SavedResults="+savedResults+" NeghNotCV="+getNeighbourNotCV()+" postReload="+postReloading);
788 } catch (Exception e) {
789 System.err.println("Error printing status in Task :" + e);
792 if (action.equals("sendVerif") && state.equals("VERIF")
793 && postReloading == true) {
794 // System.out.println("je passe ds send verif");
796 new SendVerifThread(jaceMyId, sendId, verifNum).start();
798 // System.out.println("send Verif");
800 } else if (action.equals("sendVerdict") && postReloading == true) {
801 // System.out.println("je passe ds send verdict-------");
803 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
806 // System.out.println("verifNum="+verifNum);
809 if (state.equals("NORMAL") && action.equals("nothing")) {
810 if (underTh == false)
813 // System.out.println("sous seuil="+underTh+" PseudoPerBeg="+localStub.getPseudoPerBeg());
814 if (pseudoPerBeg == false) {
815 // System.out.println("ds la condition");
817 } else if (pseudoPerEnd == true) {
819 localCV_state = true;
820 if (nb_not_recv == 0) {
821 // localStub.setAction("sendVerif");
823 broadcastVerif(jaceMyId, -1, verifNum + 1);
824 // localStub.setAction("nothing");
827 recievedVerdict = true;
828 initializeVerifLeader();
831 } catch (Exception e) {
833 .println("The verification message is not received: "
835 Register.Instance().viewAll();
837 } else if (nb_not_recv == 1)
840 int neighId = getNeighbourNotCV();
845 recev = Register.Instance().getListeOfTasks()
846 .getTaskIdOfRank(neighId);
847 JaceInterface stub = recev.getHostStub();
848 if (stub.setNbNeighboursNotConv(verifNum,
849 jaceMyId, timeStep)) {
850 // stub.setLeftNeighbourCV(true);
851 // System.out.println("sent convergence message to "+neighId);
854 recievedVerdict = false;
857 } catch (Exception e) {
859 .println("Unable to decrease the number of neighbors not converged on node :"
860 + recev.getHostName()
865 Register.Instance().viewAll();
869 TaskId id = Register.Instance()
871 .getTaskIdOfHostStub(
874 myRank = id.getRank();
875 Register.Instance().replaceBy(
878 .getRegister(myRank));
881 } catch (Exception e2) {
883 .println("Unable to contact the spawner: "
889 } else if (pseudoPerBeg == true && getValues())
891 } else if (state.equals("WAIT4V") && nb_not_recv == 0) {
892 int neighId = getRankLeader();
895 broadcastVerif(jaceMyId, -1, verifNum + 1);
897 recievedVerdict = true;
898 initializeVerifLeader();
902 } catch (Exception e) {
904 .println("The verification message is not received:"
907 } else if (jaceMyId > neighId) {
909 broadcastVerif(jaceMyId, -1, verifNum + 1);
911 recievedVerdict = true;
912 initializeVerifLeader();
915 } catch (Exception e) {
917 .println("The verification message is not received: "
921 } else if (state.equals("WAIT4V")) {
922 if (underTh == false) {
923 localCV_state = false;
926 } else if (state.equals("VERIF")) {
927 // if(localStub.getNewerDep(0) && localStub.getNewerDep(1))
928 // localStub.setPseudoPerEnd(true);
930 if (electedNode == true) {
931 if ((!underTh && !postReloading) || !localCV_state
932 || testNegativeResp()) {
935 broadcastVerdict(jaceMyId, -2, verifNum + 1, false);
940 } catch (Exception e) {
942 } else if (postReloading) {
943 if (recievedAllResp())
944 if (!testNegativeResp()) {
946 broadcastVerdict(jaceMyId, -2, verifNum,
948 if (finalStep == true
949 && state.equals("VERIF")) {
950 initializeSavLeader();
957 } catch (Exception e) {
958 System.err.println("Error: " + e);
961 } else if (pseudoPerEnd == true) {
962 if (recievedAllResp())
963 if (!testNegativeResp()) {
965 broadcastVerdict(jaceMyId, -2, verifNum,
967 if (finalStep == true
968 && state.equals("VERIF")) {
969 initializeSavLeader();
977 } catch (Exception e) {
978 System.out.println("erreur: " + e);
983 broadcastVerdict(jaceMyId, -2,
984 verifNum + 1, false);
989 } catch (Exception e) {
991 .println("Unable to broadcast a negative verdict :"
997 else if (getValues())
999 } else if (!respSent) {
1000 if (!underTh || !localCV_state || testNegativeResp()) {
1001 if (action.equals("nothing")) {
1002 int neighId = sendId;
1004 TaskId recev = null;
1005 recev = Register.Instance().getListeOfTasks()
1006 .getTaskIdOfRank(neighId);
1007 JaceInterface stub = recev.getHostStub();
1008 // System.out.println("tryin to send negative response to "+neighId);
1009 stub.response(jaceMyId, verifNum, -1, null);
1010 // System.out.println("send negative response to "+neighId);
1014 } catch (Exception e) {
1016 .println("Response not received:" + e);
1017 Register.Instance().viewAll();
1020 } else if (pseudoPerEnd) {
1021 // System.out.print("The daemon can send a response ");
1022 int index = recievedAllRespMinusOne();
1023 // System.out.println("to node of index ="+index);
1025 int rank = getNeighborRank(index);
1026 // if(jaceMyId!=jaceSize-1 &&
1027 // localStub.getSendRight()==true){
1029 TaskId recev = null;
1030 recev = Register.Instance().getListeOfTasks()
1031 .getTaskIdOfRank(rank);
1032 JaceInterface stub = recev.getHostStub();
1034 if (getResp(index) == 1) {
1035 stub.response(jaceMyId, verifNum, 1,
1037 // System.out.println("send positive response to"+rank);
1039 stub.response(jaceMyId, verifNum, -1, null);
1040 // System.out.println("send negative response to"+rank);
1045 } catch (Exception e) {
1046 System.err.println("Response not received by "
1048 Register.Instance().viewAll();
1051 } else if (getValues())
1052 pseudoPerEnd = true;
1055 } else if (state.equals("SAVING") && !action.equals("sendVerdict")) {
1057 if (savedResults == false) {
1060 savedResults = true;
1062 } else if (electedNode) {
1063 if (recievedAllResp())
1065 // System.out.println("recieved all responses");
1066 JaceSpawnerInterface spawnerStub = Register
1067 .Instance().getSpawnerStub();
1068 // System.out.println("##### callin spawnerStub.setFinished(true) #####");
1069 spawnerStub.setOver(true);
1070 // localStub.setState("FINISHED");
1071 } catch (Exception e) {
1072 System.err.println("Error" + e);
1074 } else if (!respSent) {
1075 int index = recievedAllRespMinusOne();
1077 int rank = getNeighborRank(index);
1079 // if(jaceMyId!=jaceSize-1 &&
1080 // localStub.getSendRight()==true){
1082 TaskId recev = null;
1083 recev = Register.Instance().getListeOfTasks()
1084 .getTaskIdOfRank(rank);
1085 JaceInterface stub = recev.getHostStub();
1086 if (stub.getReloading() == false
1087 && stub.getState().equals("SAVING")) {
1088 action = "sendResponse";
1089 stub.response(jaceMyId, verifNum, 1, null);
1093 // System.out.println("send response to"+rank);
1095 } catch (Exception e) {
1096 System.err.println("Response not received" + e);
1100 } else if (state.equals("FINISHED")
1101 && !action.equals("sendVerdict") && recievedVerdict) {
1103 jaceP2P_globalCV_state = true;
1104 // System.out.println("Finished");
1105 // System.out.println("Finished");
1106 // System.out.println("Finished");
1107 // System.out.println("Finished");
1108 System.out.println("Finished");
1111 if (postReloading == true)
1112 postReloading = false;
1114 } catch (Exception e) {
1115 System.err.println("Exception in Global Convergence :" + e);
1116 e.printStackTrace(System.out);
1117 Register.Instance().viewAll();
1122 public synchronized int getResp(int index) throws RemoteException {
1124 for (int i = 0; i < resp.size(); i++)
1126 if (((Integer) resp.get(i)).intValue() == -1)
1131 public synchronized void response(int neighId, int tag, int response,
1132 Vector<?> recievedValue) throws RemoteException {
1133 // System.out.println("inside response function");
1134 // System.out.println("sleeping till not reloading");
1135 while (reloading == true) {
1138 // System.out.println("sleeping till not reloading");
1139 } catch (Exception e) {
1142 if (verifNum == tag) {
1143 // System.out.println("inside condition");
1144 int indexNeigh = neighbors.indexOf((Object) neighId);
1145 // System.out.println("after gettin index="+index);
1146 // System.out.println("index="+indexNeigh+" size de resp ="+resp.size());
1149 // int xyz=((Integer)(resp.elementAt(indexNeigh))).intValue();
1150 // }catch(Exception e){
1151 // System.out.println("fuckkkkkkkkkkkkkkkkkk error:"+e);
1155 && !state.equals("SAVING")
1156 && (((Integer) (resp.elementAt(indexNeigh))).intValue()) != 1) {
1157 // System.out.println("calling reduceAll()");
1158 reduceAll(recievedValue);
1160 // System.out.println("after calculating reduceAll");
1161 resp.setElementAt(response, indexNeigh);
1163 // System.out.println("get response ............");
1167 throw new RemoteException();
1170 public void initializeSavLeader() {
1171 for (int i = 0; i < resp.size(); i++)
1172 resp.setElementAt(0, i);
1176 public boolean recievedAllResp() {
1177 boolean bool = true;
1178 for (int i = 0; i < resp.size(); i++)
1179 if (((Integer) (resp.get(i))).intValue() == 0) {
1186 public int recievedAllRespMinusOne() {
1188 int indexOfZero = -1;
1189 for (int i = 0; i < resp.size(); i++)
1190 if (((Integer) (resp.get(i))).intValue() == 0) {
1200 public synchronized boolean setNbNeighboursNotConv(int tag, int idNeigh,
1201 int neighborTimeStep) throws RemoteException {
1202 // System.out.println("ds setNbNeighboursNotConv !!!!!!!!!!!!!!!");
1203 if (tag == verifNum && !action.equals("sendVerdict")
1204 && neighborTimeStep == timeStep) {
1205 if (idNeigh == -1) {
1209 // System.out.println("sleeping till not reloading");
1210 while (reloading == true) {
1213 // System.out.println("sleeping till not reloading");
1214 } catch (Exception e) {
1217 int i = neighbors.indexOf((Object) idNeigh);
1218 if (((Boolean) neighborsValues.get(i)).booleanValue() == false) {
1220 // System.out.println("le noeud "+idNeigh+" a envoyer un message de pseudoconvergence");
1221 neighborsValues.setElementAt(new Boolean(true), i);
1227 } else if (tag == verifNum - 1 && !action.equals("sendVerdict")
1228 && reloading == false && neighborTimeStep == timeStep
1229 && jaceP2P_Iteration != 0)
1235 public void detectNeighbors(int id, int jaceSize) {
1236 // System.out.println("detect neighbors !!!! ");
1238 while (Math.pow(2, d) < jaceSize) {
1239 if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize)) {
1240 neighbors.add((int) (id + Math.pow(2, d)));
1241 neighborsValues.add(new Boolean(false));
1244 if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d)) {
1245 neighbors.add((int) (id - Math.pow(2, d)));
1246 neighborsValues.add(new Boolean(false));
1253 public synchronized void initialize_state() {
1255 // System.out.println("initialiser\n");
1257 nb_not_recv = neighbors.size();
1258 for (int i = 0; i < neighbors.size(); i++)
1259 neighborsValues.setElementAt(new Boolean(false), i);
1260 for (int i = 0; i < resp.size(); i++)
1261 resp.setElementAt(0, i);
1263 electedNode = false;
1264 localCV_state = false;
1270 public int getNeighborRank(int index) throws RemoteException {
1271 int rank = ((Integer) (neighbors.get(index))).intValue();
1275 public synchronized int getNeighbourNotCV() {
1277 for (int i = 0; i < neighbors.size(); i++)
1278 if (((Boolean) neighborsValues.get(i)).booleanValue() == false)
1279 neighId = ((Integer) neighbors.elementAt(i)).intValue();
1283 public boolean getValues() {
1284 boolean bool = true;
1285 for (int i = 0; i < values.size(); i++)
1286 if (((Boolean) values.elementAt(i)).equals(new Boolean(false))) {
1290 // System.out.println("getValues() have been called and it returned "+bool);
1294 public boolean testNegativeResp() {
1295 boolean bool = false;
1296 for (int i = 0; i < resp.size(); i++)
1297 if (((Integer) (resp.get(i))).intValue() == -1) {
1304 public void printResp() {
1305 // for(int i=0;i<resp.size();i++)
1306 // System.out.print(" resp["+((Integer)neighbors.get(i)).intValue()+"]="+((Integer)resp.get(i)).intValue());
1307 // System.out.print("\n");
1310 public void printDep() {
1311 // for(int i=0;i<values.size();i++)
1312 // System.out.print(" dep["+((Integer)dependancies.get(i)).intValue()+"]="+((Boolean)values.get(i)).booleanValue());
1313 // System.out.print("\n");
1316 public void broadcastVerif(int id, int neighId, int tag)
1317 throws RemoteException {
1319 TaskId recev = null;
1321 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1323 // System.out.println("Id="+id+" neighId="+ neighId+" tag="+tag);
1324 for (int i = 0; i < neighbors.size(); i++)
1325 if (neighId != ((Integer) neighbors.get(i)).intValue()) {
1326 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1327 ((Integer) neighbors.get(i)).intValue());
1328 stub = recev.getHostStub();
1329 stub.initializeVerif(tag);
1330 // System.out.println("broadcast verification to :"+((Integer)neighbors.get(i)).intValue());
1332 if (action.equals("sendVerif"))
1336 public void initializeVerifLeader() throws RemoteException {
1338 for (int i = 0; i < resp.size(); i++)
1339 resp.setElementAt(0, i);
1344 public void initializeVerif(int tag) throws RemoteException {
1345 // System.out.println("Inside initializeVerif @@@@@");
1346 // System.out.println("sleeping till not reloading");
1347 while (reloading == true) {
1350 // System.out.println("sleeping till not reloading");
1351 } catch (Exception e) {
1354 if (verifNum + 1 == tag)
1355 if (state.equals("WAIT4V")) {
1356 action = "sendVerif";
1358 for (int i = 0; i < resp.size(); i++)
1359 resp.setElementAt(0, i);
1366 new SendVerifThread(jaceMyId, sendId, verifNum).start();
1368 } else if (state.equals("VERIF")) {
1370 throw new RemoteException();
1376 public boolean getSaved() {
1383 public void reinitializePPEr() {
1384 pseudoPerBeg = false;
1385 pseudoPerEnd = false;
1386 for (int i = 0; i < values.size(); i++)
1387 values.setElementAt(new Boolean(false), i);
1391 public void broadcastVerdict(int id, int neighId, int tag, boolean verd)
1392 throws RemoteException {
1393 Boolean verdicto = verd;
1394 TaskId recev = null;
1396 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1398 for (int i = 0; i < neighbors.size(); i++)
1399 if (((Integer) neighbors.get(i)).intValue() != neighId) {
1400 // System.out.println("broadcasting verdict "+verdicto+" to node of Rank "+((Integer)neighbors.get(i)).intValue());
1401 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1402 ((Integer) neighbors.get(i)).intValue());
1403 stub = recev.getHostStub();
1404 stub.savOrFinOrRest(tag, timeStep, verdicto, reduceAll);
1407 if (verdict == false)
1411 public void savOrFinOrRest(int tag, int step, boolean verd,
1412 Vector<?> reduceAll) {
1413 // System.out.println("Recieved verd "+verd+" sleeping till not reloading");
1414 while (reloading == true) {
1417 // System.out.println("sleeping till not reloading");
1418 } catch (Exception e) {
1422 if (verifNum == tag && timeStep == step && state.equals("VERIF")) {
1423 action = "sendVerdict";
1424 if (finalStep == true) {
1425 for (int i = 0; i < resp.size(); i++)
1426 resp.setElementAt(0, i);
1427 // System.out.println("sleeping till response is sent");
1428 while (action.equals("sendResponse") || respSent == false)
1431 // System.out.println("sleeping till response is sent");
1432 } catch (Exception e) {
1437 recievedVerdict = true;
1442 recievedVerdict = true;
1444 // System.out.println("//// Stetting reduceAll\\\\");
1445 this.reduceAll = reduceAll;
1448 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
1451 } else if (verifNum < tag && timeStep == step) {
1456 action = "sendVerdict";
1457 recievedVerdict = true;
1460 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict).start();
1465 public int getRankLeader() throws RemoteException {
1467 for (int i = 0; i < neighbors.size(); i++) {
1468 TaskId recev = null;
1469 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1470 ((Integer) (neighbors.get(i))).intValue());
1471 JaceInterface stub = recev.getHostStub();
1472 if (stub.getNbNeighboursNotConv() == 0)
1473 neighId = ((Integer) (neighbors.get(i))).intValue();
1478 class SaveTaskThread extends Thread {
1481 public SaveTaskThread(Task s) {
1485 @SuppressWarnings("static-access")
1487 // If array of BackupNodes not created, create it
1488 if (saveTab == null) {
1492 if (finalize == false) { // if finalization step, it will crash
1493 // because register purged yet
1495 ByteArrayOutputStream stream = convertTask2stream(sauv);
1497 // find the BackupNode to send
1500 JaceInterface stub = null;
1501 boolean sent = false;
1504 while (j < saveTab.length && sent == false) {
1505 // 1 - find the remote task Id to send the backup to
1506 if (jaceSize < (2 * Register.Instance()
1507 .getNumBackupNeighbors() + 1)) {
1508 taskRankDest = saveTab[saveRound % (jaceSize - 1)];
1510 taskRankDest = saveTab[saveRound
1511 % (2 * Register.Instance()
1512 .getNumBackupNeighbors())];
1515 // 2 - knowing the destination task Id, get the stub of the
1516 // corresponding node
1518 task = Register.Instance().getListeOfTasks()
1519 .getTaskIdOfRank(taskRankDest); // ///////////////////////////pb
1520 stub = task.getHostStub();
1521 } catch (Exception e) {
1523 .println("Problem in SaveTaskThread on assignation line in save : "
1526 // System.out.println("ite " + jaceP2P_Iteration +
1527 // ".......... SENDING on task " + taskRankDest);
1529 // if no stub there is a problem
1533 // System.out.println("unable to SEND backup on task of rank "
1536 // 3 - try to send the stream
1538 // if there is a stub, send the stream to that node
1540 stub.saveTask(jaceMyId, stream.toByteArray(),
1541 sauv.lastSave.getLastSave(), sauv.timeStep,
1542 Register.Instance().getAppliName(), 0);
1543 // System.out.println("saved data on "+taskRankDest+" iteration= "+sauv.lastSave.getLastSave()+
1544 // "timeStep="+sauv.timeStep+" !!!!!!!!");
1546 // Vector v = stub.getIterationOfBackup(jaceMyId,0);
1547 // int ite=((Integer)v.get(0)).intValue();
1548 // System.out.println("******************************************sauvegarde de donnees: ite="+ite+" taskDest="+taskRankDest+" ******************************************************");
1551 } catch (Exception e) {
1553 .println("JaceP2P_Error in Task.jaceP2P_Save() when saving stream: "
1562 JaceSession.Instance().getTaskThread().sleep(10);
1563 JaceSession.Instance().getTaskThread().yield();
1564 } catch (Exception e) {
1568 // 5 - if stream not sent at all, do something (WHAT ???)
1569 if (j > saveTab.length) {
1571 .println("No more alive neighbors for storing the Backup");
1572 // TODO : what to do if no BackupNode has answered ???
1581 class BroadcastTaskThread extends Thread {
1592 public BroadcastTaskThread(JaceInterface theStub, int theRank,
1593 byte[] theTsk, int theIteration, int timeStep, String theAppliName,
1594 int tag, int dest) {
1595 this.stub = theStub;
1596 this.rank = theRank;
1598 this.iteration = theIteration;
1599 this.timeStep = timeStep;
1600 this.appliName = theAppliName;
1603 // System.out.println("tag="+tag);
1606 // method launched by start()
1609 stub.saveTask(rank, tsk, iteration, timeStep, appliName, tag);
1610 JaceSession.Instance().getTaskObject().setSaved(true);
1613 // Vector v = stub.getIterationOfBackup(rank,0);
1614 // int ite=((Integer)v.get(0)).intValue();
1615 // System.out.println("++++++++++ Broadcast data ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1618 // Vector v = stub.getIterationOfBackup(rank,1);
1619 // int ite=((Integer)v.get(0)).intValue();
1620 // System.out.println("++++++++++ Broadcast dataConvg ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1621 // System.out.println("+++++++++++++++++++++++++++++++++++++++++ Broadcast convgData ite="+ite+" dest="+dest+" +++++++++++++++++++++++++++++++++++++++++++++++++++++");
1624 } catch (Exception e) {
1626 .println("Node not reachable by JaceServer.saveTask() in BroadcastTaskThread :"