Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Correction of some bugs and performance enhancement.
[jaceP2P.git] / src / jaceP2P / Task.java
1 package jaceP2P;
2
3 import java.io.ByteArrayOutputStream;
4 import java.io.ObjectInputStream;
5 import java.io.ObjectOutputStream;
6 import java.rmi.ConnectException;
7 import java.rmi.RemoteException;
8 import java.util.ArrayList;
9
10 //import java.util.Calendar;
11 //import java.util.GregorianCalendar;
12
13 //import com.jamonapi.*;
14
15 public class Task implements Runnable, Cloneable, java.io.Serializable {
16         private static final long serialVersionUID = 1L;
17
18         public double errorLoc = 0;
19         public int saveParameter;
20         public int jaceMyId;
21         public int jaceSize;
22         public TaskId jaceTaskId = null;
23         public String[] jaceArgs;
24         public boolean reloading = false;
25         public String state = "NORMAL";
26         public int nb_not_recv;
27         public boolean electedNode = false;
28         public boolean respSent = false;
29         public ArrayList<Integer> resp;
30         public int verifNum = 0;
31         public LastSave lastSave = new LastSave();
32         public ArrayList<Integer> neighbors;
33         public ArrayList<Boolean> neighborsValues;
34         public ArrayList<Integer> dependancies;
35         public ArrayList<Boolean> values;
36         public int sendId;
37         public String action = "nothing";
38         public boolean verdict = false;
39         public boolean recievedVerdict = false;
40         public int jaceP2P_Iteration = 0;
41         public boolean finalStep = false;
42         // public Monitor mon1=null;
43         public int timeStep = 0; // time discretization counter for non-stationary
44         // problems
45         public boolean localCV_state;
46         public boolean jaceP2P_globalCV_state = false;
47         // attribute to know if an appli has finished yet
48         private boolean finalize = false;
49         public boolean pseudoPerBeg;
50         public boolean pseudoPerEnd;
51         public boolean underTh = false;
52         public boolean saved[];
53         // attributes for BackupNodes
54         private int saveRound = 0;
55         private int[] saveTab = null;
56         public boolean savedResults = false;
57         public Task sauv;
58         public BackupConvg sauvConvg = new BackupConvg();
59         public boolean postReloading = false;
60         public ArrayList<Object> reduceAll;
61
62         int cpt = 0;
63         int count = 0;
64         Thread th;
65
66         public Task() {
67                 reduceAll = new ArrayList<Object>();
68                 dependancies = new ArrayList<Integer>();
69                 values = new ArrayList<Boolean>();
70                 resp = new ArrayList<Integer>();
71                 saved = new boolean[2];
72                 sauv = this;
73                 neighbors = new ArrayList<Integer>();
74                 neighborsValues = new ArrayList<Boolean>();
75         }
76
77         public void getBackupForNewNode(int rank) {
78                 TaskId task = null;
79                 JaceInterface stub = null;
80                 task = Register.Instance().getListeOfTasks().getTaskIdOfRank(rank);
81
82                 stub = task.getHostStub();
83
84                 // if no stub there is a problem
85                 if (stub == null) {
86                         System.err.println("Unable to send backup on task of rank " + rank);
87                 } else {
88                         // if there is a stub, send the stream to that node
89                         try {
90                                 ByteArrayOutputStream stream;
91                                 synchronized (sauv) {
92                                         stream = convertTask2stream(sauv);
93                                         stub.saveTask(jaceMyId, stream.toByteArray(), sauv.lastSave
94                                                         .getLastSave(), sauv.timeStep, Register.Instance()
95                                                         .getAppliName(), 0);
96                                 }
97                                 synchronized (sauvConvg) {
98                                         if (sauvConvg.initialized == true) {
99                                                 stream = convertBackupConv2stream(sauvConvg);
100                                                 stub.saveTask(jaceMyId, stream.toByteArray(),
101                                                                 sauvConvg.lastSave.getLastSave(),
102                                                                 sauvConvg.timeStep, Register.Instance()
103                                                                                 .getAppliName(), 1);
104                                         }
105                                 }
106                         } catch (Exception e) {
107                                 System.err.println("Error in getBackupForNewNode :" + e);
108                                 e.printStackTrace(System.out);
109                         }
110                 }
111         }
112
113         // method to overload by user in the appli to convert the stream sent by the
114         // BackupNode to a Task object of the type of the appli this method is
115         // called
116         // in TaskLauncher.loadBackupAndRestart() and
117         // TaskLauncher.loadOrReloadTask()
118         public Task jaceP2P_ConvertStream(ObjectInputStream stream) {
119                 return null;
120         }
121
122         public void printSav() {
123         }
124
125         // method to overload by user in the appli to specify the reduceAll method
126         public synchronized void reduceAll(ArrayList<Object> recievedValue) {
127         }
128
129         public void setId(TaskId Id) {
130                 jaceTaskId = Id;
131                 jaceMyId = jaceTaskId.getRank();
132                 try {
133                         jaceSize = Register.Instance().getNbOfTasks();
134                 } catch (Exception e) {
135                         try {
136                                 System.err.println("SetId is bad !! " + e + " "
137                                                 + LocalHost.Instance().getName());
138                                 // jaceSize = Register.Instance().getListeOfTasks().getSize();
139                         } catch (Exception e2) {
140                                 System.err.println("Not localised the spawner : " + e2);
141                         }
142                 }
143         }
144
145         public void setParam(String[] arg) {
146                 this.jaceArgs = arg;
147
148         }
149
150         public void setJaceSize(int nbTasks) {
151                 jaceSize = nbTasks;
152         }
153
154         public TaskId getId() {
155                 return jaceTaskId;
156                 // return myIdd;
157         }
158
159         public int getTimeStep() {
160                 return timeStep;
161         }
162
163         // method to overload by user in the appli to identify the neighbors of a
164         // node of rank i
165
166         public int[] getDependencies(int i) {
167                 return null ;
168         }
169
170         // method to overload by user in the appli to init each task at beginning of
171         // computation
172
173         public void jaceP2P_InitTask() {
174         }
175
176         // TaskLauncher.loadOrReloadTask()
177         public void jaceP2P_ReinitTask() {
178         }
179
180         // method to overload by user in the appli to safeguard the results
181         public void saveResults() {
182         }
183
184         // method to overload by user in the appli to save only the requiered
185         // attributes (iter, vecteurs,......) this method is called in
186         // Task.jaceP2P_Save()
187         public Task jaceP2P_SaveFromCrash() {
188                 System.out.println("JaceSaveFromCrash Task");
189                 return null;
190         }
191
192         public void jaceFinalize() {
193
194                 finalize = true;
195                 System.out.println("Ready to Death Task:" + jaceMyId);
196
197                 try {
198                         Register.Instance().getSpawnerStub().killApplication(
199                                         LocalHost.Instance().getStub());
200
201                 } catch (Exception e) {
202                         System.err
203                                         .println("Cannot join the Spawner to kill application: "
204                                                         + e);
205                 }
206
207                 JaceDaemon.Instance().reinitDaemon();
208
209         }
210
211         public void jaceP2P_ReinitConv() {
212                 // System.out.println("reinit conv");
213
214                 timeStep++;
215                 state = "NORMAL";
216                 // last_iter.removeAllElements();
217                 try {
218                         reinitializeVectors();
219
220                 } catch (Exception e) {
221                         System.err.println("Error in jaceP2P_reinitConv():" + e);
222                 }
223                 pseudoPerBeg = false;
224                 pseudoPerEnd = false;
225                 synchronized (lastSave) {
226                         lastSave = new LastSave();
227                 }
228                 // reinit les var de conv de Task generique
229                 localCV_state = false;
230                 jaceP2P_globalCV_state = false;
231
232         }
233
234         public void reinitializeVectors() throws RemoteException {
235                 values.clear() ;
236                 dependancies.clear() ;
237                 neighbors.clear() ;
238                 neighborsValues.clear() ;
239                 resp.clear() ;
240         }
241
242         public long jaceP2P_getChronoValue() {
243                 long result = 0;
244
245                 try {
246                         result = Register.Instance().getSpawnerStub().getChronoValue(
247                                         Register.Instance().getAppliName());
248                 } catch (Exception e) {
249                         System.err
250                                         .println("JaceP2P_Error in Task.jaceP2P_getChronoValue() on SuperNode : "
251                                                         + e);
252                 }
253
254                 return result;
255         }
256
257         public void run() {
258                 //System.out.println("ds run()");
259         }
260
261         /**
262          *asynchronous sending (non blocking), of data object to an other task
263          * 
264          * @param buffer
265          *            the object (serializable) data to be send
266          *@param dest
267          *            the task id for receiver's task
268          *@param tag
269          *            the tag for message
270          */
271         @SuppressWarnings("static-access")
272         public void jaceSend(Object buffer, int dest, int tag, double erreur_locale) {
273                 // System.out.println("dest : " + dest);
274
275                 TaskId recev = null;
276                 try {
277                         recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(dest);
278                         
279                         if (recev == null) {
280                                 System.err.println("In jaceSend recv = null !");
281                                 try {
282                                         JaceSession.Instance().getTaskThread().sleep(10);
283                                         JaceSession.Instance().getTaskThread().yield();
284                                 } catch (Exception e) {
285                                 }
286                         }
287
288                         // TODO : virer ce else, mais chercher pkoi recev est null des fois
289                         else {
290                                 if (recev.getRank() != dest) {
291                                         System.err.println("Problem !! pas le meme dest que ds les param");
292                                 }
293
294                                 // creer le Message
295                                 // TODO : rajouter nom de l'appli ds Messag
296                                 try {
297                                         JaceInterface stub;
298                                         stub = recev.getHostStub();
299                                         if (stub.getTimeStep() == timeStep) {
300                                                 // System.out.println("************ "+verifNum+" *************");
301                                                 Message msg = new Message();
302                                                 msg.setParam(buffer, jaceTaskId, recev, tag, timeStep,
303                                                                 jaceP2P_Iteration, verifNum, erreur_locale);
304
305                                                 // on met le message ds
306                                                 // JaceBuffer.Instance() (la liste des Message a
307                                                 // envoyer)
308
309                                                 /*
310                                                  * if(JaceDaemon.Instance().getProtocol().equals("socket"
311                                                  * )) SenderSocket.Instance().buffer.add(msg); else
312                                                  * if(JaceDaemon.Instance().getProtocol().equals("rmi"))
313                                                  * SenderRmi.Instance().buffer.add(msg);
314                                                  */
315                                                 // System.out.println("putting message to "+dest+" in the buffer");
316                                                 Sender.Instance().getBuffer().add(msg);
317
318                                                 // if(JaceDaemon.Instance().getProtocol().equals("rmi"))
319                                                 // SenderRmi.Instance().getBuffer().add(msg);
320                                                 // else
321                                                 // SenderSocket.Instance().getBuffer().add(msg);
322                                         }
323                                         
324                                 } catch( ConnectException ce ) {
325                                         recev.getHostStub().suicide2( "Not responding!" ) ;
326                                 } catch (Exception e) {
327                                         System.err.println("Unable to send data message to " + dest
328                                                         + ": " + e);
329 //                                      recev.getHostStub().suicide2( "Not responding!" ) ;
330                                 }
331                                 // System.out.println("TASK : g mis un msg qui doit etre envoye");
332                                 // envoie toujours asynchrone !!!!!!!! : le Message partira
333                                 // instantanement car
334                                 // -Sender faisait JaceBuffer.Instance().get() qui contient un
335                                 // wait()
336                                 // -et ici, Task fait JaceBuffer.Instance().add(msg) qui
337                                 // contient un notifyAll()
338                                 try {
339                                         JaceSession.Instance().getTaskThread().sleep(10);
340                                         JaceSession.Instance().getTaskThread().yield();
341                                 } catch (Exception e) {
342                                 }
343                                 // System.out.println("TASK : je sort de jaceSend");
344                         }
345                 } catch (Exception e) {
346                         if (Register.Instance().getListeOfTasks() == null)
347                                 System.err.println("Tasks list is null: " + e);
348                 }
349
350         }
351
352         /**
353          *not-blocking reception of data object, return an object
354          * 
355          * @param dest
356          *            the task id for the task sender
357          *@param tag
358          *            the tag for message
359          */
360         @SuppressWarnings("static-access")
361         public Object jaceReceive(int sender, int tag) {
362                 if (jaceP2P_Iteration == 0 || postReloading) {
363                         try {
364                                 if (notExist(sender)) {
365                                         setDep(sender);
366                                         // last_iter.add(0);
367                                 }
368                         } catch (Exception e) {
369                         }
370                 }
371
372                 Message tmp = MsgQueue.Instance().get(sender, tag);
373                 if (tmp != null) {
374                         // System.out.println("recu MSG de tache " + sender + " (" +
375                         // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
376                         // + ")  MsgQueue : " +
377                         // MsgQueue.Instance().getSize()+" message src_tag="+tmp.getSrc_tag()+
378                         // " localError="+tmp.getLocalError());
379                         try {
380
381                                 int index;
382                                 if (underTh == true && jaceP2P_Iteration != 0 && !reloading) {
383                                         index = depIndex(sender);
384
385                                         if ((!(state.equals("VERIF")) || verifNum == tmp
386                                                         .getSrc_tag())) {
387
388                                                 // System.out.println("dep["+sender+"]=true, index="+index);
389                                                 setValues(index, true);
390                                         }
391                                 }
392
393                         } catch (Exception e) {
394                                 System.err.println("Error jaceReceive :" + e);
395                                 System.err.println("Sender=" + sender);
396                         }
397                         return (tmp.getData());
398                 } else {
399                         try {
400                                 // System.out.println("RIENNN recu de tache " + sender + " (" +
401                                 // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
402                                 // + ")   taille MsgQueue : " + MsgQueue.Instance().getSize());
403                         } catch (Exception e) {
404                                 if (Register.Instance().getListeOfTasks() == null)
405                                         System.err.println("Tasks list is null2: " + e);
406                         }
407                         try {
408                                 JaceSession.Instance().getTaskThread().sleep(10);
409                                 JaceSession.Instance().getTaskThread().yield();
410                         } catch (Exception e) {
411                         }
412                         return null;
413                 }
414         }
415
416         public boolean notExist(int sender) {
417                 int i = dependancies.indexOf((Object) (new Integer(sender)));
418                 if (i == -1)
419                         return true;
420                 else
421                         return false;
422         }
423
424         public void setDep(int value) {
425                 dependancies.add(new Integer(value));
426                 values.add(new Boolean(false));
427         }
428
429         public int depIndex(int sender) {
430                 int index = dependancies.indexOf((Object) (new Integer(sender)));
431                 return index;
432         }
433
434         public void setValues(int index, boolean value) {
435                 values.set( index, new Boolean(value) ) ;
436         }
437
438         @SuppressWarnings("static-access")
439         public void jaceP2P_Save() {
440                 try {
441
442                         if (jaceP2P_Iteration == 0) {
443                                 // request 0 when task at barrier
444                                 // request 1 when only at convergence
445                                 broadcastTasks(0);
446                         } else if ((jaceP2P_Iteration % saveParameter) == 0) {
447                                 // clone the Task at that step of computations
448                                 saveRound++;
449                                 synchronized (sauv) {
450                                         sauv = getTask2save();
451
452                                         // send it to the corresponding BackupNode in a round robin
453                                         // fashion
454                                         new SaveTaskThread(sauv).start();
455                                 }
456                         }
457
458                         JaceSession.Instance().getTaskThread().sleep(10);
459                         JaceSession.Instance().getTaskThread().yield();
460                 } catch (Exception e) {
461                 }
462         }
463
464         // request 0 when Saving all task
465         // request 3 when saving only convergence data
466         @SuppressWarnings("static-access")
467         public void broadcastTasks(int request) {
468                 ByteArrayOutputStream stream;
469                 if (request == 0) {
470                         // 1 - clone the Task at that step of computations and serialize it
471                         try {
472                                 synchronized (sauv) {
473                                         sauv = getTask2save();
474                                         stream = convertTask2stream(sauv);
475
476                                         // 2 - create de saveTab if necessary
477                                         if (saveTab == null) {
478                                                 createSaveTab();
479                                         }
480                                         TaskId task = null;
481                                         JaceInterface stub = null;
482
483                                         // 3 - send the stream to all the BackupNodes
484                                         for (int i = 0; i < saveTab.length; i++) {
485                                                 // System.out.println("saveTab[" + i + "] = " +
486                                                 // saveTab[i]);
487
488                                                 // 3.1 - get the stub of destinatory
489                                                 // System.out.println("Saving on neighbor "+i);
490                                                 try {
491                                                         task = Register.Instance().getListeOfTasks()
492                                                                         .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
493                                                         stub = task.getHostStub();
494                                                 } catch (Exception e) {
495                                                         System.err
496                                                                         .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
497                                                                                         + e);
498                                                 }
499
500                                                 // 3.2 - send the stream to that stub
501
502                                                 // System.out.println("saving on second list");
503                                                 if (stub != null) {
504                                                         new BroadcastTaskThread(stub, jaceMyId, stream
505                                                                         .toByteArray(),
506                                                                         sauv.lastSave.getLastSave(), sauv.timeStep,
507                                                                         Register.Instance().getAppliName(), 0,
508                                                                         saveTab[i]).start();
509
510                                                 }
511
512                                         }
513                                 }
514                         } catch (Exception e) {
515                                 e.printStackTrace(System.out);
516                         }
517                 } else {
518                         synchronized (sauvConvg) {
519                                 sauvConvg = getBackupConvg2Save();
520                                 stream = convertBackupConv2stream(sauvConvg);
521
522                                 // 2 - create de saveTab if necessary
523                                 if (saveTab == null) {
524                                         createSaveTab();
525                                 }
526                                 TaskId task = null;
527                                 JaceInterface stub = null;
528
529                                 // 3 - send the stream to all the BackupNodes
530                                 for (int i = 0; i < saveTab.length; i++) {
531                                         // System.out.println("saveTab[" + i + "] = " + saveTab[i]);
532
533                                         // 3.1 - get the stub of destinatory
534                                         // System.out.println("Saving on neighbor "+i);
535                                         try {
536                                                 task = Register.Instance().getListeOfTasks()
537                                                                 .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
538                                                 stub = task.getHostStub();
539                                         } catch (Exception e) {
540                                                 System.err
541                                                                 .println("Problem in the broadcast, ligne d'assignation de task ds broadcats: "
542                                                                                 + e);
543                                         }
544
545                                         // 3.2 - send the stream to that stub
546                                         /*
547                                          * if(request==3){ if (stub != null) { new
548                                          * BroadcastTaskThread(stub, jaceMyId, stream.toByteArray(),
549                                          * sauvConvg.lastSave.getLastSave(),timeStep,
550                                          * Register.Instance().getAppliName(),1,saveTab[i]).start();
551                                          * } //System.out.println("saving on second list"); } else{
552                                          */
553                                         if (stub != null) {
554                                                 new BroadcastTaskThread(stub, jaceMyId, stream
555                                                                 .toByteArray(), sauvConvg.lastSave
556                                                                 .getLastSave(), sauvConvg.timeStep, Register
557                                                                 .Instance().getAppliName(), 1, saveTab[i])
558                                                                 .start();
559                                         }
560
561                                         // System.out.println("saving on first list");
562                                 }
563                         }
564                 }
565                 try {
566                         // JaceSession.Instance().getTaskThread().sleep(10);
567                         JaceSession.Instance().getTaskThread().yield();
568                 }
569
570                 catch (Exception e) {
571                 }
572         }
573
574         private ByteArrayOutputStream convertBackupConv2stream(BackupConvg t) {
575                 // System.out.println("beginning of the checkpointing process......");
576                 ByteArrayOutputStream stream = new ByteArrayOutputStream();
577                 try {
578                         ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
579                         fluxOut.writeObject(t);
580                         fluxOut.close();
581                 } catch (Exception e) {
582                         System.err
583                                         .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
584                                                         + e);
585                 }
586                 // System.out.println("taille du tablo de byte : " +
587                 // stream.toByteArray().length + " bytes");
588                 return stream;
589         }
590
591         // convert the task in stream to send it to the BackupNode
592         private ByteArrayOutputStream convertTask2stream(Task t) {
593                 // System.out.println("beginning of the checkpointing process......");
594                 ByteArrayOutputStream stream = new ByteArrayOutputStream();
595                 try {
596                         ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
597                         fluxOut.writeObject(t);
598                         fluxOut.close();
599                 } catch (Exception e) {
600                         System.err
601                                         .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
602                                                         + e);
603                 }
604                 // System.out.println("taille du tablo de byte : " +
605                 // stream.toByteArray().length + " bytes");
606                 return stream;
607         }
608
609         // get the task of the appli with the data to save (in
610         // jaceP2P_SaveFromCrash() overloaded in the appli)
611         @SuppressWarnings("unchecked")
612         public Task getTask2save() {
613
614                 sauv = jaceP2P_SaveFromCrash();
615
616                 // System.out.println("Saving data at it="+sauv.it);
617                 // assign the default attributes of a task (params of the appli and
618                 // TaskId of the local node)
619                 sauv.setId(this.jaceTaskId);
620                 sauv.setParam(Register.Instance().getParams());
621                 sauv.timeStep = timeStep;
622
623                 // assign the iteration number that had the appli at the moment of the
624                 // checkpointing
625                 sauv.jaceP2P_Iteration = jaceP2P_Iteration;
626                 sauv.saveRound = saveRound;
627
628                 sauv.underTh = underTh;
629                 sauv.jaceP2P_globalCV_state = jaceP2P_globalCV_state;
630                 // System.out.println("I checkpoint the task at ite " +
631                 // sauv.jaceP2P_Iteration);
632                 // attributes needed to detect global convergence
633                 try {
634
635                         sauv.state = state;
636                         sauv.nb_not_recv = nb_not_recv;
637                         sauv.electedNode = electedNode;
638                         sauv.respSent = respSent;
639                         sauv.neighbors = (ArrayList<Integer>) neighbors.clone();
640                         sauv.neighborsValues = (ArrayList<Boolean>) neighborsValues.clone();
641                         sauv.resp = (ArrayList<Integer>) resp.clone();
642                         sauv.verifNum = verifNum;
643                         sauv.sendId = sendId;
644                         sauv.reduceAll = reduceAll;
645                         sauv.finalStep = finalStep;
646                         sauv.action = action;
647                         sauv.verdict = verdict;
648                         sauv.localCV_state = localCV_state;
649                         sauv.recievedVerdict = recievedVerdict;
650                         synchronized (lastSave) {
651                                 lastSave.increment();
652                                 sauv.lastSave = lastSave;
653                         }
654                 } catch (Exception e) {
655                         System.err.println("Problem with RMI !");
656                 }
657
658                 return sauv;
659         }
660
661         @SuppressWarnings("unchecked")
662         public BackupConvg getBackupConvg2Save() {
663
664                 try {
665
666                         // sauvConvg=new BackupConvg();
667                         sauvConvg.state = state;
668                         sauvConvg.underTh = underTh;
669                         sauvConvg.nb_not_recv = nb_not_recv;
670                         sauvConvg.electedNode = electedNode;
671                         sauvConvg.respSent = respSent;
672                         sauvConvg.neighbors = (ArrayList<Integer>) neighbors.clone();
673                         sauvConvg.neighborsValues = (ArrayList<Boolean>) neighborsValues.clone();
674                         sauvConvg.resp = (ArrayList<Integer>) resp.clone();
675                         sauvConvg.verifNum = verifNum;
676                         sauvConvg.sendId = sendId;
677                         sauvConvg.finalStep = finalStep;
678                         sauvConvg.action = action;
679                         sauvConvg.verdict = verdict;
680                         sauvConvg.localCV_state = localCV_state;
681                         sauvConvg.timeStep = timeStep;
682                         sauvConvg.recievedVerdict = recievedVerdict;
683                         sauvConvg.jaceP2P_Iteration = jaceP2P_Iteration;
684                         sauvConvg.reduceAll = reduceAll;
685
686                         synchronized (lastSave) {
687                                 lastSave.increment();
688                                 sauvConvg.lastSave = lastSave;
689                         }
690                         sauvConvg.initialized = true;
691                 } catch (Exception e) {
692                         System.err.println("Problem with RMI:" + e);
693                 }
694
695                 return sauvConvg;
696         }
697
698         private void createSaveTab() {
699
700                 saveTab = new int[BackupsManager.Instance().size()];
701
702                 // 2 - assign the taskId to each cell of the saveTab
703                 // System.out.println("in TASK");
704                 for (int i = 0; i < saveTab.length; i++) {
705                         saveTab[i] = BackupsManager.Instance().getBackupTaskAtIndex(i, 0)
706                                         .getTaskRank();
707                         // System.out.print(saveTab[i] + ", ");
708                 }
709                 // System.out.println("saveTab created..... size : " + saveTab.length);
710         }
711
712         public void setSaved(boolean bool) {
713                 synchronized (saved) {
714                         if (bool == true) {
715                                 if (saved[0] == false)
716                                         saved[0] = true;
717                                 else if (saved[1] == false)
718                                         saved[1] = true;
719                         } else {
720                                 saved[0] = false;
721                                 saved[1] = false;
722                         }
723                 }
724
725         }
726
727         public void waitForAck(int tag) {
728                 if (Register.Instance().getNumBackupNeighbors() != 0) {
729                         setSaved(false);
730                         broadcastTasks(tag);
731                         // Calendar cal = new GregorianCalendar();
732                         // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
733                         // System.out.println("sleeping till acknowledge saved");
734                         while (getSaved() == false)
735                                 try {
736                                         Thread.sleep(50);
737                                         count++;
738                                         if (count > 30) {
739                                                 try {
740                                                         Register newReg = Register.Instance()
741                                                                         .getSpawnerStub().getRegister(jaceMyId);
742                                                         if (newReg != null) {
743                                                                 Register.Instance().replaceBy(newReg);
744                                                         } else
745                                                                 System.out
746                                                                                 .println("I got a null register from the spawner");
747                                                         broadcastTasks(tag);
748                                                         System.out
749                                                                         .println("Sleeping till acknowledge saved");
750                                                         count = 0;
751                                                 } catch (Exception e2) {
752                                                         System.err
753                                                                         .println("Unable to get register from spawner :"
754                                                                                         + e2);
755                                                         e2.printStackTrace(System.out);
756                                                 }
757                                         }
758                                 } catch (Exception e) {
759                                 }
760                         // Calendar cal1 = new GregorianCalendar();
761                         // System.out.println("end save at time="+cal1.get(Calendar.MINUTE)+":"+cal1.get(Calendar.SECOND));
762                 }
763         }
764
765         public void jaceP2P_GlobalConvergence(boolean under_th) {
766
767                 try {
768
769                         Thread.yield();
770                         if (under_th != underTh && postReloading == false) {
771                                 // 1 - update sous seuil
772                                 underTh = under_th;
773                         }
774                         if (jaceP2P_Iteration == 0
775                                         && (postReloading == false || resp.size() == 0)) {
776                                 detectNeighbors(jaceMyId, jaceSize);
777                                 initialize_state();
778                                 sendId = -1;
779                                 verifNum = 0;
780                                 reloading = false;
781                                 broadcastTasks(3);
782                         }
783
784                         // affiche les valeurs des variables
785                         try {
786                                 // Calendar cal = new GregorianCalendar();
787                                 // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
788                                 // System.out.println("MyId="+jaceMyId+" sous seuil="+underTh+" etat="+state+" verifNum="+verifNum+" localCV="+localCV_state+" leader="+electedNode+" respSent="+respSent);
789                                 // System.out.println("dep="+getValues()+" pBeg="+pseudoPerBeg+" PEnd="+pseudoPerEnd+" sendId="+sendId);
790                                 // System.out.println("neigh="+nb_not_recv+" action="+action+" negative_resp="+testNegativeResp()+" reloading="+reloading+"\n"+" finalStep="+finalStep+" SavedResults="+savedResults+" NeghNotCV="+getNeighbourNotCV()+" postReload="+postReloading);
791                                 printResp();
792                                 printDep();
793                         } catch (Exception e) {
794                                 System.err.println("Error printing status in Task :" + e);
795                         }
796                         Thread.yield();
797                         if (action.equals("sendVerif") && state.equals("VERIF")
798                                         && postReloading == true) {
799                                 // System.out.println("je passe ds send verif");
800
801                                 new SendVerifThread(jaceMyId, sendId, verifNum).start();
802
803                                 // System.out.println("send Verif");
804
805                         } else if (action.equals("sendVerdict") && postReloading == true) {
806                                 // System.out.println("je passe ds send verdict-------");
807
808                                 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
809                                                 .start();
810
811                                 // System.out.println("verifNum="+verifNum);
812                         }
813
814                         if (state.equals("NORMAL") && action.equals("nothing")) {
815                                 if (underTh == false)
816                                         reinitializePPEr();
817                                 else
818                                 // System.out.println("sous seuil="+underTh+" PseudoPerBeg="+localStub.getPseudoPerBeg());
819                                 if (pseudoPerBeg == false) {
820                                         // System.out.println("ds la condition");
821                                         pseudoPerBeg = true;
822                                 } else if (pseudoPerEnd == true) {
823
824                                         localCV_state = true;
825                                         if (nb_not_recv == 0) {
826                                                 // localStub.setAction("sendVerif");
827                                                 try {
828                                                         broadcastVerif(jaceMyId, -1, verifNum + 1);
829                                                         // localStub.setAction("nothing");
830
831                                                         electedNode = true;
832                                                         recievedVerdict = true;
833                                                         initializeVerifLeader();
834                                                         state = "VERIF";
835                                                         broadcastTasks(3);
836                                                 } catch (Exception e) {
837                                                         System.err
838                                                                         .println("The verification message is not received: "
839                                                                                         + e);
840                                                         Register.Instance().viewAll();
841                                                 }
842                                         } else if (nb_not_recv == 1)
843                                                 if (sendId == -1) {
844
845                                                         int neighId = getNeighbourNotCV();
846                                                         TaskId recev = null;
847
848                                                         try {
849
850                                                                 recev = Register.Instance().getListeOfTasks()
851                                                                                 .getTaskIdOfRank(neighId);
852                                                                 JaceInterface stub = recev.getHostStub();
853                                                                 if (stub.setNbNeighboursNotConv(verifNum,
854                                                                                 jaceMyId, timeStep)) {
855                                                                         // stub.setLeftNeighbourCV(true);
856                                                                         // System.out.println("sent convergence message to "+neighId);
857                                                                         sendId = neighId;
858                                                                         state = "WAIT4V";
859                                                                         recievedVerdict = false;
860                                                                         broadcastTasks(3);
861                                                                 }
862                                                         } catch (Exception e) {
863                                                                 System.err
864                                                                                 .println("Unable to decrease the number of neighbors not converged on node :"
865                                                                                                 + recev.getHostName()
866                                                                                                 + " count ="
867                                                                                                 + count
868                                                                                                 + " . error:" + e);
869                                                                 count++;
870                                                                 Register.Instance().viewAll();
871                                                                 if (count > 3) {
872                                                                         try {
873                                                                                 int myRank;
874                                                                                 TaskId id = Register.Instance()
875                                                                                                 .getListeOfTasks()
876                                                                                                 .getTaskIdOfHostStub(
877                                                                                                                 LocalHost.Instance()
878                                                                                                                                 .getStub());
879                                                                                 myRank = id.getRank();
880                                                                                 Register.Instance().replaceBy(
881                                                                                                 Register.Instance()
882                                                                                                                 .getSpawnerStub()
883                                                                                                                 .getRegister(myRank));
884
885                                                                                 count = 0;
886                                                                         } catch (Exception e2) {
887                                                                                 System.err
888                                                                                                 .println("Unable to contact the spawner: "
889                                                                                                                 + e2);
890                                                                         }
891                                                                 }
892                                                         }
893                                                 }
894                                 } else if (pseudoPerBeg == true && getValues())
895                                         pseudoPerEnd = true;
896                         } else if (state.equals("WAIT4V") && nb_not_recv == 0) {
897                                 int neighId = getRankLeader();
898                                 if (neighId == -1) {
899                                         try {
900                                                 broadcastVerif(jaceMyId, -1, verifNum + 1);
901                                                 electedNode = true;
902                                                 recievedVerdict = true;
903                                                 initializeVerifLeader();
904                                                 state = "VERIF";
905
906                                                 broadcastTasks(3);
907                                         } catch (Exception e) {
908                                                 System.err
909                                                                 .println("The verification message is not received:"
910                                                                                 + e);
911                                         }
912                                 } else if (jaceMyId > neighId) {
913                                         try {
914                                                 broadcastVerif(jaceMyId, -1, verifNum + 1);
915                                                 electedNode = true;
916                                                 recievedVerdict = true;
917                                                 initializeVerifLeader();
918                                                 state = "VERIF";
919                                                 broadcastTasks(3);
920                                         } catch (Exception e) {
921                                                 System.err
922                                                                 .println("The verification message is not received: "
923                                                                                 + e);
924                                         }
925                                 }
926                         } else if (state.equals("WAIT4V")) {
927                                 if (underTh == false) {
928                                         localCV_state = false;
929                                         broadcastTasks(3);
930                                 }
931                         } else if (state.equals("VERIF")) {
932                                 // if(localStub.getNewerDep(0) && localStub.getNewerDep(1))
933                                 // localStub.setPseudoPerEnd(true);
934                                 // if(!underTh)
935                                 if (electedNode == true) {
936                                         if ((!underTh && !postReloading) || !localCV_state
937                                                         || testNegativeResp()) {
938                                                 try {
939
940                                                         broadcastVerdict(jaceMyId, -2, verifNum + 1, false);
941                                                         verifNum++;
942                                                         initialize_state();
943                                                         reinitializePPEr();
944                                                         broadcastTasks(3);
945                                                 } catch (Exception e) {
946                                                 }
947                                         } else if (postReloading) {
948                                                 if (recievedAllResp())
949                                                         if (!testNegativeResp()) {
950                                                                 try {
951                                                                         broadcastVerdict(jaceMyId, -2, verifNum,
952                                                                                         true);
953                                                                         if (finalStep == true
954                                                                                         && state.equals("VERIF")) {
955                                                                                 initializeSavLeader();
956                                                                                 state = "SAVING";
957                                                                                 waitForAck(0);
958                                                                         } else {
959                                                                                 state = "FINISHED";
960                                                                                 waitForAck(1);
961                                                                         }
962                                                                 } catch (Exception e) {
963                                                                         System.err.println("Error: " + e);
964                                                                 }
965                                                         }
966                                         } else if (pseudoPerEnd == true) {
967                                                 if (recievedAllResp())
968                                                         if (!testNegativeResp()) {
969                                                                 try {
970                                                                         broadcastVerdict(jaceMyId, -2, verifNum,
971                                                                                         true);
972                                                                         if (finalStep == true
973                                                                                         && state.equals("VERIF")) {
974                                                                                 initializeSavLeader();
975                                                                                 state = "SAVING";
976                                                                                 waitForAck(0);
977                                                                         } else {
978                                                                                 state = "FINISHED";
979                                                                                 waitForAck(3);
980                                                                         }
981
982                                                                 } catch (Exception e) {
983                                                                         System.out.println("erreur: " + e);
984                                                                 }
985                                                         } else {
986                                                                 try {
987
988                                                                         broadcastVerdict(jaceMyId, -2,
989                                                                                         verifNum + 1, false);
990                                                                         verifNum++;
991                                                                         initialize_state();
992                                                                         reinitializePPEr();
993                                                                         broadcastTasks(3);
994                                                                 } catch (Exception e) {
995                                                                         System.err
996                                                                                         .println("Unable to broadcast a negative verdict :"
997                                                                                                         + e);
998                                                                 }
999                                                         }
1000                                         }
1001
1002                                         else if (getValues())
1003                                                 pseudoPerEnd = true;
1004                                 } else if (!respSent) {
1005                                         if (!underTh || !localCV_state || testNegativeResp()) {
1006                                                 if (action.equals("nothing")) {
1007                                                         int neighId = sendId;
1008                                                         try {
1009                                                                 TaskId recev = null;
1010                                                                 recev = Register.Instance().getListeOfTasks()
1011                                                                                 .getTaskIdOfRank(neighId);
1012                                                                 JaceInterface stub = recev.getHostStub();
1013                                                                 // System.out.println("tryin to send negative response to "+neighId);
1014                                                                 stub.response(jaceMyId, verifNum, -1, null);
1015                                                                 // System.out.println("send negative response to "+neighId);
1016                                                                 respSent = true;
1017                                                                 broadcastTasks(3);
1018
1019                                                         } catch (Exception e) {
1020                                                                 System.err
1021                                                                                 .println("Response not received:" + e);
1022                                                                 Register.Instance().viewAll();
1023                                                         }
1024                                                 }
1025                                         } else if (pseudoPerEnd) {
1026                                                 // System.out.print("The daemon can send a response ");
1027                                                 int index = recievedAllRespMinusOne();
1028                                                 // System.out.println("to node of index ="+index);
1029                                                 if (index != -1) {
1030                                                         int rank = getNeighborRank(index);
1031                                                         // if(jaceMyId!=jaceSize-1 &&
1032                                                         // localStub.getSendRight()==true){
1033                                                         try {
1034                                                                 TaskId recev = null;
1035                                                                 recev = Register.Instance().getListeOfTasks()
1036                                                                                 .getTaskIdOfRank(rank);
1037                                                                 JaceInterface stub = recev.getHostStub();
1038
1039                                                                 if (getResp(index) == 1) {
1040                                                                         stub.response(jaceMyId, verifNum, 1,
1041                                                                                         reduceAll);
1042                                                                         // System.out.println("send positive response to"+rank);
1043                                                                 } else {
1044                                                                         stub.response(jaceMyId, verifNum, -1, null);
1045                                                                         // System.out.println("send negative response to"+rank);
1046                                                                 }
1047                                                                 respSent = true;
1048                                                                 broadcastTasks(3);
1049
1050                                                         } catch (Exception e) {
1051                                                                 System.err.println("Response not received by "
1052                                                                                 + rank + ": " + e);
1053                                                                 Register.Instance().viewAll();
1054                                                         }
1055                                                 }
1056                                         } else if (getValues())
1057                                                 pseudoPerEnd = true;
1058                                 }
1059
1060                         } else if (state.equals("SAVING") && !action.equals("sendVerdict")) {
1061
1062                                 if (savedResults == false) {
1063
1064                                         saveResults();
1065                                         savedResults = true;
1066                                         broadcastTasks(3);
1067                                 } else if (electedNode) {
1068                                         if (recievedAllResp())
1069                                                 try {
1070                                                         // System.out.println("recieved all responses");
1071                                                         JaceSpawnerInterface spawnerStub = Register
1072                                                                         .Instance().getSpawnerStub();
1073                                                         // System.out.println("##### callin spawnerStub.setFinished(true) #####");
1074                                                         spawnerStub.setOver(true);
1075                                                         // localStub.setState("FINISHED");
1076                                                 } catch (Exception e) {
1077                                                         System.err.println("Error" + e);
1078                                                 }
1079                                 } else if (!respSent) {
1080                                         int index = recievedAllRespMinusOne();
1081                                         if (index != -1) {
1082                                                 int rank = getNeighborRank(index);
1083
1084                                                 // if(jaceMyId!=jaceSize-1 &&
1085                                                 // localStub.getSendRight()==true){
1086                                                 try {
1087                                                         TaskId recev = null;
1088                                                         recev = Register.Instance().getListeOfTasks()
1089                                                                         .getTaskIdOfRank(rank);
1090                                                         JaceInterface stub = recev.getHostStub();
1091                                                         if (stub.getReloading() == false
1092                                                                         && stub.getState().equals("SAVING")) {
1093                                                                 action = "sendResponse";
1094                                                                 stub.response(jaceMyId, verifNum, 1, null);
1095                                                                 respSent = true;
1096                                                                 action = "nothing";
1097                                                                 broadcastTasks(3);
1098                                                                 // System.out.println("send response to"+rank);
1099                                                         }
1100                                                 } catch (Exception e) {
1101                                                         System.err.println("Response not received" + e);
1102                                                 }
1103                                         }
1104                                 }
1105                         } else if (state.equals("FINISHED")
1106                                         && !action.equals("sendVerdict") && recievedVerdict) {
1107
1108                                 jaceP2P_globalCV_state = true;
1109                                 // System.out.println("Finished");
1110                                 // System.out.println("Finished");
1111                                 // System.out.println("Finished");
1112                                 // System.out.println("Finished");
1113                                 System.out.println("Finished");
1114                         }
1115
1116                         if (postReloading == true)
1117                                 postReloading = false;
1118
1119                 } catch (Exception e) {
1120                         System.err.println("Exception in Global Convergence :" + e);
1121                         e.printStackTrace(System.out);
1122                         Register.Instance().viewAll();
1123                 }
1124                 // mon1.stop();
1125         }
1126
1127         public synchronized int getResp(int index) throws RemoteException {
1128                 int res = 1;
1129                 for (int i = 0; i < resp.size(); i++)
1130                         if (i != index)
1131                                 if (((Integer) resp.get(i)).intValue() == -1)
1132                                         res = -1;
1133                 return res;
1134         }
1135
1136         public synchronized void response(int neighId, int tag, int response,
1137                         ArrayList<Object> recievedValue) throws RemoteException {
1138                 // System.out.println("inside response function");
1139                 // System.out.println("sleeping till not reloading");
1140                 while (reloading == true) {
1141                         try {
1142                                 Thread.sleep(10);
1143                                 // System.out.println("sleeping till not reloading");
1144                         } catch (Exception e) {
1145                         }
1146                 }
1147                 if (verifNum == tag) {
1148                         // System.out.println("inside condition");
1149                         int indexNeigh = neighbors.indexOf((Object) neighId);
1150                         // System.out.println("after gettin index="+index);
1151                         // System.out.println("index="+indexNeigh+" size de resp ="+resp.size());
1152                         // printResp();
1153                         // try{
1154                         // int xyz=((Integer)(resp.elementAt(indexNeigh))).intValue();
1155                         // }catch(Exception e){
1156                         // System.out.println("fuckkkkkkkkkkkkkkkkkk error:"+e);
1157                         // }
1158
1159                         if (response == 1
1160                                         && !state.equals("SAVING")
1161                                         && (((Integer) (resp.get(indexNeigh))).intValue()) != 1) {
1162                                 // System.out.println("calling reduceAll()");
1163                                 reduceAll(recievedValue);
1164                         }
1165                         // System.out.println("after calculating reduceAll");
1166                         resp.set( indexNeigh, response ) ;
1167
1168                         // System.out.println("get response ............");
1169                         waitForAck(3);
1170
1171                 } else
1172                         throw new RemoteException();
1173         }
1174
1175         public void initializeSavLeader() {
1176                 for (int i = 0; i < resp.size(); i++)
1177                         resp.set( i, 0 ) ;
1178                 respSent = false ;
1179         }
1180
1181         public boolean recievedAllResp() {
1182                 boolean bool = true;
1183                 for (int i = 0; i < resp.size(); i++)
1184                         if (((Integer) (resp.get(i))).intValue() == 0) {
1185                                 bool = false;
1186                                 break;
1187                         }
1188                 return bool;
1189         }
1190
1191         public int recievedAllRespMinusOne() {
1192                 int nbOfZeros = 0;
1193                 int indexOfZero = -1;
1194                 for (int i = 0; i < resp.size(); i++)
1195                         if (((Integer) (resp.get(i))).intValue() == 0) {
1196                                 nbOfZeros++;
1197                                 indexOfZero = i;
1198                         }
1199                 if (nbOfZeros > 1)
1200                         return -1;
1201                 else
1202                         return indexOfZero;
1203         }
1204
1205         public synchronized boolean setNbNeighboursNotConv(int tag, int idNeigh,
1206                         int neighborTimeStep) throws RemoteException {
1207                 // System.out.println("ds  setNbNeighboursNotConv !!!!!!!!!!!!!!!");
1208                 if (tag == verifNum && !action.equals("sendVerdict")
1209                                 && neighborTimeStep == timeStep) {
1210                         if (idNeigh == -1) {
1211                                 nb_not_recv--;
1212                                 return false;
1213                         } else {
1214                                 // System.out.println("sleeping till not reloading");
1215                                 while (reloading == true) {
1216                                         try {
1217                                                 Thread.sleep(10);
1218                                                 // System.out.println("sleeping till not reloading");
1219                                         } catch (Exception e) {
1220                                         }
1221                                 }
1222                                 int i = neighbors.indexOf((Object) idNeigh);
1223                                 if (((Boolean) neighborsValues.get(i)).booleanValue() == false) {
1224                                         nb_not_recv--;
1225                                         // System.out.println("le noeud "+idNeigh+" a envoyer un message de pseudoconvergence");
1226                                         neighborsValues.set( i, new Boolean(true) ) ;
1227                                         waitForAck(3);
1228
1229                                 }
1230                         }
1231                         return true;
1232                 } else if (tag == verifNum - 1 && !action.equals("sendVerdict")
1233                                 && reloading == false && neighborTimeStep == timeStep
1234                                 && jaceP2P_Iteration != 0)
1235                         return true;
1236                 else
1237                         return false;
1238         }
1239
1240         public void detectNeighbors(int id, int jaceSize) {
1241                 // System.out.println("detect neighbors !!!! ");
1242                 int d = 0;
1243                 while (Math.pow(2, d) < jaceSize) {
1244                         if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize)) {
1245                                 neighbors.add((int) (id + Math.pow(2, d)));
1246                                 neighborsValues.add(new Boolean(false));
1247                                 resp.add(0);
1248                         }
1249                         if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d)) {
1250                                 neighbors.add((int) (id - Math.pow(2, d)));
1251                                 neighborsValues.add(new Boolean(false));
1252                                 resp.add(0);
1253                         }
1254                         d++;
1255                 }
1256         }
1257
1258         public synchronized void initialize_state() {
1259
1260                 // System.out.println("initialiser\n");
1261
1262                 nb_not_recv = neighbors.size();
1263                 for (int i = 0; i < neighbors.size(); i++)
1264                         neighborsValues.set( i, new Boolean(false) ) ;
1265                 for (int i = 0; i < resp.size(); i++)
1266                         resp.set( i, 0 ) ;
1267                 underTh = false;
1268                 electedNode = false;
1269                 localCV_state = false;
1270                 verdict = false;
1271
1272                 state = "NORMAL";
1273         }
1274
1275         public int getNeighborRank(int index) throws RemoteException {
1276                 int rank = ((Integer) (neighbors.get(index))).intValue();
1277                 return rank;
1278         }
1279
1280         public synchronized int getNeighbourNotCV() {
1281                 int neighId = -1;
1282                 for (int i = 0; i < neighbors.size(); i++)
1283                         if (((Boolean) neighborsValues.get(i)).booleanValue() == false)
1284                                 neighId = ((Integer) neighbors.get(i)).intValue();
1285                 return neighId;
1286         }
1287
1288         public boolean getValues() {
1289                 boolean bool = true;
1290                 for (int i = 0; i < values.size(); i++)
1291                         if (((Boolean) values.get(i)).equals(new Boolean(false))) {
1292                                 bool = false;
1293                                 break;
1294                         }
1295                 // System.out.println("getValues() have been called and it returned "+bool);
1296                 return bool;
1297         }
1298
1299         public boolean testNegativeResp() {
1300                 boolean bool = false;
1301                 for (int i = 0; i < resp.size(); i++)
1302                         if (((Integer) (resp.get(i))).intValue() == -1) {
1303                                 bool = true;
1304                                 break;
1305                         }
1306                 return bool;
1307         }
1308
1309         public void printResp() {
1310                 // for(int i=0;i<resp.size();i++)
1311                 // System.out.print(" resp["+((Integer)neighbors.get(i)).intValue()+"]="+((Integer)resp.get(i)).intValue());
1312                 // System.out.print("\n");
1313         }
1314
1315         public void printDep() {
1316                 // for(int i=0;i<values.size();i++)
1317                 // System.out.print(" dep["+((Integer)dependancies.get(i)).intValue()+"]="+((Boolean)values.get(i)).booleanValue());
1318                 // System.out.print("\n");
1319         }
1320
1321         public void broadcastVerif(int id, int neighId, int tag)
1322                         throws RemoteException {
1323
1324                 TaskId recev = null;
1325                 // TaskId local =
1326                 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1327                 JaceInterface stub;
1328                 // System.out.println("Id="+id+" neighId="+ neighId+" tag="+tag);
1329                 for (int i = 0; i < neighbors.size(); i++)
1330                         if (neighId != ((Integer) neighbors.get(i)).intValue()) {
1331                                 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1332                                                 ((Integer) neighbors.get(i)).intValue());
1333                                 stub = recev.getHostStub();
1334                                 stub.initializeVerif(tag);
1335                                 // System.out.println("broadcast verification to :"+((Integer)neighbors.get(i)).intValue());
1336                         }
1337                 if (action.equals("sendVerif"))
1338                         action = "nothing";
1339         }
1340
1341         public void initializeVerifLeader() throws RemoteException {
1342                 reinitializePPEr();
1343                 for (int i = 0; i < resp.size(); i++)
1344                         resp.set( i, 0 ) ;
1345                 verifNum++;
1346                 respSent = false;
1347         }
1348
1349         public void initializeVerif(int tag) throws RemoteException {
1350                 // System.out.println("Inside initializeVerif @@@@@");
1351                 // System.out.println("sleeping till not reloading");
1352                 while (reloading == true) {
1353                         try {
1354                                 Thread.sleep(10);
1355                                 // System.out.println("sleeping till not reloading");
1356                         } catch (Exception e) {
1357                         }
1358                 }
1359                 if (verifNum + 1 == tag)
1360                         if (state.equals("WAIT4V")) {
1361                                 action = "sendVerif";
1362                                 reinitializePPEr();
1363                                 for (int i = 0; i < resp.size(); i++)
1364                                         resp.set( i, 0 ) ;
1365                                 verifNum++;
1366
1367                                 respSent = false;
1368                                 state = "VERIF";
1369                                 waitForAck(3);
1370
1371                                 new SendVerifThread(jaceMyId, sendId, verifNum).start();
1372
1373                         } else if (state.equals("VERIF")) {
1374                         } else {
1375                                 throw new RemoteException();
1376
1377                         }
1378
1379         }
1380
1381         public boolean getSaved() {
1382                 if (saved[1])
1383                         return true;
1384                 else
1385                         return false;
1386         }
1387
1388         public void reinitializePPEr() {
1389                 pseudoPerBeg = false;
1390                 pseudoPerEnd = false;
1391                 for (int i = 0; i < values.size(); i++)
1392                         values.set( i, new Boolean(false) ) ;
1393
1394         }
1395
1396         public void broadcastVerdict(int id, int neighId, int tag, boolean verd)
1397                         throws RemoteException {
1398                 Boolean verdicto = verd;
1399                 TaskId recev = null;
1400                 // TaskId local =
1401                 // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
1402                 JaceInterface stub;
1403                 for (int i = 0; i < neighbors.size(); i++)
1404                         if (((Integer) neighbors.get(i)).intValue() != neighId) {
1405                                 // System.out.println("broadcasting verdict "+verdicto+" to node of Rank "+((Integer)neighbors.get(i)).intValue());
1406                                 recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1407                                                 ((Integer) neighbors.get(i)).intValue());
1408                                 stub = recev.getHostStub();
1409                                 stub.savOrFinOrRest(tag, timeStep, verdicto, reduceAll);
1410                         }
1411                 action = "nothing";
1412                 if (verdict == false)
1413                         sendId = -1;
1414         }
1415
1416         public void savOrFinOrRest(int tag, int step, boolean verd,
1417                         ArrayList<Object> reduceAll) {
1418                 // System.out.println("Recieved verd "+verd+" sleeping till not reloading");
1419                 while (reloading == true) {
1420                         try {
1421                                 Thread.sleep(10);
1422                                 // System.out.println("sleeping till not reloading");
1423                         } catch (Exception e) {
1424                         }
1425                 }
1426                 if (verd == true) {
1427                         if (verifNum == tag && timeStep == step && state.equals("VERIF")) {
1428                                 action = "sendVerdict";
1429                                 if (finalStep == true) {
1430                                         for (int i = 0; i < resp.size(); i++)
1431                                                 resp.set( i, 0 ) ;
1432                                         // System.out.println("sleeping till response is sent");
1433                                         while (action.equals("sendResponse") || respSent == false)
1434                                                 try {
1435                                                         Thread.sleep(10);
1436                                                         // System.out.println("sleeping till response is sent");
1437                                                 } catch (Exception e) {
1438                                                 }
1439                                         respSent = false;
1440                                         state = "SAVING";
1441                                         verdict = true;
1442                                         recievedVerdict = true;
1443
1444                                 } else {
1445                                         state = "FINISHED";
1446                                         verdict = true;
1447                                         recievedVerdict = true;
1448                                 }
1449                                 // System.out.println("//// Stetting reduceAll\\\\");
1450                                 this.reduceAll = reduceAll;
1451                                 waitForAck(3);
1452
1453                                 new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
1454                                                 .start();
1455                         }
1456                 } else if (verifNum < tag && timeStep == step) {
1457                         verifNum = tag;
1458                         initialize_state();
1459                         reinitializePPEr();
1460                         verdict = false;
1461                         action = "sendVerdict";
1462                         recievedVerdict = true;
1463                         waitForAck(3);
1464
1465                         new SendVerdictThread(jaceMyId, sendId, verifNum, verdict).start();
1466                 }
1467
1468         }
1469
1470         public int getRankLeader() throws RemoteException {
1471                 int neighId = -1;
1472                 for (int i = 0; i < neighbors.size(); i++) {
1473                         TaskId recev = null;
1474                         recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
1475                                         ((Integer) (neighbors.get(i))).intValue());
1476                         JaceInterface stub = recev.getHostStub();
1477                         if (stub.getNbNeighboursNotConv() == 0)
1478                                 neighId = ((Integer) (neighbors.get(i))).intValue();
1479                 }
1480                 return neighId;
1481         }
1482
1483         class SaveTaskThread extends Thread {
1484                 Task sauv;
1485
1486                 public SaveTaskThread(Task s) {
1487                         this.sauv = s;
1488                 }
1489
1490                 @SuppressWarnings("static-access")
1491                 public void run() {
1492                         // If array of BackupNodes not created, create it
1493                         if (saveTab == null) {
1494                                 createSaveTab();
1495                         }
1496
1497                         if (finalize == false) { // if finalization step, it will crash
1498                                 // because register purged yet
1499
1500                                 ByteArrayOutputStream stream = convertTask2stream(sauv);
1501
1502                                 // find the BackupNode to send
1503                                 int taskRankDest;
1504                                 TaskId task = null;
1505                                 JaceInterface stub = null;
1506                                 boolean sent = false;
1507                                 int j = 0;
1508
1509                                 while (j < saveTab.length && sent == false) {
1510                                         // 1 - find the remote task Id to send the backup to
1511                                         if (jaceSize < (2 * Register.Instance()
1512                                                         .getNumBackupNeighbors() + 1)) {
1513                                                 taskRankDest = saveTab[saveRound % (jaceSize - 1)];
1514                                         } else {
1515                                                 taskRankDest = saveTab[saveRound
1516                                                                 % (2 * Register.Instance()
1517                                                                                 .getNumBackupNeighbors())];
1518                                         }
1519
1520                                         // 2 - knowing the destination task Id, get the stub of the
1521                                         // corresponding node
1522                                         try {
1523                                                 task = Register.Instance().getListeOfTasks()
1524                                                                 .getTaskIdOfRank(taskRankDest); // ///////////////////////////pb
1525                                                 stub = task.getHostStub();
1526                                         } catch (Exception e) {
1527                                                 System.err
1528                                                                 .println("Problem in SaveTaskThread on assignation line in save : "
1529                                                                                 + e);
1530                                         }
1531                                         // System.out.println("ite " + jaceP2P_Iteration +
1532                                         // ".......... SENDING on task " + taskRankDest);
1533
1534                                         // if no stub there is a problem
1535                                         if (stub == null) {
1536                                                 j++;
1537                                                 saveRound++;
1538                                                 // System.out.println("unable to SEND backup on task of rank "
1539                                                 // + taskRankDest);
1540                                         }
1541                                         // 3 - try to send the stream
1542                                         else {
1543                                                 // if there is a stub, send the stream to that node
1544                                                 try {
1545                                                         stub.saveTask(jaceMyId, stream.toByteArray(),
1546                                                                         sauv.lastSave.getLastSave(), sauv.timeStep,
1547                                                                         Register.Instance().getAppliName(), 0);
1548                                                         // System.out.println("saved data on "+taskRankDest+" iteration= "+sauv.lastSave.getLastSave()+
1549                                                         // "timeStep="+sauv.timeStep+" !!!!!!!!");
1550
1551                                                         // Vector v = stub.getIterationOfBackup(jaceMyId,0);
1552                                                         // int ite=((Integer)v.get(0)).intValue();
1553                                                         // System.out.println("******************************************sauvegarde de donnees: ite="+ite+" taskDest="+taskRankDest+" ******************************************************");
1554                                                         sent = true;
1555
1556                                                 } catch (Exception e) {
1557                                                         System.err
1558                                                                         .println("JaceP2P_Error in Task.jaceP2P_Save() when saving stream: "
1559                                                                                         + e);
1560                                                         j++;
1561                                                         saveRound++;
1562
1563                                                 }
1564                                         }
1565
1566                                         try {
1567                                                 JaceSession.Instance().getTaskThread().sleep(10);
1568                                                 JaceSession.Instance().getTaskThread().yield();
1569                                         } catch (Exception e) {
1570                                         }
1571                                 }
1572
1573                                 // 5 - if stream not sent at all, do something (WHAT ???)
1574                                 if (j > saveTab.length) {
1575                                         System.err
1576                                                         .println("No more alive neighbors for storing the Backup");
1577                                         // TODO : what to do if no BackupNode has answered ???
1578                                 }
1579
1580                         }
1581                 }
1582         }
1583
1584 }
1585
1586 class BroadcastTaskThread extends Thread {
1587         JaceInterface stub;
1588         int rank;
1589         byte[] tsk;
1590         int iteration;
1591         int tag;
1592         String appliName;
1593         int timeStep;
1594         int dest;
1595
1596         // constructor
1597         public BroadcastTaskThread(JaceInterface theStub, int theRank,
1598                         byte[] theTsk, int theIteration, int timeStep, String theAppliName,
1599                         int tag, int dest) {
1600                 this.stub = theStub;
1601                 this.rank = theRank;
1602                 this.tsk = theTsk;
1603                 this.iteration = theIteration;
1604                 this.timeStep = timeStep;
1605                 this.appliName = theAppliName;
1606                 this.tag = tag;
1607                 this.dest = dest;
1608                 // System.out.println("tag="+tag);
1609         }
1610
1611         // method launched by start()
1612         public void run() {
1613                 try {
1614                         stub.saveTask(rank, tsk, iteration, timeStep, appliName, tag);
1615                         JaceSession.Instance().getTaskObject().setSaved(true);
1616
1617                         if (tag == 0) {
1618                                 // Vector v = stub.getIterationOfBackup(rank,0);
1619                                 // int ite=((Integer)v.get(0)).intValue();
1620                                 // System.out.println("++++++++++ Broadcast data ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1621
1622                         } else {
1623                                 // Vector v = stub.getIterationOfBackup(rank,1);
1624                                 // int ite=((Integer)v.get(0)).intValue();
1625                                 // System.out.println("++++++++++ Broadcast dataConvg ite="+iteration+" dest="+dest+" timeStep="+timeStep);
1626                                 // System.out.println("+++++++++++++++++++++++++++++++++++++++++ Broadcast convgData ite="+ite+" dest="+dest+" +++++++++++++++++++++++++++++++++++++++++++++++++++++");
1627
1628                         }
1629                 } catch (Exception e) {
1630                         System.err
1631                                         .println("Node not reachable by JaceServer.saveTask() in BroadcastTaskThread :"
1632                                                         + e);
1633                 }
1634         }
1635 }
1636