3 import java.io.ByteArrayOutputStream;
4 import java.io.ObjectInputStream;
5 import java.io.ObjectOutputStream;
6 import java.rmi.ConnectException;
7 import java.rmi.RemoteException;
8 import java.util.Vector;
10 //import java.util.Calendar;
11 //import java.util.GregorianCalendar;
13 //import com.jamonapi.*;
15 public class Task implements Runnable, Cloneable, java.io.Serializable {
16 private static final long serialVersionUID = 1L;
18 public double errorLoc = 0;
19 public int saveParameter;
22 public TaskId jaceTaskId = null;
23 public String[] jaceArgs;
24 public boolean reloading = false;
25 public String state = "NORMAL";
26 public int nb_not_recv;
27 public boolean electedNode = false;
28 public boolean respSent = false;
29 public Vector<Integer> resp;
30 public int verifNum = 0;
31 public LastSave lastSave = new LastSave();
32 public Vector<Integer> neighbors;
33 public Vector<Boolean> neighborsValues;
34 public Vector<Integer> dependancies;
35 public Vector<Boolean> values;
37 public String action = "nothing";
38 public boolean verdict = false;
39 public boolean recievedVerdict = false;
40 public int jaceP2P_Iteration = 0;
41 public boolean finalStep = false;
42 // public Monitor mon1=null;
43 public int timeStep = 0; // time discretization counter for non-stationary
45 public boolean localCV_state;
46 public boolean jaceP2P_globalCV_state = false;
47 // attribute to know if an appli has finished yet
48 private boolean finalize = false;
49 public boolean pseudoPerBeg;
50 public boolean pseudoPerEnd;
51 public boolean underTh = false;
52 public boolean saved[];
53 // attributes for BackupNodes
54 private int saveRound = 0;
55 private int[] saveTab = null;
56 public boolean savedResults = false;
58 public BackupConvg sauvConvg = new BackupConvg();
59 public boolean postReloading = false;
60 @SuppressWarnings("unchecked")
61 public Vector reduceAll;
68 reduceAll = new Vector<Object>();
69 dependancies = new Vector<Integer>();
70 values = new Vector<Boolean>();
71 resp = new Vector<Integer>();
72 saved = new boolean[2];
74 neighbors = new Vector<Integer>();
75 neighborsValues = new Vector<Boolean>();
78 public void getBackupForNewNode(int rank) {
80 JaceInterface stub = null;
81 task = Register.Instance().getListeOfTasks().getTaskIdOfRank(rank);
83 stub = task.getHostStub();
85 // if no stub there is a problem
87 System.err.println("Unable to send backup on task of rank " + rank);
89 // if there is a stub, send the stream to that node
91 ByteArrayOutputStream stream;
93 stream = convertTask2stream(sauv);
94 stub.saveTask(jaceMyId, stream.toByteArray(), sauv.lastSave
95 .getLastSave(), sauv.timeStep, Register.Instance()
98 synchronized (sauvConvg) {
99 if (sauvConvg.initialized == true) {
100 stream = convertBackupConv2stream(sauvConvg);
101 stub.saveTask(jaceMyId, stream.toByteArray(),
102 sauvConvg.lastSave.getLastSave(),
103 sauvConvg.timeStep, Register.Instance()
107 } catch (Exception e) {
108 System.err.println("Error in getBackupForNewNode :" + e);
109 e.printStackTrace(System.out);
114 // method to overload by user in the appli to convert the stream sent by the
115 // BackupNode to a Task object of the type of the appli this method is
117 // in TaskLauncher.loadBackupAndRestart() and
118 // TaskLauncher.loadOrReloadTask()
119 public Task jaceP2P_ConvertStream(ObjectInputStream stream) {
123 public void printSav() {
126 // method to overload by user in the appli to specify the reduceAll method
127 public synchronized void reduceAll(Vector<?> recievedValue) {
130 public void setId(TaskId Id) {
132 jaceMyId = jaceTaskId.getRank();
134 jaceSize = Register.Instance().getNbOfTasks();
135 } catch (Exception e) {
137 System.err.println("SetId is bad !! " + e + " "
138 + LocalHost.Instance().getName());
139 // jaceSize = Register.Instance().getListeOfTasks().getSize();
140 } catch (Exception e2) {
141 System.err.println("Not localised the spawner : " + e2);
146 public void setParam(String[] arg) {
151 public void setJaceSize(int nbTasks) {
155 public TaskId getId() {
160 public int getTimeStep() {
164 // method to overload by user in the appli to identify the neighbors of a
167 public int[] getDependencies(int i) {
171 // method to overload by user in the appli to init each task at beginning of
174 public void jaceP2P_InitTask() {
177 // TaskLauncher.loadOrReloadTask()
178 public void jaceP2P_ReinitTask() {
181 // method to overload by user in the appli to safeguard the results
182 public void saveResults() {
185 // method to overload by user in the appli to save only the requiered
186 // attributes (iter, vecteurs,......) this method is called in
187 // Task.jaceP2P_Save()
188 public Task jaceP2P_SaveFromCrash() {
189 System.out.println("JaceSaveFromCrash Task");
193 public void jaceFinalize() {
196 System.out.println("Ready to Death Task:" + jaceMyId);
199 Register.Instance().getSpawnerStub().killApplication(
200 LocalHost.Instance().getStub());
202 } catch (Exception e) {
204 .println("Cannot join the Spawner to kill application: "
208 JaceDaemon.Instance().reinitDaemon();
212 public void jaceP2P_ReinitConv() {
213 // System.out.println("reinit conv");
217 // last_iter.removeAllElements();
219 reinitializeVectors();
221 } catch (Exception e) {
222 System.err.println("Error in jaceP2P_reinitConv():" + e);
224 pseudoPerBeg = false;
225 pseudoPerEnd = false;
226 synchronized (lastSave) {
227 lastSave = new LastSave();
229 // reinit les var de conv de Task generique
230 localCV_state = false;
231 jaceP2P_globalCV_state = false;
235 public void reinitializeVectors() throws RemoteException {
236 values.removeAllElements();
237 dependancies.removeAllElements();
238 neighbors.removeAllElements();
239 neighborsValues.removeAllElements();
240 resp.removeAllElements();
243 public long jaceP2P_getChronoValue() {
247 result = Register.Instance().getSpawnerStub().getChronoValue(
248 Register.Instance().getAppliName());
249 } catch (Exception e) {
251 .println("JaceP2P_Error in Task.jaceP2P_getChronoValue() on SuperNode : "
259 //System.out.println("ds run()");
263 *asynchronous sending (non blocking), of data object to an other task
266 * the object (serializable) data to be send
268 * the task id for receiver's task
270 * the tag for message
272 @SuppressWarnings("static-access")
273 public void jaceSend(Object buffer, int dest, int tag, double erreur_locale) {
274 // System.out.println("dest : " + dest);
278 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(dest);
281 System.err.println("In jaceSend recv = null !");
283 JaceSession.Instance().getTaskThread().sleep(10);
284 JaceSession.Instance().getTaskThread().yield();
285 } catch (Exception e) {
289 // TODO : virer ce else, mais chercher pkoi recev est null des fois
291 if (recev.getRank() != dest) {
292 System.err.println("Problem !! pas le meme dest que ds les param");
296 // TODO : rajouter nom de l'appli ds Messag
299 stub = recev.getHostStub();
300 if (stub.getTimeStep() == timeStep) {
301 // System.out.println("************ "+verifNum+" *************");
302 Message msg = new Message();
303 msg.setParam(buffer, jaceTaskId, recev, tag, timeStep,
304 jaceP2P_Iteration, verifNum, erreur_locale);
306 // on met le message ds
307 // JaceBuffer.Instance() (la liste des Message a
311 * if(JaceDaemon.Instance().getProtocol().equals("socket"
312 * )) SenderSocket.Instance().buffer.add(msg); else
313 * if(JaceDaemon.Instance().getProtocol().equals("rmi"))
314 * SenderRmi.Instance().buffer.add(msg);
316 // System.out.println("putting message to "+dest+" in the buffer");
317 Sender.Instance().getBuffer().add(msg);
319 // if(JaceDaemon.Instance().getProtocol().equals("rmi"))
320 // SenderRmi.Instance().getBuffer().add(msg);
322 // SenderSocket.Instance().getBuffer().add(msg);
325 } catch( ConnectException ce ) {
326 recev.getHostStub().suicide2( "Not responding!" ) ;
327 } catch (Exception e) {
328 System.err.println("Unable to send data message to " + dest
330 // recev.getHostStub().suicide2( "Not responding!" ) ;
332 // System.out.println("TASK : g mis un msg qui doit etre envoye");
333 // envoie toujours asynchrone !!!!!!!! : le Message partira
334 // instantanement car
335 // -Sender faisait JaceBuffer.Instance().get() qui contient un
337 // -et ici, Task fait JaceBuffer.Instance().add(msg) qui
338 // contient un notifyAll()
340 JaceSession.Instance().getTaskThread().sleep(10);
341 JaceSession.Instance().getTaskThread().yield();
342 } catch (Exception e) {
344 // System.out.println("TASK : je sort de jaceSend");
346 } catch (Exception e) {
347 if (Register.Instance().getListeOfTasks() == null)
348 System.err.println("Tasks list is null: " + e);
354 *not-blocking reception of data object, return an object
357 * the task id for the task sender
359 * the tag for message
361 @SuppressWarnings("static-access")
362 public Object jaceReceive(int sender, int tag) {
363 if (jaceP2P_Iteration == 0 || postReloading) {
365 if (notExist(sender)) {
369 } catch (Exception e) {
373 Message tmp = MsgQueue.Instance().get(sender, tag);
375 // System.out.println("recu MSG de tache " + sender + " (" +
376 // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
377 // + ") MsgQueue : " +
378 // MsgQueue.Instance().getSize()+" message src_tag="+tmp.getSrc_tag()+
379 // " localError="+tmp.getLocalError());
383 if (underTh == true && jaceP2P_Iteration != 0 && !reloading) {
384 index = depIndex(sender);
386 if ((!(state.equals("VERIF")) || verifNum == tmp
389 // System.out.println("dep["+sender+"]=true, index="+index);
390 setValues(index, true);
394 } catch (Exception e) {
395 System.err.println("Error jaceReceive :" + e);
396 System.err.println("Sender=" + sender);
398 return (tmp.getData());
401 // System.out.println("RIENNN recu de tache " + sender + " (" +
402 // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
403 // + ") taille MsgQueue : " + MsgQueue.Instance().getSize());
404 } catch (Exception e) {
405 if (Register.Instance().getListeOfTasks() == null)
406 System.err.println("Tasks list is null2: " + e);
409 JaceSession.Instance().getTaskThread().sleep(10);
410 JaceSession.Instance().getTaskThread().yield();
411 } catch (Exception e) {
417 public boolean notExist(int sender) {
418 int i = dependancies.indexOf((Object) (new Integer(sender)));
425 public void setDep(int value) {
426 dependancies.add(new Integer(value));
427 values.add(new Boolean(false));
430 public int depIndex(int sender) {
431 int index = dependancies.indexOf((Object) (new Integer(sender)));
435 public void setValues(int index, boolean value) {
436 values.setElementAt(new Boolean(value), index);
439 @SuppressWarnings("static-access")
440 public void jaceP2P_Save() {
443 if (jaceP2P_Iteration == 0) {
444 // request 0 when task at barrier
445 // request 1 when only at convergence
447 } else if ((jaceP2P_Iteration % saveParameter) == 0) {
448 // clone the Task at that step of computations
450 synchronized (sauv) {
451 sauv = getTask2save();
453 // send it to the corresponding BackupNode in a round robin
455 new SaveTaskThread(sauv).start();
459 JaceSession.Instance().getTaskThread().sleep(10);
460 JaceSession.Instance().getTaskThread().yield();
461 } catch (Exception e) {
465 // request 0 when Saving all task
466 // request 3 when saving only convergence data
467 @SuppressWarnings("static-access")
468 public void broadcastTasks(int request) {
469 ByteArrayOutputStream stream;
471 // 1 - clone the Task at that step of computations and serialize it
473 synchronized (sauv) {
474 sauv = getTask2save();
475 stream = convertTask2stream(sauv);
477 // 2 - create de saveTab if necessary
478 if (saveTab == null) {
482 JaceInterface stub = null;
484 // 3 - send the stream to all the BackupNodes
485 for (int i = 0; i < saveTab.length; i++) {
486 // System.out.println("saveTab[" + i + "] = " +
489 // 3.1 - get the stub of destinatory
490 // System.out.println("Saving on neighbor "+i);
492 task = Register.Instance().getListeOfTasks()
493 .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
494 stub = task.getHostStub();
495 } catch (Exception e) {
497 .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
501 // 3.2 - send the stream to that stub
503 // System.out.println("saving on second list");
505 new BroadcastTaskThread(stub, jaceMyId, stream
507 sauv.lastSave.getLastSave(), sauv.timeStep,
508 Register.Instance().getAppliName(), 0,
515 } catch (Exception e) {
516 e.printStackTrace(System.out);
519 synchronized (sauvConvg) {
520 sauvConvg = getBackupConvg2Save();
521 stream = convertBackupConv2stream(sauvConvg);
523 // 2 - create de saveTab if necessary
524 if (saveTab == null) {
528 JaceInterface stub = null;
530 // 3 - send the stream to all the BackupNodes
531 for (int i = 0; i < saveTab.length; i++) {
532 // System.out.println("saveTab[" + i + "] = " + saveTab[i]);
534 // 3.1 - get the stub of destinatory
535 // System.out.println("Saving on neighbor "+i);
537 task = Register.Instance().getListeOfTasks()
538 .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
539 stub = task.getHostStub();
540 } catch (Exception e) {
542 .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
546 // 3.2 - send the stream to that stub
548 * if(request==3){ if (stub != null) { new
549 * BroadcastTaskThread(stub, jaceMyId, stream.toByteArray(),
550 * sauvConvg.lastSave.getLastSave(),timeStep,
551 * Register.Instance().getAppliName(),1,saveTab[i]).start();
552 * } //System.out.println("saving on second list"); } else{
555 new BroadcastTaskThread(stub, jaceMyId, stream
556 .toByteArray(), sauvConvg.lastSave
557 .getLastSave(), sauvConvg.timeStep, Register
558 .Instance().getAppliName(), 1, saveTab[i])
562 // System.out.println("saving on first list");
567 // JaceSession.Instance().getTaskThread().sleep(10);
568 JaceSession.Instance().getTaskThread().yield();
571 catch (Exception e) {
575 private ByteArrayOutputStream convertBackupConv2stream(BackupConvg t) {
576 // System.out.println("beginning of the checkpointing process......");
577 ByteArrayOutputStream stream = new ByteArrayOutputStream();
579 ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
580 fluxOut.writeObject(t);
582 } catch (Exception e) {
584 .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
587 // System.out.println("taille du tablo de byte : " +
588 // stream.toByteArray().length + " bytes");
592 // convert the task in stream to send it to the BackupNode
593 private ByteArrayOutputStream convertTask2stream(Task t) {
594 // System.out.println("beginning of the checkpointing process......");
595 ByteArrayOutputStream stream = new ByteArrayOutputStream();
597 ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
598 fluxOut.writeObject(t);
600 } catch (Exception e) {
602 .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
605 // System.out.println("taille du tablo de byte : " +
606 // stream.toByteArray().length + " bytes");
610 // get the task of the appli with the data to save (in
611 // jaceP2P_SaveFromCrash() overloaded in the appli)
612 @SuppressWarnings("unchecked")
613 public Task getTask2save() {
615 sauv = jaceP2P_SaveFromCrash();
617 // System.out.println("Saving data at it="+sauv.it);
618 // assign the default attributes of a task (params of the appli and
619 // TaskId of the local node)
620 sauv.setId(this.jaceTaskId);
621 sauv.setParam(Register.Instance().getParams());
622 sauv.timeStep = timeStep;
624 // assign the iteration number that had the appli at the moment of the
626 sauv.jaceP2P_Iteration = jaceP2P_Iteration;
627 sauv.saveRound = saveRound;
629 sauv.underTh = underTh;
630 sauv.jaceP2P_globalCV_state = jaceP2P_globalCV_state;
631 // System.out.println("I checkpoint the task at ite " +
632 // sauv.jaceP2P_Iteration);
633 // attributes needed to detect global convergence
637 sauv.nb_not_recv = nb_not_recv;
638 sauv.electedNode = electedNode;
639 sauv.respSent = respSent;
640 sauv.neighbors = (Vector) neighbors.clone();
641 sauv.neighborsValues = (Vector) neighborsValues.clone();
642 sauv.resp = (Vector) resp.clone();
643 sauv.verifNum = verifNum;
644 sauv.sendId = sendId;
645 sauv.reduceAll = reduceAll;
646 sauv.finalStep = finalStep;
647 sauv.action = action;
648 sauv.verdict = verdict;
649 sauv.localCV_state = localCV_state;
650 sauv.recievedVerdict = recievedVerdict;
651 synchronized (lastSave) {
652 lastSave.increment();
653 sauv.lastSave = lastSave;
655 } catch (Exception e) {
656 System.err.println("Problem with RMI !");
662 @SuppressWarnings("unchecked")
663 public BackupConvg getBackupConvg2Save() {
667 // sauvConvg=new BackupConvg();
668 sauvConvg.state = state;
669 sauvConvg.underTh = underTh;
670 sauvConvg.nb_not_recv = nb_not_recv;
671 sauvConvg.electedNode = electedNode;
672 sauvConvg.respSent = respSent;
673 sauvConvg.neighbors = (Vector) neighbors.clone();
674 sauvConvg.neighborsValues = (Vector) neighborsValues.clone();
675 sauvConvg.resp = (Vector) resp.clone();
676 sauvConvg.verifNum = verifNum;
677 sauvConvg.sendId = sendId;
678 sauvConvg.finalStep = finalStep;
679 sauvConvg.action = action;
680 sauvConvg.verdict = verdict;
681 sauvConvg.localCV_state = localCV_state;
682 sauvConvg.timeStep = timeStep;
683 sauvConvg.recievedVerdict = recievedVerdict;
684 sauvConvg.jaceP2P_Iteration = jaceP2P_Iteration;
685 sauvConvg.reduceAll = reduceAll;
687 synchronized (lastSave) {
688 lastSave.increment();
689 sauvConvg.lastSave = lastSave;
691 sauvConvg.initialized = true;
692 } catch (Exception e) {
693 System.err.println("Problem with RMI:" + e);
699 private void createSaveTab() {
701 saveTab = new int[BackupsManager.Instance().size()];
703 // 2 - assign the taskId to each cell of the saveTab
704 // System.out.println("in TASK");
705 for (int i = 0; i < saveTab.length; i++) {
706 saveTab[i] = BackupsManager.Instance().getBackupTaskAtIndex(i, 0)
708 // System.out.print(saveTab[i] + ", ");
710 // System.out.println("saveTab created..... size : " + saveTab.length);
713 public void setSaved(boolean bool) {
714 synchronized (saved) {
716 if (saved[0] == false)
718 else if (saved[1] == false)
728 public void waitForAck(int tag) {
729 if (Register.Instance().getNumBackupNeighbors() != 0) {
732 // Calendar cal = new GregorianCalendar();
733 // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
734 // System.out.println("sleeping till acknowledge saved");
735 while (getSaved() == false)
741 Register newReg = Register.Instance()
742 .getSpawnerStub().getRegister(jaceMyId);
743 if (newReg != null) {
744 Register.Instance().replaceBy(newReg);
747 .println("I got a null register from the spawner");
750 .println("Sleeping till acknowledge saved");
752 } catch (Exception e2) {
754 .println("Unable to get register from spawner :"
756 e2.printStackTrace(System.out);
759 } catch (Exception e) {
761 // Calendar cal1 = new GregorianCalendar();
762 // System.out.println("end save at time="+cal1.get(Calendar.MINUTE)+":"+cal1.get(Calendar.SECOND));
766 public void jaceP2P_GlobalConvergence(boolean under_th) {
771 if (under_th != underTh && postReloading == false) {
772 // 1 - update sous seuil
775 if (jaceP2P_Iteration == 0
776 && (postReloading == false || resp.size() == 0)) {
777 detectNeighbors(jaceMyId, jaceSize);
785 // affiche les valeurs des variables
787 // Calendar cal = new GregorianCalendar();
788 // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
789 // System.out.println("MyId="+jaceMyId+" sous seuil="+underTh+" etat="+state+" verifNum="+verifNum+" localCV="+localCV_state+" leader="+electedNode+" respSent="+respSent);
790 // System.out.println("dep="+getValues()+" pBeg="+pseudoPerBeg+" PEnd="+pseudoPerEnd+" sendId="+sendId);
791 // System.out.println("neigh="+nb_not_recv+" action="+action+" negative_resp="+testNegativeResp()+" reloading="+reloading+"\n"+" finalStep="+finalStep+" SavedResults="+savedResults+" NeghNotCV="+getNeighbourNotCV()+" postReload="+postReloading);
794 } catch (Exception e) {
795 System.err.println("Error printing status in Task :" + e);
798 if (action.equals("sendVerif") && state.equals("VERIF")
799 && postReloading == true) {
800 // System.out.println("je passe ds send verif");
802 new SendVerifThread(jaceMyId, sendId, verifNum).start();
804 // System.out.println("send Verif");
806 } else if (action.equals("sendVerdict") && postReloading == true) {
807 // System.out.println("je passe ds send verdict-------");
809 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
812 // System.out.println("verifNum="+verifNum);
815 if (state.equals("NORMAL") && action.equals("nothing")) {
816 if (underTh == false)
819 // System.out.println("sous seuil="+underTh+" PseudoPerBeg="+localStub.getPseudoPerBeg());
820 if (pseudoPerBeg == false) {
821 // System.out.println("ds la condition");
823 } else if (pseudoPerEnd == true) {
825 localCV_state = true;
826 if (nb_not_recv == 0) {
827 // localStub.setAction("sendVerif");
829 broadcastVerif(jaceMyId, -1, verifNum + 1);
830 // localStub.setAction("nothing");
833 recievedVerdict = true;
834 initializeVerifLeader();
837 } catch (Exception e) {
839 .println("The verification message is not received: "
841 Register.Instance().viewAll();
843 } else if (nb_not_recv == 1)
846 int neighId = getNeighbourNotCV();
851 recev = Register.Instance().getListeOfTasks()
852 .getTaskIdOfRank(neighId);
853 JaceInterface stub = recev.getHostStub();
854 if (stub.setNbNeighboursNotConv(verifNum,
855 jaceMyId, timeStep)) {
856 // stub.setLeftNeighbourCV(true);
857 // System.out.println("sent convergence message to "+neighId);
860 recievedVerdict = false;
863 } catch (Exception e) {
865 .println("Unable to decrease the number of neighbors not converged on node :"
866 + recev.getHostName()
871 Register.Instance().viewAll();
875 TaskId id = Register.Instance()
877 .getTaskIdOfHostStub(
880 myRank = id.getRank();
881 Register.Instance().replaceBy(
884 .getRegister(myRank));
887 } catch (Exception e2) {
889 .println("Unable to contact the spawner: "
895 } else if (pseudoPerBeg == true && getValues())
897 } else if (state.equals("WAIT4V") && nb_not_recv == 0) {
898 int neighId = getRankLeader();
901 broadcastVerif(jaceMyId, -1, verifNum + 1);
903 recievedVerdict = true;
904 initializeVerifLeader();
908 } catch (Exception e) {
910 .println("The verification message is not received:"
913 } else if (jaceMyId > neighId) {
915 broadcastVerif(jaceMyId, -1, verifNum + 1);
917 recievedVerdict = true;
918 initializeVerifLeader();
921 } catch (Exception e) {
923 .println("The verification message is not received: "
927 } else if (state.equals("WAIT4V")) {
928 if (underTh == false) {
929 localCV_state = false;
932 } else if (state.equals("VERIF")) {
933 // if(localStub.getNewerDep(0) && localStub.getNewerDep(1))
934 // localStub.setPseudoPerEnd(true);
936 if (electedNode == true) {
937 if ((!underTh && !postReloading) || !localCV_state
938 || testNegativeResp()) {
941 broadcastVerdict(jaceMyId, -2, verifNum + 1, false);
946 } catch (Exception e) {
948 } else if (postReloading) {
949 if (recievedAllResp())
950 if (!testNegativeResp()) {
952 broadcastVerdict(jaceMyId, -2, verifNum,
954 if (finalStep == true
955 && state.equals("VERIF")) {
956 initializeSavLeader();
963 } catch (Exception e) {
964 System.err.println("Error: " + e);
967 } else if (pseudoPerEnd == true) {
968 if (recievedAllResp())
969 if (!testNegativeResp()) {
971 broadcastVerdict(jaceMyId, -2, verifNum,
973 if (finalStep == true
974 && state.equals("VERIF")) {
975 initializeSavLeader();
983 } catch (Exception e) {
984 System.out.println("erreur: " + e);
989 broadcastVerdict(jaceMyId, -2,
990 verifNum + 1, false);
995 } catch (Exception e) {
997 .println("Unable to broadcast a negative verdict :"
1003 else if (getValues())
1004 pseudoPerEnd = true;
1005 } else if (!respSent) {
1006 if (!underTh || !localCV_state || testNegativeResp()) {
1007 if (action.equals("nothing")) {
1008 int neighId = sendId;
1010 TaskId recev = null;
1011 recev = Register.Instance().getListeOfTasks()
1012 .getTaskIdOfRank(neighId);
1013 JaceInterface stub = recev.getHostStub();
1014 // System.out.println("tryin to send negative response to "+neighId);
1015 stub.response(jaceMyId, verifNum, -1, null);
1016 // System.out.println("send negative response to "+neighId);
1020 } catch (Exception e) {
1022 .println("Response not received:" + e);
1023 Register.Instance().viewAll();
1026 } else if (pseudoPerEnd) {
1027 // System.out.print("The daemon can send a response ");
1028 int index = recievedAllRespMinusOne();
1029 // System.out.println("to node of index ="+index);
1031 int rank = getNeighborRank(index);
1032 // if(jaceMyId!=jaceSize-1 &&
1033 // localStub.getSendRight()==true){
1035 TaskId recev = null;
1036 recev = Register.Instance().getListeOfTasks()
1037 .getTaskIdOfRank(rank);
1038 JaceInterface stub = recev.getHostStub();
1040 if (getResp(index) == 1) {
1041 stub.response(jaceMyId, verifNum, 1,
1043 // System.out.println("send positive response to"+rank);
1045 stub.response(jaceMyId, verifNum, -1, null);
1046 // System.out.println("send negative response to"+rank);
1051 } catch (Exception e) {
1052 System.err.println("Response not received by "
1054 Register.Instance().viewAll();
1057 } else if (getValues())
1058 pseudoPerEnd = true;
1061 } else if (state.equals("SAVING") && !action.equals("sendVerdict")) {
1063 if (savedResults == false) {
1066 savedResults = true;
1068 } else if (electedNode) {
1069 if (recievedAllResp())
1071 // System.out.println("recieved all responses");
1072 JaceSpawnerInterface spawnerStub = Register
1073 .Instance().getSpawnerStub();
1074 // System.out.println("##### callin spawnerStub.setFinished(true) #####");
1075 spawnerStub.setOver(true);
1076 // localStub.setState("FINISHED");
1077 } catch (Exception e) {
1078 System.err.println("Error" + e);
1080 } else if (!respSent) {
1081 int index = recievedAllRespMinusOne();
1083 int rank = getNeighborRank(index);
1085 // if(jaceMyId!=jaceSize-1 &&
1086 // localStub.getSendRight()==true){
1088 TaskId recev = null;
1089 recev = Register.Instance().getListeOfTasks()
1090 .getTaskIdOfRank(rank);
1091 JaceInterface stub = recev.getHostStub();
1092 if (stub.getReloading() == false
1093 && stub.getState().equals("SAVING")) {
1094 action = "sendResponse";
1095 stub.response(jaceMyId, verifNum, 1, null);
1099 // System.out.println("send response to"+rank);
1101 } catch (Exception e) {
1102 System.err.println("Response not received" + e);
1106 } else if (state.equals("FINISHED")
1107 && !action.equals("sendVerdict") && recievedVerdict) {
1109 jaceP2P_globalCV_state = true;
1110 // System.out.println("Finished");
1111 // System.out.println("Finished");
1112 // System.out.println("Finished");
1113 // System.out.println("Finished");
1114 System.out.println("Finished");
1117 if (postReloading == true)
1118 postReloading = false;
1120 } catch (Exception e) {
1121 System.err.println("Exception in Global Convergence :" + e);
1122 e.printStackTrace(System.out);
1123 Register.Instance().viewAll();
1128 public synchronized int getResp(int index) throws RemoteException {
1130 for (int i = 0; i < resp.size(); i++)
1132 if (((Integer) resp.get(i)).intValue() == -1)
1137 public synchronized void response(int neighId, int tag, int response,
1138 Vector<?> recievedValue) throws RemoteException {
1139 // System.out.println("inside response function");
1140 // System.out.println("sleeping till not reloading");
1141 while (reloading == true) {
1144 // System.out.println("sleeping till not reloading");
1145 } catch (Exception e) {
1148 if (verifNum == tag) {
1149 // System.out.println("inside condition");
1150 int indexNeigh = neighbors.indexOf((Object) neighId);
1151 // System.out.println("after gettin index="+index);
1152 // System.out.println("index="+indexNeigh+" size de resp ="+resp.size());
1155 // int xyz=((Integer)(resp.elementAt(indexNeigh))).intValue();
1156 // }catch(Exception e){
1157 // System.out.println("fuckkkkkkkkkkkkkkkkkk error:"+e);
1161 && !state.equals("SAVING")
1162 && (((Integer) (resp.elementAt(indexNeigh))).intValue()) != 1) {
1163 // System.out.println("calling reduceAll()");
1164 reduceAll(recievedValue);
1166 // System.out.println("after calculating reduceAll");
1167 resp.setElementAt(response, indexNeigh);
1169 // System.out.println("get response ............");
1173 throw new RemoteException();
1176 public void initializeSavLeader() {
1177 for (int i = 0; i < resp.size(); i++)
1178 resp.setElementAt(0, i);
1182 public boolean recievedAllResp() {
1183 boolean bool = true;
1184 for (int i = 0; i < resp.size(); i++)
1185 if (((Integer) (resp.get(i))).intValue() == 0) {
1192 public int recievedAllRespMinusOne() {
1194 int indexOfZero = -1;
1195 for (int i = 0; i < resp.size(); i++)
1196 if (((Integer) (resp.get(i))).intValue() == 0) {
1206 public synchronized boolean setNbNeighboursNotConv(int tag, int idNeigh,
1207 int neighborTimeStep) throws RemoteException {
1208 // System.out.println("ds setNbNeighboursNotConv !!!!!!!!!!!!!!!");
1209 if (tag == verifNum && !action.equals("sendVerdict")
1210 && neighborTimeStep == timeStep) {
1211 if (idNeigh == -1) {
1215 // System.out.println("sleeping till not reloading");
1216 while (reloading == true) {
1219 // System.out.println("sleeping till not reloading");
1220 } catch (Exception e) {
1223 int i = neighbors.indexOf((Object) idNeigh);
1224 if (((Boolean) neighborsValues.get(i)).booleanValue() == false) {
1226 // System.out.println("le noeud "+idNeigh+" a envoyer un message de pseudoconvergence");
1227 neighborsValues.setElementAt(new Boolean(true), i);
1233 } else if (tag == verifNum - 1 && !action.equals("sendVerdict")
1234 && reloading == false && neighborTimeStep == timeStep
1235 && jaceP2P_Iteration != 0)
1241 public void detectNeighbors(int id, int jaceSize) {
1242 // System.out.println("detect neighbors !!!! ");
1244 while (Math.pow(2, d) < jaceSize) {
1245 if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize)) {
1246 neighbors.add((int) (id + Math.pow(2, d)));
1247 neighborsValues.add(new Boolean(false));
1250 if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d)) {
1251 neighbors.add((int) (id - Math.pow(2, d)));
1252 neighborsValues.add(new Boolean(false));
1259 public synchronized void initialize_state() {
1261 // System.out.println("initialiser\n");
1263 nb_not_recv = neighbors.size();
1264 for (int i = 0; i < neighbors.size(); i++)
1265 neighborsValues.setElementAt(new Boolean(false), i);
1266 for (int i = 0; i < resp.size(); i++)
1267 resp.setElementAt(0, i);
1269 electedNode = false;
1270 localCV_state = false;
1276 public int getNeighborRank(int index) throws RemoteException {
1277 int rank = ((Integer) (neighbors.get(index))).intValue();
1281 public synchronized int getNeighbourNotCV() {
1283 for (int i = 0; i < neighbors.size(); i++)
1284 if (((Boolean) neighborsValues.get(i)).booleanValue() == false)
1285 neighId = ((Integer) neighbors.elementAt(i)).intValue();
1289 public boolean getValues() {
1290 boolean bool = true;
1291 for (int i = 0; i < values.size(); i++)
1292 if (((Boolean) values.elementAt(i)).equals(new Boolean(false))) {
1296 // System.out.println("getValues() have been called and it returned "+bool);
1300 public boolean testNegativeResp() {
1301 boolean bool = false;
1302 for (int i = 0; i < resp.size(); i++)
1303 if (((Integer) (resp.get(i))).intValue() == -1) {
1310 public void printResp() {
1311 // for(int i=0;i<resp.size();i++)
1312 // System.out.print(" resp["+((Integer)neighbors.get(i)).intValue()+"]="+((Integer)resp.get(i)).intValue());
1313 // System.out.print("\n");
1316 public void printDep() {
1317 // for(int i=0;i<values.size();i++)
1318 // System.out.print(" dep["+((Integer)dependancies.get(i)).intValue()+"]="+((Boolean)values.get(i)).booleanValue());
1319 // System.out.print("\n");
1322 public void broadcastVerif(int id, int neighId, int tag)
1323 throws RemoteException {
1325 TaskId recev = null;
1327 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1329 // System.out.println("Id="+id+" neighId="+ neighId+" tag="+tag);
1330 for (int i = 0; i < neighbors.size(); i++)
1331 if (neighId != ((Integer) neighbors.get(i)).intValue()) {
1332 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1333 ((Integer) neighbors.get(i)).intValue());
1334 stub = recev.getHostStub();
1335 stub.initializeVerif(tag);
1336 // System.out.println("broadcast verification to :"+((Integer)neighbors.get(i)).intValue());
1338 if (action.equals("sendVerif"))
1342 public void initializeVerifLeader() throws RemoteException {
1344 for (int i = 0; i < resp.size(); i++)
1345 resp.setElementAt(0, i);
1350 public void initializeVerif(int tag) throws RemoteException {
1351 // System.out.println("Inside initializeVerif @@@@@");
1352 // System.out.println("sleeping till not reloading");
1353 while (reloading == true) {
1356 // System.out.println("sleeping till not reloading");
1357 } catch (Exception e) {
1360 if (verifNum + 1 == tag)
1361 if (state.equals("WAIT4V")) {
1362 action = "sendVerif";
1364 for (int i = 0; i < resp.size(); i++)
1365 resp.setElementAt(0, i);
1372 new SendVerifThread(jaceMyId, sendId, verifNum).start();
1374 } else if (state.equals("VERIF")) {
1376 throw new RemoteException();
1382 public boolean getSaved() {
1389 public void reinitializePPEr() {
1390 pseudoPerBeg = false;
1391 pseudoPerEnd = false;
1392 for (int i = 0; i < values.size(); i++)
1393 values.setElementAt(new Boolean(false), i);
1397 public void broadcastVerdict(int id, int neighId, int tag, boolean verd)
1398 throws RemoteException {
1399 Boolean verdicto = verd;
1400 TaskId recev = null;
1402 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1404 for (int i = 0; i < neighbors.size(); i++)
1405 if (((Integer) neighbors.get(i)).intValue() != neighId) {
1406 // System.out.println("broadcasting verdict "+verdicto+" to node of Rank "+((Integer)neighbors.get(i)).intValue());
1407 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1408 ((Integer) neighbors.get(i)).intValue());
1409 stub = recev.getHostStub();
1410 stub.savOrFinOrRest(tag, timeStep, verdicto, reduceAll);
1413 if (verdict == false)
1417 public void savOrFinOrRest(int tag, int step, boolean verd,
1418 Vector<?> reduceAll) {
1419 // System.out.println("Recieved verd "+verd+" sleeping till not reloading");
1420 while (reloading == true) {
1423 // System.out.println("sleeping till not reloading");
1424 } catch (Exception e) {
1428 if (verifNum == tag && timeStep == step && state.equals("VERIF")) {
1429 action = "sendVerdict";
1430 if (finalStep == true) {
1431 for (int i = 0; i < resp.size(); i++)
1432 resp.setElementAt(0, i);
1433 // System.out.println("sleeping till response is sent");
1434 while (action.equals("sendResponse") || respSent == false)
1437 // System.out.println("sleeping till response is sent");
1438 } catch (Exception e) {
1443 recievedVerdict = true;
1448 recievedVerdict = true;
1450 // System.out.println("//// Stetting reduceAll\\\\");
1451 this.reduceAll = reduceAll;
1454 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
1457 } else if (verifNum < tag && timeStep == step) {
1462 action = "sendVerdict";
1463 recievedVerdict = true;
1466 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict).start();
1471 public int getRankLeader() throws RemoteException {
1473 for (int i = 0; i < neighbors.size(); i++) {
1474 TaskId recev = null;
1475 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1476 ((Integer) (neighbors.get(i))).intValue());
1477 JaceInterface stub = recev.getHostStub();
1478 if (stub.getNbNeighboursNotConv() == 0)
1479 neighId = ((Integer) (neighbors.get(i))).intValue();
1484 class SaveTaskThread extends Thread {
1487 public SaveTaskThread(Task s) {
1491 @SuppressWarnings("static-access")
1493 // If array of BackupNodes not created, create it
1494 if (saveTab == null) {
1498 if (finalize == false) { // if finalization step, it will crash
1499 // because register purged yet
1501 ByteArrayOutputStream stream = convertTask2stream(sauv);
1503 // find the BackupNode to send
1506 JaceInterface stub = null;
1507 boolean sent = false;
1510 while (j < saveTab.length && sent == false) {
1511 // 1 - find the remote task Id to send the backup to
1512 if (jaceSize < (2 * Register.Instance()
1513 .getNumBackupNeighbors() + 1)) {
1514 taskRankDest = saveTab[saveRound % (jaceSize - 1)];
1516 taskRankDest = saveTab[saveRound
1517 % (2 * Register.Instance()
1518 .getNumBackupNeighbors())];
1521 // 2 - knowing the destination task Id, get the stub of the
1522 // corresponding node
1524 task = Register.Instance().getListeOfTasks()
1525 .getTaskIdOfRank(taskRankDest); // ///////////////////////////pb
1526 stub = task.getHostStub();
1527 } catch (Exception e) {
1529 .println("Problem in SaveTaskThread on assignation line in save : "
1532 // System.out.println("ite " + jaceP2P_Iteration +
1533 // ".......... SENDING on task " + taskRankDest);
1535 // if no stub there is a problem
1539 // System.out.println("unable to SEND backup on task of rank "
1542 // 3 - try to send the stream
1544 // if there is a stub, send the stream to that node
1546 stub.saveTask(jaceMyId, stream.toByteArray(),
1547 sauv.lastSave.getLastSave(), sauv.timeStep,
1548 Register.Instance().getAppliName(), 0);
1549 // System.out.println("saved data on "+taskRankDest+" iteration= "+sauv.lastSave.getLastSave()+
1550 // "timeStep="+sauv.timeStep+" !!!!!!!!");
1552 // Vector v = stub.getIterationOfBackup(jaceMyId,0);
1553 // int ite=((Integer)v.get(0)).intValue();
1554 // System.out.println("******************************************sauvegarde de donnees: ite="+ite+" taskDest="+taskRankDest+" ******************************************************");
1557 } catch (Exception e) {
1559 .println("JaceP2P_Error in Task.jaceP2P_Save() when saving stream: "
1568 JaceSession.Instance().getTaskThread().sleep(10);
1569 JaceSession.Instance().getTaskThread().yield();
1570 } catch (Exception e) {
1574 // 5 - if stream not sent at all, do something (WHAT ???)
1575 if (j > saveTab.length) {
1577 .println("No more alive neighbors for storing the Backup");
1578 // TODO : what to do if no BackupNode has answered ???
1587 class BroadcastTaskThread extends Thread {
1598 public BroadcastTaskThread(JaceInterface theStub, int theRank,
1599 byte[] theTsk, int theIteration, int timeStep, String theAppliName,
1600 int tag, int dest) {
1601 this.stub = theStub;
1602 this.rank = theRank;
1604 this.iteration = theIteration;
1605 this.timeStep = timeStep;
1606 this.appliName = theAppliName;
1609 // System.out.println("tag="+tag);
1612 // method launched by start()
1615 stub.saveTask(rank, tsk, iteration, timeStep, appliName, tag);
1616 JaceSession.Instance().getTaskObject().setSaved(true);
1619 // Vector v = stub.getIterationOfBackup(rank,0);
1620 // int ite=((Integer)v.get(0)).intValue();
1621 // System.out.println("++++++++++ Broadcast data ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1624 // Vector v = stub.getIterationOfBackup(rank,1);
1625 // int ite=((Integer)v.get(0)).intValue();
1626 // System.out.println("++++++++++ Broadcast dataConvg ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1627 // System.out.println("+++++++++++++++++++++++++++++++++++++++++ Broadcast convgData ite="+ite+" dest="+dest+" +++++++++++++++++++++++++++++++++++++++++++++++++++++");
1630 } catch (Exception e) {
1632 .println("Node not reachable by JaceServer.saveTask() in BroadcastTaskThread :"