Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
New development version.
[jaceP2P.git] / src / jaceP2P / Task.java
1 package jaceP2P;
2
3 import java.io.ByteArrayOutputStream;
4 import java.io.ObjectInputStream;
5 import java.io.ObjectOutputStream;
6 import java.rmi.ConnectException;
7 import java.rmi.RemoteException;
8 import java.util.Vector;
9
10 //import java.util.Calendar;
11 //import java.util.GregorianCalendar;
12
13 //import com.jamonapi.*;
14
15 public class Task implements Runnable, Cloneable, java.io.Serializable {
16         private static final long serialVersionUID = 1L;
17
18         public double errorLoc = 0;
19         public int saveParameter;
20         public int jaceMyId;
21         public int jaceSize;
22         public TaskId jaceTaskId = null;
23         public String[] jaceArgs;
24         public boolean reloading = false;
25         public String state = "NORMAL";
26         public int nb_not_recv;
27         public boolean electedNode = false;
28         public boolean respSent = false;
29         public Vector<Integer> resp;
30         public int verifNum = 0;
31         public LastSave lastSave = new LastSave();
32         public Vector<Integer> neighbors;
33         public Vector<Boolean> neighborsValues;
34         public Vector<Integer> dependancies;
35         public Vector<Boolean> values;
36         public int sendId;
37         public String action = "nothing";
38         public boolean verdict = false;
39         public boolean recievedVerdict = false;
40         public int jaceP2P_Iteration = 0;
41         public boolean finalStep = false;
42         // public Monitor mon1=null;
43         public int timeStep = 0; // time discretization counter for non-stationary
44         // problems
45         public boolean localCV_state;
46         public boolean jaceP2P_globalCV_state = false;
47         // attribute to know if an appli has finished yet
48         private boolean finalize = false;
49         public boolean pseudoPerBeg;
50         public boolean pseudoPerEnd;
51         public boolean underTh = false;
52         public boolean saved[];
53         // attributes for BackupNodes
54         private int saveRound = 0;
55         private int[] saveTab = null;
56         public boolean savedResults = false;
57         public Task sauv;
58         public BackupConvg sauvConvg = new BackupConvg();
59         public boolean postReloading = false;
60         @SuppressWarnings("unchecked")
61         public Vector reduceAll;
62
63         int cpt = 0;
64         int count = 0;
65         Thread th;
66
67         public Task() {
68                 reduceAll = new Vector<Object>();
69                 dependancies = new Vector<Integer>();
70                 values = new Vector<Boolean>();
71                 resp = new Vector<Integer>();
72                 saved = new boolean[2];
73                 sauv = this;
74                 neighbors = new Vector<Integer>();
75                 neighborsValues = new Vector<Boolean>();
76         }
77
78         public void getBackupForNewNode(int rank) {
79                 TaskId task = null;
80                 JaceInterface stub = null;
81                 task = Register.Instance().getListeOfTasks().getTaskIdOfRank(rank);
82
83                 stub = task.getHostStub();
84
85                 // if no stub there is a problem
86                 if (stub == null) {
87                         System.err.println("Unable to send backup on task of rank " + rank);
88                 } else {
89                         // if there is a stub, send the stream to that node
90                         try {
91                                 ByteArrayOutputStream stream;
92                                 synchronized (sauv) {
93                                         stream = convertTask2stream(sauv);
94                                         stub.saveTask(jaceMyId, stream.toByteArray(), sauv.lastSave
95                                                         .getLastSave(), sauv.timeStep, Register.Instance()
96                                                         .getAppliName(), 0);
97                                 }
98                                 synchronized (sauvConvg) {
99                                         if (sauvConvg.initialized == true) {
100                                                 stream = convertBackupConv2stream(sauvConvg);
101                                                 stub.saveTask(jaceMyId, stream.toByteArray(),
102                                                                 sauvConvg.lastSave.getLastSave(),
103                                                                 sauvConvg.timeStep, Register.Instance()
104                                                                                 .getAppliName(), 1);
105                                         }
106                                 }
107                         } catch (Exception e) {
108                                 System.err.println("Error in getBackupForNewNode :" + e);
109                                 e.printStackTrace(System.out);
110                         }
111                 }
112         }
113
114         // method to overload by user in the appli to convert the stream sent by the
115         // BackupNode to a Task object of the type of the appli this method is
116         // called
117         // in TaskLauncher.loadBackupAndRestart() and
118         // TaskLauncher.loadOrReloadTask()
119         public Task jaceP2P_ConvertStream(ObjectInputStream stream) {
120                 return null;
121         }
122
123         public void printSav() {
124         }
125
126         // method to overload by user in the appli to specify the reduceAll method
127         public synchronized void reduceAll(Vector<?> recievedValue) {
128         }
129
130         public void setId(TaskId Id) {
131                 jaceTaskId = Id;
132                 jaceMyId = jaceTaskId.getRank();
133                 try {
134                         jaceSize = Register.Instance().getNbOfTasks();
135                 } catch (Exception e) {
136                         try {
137                                 System.err.println("SetId is bad !! " + e + " "
138                                                 + LocalHost.Instance().getName());
139                                 // jaceSize = Register.Instance().getListeOfTasks().getSize();
140                         } catch (Exception e2) {
141                                 System.err.println("Not localised the spawner : " + e2);
142                         }
143                 }
144         }
145
146         public void setParam(String[] arg) {
147                 this.jaceArgs = arg;
148
149         }
150
151         public void setJaceSize(int nbTasks) {
152                 jaceSize = nbTasks;
153         }
154
155         public TaskId getId() {
156                 return jaceTaskId;
157                 // return myIdd;
158         }
159
160         public int getTimeStep() {
161                 return timeStep;
162         }
163
164         // method to overload by user in the appli to identify the neighbors of a
165         // node of rank i
166
167         public int[] getDependencies(int i) {
168                 return null ;
169         }
170
171         // method to overload by user in the appli to init each task at beginning of
172         // computation
173
174         public void jaceP2P_InitTask() {
175         }
176
177         // TaskLauncher.loadOrReloadTask()
178         public void jaceP2P_ReinitTask() {
179         }
180
181         // method to overload by user in the appli to safeguard the results
182         public void saveResults() {
183         }
184
185         // method to overload by user in the appli to save only the requiered
186         // attributes (iter, vecteurs,......) this method is called in
187         // Task.jaceP2P_Save()
188         public Task jaceP2P_SaveFromCrash() {
189                 System.out.println("JaceSaveFromCrash Task");
190                 return null;
191         }
192
193         public void jaceFinalize() {
194
195                 finalize = true;
196                 System.out.println("Ready to Death Task:" + jaceMyId);
197
198                 try {
199                         Register.Instance().getSpawnerStub().killApplication(
200                                         LocalHost.Instance().getStub());
201
202                 } catch (Exception e) {
203                         System.err
204                                         .println("Cannot join the Spawner to kill application: "
205                                                         + e);
206                 }
207
208                 JaceDaemon.Instance().reinitDaemon();
209
210         }
211
212         public void jaceP2P_ReinitConv() {
213                 // System.out.println("reinit conv");
214
215                 timeStep++;
216                 state = "NORMAL";
217                 // last_iter.removeAllElements();
218                 try {
219                         reinitializeVectors();
220
221                 } catch (Exception e) {
222                         System.err.println("Error in jaceP2P_reinitConv():" + e);
223                 }
224                 pseudoPerBeg = false;
225                 pseudoPerEnd = false;
226                 synchronized (lastSave) {
227                         lastSave = new LastSave();
228                 }
229                 // reinit les var de conv de Task generique
230                 localCV_state = false;
231                 jaceP2P_globalCV_state = false;
232
233         }
234
235         public void reinitializeVectors() throws RemoteException {
236                 values.removeAllElements();
237                 dependancies.removeAllElements();
238                 neighbors.removeAllElements();
239                 neighborsValues.removeAllElements();
240                 resp.removeAllElements();
241         }
242
243         public long jaceP2P_getChronoValue() {
244                 long result = 0;
245
246                 try {
247                         result = Register.Instance().getSpawnerStub().getChronoValue(
248                                         Register.Instance().getAppliName());
249                 } catch (Exception e) {
250                         System.err
251                                         .println("JaceP2P_Error in Task.jaceP2P_getChronoValue() on SuperNode : "
252                                                         + e);
253                 }
254
255                 return result;
256         }
257
258         public void run() {
259                 //System.out.println("ds run()");
260         }
261
262         /**
263          *asynchronous sending (non blocking), of data object to an other task
264          * 
265          * @param buffer
266          *            the object (serializable) data to be send
267          *@param dest
268          *            the task id for receiver's task
269          *@param tag
270          *            the tag for message
271          */
272         @SuppressWarnings("static-access")
273         public void jaceSend(Object buffer, int dest, int tag, double erreur_locale) {
274                 // System.out.println("dest : " + dest);
275
276                 TaskId recev = null;
277                 try {
278                         recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(dest);
279                         
280                         if (recev == null) {
281                                 System.err.println("In jaceSend recv = null !");
282                                 try {
283                                         JaceSession.Instance().getTaskThread().sleep(10);
284                                         JaceSession.Instance().getTaskThread().yield();
285                                 } catch (Exception e) {
286                                 }
287                         }
288
289                         // TODO : virer ce else, mais chercher pkoi recev est null des fois
290                         else {
291                                 if (recev.getRank() != dest) {
292                                         System.err.println("Problem !! pas le meme dest que ds les param");
293                                 }
294
295                                 // creer le Message
296                                 // TODO : rajouter nom de l'appli ds Messag
297                                 try {
298                                         JaceInterface stub;
299                                         stub = recev.getHostStub();
300                                         if (stub.getTimeStep() == timeStep) {
301                                                 // System.out.println("************ "+verifNum+" *************");
302                                                 Message msg = new Message();
303                                                 msg.setParam(buffer, jaceTaskId, recev, tag, timeStep,
304                                                                 jaceP2P_Iteration, verifNum, erreur_locale);
305
306                                                 // on met le message ds
307                                                 // JaceBuffer.Instance() (la liste des Message a
308                                                 // envoyer)
309
310                                                 /*
311                                                  * if(JaceDaemon.Instance().getProtocol().equals("socket"
312                                                  * )) SenderSocket.Instance().buffer.add(msg); else
313                                                  * if(JaceDaemon.Instance().getProtocol().equals("rmi"))
314                                                  * SenderRmi.Instance().buffer.add(msg);
315                                                  */
316                                                 // System.out.println("putting message to "+dest+" in the buffer");
317                                                 Sender.Instance().getBuffer().add(msg);
318
319                                                 // if(JaceDaemon.Instance().getProtocol().equals("rmi"))
320                                                 // SenderRmi.Instance().getBuffer().add(msg);
321                                                 // else
322                                                 // SenderSocket.Instance().getBuffer().add(msg);
323                                         }
324                                         
325                                 } catch( ConnectException ce ) {
326                                         recev.getHostStub().suicide2( "Not responding!" ) ;
327                                 } catch (Exception e) {
328                                         System.err.println("Unable to send data message to " + dest
329                                                         + ": " + e);
330 //                                      recev.getHostStub().suicide2( "Not responding!" ) ;
331                                 }
332                                 // System.out.println("TASK : g mis un msg qui doit etre envoye");
333                                 // envoie toujours asynchrone !!!!!!!! : le Message partira
334                                 // instantanement car
335                                 // -Sender faisait JaceBuffer.Instance().get() qui contient un
336                                 // wait()
337                                 // -et ici, Task fait JaceBuffer.Instance().add(msg) qui
338                                 // contient un notifyAll()
339                                 try {
340                                         JaceSession.Instance().getTaskThread().sleep(10);
341                                         JaceSession.Instance().getTaskThread().yield();
342                                 } catch (Exception e) {
343                                 }
344                                 // System.out.println("TASK : je sort de jaceSend");
345                         }
346                 } catch (Exception e) {
347                         if (Register.Instance().getListeOfTasks() == null)
348                                 System.err.println("Tasks list is null: " + e);
349                 }
350
351         }
352
353         /**
354          *not-blocking reception of data object, return an object
355          * 
356          * @param dest
357          *            the task id for the task sender
358          *@param tag
359          *            the tag for message
360          */
361         @SuppressWarnings("static-access")
362         public Object jaceReceive(int sender, int tag) {
363                 if (jaceP2P_Iteration == 0 || postReloading) {
364                         try {
365                                 if (notExist(sender)) {
366                                         setDep(sender);
367                                         // last_iter.add(0);
368                                 }
369                         } catch (Exception e) {
370                         }
371                 }
372
373                 Message tmp = MsgQueue.Instance().get(sender, tag);
374                 if (tmp != null) {
375                         // System.out.println("recu MSG de tache " + sender + " (" +
376                         // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
377                         // + ")  MsgQueue : " +
378                         // MsgQueue.Instance().getSize()+" message src_tag="+tmp.getSrc_tag()+
379                         // " localError="+tmp.getLocalError());
380                         try {
381
382                                 int index;
383                                 if (underTh == true && jaceP2P_Iteration != 0 && !reloading) {
384                                         index = depIndex(sender);
385
386                                         if ((!(state.equals("VERIF")) || verifNum == tmp
387                                                         .getSrc_tag())) {
388
389                                                 // System.out.println("dep["+sender+"]=true, index="+index);
390                                                 setValues(index, true);
391                                         }
392                                 }
393
394                         } catch (Exception e) {
395                                 System.err.println("Error jaceReceive :" + e);
396                                 System.err.println("Sender=" + sender);
397                         }
398                         return (tmp.getData());
399                 } else {
400                         try {
401                                 // System.out.println("RIENNN recu de tache " + sender + " (" +
402                                 // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
403                                 // + ")   taille MsgQueue : " + MsgQueue.Instance().getSize());
404                         } catch (Exception e) {
405                                 if (Register.Instance().getListeOfTasks() == null)
406                                         System.err.println("Tasks list is null2: " + e);
407                         }
408                         try {
409                                 JaceSession.Instance().getTaskThread().sleep(10);
410                                 JaceSession.Instance().getTaskThread().yield();
411                         } catch (Exception e) {
412                         }
413                         return null;
414                 }
415         }
416
417         public boolean notExist(int sender) {
418                 int i = dependancies.indexOf((Object) (new Integer(sender)));
419                 if (i == -1)
420                         return true;
421                 else
422                         return false;
423         }
424
425         public void setDep(int value) {
426                 dependancies.add(new Integer(value));
427                 values.add(new Boolean(false));
428         }
429
430         public int depIndex(int sender) {
431                 int index = dependancies.indexOf((Object) (new Integer(sender)));
432                 return index;
433         }
434
435         public void setValues(int index, boolean value) {
436                 values.setElementAt(new Boolean(value), index);
437         }
438
439         @SuppressWarnings("static-access")
440         public void jaceP2P_Save() {
441                 try {
442
443                         if (jaceP2P_Iteration == 0) {
444                                 // request 0 when task at barrier
445                                 // request 1 when only at convergence
446                                 broadcastTasks(0);
447                         } else if ((jaceP2P_Iteration % saveParameter) == 0) {
448                                 // clone the Task at that step of computations
449                                 saveRound++;
450                                 synchronized (sauv) {
451                                         sauv = getTask2save();
452
453                                         // send it to the corresponding BackupNode in a round robin
454                                         // fashion
455                                         new SaveTaskThread(sauv).start();
456                                 }
457                         }
458
459                         JaceSession.Instance().getTaskThread().sleep(10);
460                         JaceSession.Instance().getTaskThread().yield();
461                 } catch (Exception e) {
462                 }
463         }
464
465         // request 0 when Saving all task
466         // request 3 when saving only convergence data
467         @SuppressWarnings("static-access")
468         public void broadcastTasks(int request) {
469                 ByteArrayOutputStream stream;
470                 if (request == 0) {
471                         // 1 - clone the Task at that step of computations and serialize it
472                         try {
473                                 synchronized (sauv) {
474                                         sauv = getTask2save();
475                                         stream = convertTask2stream(sauv);
476
477                                         // 2 - create de saveTab if necessary
478                                         if (saveTab == null) {
479                                                 createSaveTab();
480                                         }
481                                         TaskId task = null;
482                                         JaceInterface stub = null;
483
484                                         // 3 - send the stream to all the BackupNodes
485                                         for (int i = 0; i < saveTab.length; i++) {
486                                                 // System.out.println("saveTab[" + i + "] = " +
487                                                 // saveTab[i]);
488
489                                                 // 3.1 - get the stub of destinatory
490                                                 // System.out.println("Saving on neighbor "+i);
491                                                 try {
492                                                         task = Register.Instance().getListeOfTasks()
493                                                                         .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
494                                                         stub = task.getHostStub();
495                                                 } catch (Exception e) {
496                                                         System.err
497                                                                         .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
498                                                                                         + e);
499                                                 }
500
501                                                 // 3.2 - send the stream to that stub
502
503                                                 // System.out.println("saving on second list");
504                                                 if (stub != null) {
505                                                         new BroadcastTaskThread(stub, jaceMyId, stream
506                                                                         .toByteArray(),
507                                                                         sauv.lastSave.getLastSave(), sauv.timeStep,
508                                                                         Register.Instance().getAppliName(), 0,
509                                                                         saveTab[i]).start();
510
511                                                 }
512
513                                         }
514                                 }
515                         } catch (Exception e) {
516                                 e.printStackTrace(System.out);
517                         }
518                 } else {
519                         synchronized (sauvConvg) {
520                                 sauvConvg = getBackupConvg2Save();
521                                 stream = convertBackupConv2stream(sauvConvg);
522
523                                 // 2 - create de saveTab if necessary
524                                 if (saveTab == null) {
525                                         createSaveTab();
526                                 }
527                                 TaskId task = null;
528                                 JaceInterface stub = null;
529
530                                 // 3 - send the stream to all the BackupNodes
531                                 for (int i = 0; i < saveTab.length; i++) {
532                                         // System.out.println("saveTab[" + i + "] = " + saveTab[i]);
533
534                                         // 3.1 - get the stub of destinatory
535                                         // System.out.println("Saving on neighbor "+i);
536                                         try {
537                                                 task = Register.Instance().getListeOfTasks()
538                                                                 .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
539                                                 stub = task.getHostStub();
540                                         } catch (Exception e) {
541                                                 System.err
542                                                                 .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
543                                                                                 + e);
544                                         }
545
546                                         // 3.2 - send the stream to that stub
547                                         /*
548                                          * if(request==3){ if (stub != null) { new
549                                          * BroadcastTaskThread(stub, jaceMyId, stream.toByteArray(),
550                                          * sauvConvg.lastSave.getLastSave(),timeStep,
551                                          * Register.Instance().getAppliName(),1,saveTab[i]).start();
552                                          * } //System.out.println("saving on second list"); } else{
553                                          */
554                                         if (stub != null) {
555                                                 new BroadcastTaskThread(stub, jaceMyId, stream
556                                                                 .toByteArray(), sauvConvg.lastSave
557                                                                 .getLastSave(), sauvConvg.timeStep, Register
558                                                                 .Instance().getAppliName(), 1, saveTab[i])
559                                                                 .start();
560                                         }
561
562                                         // System.out.println("saving on first list");
563                                 }
564                         }
565                 }
566                 try {
567                         // JaceSession.Instance().getTaskThread().sleep(10);
568                         JaceSession.Instance().getTaskThread().yield();
569                 }
570
571                 catch (Exception e) {
572                 }
573         }
574
575         private ByteArrayOutputStream convertBackupConv2stream(BackupConvg t) {
576                 // System.out.println("beginning of the checkpointing process......");
577                 ByteArrayOutputStream stream = new ByteArrayOutputStream();
578                 try {
579                         ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
580                         fluxOut.writeObject(t);
581                         fluxOut.close();
582                 } catch (Exception e) {
583                         System.err
584                                         .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
585                                                         + e);
586                 }
587                 // System.out.println("taille du tablo de byte : " +
588                 // stream.toByteArray().length + " bytes");
589                 return stream;
590         }
591
592         // convert the task in stream to send it to the BackupNode
593         private ByteArrayOutputStream convertTask2stream(Task t) {
594                 // System.out.println("beginning of the checkpointing process......");
595                 ByteArrayOutputStream stream = new ByteArrayOutputStream();
596                 try {
597                         ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
598                         fluxOut.writeObject(t);
599                         fluxOut.close();
600                 } catch (Exception e) {
601                         System.err
602                                         .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
603                                                         + e);
604                 }
605                 // System.out.println("taille du tablo de byte : " +
606                 // stream.toByteArray().length + " bytes");
607                 return stream;
608         }
609
610         // get the task of the appli with the data to save (in
611         // jaceP2P_SaveFromCrash() overloaded in the appli)
612         @SuppressWarnings("unchecked")
613         public Task getTask2save() {
614
615                 sauv = jaceP2P_SaveFromCrash();
616
617                 // System.out.println("Saving data at it="+sauv.it);
618                 // assign the default attributes of a task (params of the appli and
619                 // TaskId of the local node)
620                 sauv.setId(this.jaceTaskId);
621                 sauv.setParam(Register.Instance().getParams());
622                 sauv.timeStep = timeStep;
623
624                 // assign the iteration number that had the appli at the moment of the
625                 // checkpointing
626                 sauv.jaceP2P_Iteration = jaceP2P_Iteration;
627                 sauv.saveRound = saveRound;
628
629                 sauv.underTh = underTh;
630                 sauv.jaceP2P_globalCV_state = jaceP2P_globalCV_state;
631                 // System.out.println("I checkpoint the task at ite " +
632                 // sauv.jaceP2P_Iteration);
633                 // attributes needed to detect global convergence
634                 try {
635
636                         sauv.state = state;
637                         sauv.nb_not_recv = nb_not_recv;
638                         sauv.electedNode = electedNode;
639                         sauv.respSent = respSent;
640                         sauv.neighbors = (Vector) neighbors.clone();
641                         sauv.neighborsValues = (Vector) neighborsValues.clone();
642                         sauv.resp = (Vector) resp.clone();
643                         sauv.verifNum = verifNum;
644                         sauv.sendId = sendId;
645                         sauv.reduceAll = reduceAll;
646                         sauv.finalStep = finalStep;
647                         sauv.action = action;
648                         sauv.verdict = verdict;
649                         sauv.localCV_state = localCV_state;
650                         sauv.recievedVerdict = recievedVerdict;
651                         synchronized (lastSave) {
652                                 lastSave.increment();
653                                 sauv.lastSave = lastSave;
654                         }
655                 } catch (Exception e) {
656                         System.err.println("Problem with RMI !");
657                 }
658
659                 return sauv;
660         }
661
662         @SuppressWarnings("unchecked")
663         public BackupConvg getBackupConvg2Save() {
664
665                 try {
666
667                         // sauvConvg=new BackupConvg();
668                         sauvConvg.state = state;
669                         sauvConvg.underTh = underTh;
670                         sauvConvg.nb_not_recv = nb_not_recv;
671                         sauvConvg.electedNode = electedNode;
672                         sauvConvg.respSent = respSent;
673                         sauvConvg.neighbors = (Vector) neighbors.clone();
674                         sauvConvg.neighborsValues = (Vector) neighborsValues.clone();
675                         sauvConvg.resp = (Vector) resp.clone();
676                         sauvConvg.verifNum = verifNum;
677                         sauvConvg.sendId = sendId;
678                         sauvConvg.finalStep = finalStep;
679                         sauvConvg.action = action;
680                         sauvConvg.verdict = verdict;
681                         sauvConvg.localCV_state = localCV_state;
682                         sauvConvg.timeStep = timeStep;
683                         sauvConvg.recievedVerdict = recievedVerdict;
684                         sauvConvg.jaceP2P_Iteration = jaceP2P_Iteration;
685                         sauvConvg.reduceAll = reduceAll;
686
687                         synchronized (lastSave) {
688                                 lastSave.increment();
689                                 sauvConvg.lastSave = lastSave;
690                         }
691                         sauvConvg.initialized = true;
692                 } catch (Exception e) {
693                         System.err.println("Problem with RMI:" + e);
694                 }
695
696                 return sauvConvg;
697         }
698
699         private void createSaveTab() {
700
701                 saveTab = new int[BackupsManager.Instance().size()];
702
703                 // 2 - assign the taskId to each cell of the saveTab
704                 // System.out.println("in TASK");
705                 for (int i = 0; i < saveTab.length; i++) {
706                         saveTab[i] = BackupsManager.Instance().getBackupTaskAtIndex(i, 0)
707                                         .getTaskRank();
708                         // System.out.print(saveTab[i] + ", ");
709                 }
710                 // System.out.println("saveTab created..... size : " + saveTab.length);
711         }
712
713         public void setSaved(boolean bool) {
714                 synchronized (saved) {
715                         if (bool == true) {
716                                 if (saved[0] == false)
717                                         saved[0] = true;
718                                 else if (saved[1] == false)
719                                         saved[1] = true;
720                         } else {
721                                 saved[0] = false;
722                                 saved[1] = false;
723                         }
724                 }
725
726         }
727
728         public void waitForAck(int tag) {
729                 if (Register.Instance().getNumBackupNeighbors() != 0) {
730                         setSaved(false);
731                         broadcastTasks(tag);
732                         // Calendar cal = new GregorianCalendar();
733                         // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
734                         // System.out.println("sleeping till acknowledge saved");
735                         while (getSaved() == false)
736                                 try {
737                                         Thread.sleep(50);
738                                         count++;
739                                         if (count > 30) {
740                                                 try {
741                                                         Register newReg = Register.Instance()
742                                                                         .getSpawnerStub().getRegister(jaceMyId);
743                                                         if (newReg != null) {
744                                                                 Register.Instance().replaceBy(newReg);
745                                                         } else
746                                                                 System.out
747                                                                                 .println("I got a null register from the spawner");
748                                                         broadcastTasks(tag);
749                                                         System.out
750                                                                         .println("Sleeping till acknowledge saved");
751                                                         count = 0;
752                                                 } catch (Exception e2) {
753                                                         System.err
754                                                                         .println("Unable to get register from spawner :"
755                                                                                         + e2);
756                                                         e2.printStackTrace(System.out);
757                                                 }
758                                         }
759                                 } catch (Exception e) {
760                                 }
761                         // Calendar cal1 = new GregorianCalendar();
762                         // System.out.println("end save at time="+cal1.get(Calendar.MINUTE)+":"+cal1.get(Calendar.SECOND));
763                 }
764         }
765
766         public void jaceP2P_GlobalConvergence(boolean under_th) {
767
768                 try {
769
770                         Thread.yield();
771                         if (under_th != underTh && postReloading == false) {
772                                 // 1 - update sous seuil
773                                 underTh = under_th;
774                         }
775                         if (jaceP2P_Iteration == 0
776                                         && (postReloading == false || resp.size() == 0)) {
777                                 detectNeighbors(jaceMyId, jaceSize);
778                                 initialize_state();
779                                 sendId = -1;
780                                 verifNum = 0;
781                                 reloading = false;
782                                 broadcastTasks(3);
783                         }
784
785                         // affiche les valeurs des variables
786                         try {
787                                 // Calendar cal = new GregorianCalendar();
788                                 // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
789                                 // System.out.println("MyId="+jaceMyId+" sous seuil="+underTh+" etat="+state+" verifNum="+verifNum+" localCV="+localCV_state+" leader="+electedNode+" respSent="+respSent);
790                                 // System.out.println("dep="+getValues()+" pBeg="+pseudoPerBeg+" PEnd="+pseudoPerEnd+" sendId="+sendId);
791                                 // System.out.println("neigh="+nb_not_recv+" action="+action+" negative_resp="+testNegativeResp()+" reloading="+reloading+"\n"+" finalStep="+finalStep+" SavedResults="+savedResults+" NeghNotCV="+getNeighbourNotCV()+" postReload="+postReloading);
792                                 printResp();
793                                 printDep();
794                         } catch (Exception e) {
795                                 System.err.println("Error printing status in Task :" + e);
796                         }
797                         Thread.yield();
798                         if (action.equals("sendVerif") && state.equals("VERIF")
799                                         && postReloading == true) {
800                                 // System.out.println("je passe ds send verif");
801
802                                 new SendVerifThread(jaceMyId, sendId, verifNum).start();
803
804                                 // System.out.println("send Verif");
805
806                         } else if (action.equals("sendVerdict") && postReloading == true) {
807                                 // System.out.println("je passe ds send verdict-------");
808
809                                 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
810                                                 .start();
811
812                                 // System.out.println("verifNum="+verifNum);
813                         }
814
815                         if (state.equals("NORMAL") && action.equals("nothing")) {
816                                 if (underTh == false)
817                                         reinitializePPEr();
818                                 else
819                                 // System.out.println("sous seuil="+underTh+" PseudoPerBeg="+localStub.getPseudoPerBeg());
820                                 if (pseudoPerBeg == false) {
821                                         // System.out.println("ds la condition");
822                                         pseudoPerBeg = true;
823                                 } else if (pseudoPerEnd == true) {
824
825                                         localCV_state = true;
826                                         if (nb_not_recv == 0) {
827                                                 // localStub.setAction("sendVerif");
828                                                 try {
829                                                         broadcastVerif(jaceMyId, -1, verifNum + 1);
830                                                         // localStub.setAction("nothing");
831
832                                                         electedNode = true;
833                                                         recievedVerdict = true;
834                                                         initializeVerifLeader();
835                                                         state = "VERIF";
836                                                         broadcastTasks(3);
837                                                 } catch (Exception e) {
838                                                         System.err
839                                                                         .println("The verification message is not received: "
840                                                                                         + e);
841                                                         Register.Instance().viewAll();
842                                                 }
843                                         } else if (nb_not_recv == 1)
844                                                 if (sendId == -1) {
845
846                                                         int neighId = getNeighbourNotCV();
847                                                         TaskId recev = null;
848
849                                                         try {
850
851                                                                 recev = Register.Instance().getListeOfTasks()
852                                                                                 .getTaskIdOfRank(neighId);
853                                                                 JaceInterface stub = recev.getHostStub();
854                                                                 if (stub.setNbNeighboursNotConv(verifNum,
855                                                                                 jaceMyId, timeStep)) {
856                                                                         // stub.setLeftNeighbourCV(true);
857                                                                         // System.out.println("sent convergence message to "+neighId);
858                                                                         sendId = neighId;
859                                                                         state = "WAIT4V";
860                                                                         recievedVerdict = false;
861                                                                         broadcastTasks(3);
862                                                                 }
863                                                         } catch (Exception e) {
864                                                                 System.err
865                                                                                 .println("Unable to decrease the number of neighbors not converged on node :"
866                                                                                                 + recev.getHostName()
867                                                                                                 + " count ="
868                                                                                                 + count
869                                                                                                 + " . error:" + e);
870                                                                 count++;
871                                                                 Register.Instance().viewAll();
872                                                                 if (count > 3) {
873                                                                         try {
874                                                                                 int myRank;
875                                                                                 TaskId id = Register.Instance()
876                                                                                                 .getListeOfTasks()
877                                                                                                 .getTaskIdOfHostStub(
878                                                                                                                 LocalHost.Instance()
879                                                                                                                                 .getStub());
880                                                                                 myRank = id.getRank();
881                                                                                 Register.Instance().replaceBy(
882                                                                                                 Register.Instance()
883                                                                                                                 .getSpawnerStub()
884                                                                                                                 .getRegister(myRank));
885
886                                                                                 count = 0;
887                                                                         } catch (Exception e2) {
888                                                                                 System.err
889                                                                                                 .println("Unable to contact the spawner: "
890                                                                                                                 + e2);
891                                                                         }
892                                                                 }
893                                                         }
894                                                 }
895                                 } else if (pseudoPerBeg == true && getValues())
896                                         pseudoPerEnd = true;
897                         } else if (state.equals("WAIT4V") && nb_not_recv == 0) {
898                                 int neighId = getRankLeader();
899                                 if (neighId == -1) {
900                                         try {
901                                                 broadcastVerif(jaceMyId, -1, verifNum + 1);
902                                                 electedNode = true;
903                                                 recievedVerdict = true;
904                                                 initializeVerifLeader();
905                                                 state = "VERIF";
906
907                                                 broadcastTasks(3);
908                                         } catch (Exception e) {
909                                                 System.err
910                                                                 .println("The verification message is not received:"
911                                                                                 + e);
912                                         }
913                                 } else if (jaceMyId > neighId) {
914                                         try {
915                                                 broadcastVerif(jaceMyId, -1, verifNum + 1);
916                                                 electedNode = true;
917                                                 recievedVerdict = true;
918                                                 initializeVerifLeader();
919                                                 state = "VERIF";
920                                                 broadcastTasks(3);
921                                         } catch (Exception e) {
922                                                 System.err
923                                                                 .println("The verification message is not received: "
924                                                                                 + e);
925                                         }
926                                 }
927                         } else if (state.equals("WAIT4V")) {
928                                 if (underTh == false) {
929                                         localCV_state = false;
930                                         broadcastTasks(3);
931                                 }
932                         } else if (state.equals("VERIF")) {
933                                 // if(localStub.getNewerDep(0) && localStub.getNewerDep(1))
934                                 // localStub.setPseudoPerEnd(true);
935                                 // if(!underTh)
936                                 if (electedNode == true) {
937                                         if ((!underTh && !postReloading) || !localCV_state
938                                                         || testNegativeResp()) {
939                                                 try {
940
941                                                         broadcastVerdict(jaceMyId, -2, verifNum + 1, false);
942                                                         verifNum++;
943                                                         initialize_state();
944                                                         reinitializePPEr();
945                                                         broadcastTasks(3);
946                                                 } catch (Exception e) {
947                                                 }
948                                         } else if (postReloading) {
949                                                 if (recievedAllResp())
950                                                         if (!testNegativeResp()) {
951                                                                 try {
952                                                                         broadcastVerdict(jaceMyId, -2, verifNum,
953                                                                                         true);
954                                                                         if (finalStep == true
955                                                                                         && state.equals("VERIF")) {
956                                                                                 initializeSavLeader();
957                                                                                 state = "SAVING";
958                                                                                 waitForAck(0);
959                                                                         } else {
960                                                                                 state = "FINISHED";
961                                                                                 waitForAck(1);
962                                                                         }
963                                                                 } catch (Exception e) {
964                                                                         System.err.println("Error: " + e);
965                                                                 }
966                                                         }
967                                         } else if (pseudoPerEnd == true) {
968                                                 if (recievedAllResp())
969                                                         if (!testNegativeResp()) {
970                                                                 try {
971                                                                         broadcastVerdict(jaceMyId, -2, verifNum,
972                                                                                         true);
973                                                                         if (finalStep == true
974                                                                                         && state.equals("VERIF")) {
975                                                                                 initializeSavLeader();
976                                                                                 state = "SAVING";
977                                                                                 waitForAck(0);
978                                                                         } else {
979                                                                                 state = "FINISHED";
980                                                                                 waitForAck(3);
981                                                                         }
982
983                                                                 } catch (Exception e) {
984                                                                         System.out.println("erreur: " + e);
985                                                                 }
986                                                         } else {
987                                                                 try {
988
989                                                                         broadcastVerdict(jaceMyId, -2,
990                                                                                         verifNum + 1, false);
991                                                                         verifNum++;
992                                                                         initialize_state();
993                                                                         reinitializePPEr();
994                                                                         broadcastTasks(3);
995                                                                 } catch (Exception e) {
996                                                                         System.err
997                                                                                         .println("Unable to broadcast a negative verdict :"
998                                                                                                         + e);
999                                                                 }
1000                                                         }
1001                                         }
1002
1003                                         else if (getValues())
1004                                                 pseudoPerEnd = true;
1005                                 } else if (!respSent) {
1006                                         if (!underTh || !localCV_state || testNegativeResp()) {
1007                                                 if (action.equals("nothing")) {
1008                                                         int neighId = sendId;
1009                                                         try {
1010                                                                 TaskId recev = null;
1011                                                                 recev = Register.Instance().getListeOfTasks()
1012                                                                                 .getTaskIdOfRank(neighId);
1013                                                                 JaceInterface stub = recev.getHostStub();
1014                                                                 // System.out.println("tryin to send negative response to "+neighId);
1015                                                                 stub.response(jaceMyId, verifNum, -1, null);
1016                                                                 // System.out.println("send negative response to "+neighId);
1017                                                                 respSent = true;
1018                                                                 broadcastTasks(3);
1019
1020                                                         } catch (Exception e) {
1021                                                                 System.err
1022                                                                                 .println("Response not received:" + e);
1023                                                                 Register.Instance().viewAll();
1024                                                         }
1025                                                 }
1026                                         } else if (pseudoPerEnd) {
1027                                                 // System.out.print("The daemon can send a response ");
1028                                                 int index = recievedAllRespMinusOne();
1029                                                 // System.out.println("to node of index ="+index);
1030                                                 if (index != -1) {
1031                                                         int rank = getNeighborRank(index);
1032                                                         // if(jaceMyId!=jaceSize-1 &&
1033                                                         // localStub.getSendRight()==true){
1034                                                         try {
1035                                                                 TaskId recev = null;
1036                                                                 recev = Register.Instance().getListeOfTasks()
1037                                                                                 .getTaskIdOfRank(rank);
1038                                                                 JaceInterface stub = recev.getHostStub();
1039
1040                                                                 if (getResp(index) == 1) {
1041                                                                         stub.response(jaceMyId, verifNum, 1,
1042                                                                                         reduceAll);
1043                                                                         // System.out.println("send positive response to"+rank);
1044                                                                 } else {
1045                                                                         stub.response(jaceMyId, verifNum, -1, null);
1046                                                                         // System.out.println("send negative response to"+rank);
1047                                                                 }
1048                                                                 respSent = true;
1049                                                                 broadcastTasks(3);
1050
1051                                                         } catch (Exception e) {
1052                                                                 System.err.println("Response not received by "
1053                                                                                 + rank + ": " + e);
1054                                                                 Register.Instance().viewAll();
1055                                                         }
1056                                                 }
1057                                         } else if (getValues())
1058                                                 pseudoPerEnd = true;
1059                                 }
1060
1061                         } else if (state.equals("SAVING") && !action.equals("sendVerdict")) {
1062
1063                                 if (savedResults == false) {
1064
1065                                         saveResults();
1066                                         savedResults = true;
1067                                         broadcastTasks(3);
1068                                 } else if (electedNode) {
1069                                         if (recievedAllResp())
1070                                                 try {
1071                                                         // System.out.println("recieved all responses");
1072                                                         JaceSpawnerInterface spawnerStub = Register
1073                                                                         .Instance().getSpawnerStub();
1074                                                         // System.out.println("##### callin spawnerStub.setFinished(true) #####");
1075                                                         spawnerStub.setOver(true);
1076                                                         // localStub.setState("FINISHED");
1077                                                 } catch (Exception e) {
1078                                                         System.err.println("Error" + e);
1079                                                 }
1080                                 } else if (!respSent) {
1081                                         int index = recievedAllRespMinusOne();
1082                                         if (index != -1) {
1083                                                 int rank = getNeighborRank(index);
1084
1085                                                 // if(jaceMyId!=jaceSize-1 &&
1086                                                 // localStub.getSendRight()==true){
1087                                                 try {
1088                                                         TaskId recev = null;
1089                                                         recev = Register.Instance().getListeOfTasks()
1090                                                                         .getTaskIdOfRank(rank);
1091                                                         JaceInterface stub = recev.getHostStub();
1092                                                         if (stub.getReloading() == false
1093                                                                         && stub.getState().equals("SAVING")) {
1094                                                                 action = "sendResponse";
1095                                                                 stub.response(jaceMyId, verifNum, 1, null);
1096                                                                 respSent = true;
1097                                                                 action = "nothing";
1098                                                                 broadcastTasks(3);
1099                                                                 // System.out.println("send response to"+rank);
1100                                                         }
1101                                                 } catch (Exception e) {
1102                                                         System.err.println("Response not received" + e);
1103                                                 }
1104                                         }
1105                                 }
1106                         } else if (state.equals("FINISHED")
1107                                         && !action.equals("sendVerdict") && recievedVerdict) {
1108
1109                                 jaceP2P_globalCV_state = true;
1110                                 // System.out.println("Finished");
1111                                 // System.out.println("Finished");
1112                                 // System.out.println("Finished");
1113                                 // System.out.println("Finished");
1114                                 System.out.println("Finished");
1115                         }
1116
1117                         if (postReloading == true)
1118                                 postReloading = false;
1119
1120                 } catch (Exception e) {
1121                         System.err.println("Exception in Global Convergence :" + e);
1122                         e.printStackTrace(System.out);
1123                         Register.Instance().viewAll();
1124                 }
1125                 // mon1.stop();
1126         }
1127
1128         public synchronized int getResp(int index) throws RemoteException {
1129                 int res = 1;
1130                 for (int i = 0; i < resp.size(); i++)
1131                         if (i != index)
1132                                 if (((Integer) resp.get(i)).intValue() == -1)
1133                                         res = -1;
1134                 return res;
1135         }
1136
1137         public synchronized void response(int neighId, int tag, int response,
1138                         Vector<?> recievedValue) throws RemoteException {
1139                 // System.out.println("inside response function");
1140                 // System.out.println("sleeping till not reloading");
1141                 while (reloading == true) {
1142                         try {
1143                                 Thread.sleep(10);
1144                                 // System.out.println("sleeping till not reloading");
1145                         } catch (Exception e) {
1146                         }
1147                 }
1148                 if (verifNum == tag) {
1149                         // System.out.println("inside condition");
1150                         int indexNeigh = neighbors.indexOf((Object) neighId);
1151                         // System.out.println("after gettin index="+index);
1152                         // System.out.println("index="+indexNeigh+" size de resp ="+resp.size());
1153                         // printResp();
1154                         // try{
1155                         // int xyz=((Integer)(resp.elementAt(indexNeigh))).intValue();
1156                         // }catch(Exception e){
1157                         // System.out.println("fuckkkkkkkkkkkkkkkkkk error:"+e);
1158                         // }
1159
1160                         if (response == 1
1161                                         && !state.equals("SAVING")
1162                                         && (((Integer) (resp.elementAt(indexNeigh))).intValue()) != 1) {
1163                                 // System.out.println("calling reduceAll()");
1164                                 reduceAll(recievedValue);
1165                         }
1166                         // System.out.println("after calculating reduceAll");
1167                         resp.setElementAt(response, indexNeigh);
1168
1169                         // System.out.println("get response ............");
1170                         waitForAck(3);
1171
1172                 } else
1173                         throw new RemoteException();
1174         }
1175
1176         public void initializeSavLeader() {
1177                 for (int i = 0; i < resp.size(); i++)
1178                         resp.setElementAt(0, i);
1179                 respSent = false;
1180         }
1181
1182         public boolean recievedAllResp() {
1183                 boolean bool = true;
1184                 for (int i = 0; i < resp.size(); i++)
1185                         if (((Integer) (resp.get(i))).intValue() == 0) {
1186                                 bool = false;
1187                                 break;
1188                         }
1189                 return bool;
1190         }
1191
1192         public int recievedAllRespMinusOne() {
1193                 int nbOfZeros = 0;
1194                 int indexOfZero = -1;
1195                 for (int i = 0; i < resp.size(); i++)
1196                         if (((Integer) (resp.get(i))).intValue() == 0) {
1197                                 nbOfZeros++;
1198                                 indexOfZero = i;
1199                         }
1200                 if (nbOfZeros > 1)
1201                         return -1;
1202                 else
1203                         return indexOfZero;
1204         }
1205
1206         public synchronized boolean setNbNeighboursNotConv(int tag, int idNeigh,
1207                         int neighborTimeStep) throws RemoteException {
1208                 // System.out.println("ds  setNbNeighboursNotConv !!!!!!!!!!!!!!!");
1209                 if (tag == verifNum && !action.equals("sendVerdict")
1210                                 && neighborTimeStep == timeStep) {
1211                         if (idNeigh == -1) {
1212                                 nb_not_recv--;
1213                                 return false;
1214                         } else {
1215                                 // System.out.println("sleeping till not reloading");
1216                                 while (reloading == true) {
1217                                         try {
1218                                                 Thread.sleep(10);
1219                                                 // System.out.println("sleeping till not reloading");
1220                                         } catch (Exception e) {
1221                                         }
1222                                 }
1223                                 int i = neighbors.indexOf((Object) idNeigh);
1224                                 if (((Boolean) neighborsValues.get(i)).booleanValue() == false) {
1225                                         nb_not_recv--;
1226                                         // System.out.println("le noeud "+idNeigh+" a envoyer un message de pseudoconvergence");
1227                                         neighborsValues.setElementAt(new Boolean(true), i);
1228                                         waitForAck(3);
1229
1230                                 }
1231                         }
1232                         return true;
1233                 } else if (tag == verifNum - 1 && !action.equals("sendVerdict")
1234                                 && reloading == false && neighborTimeStep == timeStep
1235                                 && jaceP2P_Iteration != 0)
1236                         return true;
1237                 else
1238                         return false;
1239         }
1240
1241         public void detectNeighbors(int id, int jaceSize) {
1242                 // System.out.println("detect neighbors !!!! ");
1243                 int d = 0;
1244                 while (Math.pow(2, d) < jaceSize) {
1245                         if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize)) {
1246                                 neighbors.add((int) (id + Math.pow(2, d)));
1247                                 neighborsValues.add(new Boolean(false));
1248                                 resp.add(0);
1249                         }
1250                         if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d)) {
1251                                 neighbors.add((int) (id - Math.pow(2, d)));
1252                                 neighborsValues.add(new Boolean(false));
1253                                 resp.add(0);
1254                         }
1255                         d++;
1256                 }
1257         }
1258
1259         public synchronized void initialize_state() {
1260
1261                 // System.out.println("initialiser\n");
1262
1263                 nb_not_recv = neighbors.size();
1264                 for (int i = 0; i < neighbors.size(); i++)
1265                         neighborsValues.setElementAt(new Boolean(false), i);
1266                 for (int i = 0; i < resp.size(); i++)
1267                         resp.setElementAt(0, i);
1268                 underTh = false;
1269                 electedNode = false;
1270                 localCV_state = false;
1271                 verdict = false;
1272
1273                 state = "NORMAL";
1274         }
1275
1276         public int getNeighborRank(int index) throws RemoteException {
1277                 int rank = ((Integer) (neighbors.get(index))).intValue();
1278                 return rank;
1279         }
1280
1281         public synchronized int getNeighbourNotCV() {
1282                 int neighId = -1;
1283                 for (int i = 0; i < neighbors.size(); i++)
1284                         if (((Boolean) neighborsValues.get(i)).booleanValue() == false)
1285                                 neighId = ((Integer) neighbors.elementAt(i)).intValue();
1286                 return neighId;
1287         }
1288
1289         public boolean getValues() {
1290                 boolean bool = true;
1291                 for (int i = 0; i < values.size(); i++)
1292                         if (((Boolean) values.elementAt(i)).equals(new Boolean(false))) {
1293                                 bool = false;
1294                                 break;
1295                         }
1296                 // System.out.println("getValues() have been called and it returned "+bool);
1297                 return bool;
1298         }
1299
1300         public boolean testNegativeResp() {
1301                 boolean bool = false;
1302                 for (int i = 0; i < resp.size(); i++)
1303                         if (((Integer) (resp.get(i))).intValue() == -1) {
1304                                 bool = true;
1305                                 break;
1306                         }
1307                 return bool;
1308         }
1309
1310         public void printResp() {
1311                 // for(int i=0;i<resp.size();i++)
1312                 // System.out.print(" resp["+((Integer)neighbors.get(i)).intValue()+"]="+((Integer)resp.get(i)).intValue());
1313                 // System.out.print("\n");
1314         }
1315
1316         public void printDep() {
1317                 // for(int i=0;i<values.size();i++)
1318                 // System.out.print(" dep["+((Integer)dependancies.get(i)).intValue()+"]="+((Boolean)values.get(i)).booleanValue());
1319                 // System.out.print("\n");
1320         }
1321
1322         public void broadcastVerif(int id, int neighId, int tag)
1323                         throws RemoteException {
1324
1325                 TaskId recev = null;
1326                 // TaskId local =
1327                 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1328                 JaceInterface stub;
1329                 // System.out.println("Id="+id+" neighId="+ neighId+" tag="+tag);
1330                 for (int i = 0; i < neighbors.size(); i++)
1331                         if (neighId != ((Integer) neighbors.get(i)).intValue()) {
1332                                 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1333                                                 ((Integer) neighbors.get(i)).intValue());
1334                                 stub = recev.getHostStub();
1335                                 stub.initializeVerif(tag);
1336                                 // System.out.println("broadcast verification to :"+((Integer)neighbors.get(i)).intValue());
1337                         }
1338                 if (action.equals("sendVerif"))
1339                         action = "nothing";
1340         }
1341
1342         public void initializeVerifLeader() throws RemoteException {
1343                 reinitializePPEr();
1344                 for (int i = 0; i < resp.size(); i++)
1345                         resp.setElementAt(0, i);
1346                 verifNum++;
1347                 respSent = false;
1348         }
1349
1350         public void initializeVerif(int tag) throws RemoteException {
1351                 // System.out.println("Inside initializeVerif @@@@@");
1352                 // System.out.println("sleeping till not reloading");
1353                 while (reloading == true) {
1354                         try {
1355                                 Thread.sleep(10);
1356                                 // System.out.println("sleeping till not reloading");
1357                         } catch (Exception e) {
1358                         }
1359                 }
1360                 if (verifNum + 1 == tag)
1361                         if (state.equals("WAIT4V")) {
1362                                 action = "sendVerif";
1363                                 reinitializePPEr();
1364                                 for (int i = 0; i < resp.size(); i++)
1365                                         resp.setElementAt(0, i);
1366                                 verifNum++;
1367
1368                                 respSent = false;
1369                                 state = "VERIF";
1370                                 waitForAck(3);
1371
1372                                 new SendVerifThread(jaceMyId, sendId, verifNum).start();
1373
1374                         } else if (state.equals("VERIF")) {
1375                         } else {
1376                                 throw new RemoteException();
1377
1378                         }
1379
1380         }
1381
1382         public boolean getSaved() {
1383                 if (saved[1])
1384                         return true;
1385                 else
1386                         return false;
1387         }
1388
1389         public void reinitializePPEr() {
1390                 pseudoPerBeg = false;
1391                 pseudoPerEnd = false;
1392                 for (int i = 0; i < values.size(); i++)
1393                         values.setElementAt(new Boolean(false), i);
1394
1395         }
1396
1397         public void broadcastVerdict(int id, int neighId, int tag, boolean verd)
1398                         throws RemoteException {
1399                 Boolean verdicto = verd;
1400                 TaskId recev = null;
1401                 // TaskId local =
1402                 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1403                 JaceInterface stub;
1404                 for (int i = 0; i < neighbors.size(); i++)
1405                         if (((Integer) neighbors.get(i)).intValue() != neighId) {
1406                                 // System.out.println("broadcasting verdict "+verdicto+" to node of Rank "+((Integer)neighbors.get(i)).intValue());
1407                                 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1408                                                 ((Integer) neighbors.get(i)).intValue());
1409                                 stub = recev.getHostStub();
1410                                 stub.savOrFinOrRest(tag, timeStep, verdicto, reduceAll);
1411                         }
1412                 action = "nothing";
1413                 if (verdict == false)
1414                         sendId = -1;
1415         }
1416
1417         public void savOrFinOrRest(int tag, int step, boolean verd,
1418                         Vector<?> reduceAll) {
1419                 // System.out.println("Recieved verd "+verd+" sleeping till not reloading");
1420                 while (reloading == true) {
1421                         try {
1422                                 Thread.sleep(10);
1423                                 // System.out.println("sleeping till not reloading");
1424                         } catch (Exception e) {
1425                         }
1426                 }
1427                 if (verd == true) {
1428                         if (verifNum == tag && timeStep == step && state.equals("VERIF")) {
1429                                 action = "sendVerdict";
1430                                 if (finalStep == true) {
1431                                         for (int i = 0; i < resp.size(); i++)
1432                                                 resp.setElementAt(0, i);
1433                                         // System.out.println("sleeping till response is sent");
1434                                         while (action.equals("sendResponse") || respSent == false)
1435                                                 try {
1436                                                         Thread.sleep(10);
1437                                                         // System.out.println("sleeping till response is sent");
1438                                                 } catch (Exception e) {
1439                                                 }
1440                                         respSent = false;
1441                                         state = "SAVING";
1442                                         verdict = true;
1443                                         recievedVerdict = true;
1444
1445                                 } else {
1446                                         state = "FINISHED";
1447                                         verdict = true;
1448                                         recievedVerdict = true;
1449                                 }
1450                                 // System.out.println("//// Stetting reduceAll\\\\");
1451                                 this.reduceAll = reduceAll;
1452                                 waitForAck(3);
1453
1454                                 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
1455                                                 .start();
1456                         }
1457                 } else if (verifNum < tag && timeStep == step) {
1458                         verifNum = tag;
1459                         initialize_state();
1460                         reinitializePPEr();
1461                         verdict = false;
1462                         action = "sendVerdict";
1463                         recievedVerdict = true;
1464                         waitForAck(3);
1465
1466                         new SendVerdictThread(jaceMyId, sendId, verifNum, verdict).start();
1467                 }
1468
1469         }
1470
1471         public int getRankLeader() throws RemoteException {
1472                 int neighId = -1;
1473                 for (int i = 0; i < neighbors.size(); i++) {
1474                         TaskId recev = null;
1475                         recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1476                                         ((Integer) (neighbors.get(i))).intValue());
1477                         JaceInterface stub = recev.getHostStub();
1478                         if (stub.getNbNeighboursNotConv() == 0)
1479                                 neighId = ((Integer) (neighbors.get(i))).intValue();
1480                 }
1481                 return neighId;
1482         }
1483
1484         class SaveTaskThread extends Thread {
1485                 Task sauv;
1486
1487                 public SaveTaskThread(Task s) {
1488                         this.sauv = s;
1489                 }
1490
1491                 @SuppressWarnings("static-access")
1492                 public void run() {
1493                         // If array of BackupNodes not created, create it
1494                         if (saveTab == null) {
1495                                 createSaveTab();
1496                         }
1497
1498                         if (finalize == false) { // if finalization step, it will crash
1499                                 // because register purged yet
1500
1501                                 ByteArrayOutputStream stream = convertTask2stream(sauv);
1502
1503                                 // find the BackupNode to send
1504                                 int taskRankDest;
1505                                 TaskId task = null;
1506                                 JaceInterface stub = null;
1507                                 boolean sent = false;
1508                                 int j = 0;
1509
1510                                 while (j < saveTab.length && sent == false) {
1511                                         // 1 - find the remote task Id to send the backup to
1512                                         if (jaceSize < (2 * Register.Instance()
1513                                                         .getNumBackupNeighbors() + 1)) {
1514                                                 taskRankDest = saveTab[saveRound % (jaceSize - 1)];
1515                                         } else {
1516                                                 taskRankDest = saveTab[saveRound
1517                                                                 % (2 * Register.Instance()
1518                                                                                 .getNumBackupNeighbors())];
1519                                         }
1520
1521                                         // 2 - knowing the destination task Id, get the stub of the
1522                                         // corresponding node
1523                                         try {
1524                                                 task = Register.Instance().getListeOfTasks()
1525                                                                 .getTaskIdOfRank(taskRankDest); // ///////////////////////////pb
1526                                                 stub = task.getHostStub();
1527                                         } catch (Exception e) {
1528                                                 System.err
1529                                                                 .println("Problem in SaveTaskThread on assignation line in save : "
1530                                                                                 + e);
1531                                         }
1532                                         // System.out.println("ite " + jaceP2P_Iteration +
1533                                         // ".......... SENDING on task " + taskRankDest);
1534
1535                                         // if no stub there is a problem
1536                                         if (stub == null) {
1537                                                 j++;
1538                                                 saveRound++;
1539                                                 // System.out.println("unable to SEND backup on task of rank "
1540                                                 // + taskRankDest);
1541                                         }
1542                                         // 3 - try to send the stream
1543                                         else {
1544                                                 // if there is a stub, send the stream to that node
1545                                                 try {
1546                                                         stub.saveTask(jaceMyId, stream.toByteArray(),
1547                                                                         sauv.lastSave.getLastSave(), sauv.timeStep,
1548                                                                         Register.Instance().getAppliName(), 0);
1549                                                         // System.out.println("saved data on "+taskRankDest+" iteration= "+sauv.lastSave.getLastSave()+
1550                                                         // "timeStep="+sauv.timeStep+" !!!!!!!!");
1551
1552                                                         // Vector v = stub.getIterationOfBackup(jaceMyId,0);
1553                                                         // int ite=((Integer)v.get(0)).intValue();
1554                                                         // System.out.println("******************************************sauvegarde de donnees: ite="+ite+" taskDest="+taskRankDest+" ******************************************************");
1555                                                         sent = true;
1556
1557                                                 } catch (Exception e) {
1558                                                         System.err
1559                                                                         .println("JaceP2P_Error in Task.jaceP2P_Save() when saving stream: "
1560                                                                                         + e);
1561                                                         j++;
1562                                                         saveRound++;
1563
1564                                                 }
1565                                         }
1566
1567                                         try {
1568                                                 JaceSession.Instance().getTaskThread().sleep(10);
1569                                                 JaceSession.Instance().getTaskThread().yield();
1570                                         } catch (Exception e) {
1571                                         }
1572                                 }
1573
1574                                 // 5 - if stream not sent at all, do something (WHAT ???)
1575                                 if (j > saveTab.length) {
1576                                         System.err
1577                                                         .println("No more alive neighbors for storing the Backup");
1578                                         // TODO : what to do if no BackupNode has answered ???
1579                                 }
1580
1581                         }
1582                 }
1583         }
1584
1585 }
1586
1587 class BroadcastTaskThread extends Thread {
1588         JaceInterface stub;
1589         int rank;
1590         byte[] tsk;
1591         int iteration;
1592         int tag;
1593         String appliName;
1594         int timeStep;
1595         int dest;
1596
1597         // constructor
1598         public BroadcastTaskThread(JaceInterface theStub, int theRank,
1599                         byte[] theTsk, int theIteration, int timeStep, String theAppliName,
1600                         int tag, int dest) {
1601                 this.stub = theStub;
1602                 this.rank = theRank;
1603                 this.tsk = theTsk;
1604                 this.iteration = theIteration;
1605                 this.timeStep = timeStep;
1606                 this.appliName = theAppliName;
1607                 this.tag = tag;
1608                 this.dest = dest;
1609                 // System.out.println("tag="+tag);
1610         }
1611
1612         // method launched by start()
1613         public void run() {
1614                 try {
1615                         stub.saveTask(rank, tsk, iteration, timeStep, appliName, tag);
1616                         JaceSession.Instance().getTaskObject().setSaved(true);
1617
1618                         if (tag == 0) {
1619                                 // Vector v = stub.getIterationOfBackup(rank,0);
1620                                 // int ite=((Integer)v.get(0)).intValue();
1621                                 // System.out.println("++++++++++ Broadcast data ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1622
1623                         } else {
1624                                 // Vector v = stub.getIterationOfBackup(rank,1);
1625                                 // int ite=((Integer)v.get(0)).intValue();
1626                                 // System.out.println("++++++++++ Broadcast dataConvg ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1627                                 // System.out.println("+++++++++++++++++++++++++++++++++++++++++ Broadcast convgData ite="+ite+" dest="+dest+" +++++++++++++++++++++++++++++++++++++++++++++++++++++");
1628
1629                         }
1630                 } catch (Exception e) {
1631                         System.err
1632                                         .println("Node not reachable by JaceServer.saveTask() in BroadcastTaskThread :"
1633                                                         + e);
1634                 }
1635         }
1636 }
1637