Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Correction of some messages and comments.
[jaceP2P.git] / src / jaceP2P / TaskLauncher.java
1 package jaceP2P;
2
3 import java.io.ByteArrayInputStream;
4 import java.io.ObjectInputStream;
5 import java.util.Vector;
6
7
8
9 public class TaskLauncher {
10
11         // attributes
12         Class<?> c;
13         Loader load;
14
15         // constructors
16         TaskLauncher() {
17         }
18
19         @SuppressWarnings({ "unchecked", "static-access" })
20         public void loadOrReloadTask(Backup b, Backup bConvg) {
21                 String appliName = Register.Instance().getAppliName();
22                 System.out.println("appliName=" + appliName);
23                 Task tsk = null;
24                 BackupConvg tskConvg = new BackupConvg();
25                 // parse the file:/path and transform the .class of the user in java
26                 // "Class" object
27                 load = new Loader();
28                 c = load.load(appliName);
29                 try {
30                         // instanciate the appli (the .class of the user) and load it
31                         tsk = ((Task) c.newInstance());
32                 } catch (Exception e) {
33                         System.err.println("Unable to instanciate the class :" + e);
34                 }
35                 try {
36                         if (b != null) {
37                                 byte[] tab = b.getData();
38                                 // System.out.println("size of the byte array : " + tab.length +
39                                 // " bytes");
40
41                                 // convert the stream received in an object of the same type of
42                                 // the user class, that inherits from Tasks (but is not such
43                                 // generic as a simple Task object)
44                                 try {
45                                         ObjectInputStream stream = new ObjectInputStream(
46                                                         new ByteArrayInputStream(tab));
47                                         tsk = tsk.jaceP2P_ConvertStream(stream);
48
49                                         stream.close();
50                                 } catch (Exception e) {
51                                         System.err
52                                                         .println("JaceP2P_Error ds TaskLauncher.loadOrReloadTask() "
53                                                                         + "when converting the stream in Task : "
54                                                                         + e);
55                                 }
56                         } else {
57                                 TaskId id = Register.Instance().getListeOfTasks()
58                                                 .getTaskIdOfHostStub(LocalHost.Instance().getStub());
59                                 // assign the attributes of the Task object :
60                                 tsk.setId(id); // assign JaceMyId, jaceTaskId, jaceSize
61                                 tsk.setParam(Register.Instance().getParams()); // assign
62                                                                                                                                 // jaceArgs
63
64                                 System.out.println("No backup");
65                         }
66
67                         // insert Task in taskObject, the attribute of the JaceSession class
68                         JaceSession.Instance().addTaskObject(tsk);
69 //                      System.out.println("After add task");
70                         if (b != null) {
71
72                                 int it = b.getIteration();
73                                 tsk.reloading = true;
74                                 System.out.println("timeStep=" + tsk.timeStep);
75
76                                 // reinit the Task object as it was overloaded by the user in
77                                 // the
78                                 if (bConvg != null) {
79                                         int itConv = bConvg.getIteration();
80                                         if (itConv > it) {
81                                                 byte[] tab1 = bConvg.getData();
82                                                 ObjectInputStream stream = new ObjectInputStream(
83                                                                 new ByteArrayInputStream(tab1));
84                                                 tskConvg = (BackupConvg) stream.readObject();
85                                                 if (tskConvg.timeStep >= tsk.timeStep) {
86                                                         System.out.println("state=" + tskConvg.state);
87                                                         tsk.state = tskConvg.state;
88                                                         tsk.nb_not_recv = tskConvg.nb_not_recv;
89                                                         tsk.electedNode = tskConvg.electedNode;
90                                                         tsk.respSent = tskConvg.respSent;
91 //                                                      System.out.println("avant la copie des vecteurs");
92                                                         tsk.neighbors = (Vector) (tskConvg.neighbors)
93                                                                         .clone();
94
95                                                         tsk.neighborsValues = (Vector) (tskConvg.neighborsValues)
96                                                                         .clone();
97                                                         if (bConvg.getIteration() >= b.getIteration())
98                                                                 tsk.reduceAll = (Vector) (tskConvg.reduceAll)
99                                                                                 .clone();
100                                                         tsk.resp = (Vector) (tskConvg.resp).clone();
101 //                                                      System.out.println("apres la copie des vecteurs");
102                                                         tsk.underTh = tskConvg.underTh;
103                                                         tsk.verifNum = tskConvg.verifNum;
104                                                         tsk.sendId = tskConvg.sendId;
105                                                         tsk.finalStep = tskConvg.finalStep;
106                                                         tsk.action = tskConvg.action;
107                                                         tsk.verdict = tskConvg.verdict;
108                                                         tsk.localCV_state = tskConvg.localCV_state;
109                                                         tsk.timeStep = tskConvg.timeStep;
110                                                         tsk.lastSave = tskConvg.lastSave;
111
112                                                         // tsk.jaceP2P_Iteration=tskConvg.jaceP2P_Iteration;
113                                                         tsk.recievedVerdict = tskConvg.recievedVerdict;
114                                                 }
115                                         }
116                                 }
117
118                                 System.out.println("Reinit Task");
119                                 tsk.jaceP2P_ReinitTask();
120                                 tsk.postReloading = true;
121                                 tsk.reloading = false;
122                         } else {
123                                 // initialize the task with the appli specific data (Method
124                                 // jaceP2P_InitTask() overloaded in the appli code by the user)
125 //                              System.out.println("Before Init task");
126
127                                 tsk.jaceP2P_InitTask();
128 //                              System.out.println("After Init task");
129                         }
130
131                         if (LocalHost.Instance().getStartedThreads() == false) {
132                                 // create the thread Sender (only 1 Sender thread per node, even
133                                 // if several tasks like in Jace) which is the thread
134                                 // responsible
135                                 // for sending messages stored in JaceBuffer (the queue of
136                                 // messages to send)
137                                 if (JaceDaemon.Instance().getProtocol().equals("rmi")) {
138                                         Sender.setInstance(SenderRmi.Instance());
139                                         SenderRmi.Instance().start();
140
141                                 } else {
142                                         Sender.setInstance(SenderSocket.Instance());
143                                         SenderSocket.Instance().start();
144
145                                 }
146                                 ScanThread.Instance().start();
147                                 LocalHost.Instance().setStartedThreads(true);
148                         }
149
150                         // Create the computing thread (the thread of the Task object) and
151                         // start it
152                         Thread th = new Thread(tsk, new String().valueOf(tsk.jaceMyId));
153                         th.start();
154
155                         // keep this thread in an attribute of JaceSession (taskThread)
156                         JaceSession.Instance().addTaskThread(th);
157
158                         Thread.currentThread().yield();
159
160                 } catch (Exception e) {
161                         System.err.println("Problem in TaskLauncher.loadOrReloadTask() : " + e);
162                         e.printStackTrace(System.out);
163                 }
164         }
165
166 }