Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Modification of mapping algorithm identifier mechanisms.
[jaceP2P.git] / src / jaceP2P / Task.java
1 package jaceP2P;
2
3 import java.io.ByteArrayOutputStream;
4 import java.io.ObjectInputStream;
5 import java.io.ObjectOutputStream;
6 import java.rmi.RemoteException;
7 import java.util.Vector;
8
9 //import java.util.Calendar;
10 //import java.util.GregorianCalendar;
11
12 //import com.jamonapi.*;
13
14 public class Task implements Runnable, Cloneable, java.io.Serializable {
15         private static final long serialVersionUID = 1L;
16
17         public double errorLoc = 0;
18         public int saveParameter;
19         public int jaceMyId;
20         public int jaceSize;
21         public TaskId jaceTaskId = null;
22         public String[] jaceArgs;
23         public boolean reloading = false;
24         public String state = "NORMAL";
25         public int nb_not_recv;
26         public boolean electedNode = false;
27         public boolean respSent = false;
28         public Vector<Integer> resp;
29         public int verifNum = 0;
30         public LastSave lastSave = new LastSave();
31         public Vector<Integer> neighbors;
32         public Vector<Boolean> neighborsValues;
33         public Vector<Integer> dependancies;
34         public Vector<Boolean> values;
35         public int sendId;
36         public String action = "nothing";
37         public boolean verdict = false;
38         public boolean recievedVerdict = false;
39         public int jaceP2P_Iteration = 0;
40         public boolean finalStep = false;
41         // public Monitor mon1=null;
42         public int timeStep = 0; // time discretization counter for non-stationary
43         // problems
44         public boolean localCV_state;
45         public boolean jaceP2P_globalCV_state = false;
46         // attribute to know if an appli has finished yet
47         private boolean finalize = false;
48         public boolean pseudoPerBeg;
49         public boolean pseudoPerEnd;
50         public boolean underTh = false;
51         public boolean saved[];
52         // attributes for BackupNodes
53         private int saveRound = 0;
54         private int[] saveTab = null;
55         public boolean savedResults = false;
56         public Task sauv;
57         public BackupConvg sauvConvg = new BackupConvg();
58         public boolean postReloading = false;
59         public Vector<?> reduceAll;
60
61         int cpt = 0;
62         int count = 0;
63         Thread th;
64
65         public Task() {
66                 reduceAll = new Vector<Object>();
67                 dependancies = new Vector<Integer>();
68                 values = new Vector<Boolean>();
69                 resp = new Vector<Integer>();
70                 saved = new boolean[2];
71                 sauv = this;
72                 neighbors = new Vector<Integer>();
73                 neighborsValues = new Vector<Boolean>();
74         }
75
76         public void getBackupForNewNode(int rank) {
77                 TaskId task = null;
78                 JaceInterface stub = null;
79                 task = Register.Instance().getListeOfTasks().getTaskIdOfRank(rank);
80
81                 stub = task.getHostStub();
82
83                 // if no stub there is a problem
84                 if (stub == null) {
85                         System.err.println("Unable to send backup on task of rank " + rank);
86                 } else {
87                         // if there is a stub, send the stream to that node
88                         try {
89                                 ByteArrayOutputStream stream;
90                                 synchronized (sauv) {
91                                         stream = convertTask2stream(sauv);
92                                         stub.saveTask(jaceMyId, stream.toByteArray(), sauv.lastSave
93                                                         .getLastSave(), sauv.timeStep, Register.Instance()
94                                                         .getAppliName(), 0);
95                                 }
96                                 synchronized (sauvConvg) {
97                                         if (sauvConvg.initialized == true) {
98                                                 stream = convertBackupConv2stream(sauvConvg);
99                                                 stub.saveTask(jaceMyId, stream.toByteArray(),
100                                                                 sauvConvg.lastSave.getLastSave(),
101                                                                 sauvConvg.timeStep, Register.Instance()
102                                                                                 .getAppliName(), 1);
103                                         }
104                                 }
105                         } catch (Exception e) {
106                                 System.err.println("Error in getBackupForNewNode :" + e);
107                                 e.printStackTrace(System.out);
108                         }
109                 }
110         }
111
112         // method to overload by user in the appli to convert the stream sent by the
113         // BackupNode to a Task object of the type of the appli this method is
114         // called
115         // in TaskLauncher.loadBackupAndRestart() and
116         // TaskLauncher.loadOrReloadTask()
117         public Task jaceP2P_ConvertStream(ObjectInputStream stream) {
118                 return null;
119         }
120
121         public void printSav() {
122         }
123
124         // method to overload by user in the appli to specify the reduceAll method
125         public synchronized void reduceAll(Vector<?> recievedValue) {
126         }
127
128         public void setId(TaskId Id) {
129                 jaceTaskId = Id;
130                 jaceMyId = jaceTaskId.getRank();
131                 try {
132                         jaceSize = Register.Instance().getNbOfTasks();
133                 } catch (Exception e) {
134                         try {
135                                 System.err.println("SetId is bad !! " + e + " "
136                                                 + LocalHost.Instance().getName());
137                                 // jaceSize = Register.Instance().getListeOfTasks().getSize();
138                         } catch (Exception e2) {
139                                 System.err.println("Not localised the spawner : " + e2);
140                         }
141                 }
142         }
143
144         public void setParam(String[] arg) {
145                 this.jaceArgs = arg;
146
147         }
148
149         public void setJaceSize(int nbTasks) {
150                 jaceSize = nbTasks;
151         }
152
153         public TaskId getId() {
154                 return jaceTaskId;
155                 // return myIdd;
156         }
157
158         public int getTimeStep() {
159                 return timeStep;
160         }
161
162         // method to overload by user in the appli to identify the neighbors of a
163         // node of rank i
164
165         public int[] getDependencies(int i) {
166                 return null ;
167         }
168
169         // method to overload by user in the appli to init each task at beginning of
170         // computation
171
172         public void jaceP2P_InitTask() {
173         }
174
175         // TaskLauncher.loadOrReloadTask()
176         public void jaceP2P_ReinitTask() {
177         }
178
179         // method to overload by user in the appli to safeguard the results
180         public void saveResults() {
181         }
182
183         // method to overload by user in the appli to save only the requiered
184         // attributes (iter, vecteurs,......) this method is called in
185         // Task.jaceP2P_Save()
186         public Task jaceP2P_SaveFromCrash() {
187                 System.out.println("JaceSaveFromCrash Task");
188                 return null;
189         }
190
191         public void jaceFinalize() {
192
193                 finalize = true;
194                 System.out.println("Ready to Death Task:" + jaceMyId);
195
196                 try {
197                         Register.Instance().getSpawnerStub().killApplication(
198                                         LocalHost.Instance().getStub());
199
200                 } catch (Exception e) {
201                         System.err
202                                         .println("Cannot join the Spawner to kill application: "
203                                                         + e);
204                 }
205
206                 JaceDaemon.Instance().reinitDaemon();
207
208         }
209
210         public void jaceP2P_ReinitConv() {
211                 // System.out.println("reinit conv");
212
213                 timeStep++;
214                 state = "NORMAL";
215                 // last_iter.removeAllElements();
216                 try {
217                         reinitializeVectors();
218
219                 } catch (Exception e) {
220                         System.err.println("Error in jaceP2P_reinitConv():" + e);
221                 }
222                 pseudoPerBeg = false;
223                 pseudoPerEnd = false;
224                 synchronized (lastSave) {
225                         lastSave = new LastSave();
226                 }
227                 // reinit les var de conv de Task generique
228                 localCV_state = false;
229                 jaceP2P_globalCV_state = false;
230
231         }
232
233         public void reinitializeVectors() throws RemoteException {
234                 values.removeAllElements();
235                 dependancies.removeAllElements();
236                 neighbors.removeAllElements();
237                 neighborsValues.removeAllElements();
238                 resp.removeAllElements();
239         }
240
241         public long jaceP2P_getChronoValue() {
242                 long result = 0;
243
244                 try {
245                         result = Register.Instance().getSpawnerStub().getChronoValue(
246                                         Register.Instance().getAppliName());
247                 } catch (Exception e) {
248                         System.err
249                                         .println("JaceP2P_Error in Task.jaceP2P_getChronoValue() on SuperNode : "
250                                                         + e);
251                 }
252
253                 return result;
254         }
255
256         public void run() {
257                 //System.out.println("ds run()");
258         }
259
260         /**
261          *asynchronous sending (non blocking), of data object to an other task
262          * 
263          * @param buffer
264          *            the object (serializable) data to be send
265          *@param dest
266          *            the task id for receiver's task
267          *@param tag
268          *            the tag for message
269          */
270         @SuppressWarnings("static-access")
271         public void jaceSend(Object buffer, int dest, int tag, double erreur_locale) {
272                 // System.out.println("dest : " + dest);
273
274                 TaskId recev = null;
275                 try {
276                         recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(dest);
277                         
278                         if (recev == null) {
279                                 System.err.println("In jaceSend recv = null !");
280                                 try {
281                                         JaceSession.Instance().getTaskThread().sleep(10);
282                                         JaceSession.Instance().getTaskThread().yield();
283                                 } catch (Exception e) {
284                                 }
285                         }
286
287                         // TODO : virer ce else, mais chercher pkoi recev est null des fois
288                         else {
289                                 if (recev.getRank() != dest) {
290                                         System.err.println("Problem !! pas le meme dest que ds les param");
291                                 }
292
293                                 // creer le Message
294                                 // TODO : rajouter nom de l'appli ds Messag
295                                 try {
296                                         JaceInterface stub;
297                                         stub = recev.getHostStub();
298                                         if (stub.getTimeStep() == timeStep) {
299                                                 // System.out.println("************ "+verifNum+" *************");
300                                                 Message msg = new Message();
301                                                 msg.setParam(buffer, jaceTaskId, recev, tag, timeStep,
302                                                                 jaceP2P_Iteration, verifNum, erreur_locale);
303
304                                                 // on met le message ds
305                                                 // JaceBuffer.Instance() (la liste des Message a
306                                                 // envoyer)
307
308                                                 /*
309                                                  * if(JaceDaemon.Instance().getProtocol().equals("socket"
310                                                  * )) SenderSocket.Instance().buffer.add(msg); else
311                                                  * if(JaceDaemon.Instance().getProtocol().equals("rmi"))
312                                                  * SenderRmi.Instance().buffer.add(msg);
313                                                  */
314                                                 // System.out.println("putting message to "+dest+" in the buffer");
315                                                 Sender.Instance().getBuffer().add(msg);
316
317                                                 // if(JaceDaemon.Instance().getProtocol().equals("rmi"))
318                                                 // SenderRmi.Instance().getBuffer().add(msg);
319                                                 // else
320                                                 // SenderSocket.Instance().getBuffer().add(msg);
321                                         }
322                                 } catch (Exception e) {
323                                         System.err.println("Unable to send data message to " + dest
324                                                         + ": " + e);
325                                 }
326                                 // System.out.println("TASK : g mis un msg qui doit etre envoye");
327                                 // envoie toujours asynchrone !!!!!!!! : le Message partira
328                                 // instantanement car
329                                 // -Sender faisait JaceBuffer.Instance().get() qui contient un
330                                 // wait()
331                                 // -et ici, Task fait JaceBuffer.Instance().add(msg) qui
332                                 // contient un notifyAll()
333                                 try {
334                                         JaceSession.Instance().getTaskThread().sleep(10);
335                                         JaceSession.Instance().getTaskThread().yield();
336                                 } catch (Exception e) {
337                                 }
338                                 // System.out.println("TASK : je sort de jaceSend");
339                         }
340                 } catch (Exception e) {
341                         if (Register.Instance().getListeOfTasks() == null)
342                                 System.err.println("Tasks list is null: " + e);
343                 }
344
345         }
346
347         /**
348          *not-blocking reception of data object, return an object
349          * 
350          * @param dest
351          *            the task id for the task sender
352          *@param tag
353          *            the tag for message
354          */
355         @SuppressWarnings("static-access")
356         public Object jaceReceive(int sender, int tag) {
357                 if (jaceP2P_Iteration == 0 || postReloading) {
358                         try {
359                                 if (notExist(sender)) {
360                                         setDep(sender);
361                                         // last_iter.add(0);
362                                 }
363                         } catch (Exception e) {
364                         }
365                 }
366
367                 Message tmp = MsgQueue.Instance().get(sender, tag);
368                 if (tmp != null) {
369                         // System.out.println("recu MSG de tache " + sender + " (" +
370                         // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
371                         // + ")  MsgQueue : " +
372                         // MsgQueue.Instance().getSize()+" message src_tag="+tmp.getSrc_tag()+
373                         // " localError="+tmp.getLocalError());
374                         try {
375
376                                 int index;
377                                 if (underTh == true && jaceP2P_Iteration != 0 && !reloading) {
378                                         index = depIndex(sender);
379
380                                         if ((!(state.equals("VERIF")) || verifNum == tmp
381                                                         .getSrc_tag())) {
382
383                                                 // System.out.println("dep["+sender+"]=true, index="+index);
384                                                 setValues(index, true);
385                                         }
386                                 }
387
388                         } catch (Exception e) {
389                                 System.err.println("Error jaceReceive :" + e);
390                                 System.err.println("Sender=" + sender);
391                         }
392                         return (tmp.getData());
393                 } else {
394                         try {
395                                 // System.out.println("RIENNN recu de tache " + sender + " (" +
396                                 // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
397                                 // + ")   taille MsgQueue : " + MsgQueue.Instance().getSize());
398                         } catch (Exception e) {
399                                 if (Register.Instance().getListeOfTasks() == null)
400                                         System.err.println("Tasks list is null: " + e);
401                         }
402                         try {
403                                 JaceSession.Instance().getTaskThread().sleep(10);
404                                 JaceSession.Instance().getTaskThread().yield();
405                         } catch (Exception e) {
406                         }
407                         return null;
408                 }
409         }
410
411         public boolean notExist(int sender) {
412                 int i = dependancies.indexOf((Object) (new Integer(sender)));
413                 if (i == -1)
414                         return true;
415                 else
416                         return false;
417         }
418
419         public void setDep(int value) {
420                 dependancies.add(new Integer(value));
421                 values.add(new Boolean(false));
422         }
423
424         public int depIndex(int sender) {
425                 int index = dependancies.indexOf((Object) (new Integer(sender)));
426                 return index;
427         }
428
429         public void setValues(int index, boolean value) {
430                 values.setElementAt(new Boolean(value), index);
431         }
432
433         @SuppressWarnings("static-access")
434         public void jaceP2P_Save() {
435                 try {
436
437                         if (jaceP2P_Iteration == 0) {
438                                 // request 0 when task at barrier
439                                 // request 1 when only at convergence
440                                 broadcastTasks(0);
441                         } else if ((jaceP2P_Iteration % saveParameter) == 0) {
442                                 // clone the Task at that step of computations
443                                 saveRound++;
444                                 synchronized (sauv) {
445                                         sauv = getTask2save();
446
447                                         // send it to the corresponding BackupNode in a round robin
448                                         // fashion
449                                         new SaveTaskThread(sauv).start();
450                                 }
451                         }
452
453                         JaceSession.Instance().getTaskThread().sleep(10);
454                         JaceSession.Instance().getTaskThread().yield();
455                 } catch (Exception e) {
456                 }
457         }
458
459         // request 0 when Saving all task
460         // request 3 when saving only convergence data
461         @SuppressWarnings("static-access")
462         public void broadcastTasks(int request) {
463                 ByteArrayOutputStream stream;
464                 if (request == 0) {
465                         // 1 - clone the Task at that step of computations and serialize it
466                         try {
467                                 synchronized (sauv) {
468                                         sauv = getTask2save();
469                                         stream = convertTask2stream(sauv);
470
471                                         // 2 - create de saveTab if necessary
472                                         if (saveTab == null) {
473                                                 createSaveTab();
474                                         }
475                                         TaskId task = null;
476                                         JaceInterface stub = null;
477
478                                         // 3 - send the stream to all the BackupNodes
479                                         for (int i = 0; i < saveTab.length; i++) {
480                                                 // System.out.println("saveTab[" + i + "] = " +
481                                                 // saveTab[i]);
482
483                                                 // 3.1 - get the stub of destinatory
484                                                 // System.out.println("Saving on neighbor "+i);
485                                                 try {
486                                                         task = Register.Instance().getListeOfTasks()
487                                                                         .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
488                                                         stub = task.getHostStub();
489                                                 } catch (Exception e) {
490                                                         System.err
491                                                                         .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
492                                                                                         + e);
493                                                 }
494
495                                                 // 3.2 - send the stream to that stub
496
497                                                 // System.out.println("saving on second list");
498                                                 if (stub != null) {
499                                                         new BroadcastTaskThread(stub, jaceMyId, stream
500                                                                         .toByteArray(),
501                                                                         sauv.lastSave.getLastSave(), sauv.timeStep,
502                                                                         Register.Instance().getAppliName(), 0,
503                                                                         saveTab[i]).start();
504
505                                                 }
506
507                                         }
508                                 }
509                         } catch (Exception e) {
510                                 e.printStackTrace(System.out);
511                         }
512                 } else {
513                         synchronized (sauvConvg) {
514                                 sauvConvg = getBackupConvg2Save();
515                                 stream = convertBackupConv2stream(sauvConvg);
516
517                                 // 2 - create de saveTab if necessary
518                                 if (saveTab == null) {
519                                         createSaveTab();
520                                 }
521                                 TaskId task = null;
522                                 JaceInterface stub = null;
523
524                                 // 3 - send the stream to all the BackupNodes
525                                 for (int i = 0; i < saveTab.length; i++) {
526                                         // System.out.println("saveTab[" + i + "] = " + saveTab[i]);
527
528                                         // 3.1 - get the stub of destinatory
529                                         // System.out.println("Saving on neighbor "+i);
530                                         try {
531                                                 task = Register.Instance().getListeOfTasks()
532                                                                 .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
533                                                 stub = task.getHostStub();
534                                         } catch (Exception e) {
535                                                 System.err
536                                                                 .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
537                                                                                 + e);
538                                         }
539
540                                         // 3.2 - send the stream to that stub
541                                         /*
542                                          * if(request==3){ if (stub != null) { new
543                                          * BroadcastTaskThread(stub, jaceMyId, stream.toByteArray(),
544                                          * sauvConvg.lastSave.getLastSave(),timeStep,
545                                          * Register.Instance().getAppliName(),1,saveTab[i]).start();
546                                          * } //System.out.println("saving on second list"); } else{
547                                          */
548                                         if (stub != null) {
549                                                 new BroadcastTaskThread(stub, jaceMyId, stream
550                                                                 .toByteArray(), sauvConvg.lastSave
551                                                                 .getLastSave(), sauvConvg.timeStep, Register
552                                                                 .Instance().getAppliName(), 1, saveTab[i])
553                                                                 .start();
554                                         }
555
556                                         // System.out.println("saving on first list");
557                                 }
558                         }
559                 }
560                 try {
561                         // JaceSession.Instance().getTaskThread().sleep(10);
562                         JaceSession.Instance().getTaskThread().yield();
563                 }
564
565                 catch (Exception e) {
566                 }
567         }
568
569         private ByteArrayOutputStream convertBackupConv2stream(BackupConvg t) {
570                 // System.out.println("beginning of the checkpointing process......");
571                 ByteArrayOutputStream stream = new ByteArrayOutputStream();
572                 try {
573                         ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
574                         fluxOut.writeObject(t);
575                         fluxOut.close();
576                 } catch (Exception e) {
577                         System.err
578                                         .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
579                                                         + e);
580                 }
581                 // System.out.println("taille du tablo de byte : " +
582                 // stream.toByteArray().length + " bytes");
583                 return stream;
584         }
585
586         // convert the task in stream to send it to the BackupNode
587         private ByteArrayOutputStream convertTask2stream(Task t) {
588                 // System.out.println("beginning of the checkpointing process......");
589                 ByteArrayOutputStream stream = new ByteArrayOutputStream();
590                 try {
591                         ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
592                         fluxOut.writeObject(t);
593                         fluxOut.close();
594                 } catch (Exception e) {
595                         System.err
596                                         .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
597                                                         + e);
598                 }
599                 // System.out.println("taille du tablo de byte : " +
600                 // stream.toByteArray().length + " bytes");
601                 return stream;
602         }
603
604         // get the task of the appli with the data to save (in
605         // jaceP2P_SaveFromCrash() overloaded in the appli)
606         @SuppressWarnings("unchecked")
607         public Task getTask2save() {
608
609                 sauv = jaceP2P_SaveFromCrash();
610
611                 // System.out.println("Saving data at it="+sauv.it);
612                 // assign the default attributes of a task (params of the appli and
613                 // TaskId of the local node)
614                 sauv.setId(this.jaceTaskId);
615                 sauv.setParam(Register.Instance().getParams());
616                 sauv.timeStep = timeStep;
617
618                 // assign the iteration number that had the appli at the moment of the
619                 // checkpointing
620                 sauv.jaceP2P_Iteration = jaceP2P_Iteration;
621                 sauv.saveRound = saveRound;
622
623                 sauv.underTh = underTh;
624                 sauv.jaceP2P_globalCV_state = jaceP2P_globalCV_state;
625                 // System.out.println("I checkpoint the task at ite " +
626                 // sauv.jaceP2P_Iteration);
627                 // attributes needed to detect global convergence
628                 try {
629
630                         sauv.state = state;
631                         sauv.nb_not_recv = nb_not_recv;
632                         sauv.electedNode = electedNode;
633                         sauv.respSent = respSent;
634                         sauv.neighbors = (Vector) neighbors.clone();
635                         sauv.neighborsValues = (Vector) neighborsValues.clone();
636                         sauv.resp = (Vector) resp.clone();
637                         sauv.verifNum = verifNum;
638                         sauv.sendId = sendId;
639                         sauv.reduceAll = reduceAll;
640                         sauv.finalStep = finalStep;
641                         sauv.action = action;
642                         sauv.verdict = verdict;
643                         sauv.localCV_state = localCV_state;
644                         sauv.recievedVerdict = recievedVerdict;
645                         synchronized (lastSave) {
646                                 lastSave.increment();
647                                 sauv.lastSave = lastSave;
648                         }
649                 } catch (Exception e) {
650                         System.err.println("Problem with RMI !");
651                 }
652
653                 return sauv;
654         }
655
656         @SuppressWarnings("unchecked")
657         public BackupConvg getBackupConvg2Save() {
658
659                 try {
660
661                         // sauvConvg=new BackupConvg();
662                         sauvConvg.state = state;
663                         sauvConvg.underTh = underTh;
664                         sauvConvg.nb_not_recv = nb_not_recv;
665                         sauvConvg.electedNode = electedNode;
666                         sauvConvg.respSent = respSent;
667                         sauvConvg.neighbors = (Vector) neighbors.clone();
668                         sauvConvg.neighborsValues = (Vector) neighborsValues.clone();
669                         sauvConvg.resp = (Vector) resp.clone();
670                         sauvConvg.verifNum = verifNum;
671                         sauvConvg.sendId = sendId;
672                         sauvConvg.finalStep = finalStep;
673                         sauvConvg.action = action;
674                         sauvConvg.verdict = verdict;
675                         sauvConvg.localCV_state = localCV_state;
676                         sauvConvg.timeStep = timeStep;
677                         sauvConvg.recievedVerdict = recievedVerdict;
678                         sauvConvg.jaceP2P_Iteration = jaceP2P_Iteration;
679                         sauvConvg.reduceAll = reduceAll;
680
681                         synchronized (lastSave) {
682                                 lastSave.increment();
683                                 sauvConvg.lastSave = lastSave;
684                         }
685                         sauvConvg.initialized = true;
686                 } catch (Exception e) {
687                         System.err.println("Problem with RMI:" + e);
688                 }
689
690                 return sauvConvg;
691         }
692
693         private void createSaveTab() {
694
695                 saveTab = new int[BackupsManager.Instance().size()];
696
697                 // 2 - assign the taskId to each cell of the saveTab
698                 // System.out.println("in TASK");
699                 for (int i = 0; i < saveTab.length; i++) {
700                         saveTab[i] = BackupsManager.Instance().getBackupTaskAtIndex(i, 0)
701                                         .getTaskRank();
702                         // System.out.print(saveTab[i] + ", ");
703                 }
704                 // System.out.println("saveTab created..... size : " + saveTab.length);
705         }
706
707         public void setSaved(boolean bool) {
708                 synchronized (saved) {
709                         if (bool == true) {
710                                 if (saved[0] == false)
711                                         saved[0] = true;
712                                 else if (saved[1] == false)
713                                         saved[1] = true;
714                         } else {
715                                 saved[0] = false;
716                                 saved[1] = false;
717                         }
718                 }
719
720         }
721
722         public void waitForAck(int tag) {
723                 if (Register.Instance().getNumBackupNeighbors() != 0) {
724                         setSaved(false);
725                         broadcastTasks(tag);
726                         // Calendar cal = new GregorianCalendar();
727                         // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
728                         // System.out.println("sleeping till acknowledge saved");
729                         while (getSaved() == false)
730                                 try {
731                                         Thread.sleep(50);
732                                         count++;
733                                         if (count > 30) {
734                                                 try {
735                                                         Register newReg = Register.Instance()
736                                                                         .getSpawnerStub().getRegister(jaceMyId);
737                                                         if (newReg != null) {
738                                                                 Register.Instance().replaceBy(newReg);
739                                                         } else
740                                                                 System.out
741                                                                                 .println("I got a null register from the spawner");
742                                                         broadcastTasks(tag);
743                                                         System.out
744                                                                         .println("Sleeping till acknowledge saved");
745                                                         count = 0;
746                                                 } catch (Exception e2) {
747                                                         System.err
748                                                                         .println("Unable to get register from spawner :"
749                                                                                         + e2);
750                                                         e2.printStackTrace(System.out);
751                                                 }
752                                         }
753                                 } catch (Exception e) {
754                                 }
755                         // Calendar cal1 = new GregorianCalendar();
756                         // System.out.println("end save at time="+cal1.get(Calendar.MINUTE)+":"+cal1.get(Calendar.SECOND));
757                 }
758         }
759
760         public void jaceP2P_GlobalConvergence(boolean under_th) {
761
762                 try {
763
764                         Thread.yield();
765                         if (under_th != underTh && postReloading == false) {
766                                 // 1 - update sous seuil
767                                 underTh = under_th;
768                         }
769                         if (jaceP2P_Iteration == 0
770                                         && (postReloading == false || resp.size() == 0)) {
771                                 detectNeighbors(jaceMyId, jaceSize);
772                                 initialize_state();
773                                 sendId = -1;
774                                 verifNum = 0;
775                                 reloading = false;
776                                 broadcastTasks(3);
777                         }
778
779                         // affiche les valeurs des variables
780                         try {
781                                 // Calendar cal = new GregorianCalendar();
782                                 // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
783                                 // System.out.println("MyId="+jaceMyId+" sous seuil="+underTh+" etat="+state+" verifNum="+verifNum+" localCV="+localCV_state+" leader="+electedNode+" respSent="+respSent);
784                                 // System.out.println("dep="+getValues()+" pBeg="+pseudoPerBeg+" PEnd="+pseudoPerEnd+" sendId="+sendId);
785                                 // System.out.println("neigh="+nb_not_recv+" action="+action+" negative_resp="+testNegativeResp()+" reloading="+reloading+"\n"+" finalStep="+finalStep+" SavedResults="+savedResults+" NeghNotCV="+getNeighbourNotCV()+" postReload="+postReloading);
786                                 printResp();
787                                 printDep();
788                         } catch (Exception e) {
789                                 System.err.println("Error printing status in Task :" + e);
790                         }
791                         Thread.yield();
792                         if (action.equals("sendVerif") && state.equals("VERIF")
793                                         && postReloading == true) {
794                                 // System.out.println("je passe ds send verif");
795
796                                 new SendVerifThread(jaceMyId, sendId, verifNum).start();
797
798                                 // System.out.println("send Verif");
799
800                         } else if (action.equals("sendVerdict") && postReloading == true) {
801                                 // System.out.println("je passe ds send verdict-------");
802
803                                 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
804                                                 .start();
805
806                                 // System.out.println("verifNum="+verifNum);
807                         }
808
809                         if (state.equals("NORMAL") && action.equals("nothing")) {
810                                 if (underTh == false)
811                                         reinitializePPEr();
812                                 else
813                                 // System.out.println("sous seuil="+underTh+" PseudoPerBeg="+localStub.getPseudoPerBeg());
814                                 if (pseudoPerBeg == false) {
815                                         // System.out.println("ds la condition");
816                                         pseudoPerBeg = true;
817                                 } else if (pseudoPerEnd == true) {
818
819                                         localCV_state = true;
820                                         if (nb_not_recv == 0) {
821                                                 // localStub.setAction("sendVerif");
822                                                 try {
823                                                         broadcastVerif(jaceMyId, -1, verifNum + 1);
824                                                         // localStub.setAction("nothing");
825
826                                                         electedNode = true;
827                                                         recievedVerdict = true;
828                                                         initializeVerifLeader();
829                                                         state = "VERIF";
830                                                         broadcastTasks(3);
831                                                 } catch (Exception e) {
832                                                         System.err
833                                                                         .println("The verification message is not received: "
834                                                                                         + e);
835                                                         Register.Instance().viewAll();
836                                                 }
837                                         } else if (nb_not_recv == 1)
838                                                 if (sendId == -1) {
839
840                                                         int neighId = getNeighbourNotCV();
841                                                         TaskId recev = null;
842
843                                                         try {
844
845                                                                 recev = Register.Instance().getListeOfTasks()
846                                                                                 .getTaskIdOfRank(neighId);
847                                                                 JaceInterface stub = recev.getHostStub();
848                                                                 if (stub.setNbNeighboursNotConv(verifNum,
849                                                                                 jaceMyId, timeStep)) {
850                                                                         // stub.setLeftNeighbourCV(true);
851                                                                         // System.out.println("sent convergence message to "+neighId);
852                                                                         sendId = neighId;
853                                                                         state = "WAIT4V";
854                                                                         recievedVerdict = false;
855                                                                         broadcastTasks(3);
856                                                                 }
857                                                         } catch (Exception e) {
858                                                                 System.err
859                                                                                 .println("Unable to decrease the number of neighbors not converged on node :"
860                                                                                                 + recev.getHostName()
861                                                                                                 + " count ="
862                                                                                                 + count
863                                                                                                 + " . error:" + e);
864                                                                 count++;
865                                                                 Register.Instance().viewAll();
866                                                                 if (count > 3) {
867                                                                         try {
868                                                                                 int myRank;
869                                                                                 TaskId id = Register.Instance()
870                                                                                                 .getListeOfTasks()
871                                                                                                 .getTaskIdOfHostStub(
872                                                                                                                 LocalHost.Instance()
873                                                                                                                                 .getStub());
874                                                                                 myRank = id.getRank();
875                                                                                 Register.Instance().replaceBy(
876                                                                                                 Register.Instance()
877                                                                                                                 .getSpawnerStub()
878                                                                                                                 .getRegister(myRank));
879
880                                                                                 count = 0;
881                                                                         } catch (Exception e2) {
882                                                                                 System.err
883                                                                                                 .println("Unable to contact the spawner: "
884                                                                                                                 + e2);
885                                                                         }
886                                                                 }
887                                                         }
888                                                 }
889                                 } else if (pseudoPerBeg == true && getValues())
890                                         pseudoPerEnd = true;
891                         } else if (state.equals("WAIT4V") && nb_not_recv == 0) {
892                                 int neighId = getRankLeader();
893                                 if (neighId == -1) {
894                                         try {
895                                                 broadcastVerif(jaceMyId, -1, verifNum + 1);
896                                                 electedNode = true;
897                                                 recievedVerdict = true;
898                                                 initializeVerifLeader();
899                                                 state = "VERIF";
900
901                                                 broadcastTasks(3);
902                                         } catch (Exception e) {
903                                                 System.err
904                                                                 .println("The verification message is not received:"
905                                                                                 + e);
906                                         }
907                                 } else if (jaceMyId > neighId) {
908                                         try {
909                                                 broadcastVerif(jaceMyId, -1, verifNum + 1);
910                                                 electedNode = true;
911                                                 recievedVerdict = true;
912                                                 initializeVerifLeader();
913                                                 state = "VERIF";
914                                                 broadcastTasks(3);
915                                         } catch (Exception e) {
916                                                 System.err
917                                                                 .println("The verification message is not received: "
918                                                                                 + e);
919                                         }
920                                 }
921                         } else if (state.equals("WAIT4V")) {
922                                 if (underTh == false) {
923                                         localCV_state = false;
924                                         broadcastTasks(3);
925                                 }
926                         } else if (state.equals("VERIF")) {
927                                 // if(localStub.getNewerDep(0) && localStub.getNewerDep(1))
928                                 // localStub.setPseudoPerEnd(true);
929                                 // if(!underTh)
930                                 if (electedNode == true) {
931                                         if ((!underTh && !postReloading) || !localCV_state
932                                                         || testNegativeResp()) {
933                                                 try {
934
935                                                         broadcastVerdict(jaceMyId, -2, verifNum + 1, false);
936                                                         verifNum++;
937                                                         initialize_state();
938                                                         reinitializePPEr();
939                                                         broadcastTasks(3);
940                                                 } catch (Exception e) {
941                                                 }
942                                         } else if (postReloading) {
943                                                 if (recievedAllResp())
944                                                         if (!testNegativeResp()) {
945                                                                 try {
946                                                                         broadcastVerdict(jaceMyId, -2, verifNum,
947                                                                                         true);
948                                                                         if (finalStep == true
949                                                                                         && state.equals("VERIF")) {
950                                                                                 initializeSavLeader();
951                                                                                 state = "SAVING";
952                                                                                 waitForAck(0);
953                                                                         } else {
954                                                                                 state = "FINISHED";
955                                                                                 waitForAck(1);
956                                                                         }
957                                                                 } catch (Exception e) {
958                                                                         System.err.println("Error: " + e);
959                                                                 }
960                                                         }
961                                         } else if (pseudoPerEnd == true) {
962                                                 if (recievedAllResp())
963                                                         if (!testNegativeResp()) {
964                                                                 try {
965                                                                         broadcastVerdict(jaceMyId, -2, verifNum,
966                                                                                         true);
967                                                                         if (finalStep == true
968                                                                                         && state.equals("VERIF")) {
969                                                                                 initializeSavLeader();
970                                                                                 state = "SAVING";
971                                                                                 waitForAck(0);
972                                                                         } else {
973                                                                                 state = "FINISHED";
974                                                                                 waitForAck(3);
975                                                                         }
976
977                                                                 } catch (Exception e) {
978                                                                         System.out.println("erreur: " + e);
979                                                                 }
980                                                         } else {
981                                                                 try {
982
983                                                                         broadcastVerdict(jaceMyId, -2,
984                                                                                         verifNum + 1, false);
985                                                                         verifNum++;
986                                                                         initialize_state();
987                                                                         reinitializePPEr();
988                                                                         broadcastTasks(3);
989                                                                 } catch (Exception e) {
990                                                                         System.err
991                                                                                         .println("Unable to broadcast a negative verdict :"
992                                                                                                         + e);
993                                                                 }
994                                                         }
995                                         }
996
997                                         else if (getValues())
998                                                 pseudoPerEnd = true;
999                                 } else if (!respSent) {
1000                                         if (!underTh || !localCV_state || testNegativeResp()) {
1001                                                 if (action.equals("nothing")) {
1002                                                         int neighId = sendId;
1003                                                         try {
1004                                                                 TaskId recev = null;
1005                                                                 recev = Register.Instance().getListeOfTasks()
1006                                                                                 .getTaskIdOfRank(neighId);
1007                                                                 JaceInterface stub = recev.getHostStub();
1008                                                                 // System.out.println("tryin to send negative response to "+neighId);
1009                                                                 stub.response(jaceMyId, verifNum, -1, null);
1010                                                                 // System.out.println("send negative response to "+neighId);
1011                                                                 respSent = true;
1012                                                                 broadcastTasks(3);
1013
1014                                                         } catch (Exception e) {
1015                                                                 System.err
1016                                                                                 .println("Response not received:" + e);
1017                                                                 Register.Instance().viewAll();
1018                                                         }
1019                                                 }
1020                                         } else if (pseudoPerEnd) {
1021                                                 // System.out.print("The daemon can send a response ");
1022                                                 int index = recievedAllRespMinusOne();
1023                                                 // System.out.println("to node of index ="+index);
1024                                                 if (index != -1) {
1025                                                         int rank = getNeighborRank(index);
1026                                                         // if(jaceMyId!=jaceSize-1 &&
1027                                                         // localStub.getSendRight()==true){
1028                                                         try {
1029                                                                 TaskId recev = null;
1030                                                                 recev = Register.Instance().getListeOfTasks()
1031                                                                                 .getTaskIdOfRank(rank);
1032                                                                 JaceInterface stub = recev.getHostStub();
1033
1034                                                                 if (getResp(index) == 1) {
1035                                                                         stub.response(jaceMyId, verifNum, 1,
1036                                                                                         reduceAll);
1037                                                                         // System.out.println("send positive response to"+rank);
1038                                                                 } else {
1039                                                                         stub.response(jaceMyId, verifNum, -1, null);
1040                                                                         // System.out.println("send negative response to"+rank);
1041                                                                 }
1042                                                                 respSent = true;
1043                                                                 broadcastTasks(3);
1044
1045                                                         } catch (Exception e) {
1046                                                                 System.err.println("Response not received by "
1047                                                                                 + rank + ": " + e);
1048                                                                 Register.Instance().viewAll();
1049                                                         }
1050                                                 }
1051                                         } else if (getValues())
1052                                                 pseudoPerEnd = true;
1053                                 }
1054
1055                         } else if (state.equals("SAVING") && !action.equals("sendVerdict")) {
1056
1057                                 if (savedResults == false) {
1058
1059                                         saveResults();
1060                                         savedResults = true;
1061                                         broadcastTasks(3);
1062                                 } else if (electedNode) {
1063                                         if (recievedAllResp())
1064                                                 try {
1065                                                         // System.out.println("recieved all responses");
1066                                                         JaceSpawnerInterface spawnerStub = Register
1067                                                                         .Instance().getSpawnerStub();
1068                                                         // System.out.println("##### callin spawnerStub.setFinished(true) #####");
1069                                                         spawnerStub.setOver(true);
1070                                                         // localStub.setState("FINISHED");
1071                                                 } catch (Exception e) {
1072                                                         System.err.println("Error" + e);
1073                                                 }
1074                                 } else if (!respSent) {
1075                                         int index = recievedAllRespMinusOne();
1076                                         if (index != -1) {
1077                                                 int rank = getNeighborRank(index);
1078
1079                                                 // if(jaceMyId!=jaceSize-1 &&
1080                                                 // localStub.getSendRight()==true){
1081                                                 try {
1082                                                         TaskId recev = null;
1083                                                         recev = Register.Instance().getListeOfTasks()
1084                                                                         .getTaskIdOfRank(rank);
1085                                                         JaceInterface stub = recev.getHostStub();
1086                                                         if (stub.getReloading() == false
1087                                                                         && stub.getState().equals("SAVING")) {
1088                                                                 action = "sendResponse";
1089                                                                 stub.response(jaceMyId, verifNum, 1, null);
1090                                                                 respSent = true;
1091                                                                 action = "nothing";
1092                                                                 broadcastTasks(3);
1093                                                                 // System.out.println("send response to"+rank);
1094                                                         }
1095                                                 } catch (Exception e) {
1096                                                         System.err.println("Response not received" + e);
1097                                                 }
1098                                         }
1099                                 }
1100                         } else if (state.equals("FINISHED")
1101                                         && !action.equals("sendVerdict") && recievedVerdict) {
1102
1103                                 jaceP2P_globalCV_state = true;
1104                                 // System.out.println("Finished");
1105                                 // System.out.println("Finished");
1106                                 // System.out.println("Finished");
1107                                 // System.out.println("Finished");
1108                                 System.out.println("Finished");
1109                         }
1110
1111                         if (postReloading == true)
1112                                 postReloading = false;
1113
1114                 } catch (Exception e) {
1115                         System.err.println("Exception in Global Convergence :" + e);
1116                         e.printStackTrace(System.out);
1117                         Register.Instance().viewAll();
1118                 }
1119                 // mon1.stop();
1120         }
1121
1122         public synchronized int getResp(int index) throws RemoteException {
1123                 int res = 1;
1124                 for (int i = 0; i < resp.size(); i++)
1125                         if (i != index)
1126                                 if (((Integer) resp.get(i)).intValue() == -1)
1127                                         res = -1;
1128                 return res;
1129         }
1130
1131         public synchronized void response(int neighId, int tag, int response,
1132                         Vector<?> recievedValue) throws RemoteException {
1133                 // System.out.println("inside response function");
1134                 // System.out.println("sleeping till not reloading");
1135                 while (reloading == true) {
1136                         try {
1137                                 Thread.sleep(10);
1138                                 // System.out.println("sleeping till not reloading");
1139                         } catch (Exception e) {
1140                         }
1141                 }
1142                 if (verifNum == tag) {
1143                         // System.out.println("inside condition");
1144                         int indexNeigh = neighbors.indexOf((Object) neighId);
1145                         // System.out.println("after gettin index="+index);
1146                         // System.out.println("index="+indexNeigh+" size de resp ="+resp.size());
1147                         // printResp();
1148                         // try{
1149                         // int xyz=((Integer)(resp.elementAt(indexNeigh))).intValue();
1150                         // }catch(Exception e){
1151                         // System.out.println("fuckkkkkkkkkkkkkkkkkk error:"+e);
1152                         // }
1153
1154                         if (response == 1
1155                                         && !state.equals("SAVING")
1156                                         && (((Integer) (resp.elementAt(indexNeigh))).intValue()) != 1) {
1157                                 // System.out.println("calling reduceAll()");
1158                                 reduceAll(recievedValue);
1159                         }
1160                         // System.out.println("after calculating reduceAll");
1161                         resp.setElementAt(response, indexNeigh);
1162
1163                         // System.out.println("get response ............");
1164                         waitForAck(3);
1165
1166                 } else
1167                         throw new RemoteException();
1168         }
1169
1170         public void initializeSavLeader() {
1171                 for (int i = 0; i < resp.size(); i++)
1172                         resp.setElementAt(0, i);
1173                 respSent = false;
1174         }
1175
1176         public boolean recievedAllResp() {
1177                 boolean bool = true;
1178                 for (int i = 0; i < resp.size(); i++)
1179                         if (((Integer) (resp.get(i))).intValue() == 0) {
1180                                 bool = false;
1181                                 break;
1182                         }
1183                 return bool;
1184         }
1185
1186         public int recievedAllRespMinusOne() {
1187                 int nbOfZeros = 0;
1188                 int indexOfZero = -1;
1189                 for (int i = 0; i < resp.size(); i++)
1190                         if (((Integer) (resp.get(i))).intValue() == 0) {
1191                                 nbOfZeros++;
1192                                 indexOfZero = i;
1193                         }
1194                 if (nbOfZeros > 1)
1195                         return -1;
1196                 else
1197                         return indexOfZero;
1198         }
1199
1200         public synchronized boolean setNbNeighboursNotConv(int tag, int idNeigh,
1201                         int neighborTimeStep) throws RemoteException {
1202                 // System.out.println("ds  setNbNeighboursNotConv !!!!!!!!!!!!!!!");
1203                 if (tag == verifNum && !action.equals("sendVerdict")
1204                                 && neighborTimeStep == timeStep) {
1205                         if (idNeigh == -1) {
1206                                 nb_not_recv--;
1207                                 return false;
1208                         } else {
1209                                 // System.out.println("sleeping till not reloading");
1210                                 while (reloading == true) {
1211                                         try {
1212                                                 Thread.sleep(10);
1213                                                 // System.out.println("sleeping till not reloading");
1214                                         } catch (Exception e) {
1215                                         }
1216                                 }
1217                                 int i = neighbors.indexOf((Object) idNeigh);
1218                                 if (((Boolean) neighborsValues.get(i)).booleanValue() == false) {
1219                                         nb_not_recv--;
1220                                         // System.out.println("le noeud "+idNeigh+" a envoyer un message de pseudoconvergence");
1221                                         neighborsValues.setElementAt(new Boolean(true), i);
1222                                         waitForAck(3);
1223
1224                                 }
1225                         }
1226                         return true;
1227                 } else if (tag == verifNum - 1 && !action.equals("sendVerdict")
1228                                 && reloading == false && neighborTimeStep == timeStep
1229                                 && jaceP2P_Iteration != 0)
1230                         return true;
1231                 else
1232                         return false;
1233         }
1234
1235         public void detectNeighbors(int id, int jaceSize) {
1236                 // System.out.println("detect neighbors !!!! ");
1237                 int d = 0;
1238                 while (Math.pow(2, d) < jaceSize) {
1239                         if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize)) {
1240                                 neighbors.add((int) (id + Math.pow(2, d)));
1241                                 neighborsValues.add(new Boolean(false));
1242                                 resp.add(0);
1243                         }
1244                         if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d)) {
1245                                 neighbors.add((int) (id - Math.pow(2, d)));
1246                                 neighborsValues.add(new Boolean(false));
1247                                 resp.add(0);
1248                         }
1249                         d++;
1250                 }
1251         }
1252
1253         public synchronized void initialize_state() {
1254
1255                 // System.out.println("initialiser\n");
1256
1257                 nb_not_recv = neighbors.size();
1258                 for (int i = 0; i < neighbors.size(); i++)
1259                         neighborsValues.setElementAt(new Boolean(false), i);
1260                 for (int i = 0; i < resp.size(); i++)
1261                         resp.setElementAt(0, i);
1262                 underTh = false;
1263                 electedNode = false;
1264                 localCV_state = false;
1265                 verdict = false;
1266
1267                 state = "NORMAL";
1268         }
1269
1270         public int getNeighborRank(int index) throws RemoteException {
1271                 int rank = ((Integer) (neighbors.get(index))).intValue();
1272                 return rank;
1273         }
1274
1275         public synchronized int getNeighbourNotCV() {
1276                 int neighId = -1;
1277                 for (int i = 0; i < neighbors.size(); i++)
1278                         if (((Boolean) neighborsValues.get(i)).booleanValue() == false)
1279                                 neighId = ((Integer) neighbors.elementAt(i)).intValue();
1280                 return neighId;
1281         }
1282
1283         public boolean getValues() {
1284                 boolean bool = true;
1285                 for (int i = 0; i < values.size(); i++)
1286                         if (((Boolean) values.elementAt(i)).equals(new Boolean(false))) {
1287                                 bool = false;
1288                                 break;
1289                         }
1290                 // System.out.println("getValues() have been called and it returned "+bool);
1291                 return bool;
1292         }
1293
1294         public boolean testNegativeResp() {
1295                 boolean bool = false;
1296                 for (int i = 0; i < resp.size(); i++)
1297                         if (((Integer) (resp.get(i))).intValue() == -1) {
1298                                 bool = true;
1299                                 break;
1300                         }
1301                 return bool;
1302         }
1303
1304         public void printResp() {
1305                 // for(int i=0;i<resp.size();i++)
1306                 // System.out.print(" resp["+((Integer)neighbors.get(i)).intValue()+"]="+((Integer)resp.get(i)).intValue());
1307                 // System.out.print("\n");
1308         }
1309
1310         public void printDep() {
1311                 // for(int i=0;i<values.size();i++)
1312                 // System.out.print(" dep["+((Integer)dependancies.get(i)).intValue()+"]="+((Boolean)values.get(i)).booleanValue());
1313                 // System.out.print("\n");
1314         }
1315
1316         public void broadcastVerif(int id, int neighId, int tag)
1317                         throws RemoteException {
1318
1319                 TaskId recev = null;
1320                 // TaskId local =
1321                 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1322                 JaceInterface stub;
1323                 // System.out.println("Id="+id+" neighId="+ neighId+" tag="+tag);
1324                 for (int i = 0; i < neighbors.size(); i++)
1325                         if (neighId != ((Integer) neighbors.get(i)).intValue()) {
1326                                 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1327                                                 ((Integer) neighbors.get(i)).intValue());
1328                                 stub = recev.getHostStub();
1329                                 stub.initializeVerif(tag);
1330                                 // System.out.println("broadcast verification to :"+((Integer)neighbors.get(i)).intValue());
1331                         }
1332                 if (action.equals("sendVerif"))
1333                         action = "nothing";
1334         }
1335
1336         public void initializeVerifLeader() throws RemoteException {
1337                 reinitializePPEr();
1338                 for (int i = 0; i < resp.size(); i++)
1339                         resp.setElementAt(0, i);
1340                 verifNum++;
1341                 respSent = false;
1342         }
1343
1344         public void initializeVerif(int tag) throws RemoteException {
1345                 // System.out.println("Inside initializeVerif @@@@@");
1346                 // System.out.println("sleeping till not reloading");
1347                 while (reloading == true) {
1348                         try {
1349                                 Thread.sleep(10);
1350                                 // System.out.println("sleeping till not reloading");
1351                         } catch (Exception e) {
1352                         }
1353                 }
1354                 if (verifNum + 1 == tag)
1355                         if (state.equals("WAIT4V")) {
1356                                 action = "sendVerif";
1357                                 reinitializePPEr();
1358                                 for (int i = 0; i < resp.size(); i++)
1359                                         resp.setElementAt(0, i);
1360                                 verifNum++;
1361
1362                                 respSent = false;
1363                                 state = "VERIF";
1364                                 waitForAck(3);
1365
1366                                 new SendVerifThread(jaceMyId, sendId, verifNum).start();
1367
1368                         } else if (state.equals("VERIF")) {
1369                         } else {
1370                                 throw new RemoteException();
1371
1372                         }
1373
1374         }
1375
1376         public boolean getSaved() {
1377                 if (saved[1])
1378                         return true;
1379                 else
1380                         return false;
1381         }
1382
1383         public void reinitializePPEr() {
1384                 pseudoPerBeg = false;
1385                 pseudoPerEnd = false;
1386                 for (int i = 0; i < values.size(); i++)
1387                         values.setElementAt(new Boolean(false), i);
1388
1389         }
1390
1391         public void broadcastVerdict(int id, int neighId, int tag, boolean verd)
1392                         throws RemoteException {
1393                 Boolean verdicto = verd;
1394                 TaskId recev = null;
1395                 // TaskId local =
1396                 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1397                 JaceInterface stub;
1398                 for (int i = 0; i < neighbors.size(); i++)
1399                         if (((Integer) neighbors.get(i)).intValue() != neighId) {
1400                                 // System.out.println("broadcasting verdict "+verdicto+" to node of Rank "+((Integer)neighbors.get(i)).intValue());
1401                                 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1402                                                 ((Integer) neighbors.get(i)).intValue());
1403                                 stub = recev.getHostStub();
1404                                 stub.savOrFinOrRest(tag, timeStep, verdicto, reduceAll);
1405                         }
1406                 action = "nothing";
1407                 if (verdict == false)
1408                         sendId = -1;
1409         }
1410
1411         public void savOrFinOrRest(int tag, int step, boolean verd,
1412                         Vector<?> reduceAll) {
1413                 // System.out.println("Recieved verd "+verd+" sleeping till not reloading");
1414                 while (reloading == true) {
1415                         try {
1416                                 Thread.sleep(10);
1417                                 // System.out.println("sleeping till not reloading");
1418                         } catch (Exception e) {
1419                         }
1420                 }
1421                 if (verd == true) {
1422                         if (verifNum == tag && timeStep == step && state.equals("VERIF")) {
1423                                 action = "sendVerdict";
1424                                 if (finalStep == true) {
1425                                         for (int i = 0; i < resp.size(); i++)
1426                                                 resp.setElementAt(0, i);
1427                                         // System.out.println("sleeping till response is sent");
1428                                         while (action.equals("sendResponse") || respSent == false)
1429                                                 try {
1430                                                         Thread.sleep(10);
1431                                                         // System.out.println("sleeping till response is sent");
1432                                                 } catch (Exception e) {
1433                                                 }
1434                                         respSent = false;
1435                                         state = "SAVING";
1436                                         verdict = true;
1437                                         recievedVerdict = true;
1438
1439                                 } else {
1440                                         state = "FINISHED";
1441                                         verdict = true;
1442                                         recievedVerdict = true;
1443                                 }
1444                                 // System.out.println("//// Stetting reduceAll\\\\");
1445                                 this.reduceAll = reduceAll;
1446                                 waitForAck(3);
1447
1448                                 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
1449                                                 .start();
1450                         }
1451                 } else if (verifNum < tag && timeStep == step) {
1452                         verifNum = tag;
1453                         initialize_state();
1454                         reinitializePPEr();
1455                         verdict = false;
1456                         action = "sendVerdict";
1457                         recievedVerdict = true;
1458                         waitForAck(3);
1459
1460                         new SendVerdictThread(jaceMyId, sendId, verifNum, verdict).start();
1461                 }
1462
1463         }
1464
1465         public int getRankLeader() throws RemoteException {
1466                 int neighId = -1;
1467                 for (int i = 0; i < neighbors.size(); i++) {
1468                         TaskId recev = null;
1469                         recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1470                                         ((Integer) (neighbors.get(i))).intValue());
1471                         JaceInterface stub = recev.getHostStub();
1472                         if (stub.getNbNeighboursNotConv() == 0)
1473                                 neighId = ((Integer) (neighbors.get(i))).intValue();
1474                 }
1475                 return neighId;
1476         }
1477
1478         class SaveTaskThread extends Thread {
1479                 Task sauv;
1480
1481                 public SaveTaskThread(Task s) {
1482                         this.sauv = s;
1483                 }
1484
1485                 @SuppressWarnings("static-access")
1486                 public void run() {
1487                         // If array of BackupNodes not created, create it
1488                         if (saveTab == null) {
1489                                 createSaveTab();
1490                         }
1491
1492                         if (finalize == false) { // if finalization step, it will crash
1493                                 // because register purged yet
1494
1495                                 ByteArrayOutputStream stream = convertTask2stream(sauv);
1496
1497                                 // find the BackupNode to send
1498                                 int taskRankDest;
1499                                 TaskId task = null;
1500                                 JaceInterface stub = null;
1501                                 boolean sent = false;
1502                                 int j = 0;
1503
1504                                 while (j < saveTab.length && sent == false) {
1505                                         // 1 - find the remote task Id to send the backup to
1506                                         if (jaceSize < (2 * Register.Instance()
1507                                                         .getNumBackupNeighbors() + 1)) {
1508                                                 taskRankDest = saveTab[saveRound % (jaceSize - 1)];
1509                                         } else {
1510                                                 taskRankDest = saveTab[saveRound
1511                                                                 % (2 * Register.Instance()
1512                                                                                 .getNumBackupNeighbors())];
1513                                         }
1514
1515                                         // 2 - knowing the destination task Id, get the stub of the
1516                                         // corresponding node
1517                                         try {
1518                                                 task = Register.Instance().getListeOfTasks()
1519                                                                 .getTaskIdOfRank(taskRankDest); // ///////////////////////////pb
1520                                                 stub = task.getHostStub();
1521                                         } catch (Exception e) {
1522                                                 System.err
1523                                                                 .println("Problem in SaveTaskThread on assignation line in save : "
1524                                                                                 + e);
1525                                         }
1526                                         // System.out.println("ite " + jaceP2P_Iteration +
1527                                         // ".......... SENDING on task " + taskRankDest);
1528
1529                                         // if no stub there is a problem
1530                                         if (stub == null) {
1531                                                 j++;
1532                                                 saveRound++;
1533                                                 // System.out.println("unable to SEND backup on task of rank "
1534                                                 // + taskRankDest);
1535                                         }
1536                                         // 3 - try to send the stream
1537                                         else {
1538                                                 // if there is a stub, send the stream to that node
1539                                                 try {
1540                                                         stub.saveTask(jaceMyId, stream.toByteArray(),
1541                                                                         sauv.lastSave.getLastSave(), sauv.timeStep,
1542                                                                         Register.Instance().getAppliName(), 0);
1543                                                         // System.out.println("saved data on "+taskRankDest+" iteration= "+sauv.lastSave.getLastSave()+
1544                                                         // "timeStep="+sauv.timeStep+" !!!!!!!!");
1545
1546                                                         // Vector v = stub.getIterationOfBackup(jaceMyId,0);
1547                                                         // int ite=((Integer)v.get(0)).intValue();
1548                                                         // System.out.println("******************************************sauvegarde de donnees: ite="+ite+" taskDest="+taskRankDest+" ******************************************************");
1549                                                         sent = true;
1550
1551                                                 } catch (Exception e) {
1552                                                         System.err
1553                                                                         .println("JaceP2P_Error in Task.jaceP2P_Save() when saving stream: "
1554                                                                                         + e);
1555                                                         j++;
1556                                                         saveRound++;
1557
1558                                                 }
1559                                         }
1560
1561                                         try {
1562                                                 JaceSession.Instance().getTaskThread().sleep(10);
1563                                                 JaceSession.Instance().getTaskThread().yield();
1564                                         } catch (Exception e) {
1565                                         }
1566                                 }
1567
1568                                 // 5 - if stream not sent at all, do something (WHAT ???)
1569                                 if (j > saveTab.length) {
1570                                         System.err
1571                                                         .println("No more alive neighbors for storing the Backup");
1572                                         // TODO : what to do if no BackupNode has answered ???
1573                                 }
1574
1575                         }
1576                 }
1577         }
1578
1579 }
1580
1581 class BroadcastTaskThread extends Thread {
1582         JaceInterface stub;
1583         int rank;
1584         byte[] tsk;
1585         int iteration;
1586         int tag;
1587         String appliName;
1588         int timeStep;
1589         int dest;
1590
1591         // constructor
1592         public BroadcastTaskThread(JaceInterface theStub, int theRank,
1593                         byte[] theTsk, int theIteration, int timeStep, String theAppliName,
1594                         int tag, int dest) {
1595                 this.stub = theStub;
1596                 this.rank = theRank;
1597                 this.tsk = theTsk;
1598                 this.iteration = theIteration;
1599                 this.timeStep = timeStep;
1600                 this.appliName = theAppliName;
1601                 this.tag = tag;
1602                 this.dest = dest;
1603                 // System.out.println("tag="+tag);
1604         }
1605
1606         // method launched by start()
1607         public void run() {
1608                 try {
1609                         stub.saveTask(rank, tsk, iteration, timeStep, appliName, tag);
1610                         JaceSession.Instance().getTaskObject().setSaved(true);
1611
1612                         if (tag == 0) {
1613                                 // Vector v = stub.getIterationOfBackup(rank,0);
1614                                 // int ite=((Integer)v.get(0)).intValue();
1615                                 // System.out.println("++++++++++ Broadcast data ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1616
1617                         } else {
1618                                 // Vector v = stub.getIterationOfBackup(rank,1);
1619                                 // int ite=((Integer)v.get(0)).intValue();
1620                                 // System.out.println("++++++++++ Broadcast dataConvg ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1621                                 // System.out.println("+++++++++++++++++++++++++++++++++++++++++ Broadcast convgData ite="+ite+" dest="+dest+" +++++++++++++++++++++++++++++++++++++++++++++++++++++");
1622
1623                         }
1624                 } catch (Exception e) {
1625                         System.err
1626                                         .println("Node not reachable by JaceServer.saveTask() in BroadcastTaskThread :"
1627                                                         + e);
1628                 }
1629         }
1630 }
1631