Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Correction of some bugs and performance enhancement.
[jaceP2P.git] / src / jaceP2P / TaskLauncher.java
1 package jaceP2P;
2
3 import java.io.ByteArrayInputStream;
4 import java.io.ObjectInputStream;
5 import java.util.ArrayList;
6
7
8
9 public class TaskLauncher {
10
11         // attributes
12         Class<?> c;
13         Loader load;
14
15         // constructors
16         TaskLauncher() {
17         }
18
19         @SuppressWarnings({ "unchecked", "static-access" })
20         public void loadOrReloadTask(Backup b, Backup bConvg) {
21                 String appliName = Register.Instance().getAppliName();
22                 System.out.println("appliName=" + appliName);
23                 Task tsk = null;
24                 BackupConvg tskConvg = new BackupConvg();
25                 // parse the file:/path and transform the .class of the user in java
26                 // "Class" object
27                 load = new Loader();
28                 c = load.load(appliName);
29                 try {
30                         // instanciate the appli (the .class of the user) and load it
31                         tsk = ((Task) c.newInstance());
32                 } catch (Exception e) {
33                         System.err.println("Unable to instanciate the class :" + e);
34                 }
35                 try {
36                         if (b != null) {
37                                 byte[] tab = b.getData();
38                                 // System.out.println("size of the byte array : " + tab.length +
39                                 // " bytes");
40
41                                 // convert the stream received in an object of the same type of
42                                 // the user class, that inherits from Tasks (but is not such
43                                 // generic as a simple Task object)
44                                 try {
45                                         ObjectInputStream stream = new ObjectInputStream(
46                                                         new ByteArrayInputStream(tab));
47                                         tsk = tsk.jaceP2P_ConvertStream(stream);
48
49                                         stream.close();
50                                 } catch (Exception e) {
51                                         System.err
52                                                         .println("JaceP2P_Error ds TaskLauncher.loadOrReloadTask() "
53                                                                         + "when converting the stream in Task : "
54                                                                         + e);
55                                 }
56                         } else {
57                                 TaskId id = Register.Instance().getListeOfTasks()
58                                                 .getTaskIdOfHostStub(LocalHost.Instance().getStub());
59                                 // assign the attributes of the Task object :
60                                 tsk.setId(id); // assign JaceMyId, jaceTaskId, jaceSize
61                                 tsk.setParam(Register.Instance().getParams()); // assign
62                                                                                                                                 // jaceArgs
63
64                                 System.out.println("No backup");
65                         }
66
67                         // insert Task in taskObject, the attribute of the JaceSession class
68                         JaceSession.Instance().addTaskObject(tsk);
69 //                      System.out.println("After add task");
70                         if (b != null) {
71
72                                 int it = b.getIteration();
73                                 tsk.reloading = true;
74                                 System.out.println("timeStep=" + tsk.timeStep);
75
76                                 // reinit the Task object as it was overloaded by the user in
77                                 // the
78                                 if (bConvg != null) {
79                                         int itConv = bConvg.getIteration();
80                                         if (itConv > it) {
81                                                 byte[] tab1 = bConvg.getData();
82                                                 ObjectInputStream stream = new ObjectInputStream(
83                                                                 new ByteArrayInputStream(tab1));
84                                                 tskConvg = (BackupConvg) stream.readObject();
85                                                 if (tskConvg.timeStep >= tsk.timeStep) {
86                                                         System.out.println("state=" + tskConvg.state);
87                                                         tsk.state = tskConvg.state;
88                                                         tsk.nb_not_recv = tskConvg.nb_not_recv;
89                                                         tsk.electedNode = tskConvg.electedNode;
90                                                         tsk.respSent = tskConvg.respSent;
91 //                                                      System.out.println("avant la copie des vecteurs");
92                                                         tsk.neighbors = tskConvg.neighbors ;
93                                                         tsk.neighborsValues = tskConvg.neighborsValues ;
94
95                                                         if (bConvg.getIteration() >= b.getIteration())
96                                                                 tsk.reduceAll = tskConvg.reduceAll ;
97                                                         tsk.resp = (ArrayList<Integer>) (tskConvg.resp).clone() ;
98 //                                                      System.out.println("apres la copie des vecteurs");
99                                                         tsk.underTh = tskConvg.underTh;
100                                                         tsk.verifNum = tskConvg.verifNum;
101                                                         tsk.sendId = tskConvg.sendId;
102                                                         tsk.finalStep = tskConvg.finalStep;
103                                                         tsk.action = tskConvg.action;
104                                                         tsk.verdict = tskConvg.verdict;
105                                                         tsk.localCV_state = tskConvg.localCV_state;
106                                                         tsk.timeStep = tskConvg.timeStep;
107                                                         tsk.lastSave = tskConvg.lastSave;
108
109                                                         // tsk.jaceP2P_Iteration=tskConvg.jaceP2P_Iteration;
110                                                         tsk.recievedVerdict = tskConvg.recievedVerdict;
111                                                 }
112                                         }
113                                 }
114
115                                 System.out.println("Reinit Task");
116                                 tsk.jaceP2P_ReinitTask();
117                                 tsk.postReloading = true;
118                                 tsk.reloading = false;
119                         } else {
120                                 // initialize the task with the appli specific data (Method
121                                 // jaceP2P_InitTask() overloaded in the appli code by the user)
122 //                              System.out.println("Before Init task");
123
124                                 tsk.jaceP2P_InitTask();
125 //                              System.out.println("After Init task");
126                         }
127
128                         if (LocalHost.Instance().getStartedThreads() == false) {
129                                 // create the thread Sender (only 1 Sender thread per node, even
130                                 // if several tasks like in Jace) which is the thread
131                                 // responsible
132                                 // for sending messages stored in JaceBuffer (the queue of
133                                 // messages to send)
134                                 if (JaceDaemon.Instance().getProtocol().equals("rmi")) {
135                                         Sender.setInstance(SenderRmi.Instance());
136                                         SenderRmi.Instance().start();
137
138                                 } else {
139                                         Sender.setInstance(SenderSocket.Instance());
140                                         SenderSocket.Instance().start();
141
142                                 }
143                                 ScanThread.Instance().start();
144                                 LocalHost.Instance().setStartedThreads(true);
145                         }
146
147                         // Create the computing thread (the thread of the Task object) and
148                         // start it
149                         Thread th = new Thread(tsk, new String().valueOf(tsk.jaceMyId));
150                         th.start();
151
152                         // keep this thread in an attribute of JaceSession (taskThread)
153                         JaceSession.Instance().addTaskThread(th);
154
155                         Thread.currentThread().yield();
156
157                 } catch (Exception e) {
158                         System.err.println("Problem in TaskLauncher.loadOrReloadTask() : " + e);
159                         e.printStackTrace(System.out);
160                 }
161         }
162
163 }