3 import java.io.ByteArrayOutputStream;
4 import java.io.ObjectInputStream;
5 import java.io.ObjectOutputStream;
6 import java.rmi.ConnectException;
7 import java.rmi.RemoteException;
8 import java.util.ArrayList;
10 //import java.util.Calendar;
11 //import java.util.GregorianCalendar;
13 //import com.jamonapi.*;
15 public class Task implements Runnable, Cloneable, java.io.Serializable {
16 private static final long serialVersionUID = 1L;
18 public double errorLoc = 0;
19 public int saveParameter;
22 public TaskId jaceTaskId = null;
23 public String[] jaceArgs;
24 public boolean reloading = false;
25 public String state = "NORMAL";
26 public int nb_not_recv;
27 public boolean electedNode = false;
28 public boolean respSent = false;
29 public ArrayList<Integer> resp;
30 public int verifNum = 0;
31 public LastSave lastSave = new LastSave();
32 public ArrayList<Integer> neighbors;
33 public ArrayList<Boolean> neighborsValues;
34 public ArrayList<Integer> dependancies;
35 public ArrayList<Boolean> values;
37 public String action = "nothing";
38 public boolean verdict = false;
39 public boolean recievedVerdict = false;
40 public int jaceP2P_Iteration = 0;
41 public boolean finalStep = false;
42 // public Monitor mon1=null;
43 public int timeStep = 0; // time discretization counter for non-stationary
45 public boolean localCV_state;
46 public boolean jaceP2P_globalCV_state = false;
47 // attribute to know if an appli has finished yet
48 private boolean finalize = false;
49 public boolean pseudoPerBeg;
50 public boolean pseudoPerEnd;
51 public boolean underTh = false;
52 public boolean saved[];
53 // attributes for BackupNodes
54 private int saveRound = 0;
55 private int[] saveTab = null;
56 public boolean savedResults = false;
58 public BackupConvg sauvConvg = new BackupConvg();
59 public boolean postReloading = false;
60 public ArrayList<Object> reduceAll;
67 reduceAll = new ArrayList<Object>();
68 dependancies = new ArrayList<Integer>();
69 values = new ArrayList<Boolean>();
70 resp = new ArrayList<Integer>();
71 saved = new boolean[2];
73 neighbors = new ArrayList<Integer>();
74 neighborsValues = new ArrayList<Boolean>();
77 public void getBackupForNewNode(int rank) {
79 JaceInterface stub = null;
80 task = Register.Instance().getListeOfTasks().getTaskIdOfRank(rank);
82 stub = task.getHostStub();
84 // if no stub there is a problem
86 System.err.println("Unable to send backup on task of rank " + rank);
88 // if there is a stub, send the stream to that node
90 ByteArrayOutputStream stream;
92 stream = convertTask2stream(sauv);
93 stub.saveTask(jaceMyId, stream.toByteArray(), sauv.lastSave
94 .getLastSave(), sauv.timeStep, Register.Instance()
97 synchronized (sauvConvg) {
98 if (sauvConvg.initialized == true) {
99 stream = convertBackupConv2stream(sauvConvg);
100 stub.saveTask(jaceMyId, stream.toByteArray(),
101 sauvConvg.lastSave.getLastSave(),
102 sauvConvg.timeStep, Register.Instance()
106 } catch (Exception e) {
107 System.err.println("Error in getBackupForNewNode :" + e);
108 e.printStackTrace(System.out);
113 // method to overload by user in the appli to convert the stream sent by the
114 // BackupNode to a Task object of the type of the appli this method is
116 // in TaskLauncher.loadBackupAndRestart() and
117 // TaskLauncher.loadOrReloadTask()
118 public Task jaceP2P_ConvertStream(ObjectInputStream stream) {
122 public void printSav() {
125 // method to overload by user in the appli to specify the reduceAll method
126 public synchronized void reduceAll(ArrayList<Object> recievedValue) {
129 public void setId(TaskId Id) {
131 jaceMyId = jaceTaskId.getRank();
133 jaceSize = Register.Instance().getNbOfTasks();
134 } catch (Exception e) {
136 System.err.println("SetId is bad !! " + e + " "
137 + LocalHost.Instance().getName());
138 // jaceSize = Register.Instance().getListeOfTasks().getSize();
139 } catch (Exception e2) {
140 System.err.println("Not localised the spawner : " + e2);
145 public void setParam(String[] arg) {
150 public void setJaceSize(int nbTasks) {
154 public TaskId getId() {
159 public int getTimeStep() {
163 // method to overload by user in the appli to identify the neighbors of a
166 public int[] getDependencies(int i) {
170 // method to overload by user in the appli to init each task at beginning of
173 public void jaceP2P_InitTask() {
176 // TaskLauncher.loadOrReloadTask()
177 public void jaceP2P_ReinitTask() {
180 // method to overload by user in the appli to safeguard the results
181 public void saveResults() {
184 // method to overload by user in the appli to save only the requiered
185 // attributes (iter, vecteurs,......) this method is called in
186 // Task.jaceP2P_Save()
187 public Task jaceP2P_SaveFromCrash() {
188 System.out.println("JaceSaveFromCrash Task");
192 public void jaceFinalize() {
195 System.out.println("Ready to Death Task:" + jaceMyId);
198 Register.Instance().getSpawnerStub().killApplication(
199 LocalHost.Instance().getStub());
201 } catch (Exception e) {
203 .println("Cannot join the Spawner to kill application: "
207 JaceDaemon.Instance().reinitDaemon();
211 public void jaceP2P_ReinitConv() {
212 // System.out.println("reinit conv");
216 // last_iter.removeAllElements();
218 reinitializeVectors();
220 } catch (Exception e) {
221 System.err.println("Error in jaceP2P_reinitConv():" + e);
223 pseudoPerBeg = false;
224 pseudoPerEnd = false;
225 synchronized (lastSave) {
226 lastSave = new LastSave();
228 // reinit les var de conv de Task generique
229 localCV_state = false;
230 jaceP2P_globalCV_state = false;
234 public void reinitializeVectors() throws RemoteException {
236 dependancies.clear() ;
238 neighborsValues.clear() ;
242 public long jaceP2P_getChronoValue() {
246 result = Register.Instance().getSpawnerStub().getChronoValue(
247 Register.Instance().getAppliName());
248 } catch (Exception e) {
250 .println("JaceP2P_Error in Task.jaceP2P_getChronoValue() on SuperNode : "
258 //System.out.println("ds run()");
262 *asynchronous sending (non blocking), of data object to an other task
265 * the object (serializable) data to be send
267 * the task id for receiver's task
269 * the tag for message
271 @SuppressWarnings("static-access")
272 public void jaceSend(Object buffer, int dest, int tag, double erreur_locale) {
273 // System.out.println("dest : " + dest);
277 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(dest);
280 System.err.println("In jaceSend recv = null !");
282 JaceSession.Instance().getTaskThread().sleep(10);
283 JaceSession.Instance().getTaskThread().yield();
284 } catch (Exception e) {
288 // TODO : virer ce else, mais chercher pkoi recev est null des fois
290 if (recev.getRank() != dest) {
291 System.err.println("Problem !! pas le meme dest que ds les param");
295 // TODO : rajouter nom de l'appli ds Messag
298 stub = recev.getHostStub();
299 if (stub.getTimeStep() == timeStep) {
300 // System.out.println("************ "+verifNum+" *************");
301 Message msg = new Message();
302 msg.setParam(buffer, jaceTaskId, recev, tag, timeStep,
303 jaceP2P_Iteration, verifNum, erreur_locale);
305 // on met le message ds
306 // JaceBuffer.Instance() (la liste des Message a
310 * if(JaceDaemon.Instance().getProtocol().equals("socket"
311 * )) SenderSocket.Instance().buffer.add(msg); else
312 * if(JaceDaemon.Instance().getProtocol().equals("rmi"))
313 * SenderRmi.Instance().buffer.add(msg);
315 // System.out.println("putting message to "+dest+" in the buffer");
316 Sender.Instance().getBuffer().add(msg);
318 // if(JaceDaemon.Instance().getProtocol().equals("rmi"))
319 // SenderRmi.Instance().getBuffer().add(msg);
321 // SenderSocket.Instance().getBuffer().add(msg);
324 } catch( ConnectException ce ) {
325 recev.getHostStub().suicide2( "Not responding!" ) ;
326 } catch (Exception e) {
327 System.err.println("Unable to send data message to " + dest
329 // recev.getHostStub().suicide2( "Not responding!" ) ;
331 // System.out.println("TASK : g mis un msg qui doit etre envoye");
332 // envoie toujours asynchrone !!!!!!!! : le Message partira
333 // instantanement car
334 // -Sender faisait JaceBuffer.Instance().get() qui contient un
336 // -et ici, Task fait JaceBuffer.Instance().add(msg) qui
337 // contient un notifyAll()
339 JaceSession.Instance().getTaskThread().sleep(10);
340 JaceSession.Instance().getTaskThread().yield();
341 } catch (Exception e) {
343 // System.out.println("TASK : je sort de jaceSend");
345 } catch (Exception e) {
346 if (Register.Instance().getListeOfTasks() == null)
347 System.err.println("Tasks list is null: " + e);
353 *not-blocking reception of data object, return an object
356 * the task id for the task sender
358 * the tag for message
360 @SuppressWarnings("static-access")
361 public Object jaceReceive(int sender, int tag) {
362 if (jaceP2P_Iteration == 0 || postReloading) {
364 if (notExist(sender)) {
368 } catch (Exception e) {
372 Message tmp = MsgQueue.Instance().get(sender, tag);
374 // System.out.println("recu MSG de tache " + sender + " (" +
375 // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
376 // + ") MsgQueue : " +
377 // MsgQueue.Instance().getSize()+" message src_tag="+tmp.getSrc_tag()+
378 // " localError="+tmp.getLocalError());
382 if (underTh == true && jaceP2P_Iteration != 0 && !reloading) {
383 index = depIndex(sender);
385 if ((!(state.equals("VERIF")) || verifNum == tmp
388 // System.out.println("dep["+sender+"]=true, index="+index);
389 setValues(index, true);
393 } catch (Exception e) {
394 System.err.println("Error jaceReceive :" + e);
395 System.err.println("Sender=" + sender);
397 return (tmp.getData());
400 // System.out.println("RIENNN recu de tache " + sender + " (" +
401 // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
402 // + ") taille MsgQueue : " + MsgQueue.Instance().getSize());
403 } catch (Exception e) {
404 if (Register.Instance().getListeOfTasks() == null)
405 System.err.println("Tasks list is null2: " + e);
408 JaceSession.Instance().getTaskThread().sleep(10);
409 JaceSession.Instance().getTaskThread().yield();
410 } catch (Exception e) {
416 public boolean notExist(int sender) {
417 int i = dependancies.indexOf((Object) (new Integer(sender)));
424 public void setDep(int value) {
425 dependancies.add(new Integer(value));
426 values.add(new Boolean(false));
429 public int depIndex(int sender) {
430 int index = dependancies.indexOf((Object) (new Integer(sender)));
434 public void setValues(int index, boolean value) {
435 values.set( index, new Boolean(value) ) ;
438 @SuppressWarnings("static-access")
439 public void jaceP2P_Save() {
442 if (jaceP2P_Iteration == 0) {
443 // request 0 when task at barrier
444 // request 1 when only at convergence
446 } else if ((jaceP2P_Iteration % saveParameter) == 0) {
447 // clone the Task at that step of computations
449 synchronized (sauv) {
450 sauv = getTask2save();
452 // send it to the corresponding BackupNode in a round robin
454 new SaveTaskThread(sauv).start();
458 JaceSession.Instance().getTaskThread().sleep(10);
459 JaceSession.Instance().getTaskThread().yield();
460 } catch (Exception e) {
464 // request 0 when Saving all task
465 // request 3 when saving only convergence data
466 @SuppressWarnings("static-access")
467 public void broadcastTasks(int request) {
468 ByteArrayOutputStream stream;
470 // 1 - clone the Task at that step of computations and serialize it
472 synchronized (sauv) {
473 sauv = getTask2save();
474 stream = convertTask2stream(sauv);
476 // 2 - create de saveTab if necessary
477 if (saveTab == null) {
481 JaceInterface stub = null;
483 // 3 - send the stream to all the BackupNodes
484 for (int i = 0; i < saveTab.length; i++) {
485 // System.out.println("saveTab[" + i + "] = " +
488 // 3.1 - get the stub of destinatory
489 // System.out.println("Saving on neighbor "+i);
491 task = Register.Instance().getListeOfTasks()
492 .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
493 stub = task.getHostStub();
494 } catch (Exception e) {
496 .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
500 // 3.2 - send the stream to that stub
502 // System.out.println("saving on second list");
504 new BroadcastTaskThread(stub, jaceMyId, stream
506 sauv.lastSave.getLastSave(), sauv.timeStep,
507 Register.Instance().getAppliName(), 0,
514 } catch (Exception e) {
515 e.printStackTrace(System.out);
518 synchronized (sauvConvg) {
519 sauvConvg = getBackupConvg2Save();
520 stream = convertBackupConv2stream(sauvConvg);
522 // 2 - create de saveTab if necessary
523 if (saveTab == null) {
527 JaceInterface stub = null;
529 // 3 - send the stream to all the BackupNodes
530 for (int i = 0; i < saveTab.length; i++) {
531 // System.out.println("saveTab[" + i + "] = " + saveTab[i]);
533 // 3.1 - get the stub of destinatory
534 // System.out.println("Saving on neighbor "+i);
536 task = Register.Instance().getListeOfTasks()
537 .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
538 stub = task.getHostStub();
539 } catch (Exception e) {
541 .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
545 // 3.2 - send the stream to that stub
547 * if(request==3){ if (stub != null) { new
548 * BroadcastTaskThread(stub, jaceMyId, stream.toByteArray(),
549 * sauvConvg.lastSave.getLastSave(),timeStep,
550 * Register.Instance().getAppliName(),1,saveTab[i]).start();
551 * } //System.out.println("saving on second list"); } else{
554 new BroadcastTaskThread(stub, jaceMyId, stream
555 .toByteArray(), sauvConvg.lastSave
556 .getLastSave(), sauvConvg.timeStep, Register
557 .Instance().getAppliName(), 1, saveTab[i])
561 // System.out.println("saving on first list");
566 // JaceSession.Instance().getTaskThread().sleep(10);
567 JaceSession.Instance().getTaskThread().yield();
570 catch (Exception e) {
574 private ByteArrayOutputStream convertBackupConv2stream(BackupConvg t) {
575 // System.out.println("beginning of the checkpointing process......");
576 ByteArrayOutputStream stream = new ByteArrayOutputStream();
578 ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
579 fluxOut.writeObject(t);
581 } catch (Exception e) {
583 .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
586 // System.out.println("taille du tablo de byte : " +
587 // stream.toByteArray().length + " bytes");
591 // convert the task in stream to send it to the BackupNode
592 private ByteArrayOutputStream convertTask2stream(Task t) {
593 // System.out.println("beginning of the checkpointing process......");
594 ByteArrayOutputStream stream = new ByteArrayOutputStream();
596 ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
597 fluxOut.writeObject(t);
599 } catch (Exception e) {
601 .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
604 // System.out.println("taille du tablo de byte : " +
605 // stream.toByteArray().length + " bytes");
609 // get the task of the appli with the data to save (in
610 // jaceP2P_SaveFromCrash() overloaded in the appli)
611 @SuppressWarnings("unchecked")
612 public Task getTask2save() {
614 sauv = jaceP2P_SaveFromCrash();
616 // System.out.println("Saving data at it="+sauv.it);
617 // assign the default attributes of a task (params of the appli and
618 // TaskId of the local node)
619 sauv.setId(this.jaceTaskId);
620 sauv.setParam(Register.Instance().getParams());
621 sauv.timeStep = timeStep;
623 // assign the iteration number that had the appli at the moment of the
625 sauv.jaceP2P_Iteration = jaceP2P_Iteration;
626 sauv.saveRound = saveRound;
628 sauv.underTh = underTh;
629 sauv.jaceP2P_globalCV_state = jaceP2P_globalCV_state;
630 // System.out.println("I checkpoint the task at ite " +
631 // sauv.jaceP2P_Iteration);
632 // attributes needed to detect global convergence
636 sauv.nb_not_recv = nb_not_recv;
637 sauv.electedNode = electedNode;
638 sauv.respSent = respSent;
639 sauv.neighbors = (ArrayList<Integer>) neighbors.clone();
640 sauv.neighborsValues = (ArrayList<Boolean>) neighborsValues.clone();
641 sauv.resp = (ArrayList<Integer>) resp.clone();
642 sauv.verifNum = verifNum;
643 sauv.sendId = sendId;
644 sauv.reduceAll = reduceAll;
645 sauv.finalStep = finalStep;
646 sauv.action = action;
647 sauv.verdict = verdict;
648 sauv.localCV_state = localCV_state;
649 sauv.recievedVerdict = recievedVerdict;
650 synchronized (lastSave) {
651 lastSave.increment();
652 sauv.lastSave = lastSave;
654 } catch (Exception e) {
655 System.err.println("Problem with RMI !");
661 @SuppressWarnings("unchecked")
662 public BackupConvg getBackupConvg2Save() {
666 // sauvConvg=new BackupConvg();
667 sauvConvg.state = state;
668 sauvConvg.underTh = underTh;
669 sauvConvg.nb_not_recv = nb_not_recv;
670 sauvConvg.electedNode = electedNode;
671 sauvConvg.respSent = respSent;
672 sauvConvg.neighbors = (ArrayList<Integer>) neighbors.clone();
673 sauvConvg.neighborsValues = (ArrayList<Boolean>) neighborsValues.clone();
674 sauvConvg.resp = (ArrayList<Integer>) resp.clone();
675 sauvConvg.verifNum = verifNum;
676 sauvConvg.sendId = sendId;
677 sauvConvg.finalStep = finalStep;
678 sauvConvg.action = action;
679 sauvConvg.verdict = verdict;
680 sauvConvg.localCV_state = localCV_state;
681 sauvConvg.timeStep = timeStep;
682 sauvConvg.recievedVerdict = recievedVerdict;
683 sauvConvg.jaceP2P_Iteration = jaceP2P_Iteration;
684 sauvConvg.reduceAll = reduceAll;
686 synchronized (lastSave) {
687 lastSave.increment();
688 sauvConvg.lastSave = lastSave;
690 sauvConvg.initialized = true;
691 } catch (Exception e) {
692 System.err.println("Problem with RMI:" + e);
698 private void createSaveTab() {
700 saveTab = new int[BackupsManager.Instance().size()];
702 // 2 - assign the taskId to each cell of the saveTab
703 // System.out.println("in TASK");
704 for (int i = 0; i < saveTab.length; i++) {
705 saveTab[i] = BackupsManager.Instance().getBackupTaskAtIndex(i, 0)
707 // System.out.print(saveTab[i] + ", ");
709 // System.out.println("saveTab created..... size : " + saveTab.length);
712 public void setSaved(boolean bool) {
713 synchronized (saved) {
715 if (saved[0] == false)
717 else if (saved[1] == false)
727 public void waitForAck(int tag) {
728 if (Register.Instance().getNumBackupNeighbors() != 0) {
731 // Calendar cal = new GregorianCalendar();
732 // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
733 // System.out.println("sleeping till acknowledge saved");
734 while (getSaved() == false)
740 Register newReg = Register.Instance()
741 .getSpawnerStub().getRegister(jaceMyId);
742 if (newReg != null) {
743 Register.Instance().replaceBy(newReg);
746 .println("I got a null register from the spawner");
749 .println("Sleeping till acknowledge saved");
751 } catch (Exception e2) {
753 .println("Unable to get register from spawner :"
755 e2.printStackTrace(System.out);
758 } catch (Exception e) {
760 // Calendar cal1 = new GregorianCalendar();
761 // System.out.println("end save at time="+cal1.get(Calendar.MINUTE)+":"+cal1.get(Calendar.SECOND));
765 public void jaceP2P_GlobalConvergence(boolean under_th) {
770 if (under_th != underTh && postReloading == false) {
771 // 1 - update sous seuil
774 if (jaceP2P_Iteration == 0
775 && (postReloading == false || resp.size() == 0)) {
776 detectNeighbors(jaceMyId, jaceSize);
784 // affiche les valeurs des variables
786 // Calendar cal = new GregorianCalendar();
787 // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
788 // System.out.println("MyId="+jaceMyId+" sous seuil="+underTh+" etat="+state+" verifNum="+verifNum+" localCV="+localCV_state+" leader="+electedNode+" respSent="+respSent);
789 // System.out.println("dep="+getValues()+" pBeg="+pseudoPerBeg+" PEnd="+pseudoPerEnd+" sendId="+sendId);
790 // System.out.println("neigh="+nb_not_recv+" action="+action+" negative_resp="+testNegativeResp()+" reloading="+reloading+"\n"+" finalStep="+finalStep+" SavedResults="+savedResults+" NeghNotCV="+getNeighbourNotCV()+" postReload="+postReloading);
793 } catch (Exception e) {
794 System.err.println("Error printing status in Task :" + e);
797 if (action.equals("sendVerif") && state.equals("VERIF")
798 && postReloading == true) {
799 // System.out.println("je passe ds send verif");
801 new SendVerifThread(jaceMyId, sendId, verifNum).start();
803 // System.out.println("send Verif");
805 } else if (action.equals("sendVerdict") && postReloading == true) {
806 // System.out.println("je passe ds send verdict-------");
808 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
811 // System.out.println("verifNum="+verifNum);
814 if (state.equals("NORMAL") && action.equals("nothing")) {
815 if (underTh == false)
818 // System.out.println("sous seuil="+underTh+" PseudoPerBeg="+localStub.getPseudoPerBeg());
819 if (pseudoPerBeg == false) {
820 // System.out.println("ds la condition");
822 } else if (pseudoPerEnd == true) {
824 localCV_state = true;
825 if (nb_not_recv == 0) {
826 // localStub.setAction("sendVerif");
828 broadcastVerif(jaceMyId, -1, verifNum + 1);
829 // localStub.setAction("nothing");
832 recievedVerdict = true;
833 initializeVerifLeader();
836 } catch (Exception e) {
838 .println("The verification message is not received: "
840 Register.Instance().viewAll();
842 } else if (nb_not_recv == 1)
845 int neighId = getNeighbourNotCV();
850 recev = Register.Instance().getListeOfTasks()
851 .getTaskIdOfRank(neighId);
852 JaceInterface stub = recev.getHostStub();
853 if (stub.setNbNeighboursNotConv(verifNum,
854 jaceMyId, timeStep)) {
855 // stub.setLeftNeighbourCV(true);
856 // System.out.println("sent convergence message to "+neighId);
859 recievedVerdict = false;
862 } catch (Exception e) {
864 .println("Unable to decrease the number of neighbors not converged on node :"
865 + recev.getHostName()
870 Register.Instance().viewAll();
874 TaskId id = Register.Instance()
876 .getTaskIdOfHostStub(
879 myRank = id.getRank();
880 Register.Instance().replaceBy(
883 .getRegister(myRank));
886 } catch (Exception e2) {
888 .println("Unable to contact the spawner: "
894 } else if (pseudoPerBeg == true && getValues())
896 } else if (state.equals("WAIT4V") && nb_not_recv == 0) {
897 int neighId = getRankLeader();
900 broadcastVerif(jaceMyId, -1, verifNum + 1);
902 recievedVerdict = true;
903 initializeVerifLeader();
907 } catch (Exception e) {
909 .println("The verification message is not received:"
912 } else if (jaceMyId > neighId) {
914 broadcastVerif(jaceMyId, -1, verifNum + 1);
916 recievedVerdict = true;
917 initializeVerifLeader();
920 } catch (Exception e) {
922 .println("The verification message is not received: "
926 } else if (state.equals("WAIT4V")) {
927 if (underTh == false) {
928 localCV_state = false;
931 } else if (state.equals("VERIF")) {
932 // if(localStub.getNewerDep(0) && localStub.getNewerDep(1))
933 // localStub.setPseudoPerEnd(true);
935 if (electedNode == true) {
936 if ((!underTh && !postReloading) || !localCV_state
937 || testNegativeResp()) {
940 broadcastVerdict(jaceMyId, -2, verifNum + 1, false);
945 } catch (Exception e) {
947 } else if (postReloading) {
948 if (recievedAllResp())
949 if (!testNegativeResp()) {
951 broadcastVerdict(jaceMyId, -2, verifNum,
953 if (finalStep == true
954 && state.equals("VERIF")) {
955 initializeSavLeader();
962 } catch (Exception e) {
963 System.err.println("Error: " + e);
966 } else if (pseudoPerEnd == true) {
967 if (recievedAllResp())
968 if (!testNegativeResp()) {
970 broadcastVerdict(jaceMyId, -2, verifNum,
972 if (finalStep == true
973 && state.equals("VERIF")) {
974 initializeSavLeader();
982 } catch (Exception e) {
983 System.out.println("erreur: " + e);
988 broadcastVerdict(jaceMyId, -2,
989 verifNum + 1, false);
994 } catch (Exception e) {
996 .println("Unable to broadcast a negative verdict :"
1002 else if (getValues())
1003 pseudoPerEnd = true;
1004 } else if (!respSent) {
1005 if (!underTh || !localCV_state || testNegativeResp()) {
1006 if (action.equals("nothing")) {
1007 int neighId = sendId;
1009 TaskId recev = null;
1010 recev = Register.Instance().getListeOfTasks()
1011 .getTaskIdOfRank(neighId);
1012 JaceInterface stub = recev.getHostStub();
1013 // System.out.println("tryin to send negative response to "+neighId);
1014 stub.response(jaceMyId, verifNum, -1, null);
1015 // System.out.println("send negative response to "+neighId);
1019 } catch (Exception e) {
1021 .println("Response not received:" + e);
1022 Register.Instance().viewAll();
1025 } else if (pseudoPerEnd) {
1026 // System.out.print("The daemon can send a response ");
1027 int index = recievedAllRespMinusOne();
1028 // System.out.println("to node of index ="+index);
1030 int rank = getNeighborRank(index);
1031 // if(jaceMyId!=jaceSize-1 &&
1032 // localStub.getSendRight()==true){
1034 TaskId recev = null;
1035 recev = Register.Instance().getListeOfTasks()
1036 .getTaskIdOfRank(rank);
1037 JaceInterface stub = recev.getHostStub();
1039 if (getResp(index) == 1) {
1040 stub.response(jaceMyId, verifNum, 1,
1042 // System.out.println("send positive response to"+rank);
1044 stub.response(jaceMyId, verifNum, -1, null);
1045 // System.out.println("send negative response to"+rank);
1050 } catch (Exception e) {
1051 System.err.println("Response not received by "
1053 Register.Instance().viewAll();
1056 } else if (getValues())
1057 pseudoPerEnd = true;
1060 } else if (state.equals("SAVING") && !action.equals("sendVerdict")) {
1062 if (savedResults == false) {
1065 savedResults = true;
1067 } else if (electedNode) {
1068 if (recievedAllResp())
1070 // System.out.println("recieved all responses");
1071 JaceSpawnerInterface spawnerStub = Register
1072 .Instance().getSpawnerStub();
1073 // System.out.println("##### callin spawnerStub.setFinished(true) #####");
1074 spawnerStub.setOver(true);
1075 // localStub.setState("FINISHED");
1076 } catch (Exception e) {
1077 System.err.println("Error" + e);
1079 } else if (!respSent) {
1080 int index = recievedAllRespMinusOne();
1082 int rank = getNeighborRank(index);
1084 // if(jaceMyId!=jaceSize-1 &&
1085 // localStub.getSendRight()==true){
1087 TaskId recev = null;
1088 recev = Register.Instance().getListeOfTasks()
1089 .getTaskIdOfRank(rank);
1090 JaceInterface stub = recev.getHostStub();
1091 if (stub.getReloading() == false
1092 && stub.getState().equals("SAVING")) {
1093 action = "sendResponse";
1094 stub.response(jaceMyId, verifNum, 1, null);
1098 // System.out.println("send response to"+rank);
1100 } catch (Exception e) {
1101 System.err.println("Response not received" + e);
1105 } else if (state.equals("FINISHED")
1106 && !action.equals("sendVerdict") && recievedVerdict) {
1108 jaceP2P_globalCV_state = true;
1109 // System.out.println("Finished");
1110 // System.out.println("Finished");
1111 // System.out.println("Finished");
1112 // System.out.println("Finished");
1113 System.out.println("Finished");
1116 if (postReloading == true)
1117 postReloading = false;
1119 } catch (Exception e) {
1120 System.err.println("Exception in Global Convergence :" + e);
1121 e.printStackTrace(System.out);
1122 Register.Instance().viewAll();
1127 public synchronized int getResp(int index) throws RemoteException {
1129 for (int i = 0; i < resp.size(); i++)
1131 if (((Integer) resp.get(i)).intValue() == -1)
1136 public synchronized void response(int neighId, int tag, int response,
1137 ArrayList<Object> recievedValue) throws RemoteException {
1138 // System.out.println("inside response function");
1139 // System.out.println("sleeping till not reloading");
1140 while (reloading == true) {
1143 // System.out.println("sleeping till not reloading");
1144 } catch (Exception e) {
1147 if (verifNum == tag) {
1148 // System.out.println("inside condition");
1149 int indexNeigh = neighbors.indexOf((Object) neighId);
1150 // System.out.println("after gettin index="+index);
1151 // System.out.println("index="+indexNeigh+" size de resp ="+resp.size());
1154 // int xyz=((Integer)(resp.elementAt(indexNeigh))).intValue();
1155 // }catch(Exception e){
1156 // System.out.println("fuckkkkkkkkkkkkkkkkkk error:"+e);
1160 && !state.equals("SAVING")
1161 && (((Integer) (resp.get(indexNeigh))).intValue()) != 1) {
1162 // System.out.println("calling reduceAll()");
1163 reduceAll(recievedValue);
1165 // System.out.println("after calculating reduceAll");
1166 resp.set( indexNeigh, response ) ;
1168 // System.out.println("get response ............");
1172 throw new RemoteException();
1175 public void initializeSavLeader() {
1176 for (int i = 0; i < resp.size(); i++)
1181 public boolean recievedAllResp() {
1182 boolean bool = true;
1183 for (int i = 0; i < resp.size(); i++)
1184 if (((Integer) (resp.get(i))).intValue() == 0) {
1191 public int recievedAllRespMinusOne() {
1193 int indexOfZero = -1;
1194 for (int i = 0; i < resp.size(); i++)
1195 if (((Integer) (resp.get(i))).intValue() == 0) {
1205 public synchronized boolean setNbNeighboursNotConv(int tag, int idNeigh,
1206 int neighborTimeStep) throws RemoteException {
1207 // System.out.println("ds setNbNeighboursNotConv !!!!!!!!!!!!!!!");
1208 if (tag == verifNum && !action.equals("sendVerdict")
1209 && neighborTimeStep == timeStep) {
1210 if (idNeigh == -1) {
1214 // System.out.println("sleeping till not reloading");
1215 while (reloading == true) {
1218 // System.out.println("sleeping till not reloading");
1219 } catch (Exception e) {
1222 int i = neighbors.indexOf((Object) idNeigh);
1223 if (((Boolean) neighborsValues.get(i)).booleanValue() == false) {
1225 // System.out.println("le noeud "+idNeigh+" a envoyer un message de pseudoconvergence");
1226 neighborsValues.set( i, new Boolean(true) ) ;
1232 } else if (tag == verifNum - 1 && !action.equals("sendVerdict")
1233 && reloading == false && neighborTimeStep == timeStep
1234 && jaceP2P_Iteration != 0)
1240 public void detectNeighbors(int id, int jaceSize) {
1241 // System.out.println("detect neighbors !!!! ");
1243 while (Math.pow(2, d) < jaceSize) {
1244 if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize)) {
1245 neighbors.add((int) (id + Math.pow(2, d)));
1246 neighborsValues.add(new Boolean(false));
1249 if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d)) {
1250 neighbors.add((int) (id - Math.pow(2, d)));
1251 neighborsValues.add(new Boolean(false));
1258 public synchronized void initialize_state() {
1260 // System.out.println("initialiser\n");
1262 nb_not_recv = neighbors.size();
1263 for (int i = 0; i < neighbors.size(); i++)
1264 neighborsValues.set( i, new Boolean(false) ) ;
1265 for (int i = 0; i < resp.size(); i++)
1268 electedNode = false;
1269 localCV_state = false;
1275 public int getNeighborRank(int index) throws RemoteException {
1276 int rank = ((Integer) (neighbors.get(index))).intValue();
1280 public synchronized int getNeighbourNotCV() {
1282 for (int i = 0; i < neighbors.size(); i++)
1283 if (((Boolean) neighborsValues.get(i)).booleanValue() == false)
1284 neighId = ((Integer) neighbors.get(i)).intValue();
1288 public boolean getValues() {
1289 boolean bool = true;
1290 for (int i = 0; i < values.size(); i++)
1291 if (((Boolean) values.get(i)).equals(new Boolean(false))) {
1295 // System.out.println("getValues() have been called and it returned "+bool);
1299 public boolean testNegativeResp() {
1300 boolean bool = false;
1301 for (int i = 0; i < resp.size(); i++)
1302 if (((Integer) (resp.get(i))).intValue() == -1) {
1309 public void printResp() {
1310 // for(int i=0;i<resp.size();i++)
1311 // System.out.print(" resp["+((Integer)neighbors.get(i)).intValue()+"]="+((Integer)resp.get(i)).intValue());
1312 // System.out.print("\n");
1315 public void printDep() {
1316 // for(int i=0;i<values.size();i++)
1317 // System.out.print(" dep["+((Integer)dependancies.get(i)).intValue()+"]="+((Boolean)values.get(i)).booleanValue());
1318 // System.out.print("\n");
1321 public void broadcastVerif(int id, int neighId, int tag)
1322 throws RemoteException {
1324 TaskId recev = null;
1326 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1328 // System.out.println("Id="+id+" neighId="+ neighId+" tag="+tag);
1329 for (int i = 0; i < neighbors.size(); i++)
1330 if (neighId != ((Integer) neighbors.get(i)).intValue()) {
1331 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1332 ((Integer) neighbors.get(i)).intValue());
1333 stub = recev.getHostStub();
1334 stub.initializeVerif(tag);
1335 // System.out.println("broadcast verification to :"+((Integer)neighbors.get(i)).intValue());
1337 if (action.equals("sendVerif"))
1341 public void initializeVerifLeader() throws RemoteException {
1343 for (int i = 0; i < resp.size(); i++)
1349 public void initializeVerif(int tag) throws RemoteException {
1350 // System.out.println("Inside initializeVerif @@@@@");
1351 // System.out.println("sleeping till not reloading");
1352 while (reloading == true) {
1355 // System.out.println("sleeping till not reloading");
1356 } catch (Exception e) {
1359 if (verifNum + 1 == tag)
1360 if (state.equals("WAIT4V")) {
1361 action = "sendVerif";
1363 for (int i = 0; i < resp.size(); i++)
1371 new SendVerifThread(jaceMyId, sendId, verifNum).start();
1373 } else if (state.equals("VERIF")) {
1375 throw new RemoteException();
1381 public boolean getSaved() {
1388 public void reinitializePPEr() {
1389 pseudoPerBeg = false;
1390 pseudoPerEnd = false;
1391 for (int i = 0; i < values.size(); i++)
1392 values.set( i, new Boolean(false) ) ;
1396 public void broadcastVerdict(int id, int neighId, int tag, boolean verd)
1397 throws RemoteException {
1398 Boolean verdicto = verd;
1399 TaskId recev = null;
1401 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1403 for (int i = 0; i < neighbors.size(); i++)
1404 if (((Integer) neighbors.get(i)).intValue() != neighId) {
1405 // System.out.println("broadcasting verdict "+verdicto+" to node of Rank "+((Integer)neighbors.get(i)).intValue());
1406 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1407 ((Integer) neighbors.get(i)).intValue());
1408 stub = recev.getHostStub();
1409 stub.savOrFinOrRest(tag, timeStep, verdicto, reduceAll);
1412 if (verdict == false)
1416 public void savOrFinOrRest(int tag, int step, boolean verd,
1417 ArrayList<Object> reduceAll) {
1418 // System.out.println("Recieved verd "+verd+" sleeping till not reloading");
1419 while (reloading == true) {
1422 // System.out.println("sleeping till not reloading");
1423 } catch (Exception e) {
1427 if (verifNum == tag && timeStep == step && state.equals("VERIF")) {
1428 action = "sendVerdict";
1429 if (finalStep == true) {
1430 for (int i = 0; i < resp.size(); i++)
1432 // System.out.println("sleeping till response is sent");
1433 while (action.equals("sendResponse") || respSent == false)
1436 // System.out.println("sleeping till response is sent");
1437 } catch (Exception e) {
1442 recievedVerdict = true;
1447 recievedVerdict = true;
1449 // System.out.println("//// Stetting reduceAll\\\\");
1450 this.reduceAll = reduceAll;
1453 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
1456 } else if (verifNum < tag && timeStep == step) {
1461 action = "sendVerdict";
1462 recievedVerdict = true;
1465 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict).start();
1470 public int getRankLeader() throws RemoteException {
1472 for (int i = 0; i < neighbors.size(); i++) {
1473 TaskId recev = null;
1474 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1475 ((Integer) (neighbors.get(i))).intValue());
1476 JaceInterface stub = recev.getHostStub();
1477 if (stub.getNbNeighboursNotConv() == 0)
1478 neighId = ((Integer) (neighbors.get(i))).intValue();
1483 class SaveTaskThread extends Thread {
1486 public SaveTaskThread(Task s) {
1490 @SuppressWarnings("static-access")
1492 // If array of BackupNodes not created, create it
1493 if (saveTab == null) {
1497 if (finalize == false) { // if finalization step, it will crash
1498 // because register purged yet
1500 ByteArrayOutputStream stream = convertTask2stream(sauv);
1502 // find the BackupNode to send
1505 JaceInterface stub = null;
1506 boolean sent = false;
1509 while (j < saveTab.length && sent == false) {
1510 // 1 - find the remote task Id to send the backup to
1511 if (jaceSize < (2 * Register.Instance()
1512 .getNumBackupNeighbors() + 1)) {
1513 taskRankDest = saveTab[saveRound % (jaceSize - 1)];
1515 taskRankDest = saveTab[saveRound
1516 % (2 * Register.Instance()
1517 .getNumBackupNeighbors())];
1520 // 2 - knowing the destination task Id, get the stub of the
1521 // corresponding node
1523 task = Register.Instance().getListeOfTasks()
1524 .getTaskIdOfRank(taskRankDest); // ///////////////////////////pb
1525 stub = task.getHostStub();
1526 } catch (Exception e) {
1528 .println("Problem in SaveTaskThread on assignation line in save : "
1531 // System.out.println("ite " + jaceP2P_Iteration +
1532 // ".......... SENDING on task " + taskRankDest);
1534 // if no stub there is a problem
1538 // System.out.println("unable to SEND backup on task of rank "
1541 // 3 - try to send the stream
1543 // if there is a stub, send the stream to that node
1545 stub.saveTask(jaceMyId, stream.toByteArray(),
1546 sauv.lastSave.getLastSave(), sauv.timeStep,
1547 Register.Instance().getAppliName(), 0);
1548 // System.out.println("saved data on "+taskRankDest+" iteration= "+sauv.lastSave.getLastSave()+
1549 // "timeStep="+sauv.timeStep+" !!!!!!!!");
1551 // Vector v = stub.getIterationOfBackup(jaceMyId,0);
1552 // int ite=((Integer)v.get(0)).intValue();
1553 // System.out.println("******************************************sauvegarde de donnees: ite="+ite+" taskDest="+taskRankDest+" ******************************************************");
1556 } catch (Exception e) {
1558 .println("JaceP2P_Error in Task.jaceP2P_Save() when saving stream: "
1567 JaceSession.Instance().getTaskThread().sleep(10);
1568 JaceSession.Instance().getTaskThread().yield();
1569 } catch (Exception e) {
1573 // 5 - if stream not sent at all, do something (WHAT ???)
1574 if (j > saveTab.length) {
1576 .println("No more alive neighbors for storing the Backup");
1577 // TODO : what to do if no BackupNode has answered ???
1586 class BroadcastTaskThread extends Thread {
1597 public BroadcastTaskThread(JaceInterface theStub, int theRank,
1598 byte[] theTsk, int theIteration, int timeStep, String theAppliName,
1599 int tag, int dest) {
1600 this.stub = theStub;
1601 this.rank = theRank;
1603 this.iteration = theIteration;
1604 this.timeStep = timeStep;
1605 this.appliName = theAppliName;
1608 // System.out.println("tag="+tag);
1611 // method launched by start()
1614 stub.saveTask(rank, tsk, iteration, timeStep, appliName, tag);
1615 JaceSession.Instance().getTaskObject().setSaved(true);
1618 // Vector v = stub.getIterationOfBackup(rank,0);
1619 // int ite=((Integer)v.get(0)).intValue();
1620 // System.out.println("++++++++++ Broadcast data ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1623 // Vector v = stub.getIterationOfBackup(rank,1);
1624 // int ite=((Integer)v.get(0)).intValue();
1625 // System.out.println("++++++++++ Broadcast dataConvg ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1626 // System.out.println("+++++++++++++++++++++++++++++++++++++++++ Broadcast convgData ite="+ite+" dest="+dest+" +++++++++++++++++++++++++++++++++++++++++++++++++++++");
1629 } catch (Exception e) {
1631 .println("Node not reachable by JaceServer.saveTask() in BroadcastTaskThread :"