Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Correction of some bugs and performance enhancement.
[jaceP2P.git] / src / jaceP2P / JaceBuffer.java
1 package jaceP2P;
2
3 import java.util.ArrayList;
4
5 public class JaceBuffer {
6
7         // attributes
8         int id;
9         static int nb = 0;
10         private ArrayList<Message> liste;
11         boolean msgConsumed; // attribut utiliser ds JaceSender pr savoir si msg
12                                                         // enlever de la liste de JaceBuffer
13         long time;
14         boolean stopGet = false;
15
16         // constructors
17
18         public JaceBuffer() {
19                 /* liste=new Vector(); */
20                 liste = new ArrayList<Message>();
21                 msgConsumed = false;
22                 time = 0;
23                 id = nb;
24                 nb++;
25                 System.out.println("new JaceBuffer .... id=" + id + " .....");
26         }
27
28         // methods
29         public void purge() {
30
31                 liste.clear();
32         }
33
34         // retourne l'index d'un Message de meme tag ET meme destinataire ET meme
35         // envoyeur que "msg"
36         private synchronized int exist(Message msg) {
37                 int existe = -1;
38
39                 int index = 0;
40                 while ((existe == -1) && (index < liste.size())) {
41
42                         if (msg.getReceiver().getRank() == (((Message) liste.get(index))
43                                         .getReceiver().getRank())) {
44                                 for (int i = 0; i < liste.size(); i++)
45                                         // System.out.println("exist id="+id+" element "+i+" to "+((Message)liste.get(i)).getReceiver().getRank());
46                                         existe = index;
47                         } else
48                                 index++;
49                 }
50
51                 return existe;
52         }
53
54         public synchronized void add(Message msg) {
55                 int is = -1;
56
57                 synchronized (liste) {
58                         is = exist(msg);
59
60                         // si existe deja 1 Message de meme tag ET meme envoyeur ET meme
61                         // destinataire, on l'ecrase
62                         if (is != -1) {
63                                 liste.set(is, msg);
64
65                                 // System.out.println("id="+id+" remplacer un message a la place "+is+" ds le buffer pour "+msg.getReceiver().getRank()+" liste size "+liste.size());
66
67                         } else {
68                                 // si existe pas de Message de meme tag ET meme envoyeur ET meme
69                                 // destinataire, on l'ajoute
70                                 liste.add(msg);
71                                 // System.out.println("id="+id+" ajouter un message au buffer pour "+msg.getReceiver().getRank()+" liste size "+liste.size());
72
73                         }
74                 }
75                 try {
76                         // notifyAll();
77                         synchronized (this) {
78                                 notify();
79                         }
80
81                 } catch (Exception e) {
82                         System.out.println("error notifying Sender :" + e);
83                 }
84
85         }
86
87         public synchronized Message getMessageAt(int index) {
88                 // System.out.println("size = " + liste.size());
89                 return (Message) liste.get(index);
90         }
91
92         public synchronized Message get() {
93                 msgConsumed = false;
94
95                 // tant que aucun message, j'attend
96                 while (liste.isEmpty() && stopGet == false) {
97                         try {
98                                 // System.out.println("BUFFER : rien, j'attend, liste vide "+liste.isEmpty());
99                                 wait();
100                         } catch (Exception e) {
101                         }
102                         ;
103                 }
104                 // System.out.println("Took message from Buffer");
105
106                 Message tmp = null;
107                 synchronized (liste) {
108
109                         try {
110                                 tmp = (Message) ((Message) liste.get(0)).clone();
111                                 liste.remove(0);
112                                 // System.out.println("id="+id+" get message du buffer pour "+tmp.getReceiver().getRank()+" liste size "+liste.size());
113                         } catch (Exception e) {
114                                 System.out.println("unable to get message :" + e);
115                         }
116
117                 }
118                 msgConsumed = true;
119                 time = System.currentTimeMillis();
120                 return tmp;
121         }
122
123         public synchronized void viewAll() {
124                 if (liste.isEmpty()) {
125                         // System.out.println("pas de msg a envoyer");
126                 } else {
127                         Message msg;
128                         TaskId sender, dest;
129                         System.out.print("id=" + id + " nb msg ds JaceBuffer : "
130                                         + liste.size());
131                         for (int i = 0; i < liste.size(); i++) {
132                                 msg = (Message) liste.get(i);
133                                 sender = msg.getSender();
134                                 dest = msg.getReceiver();
135                                 System.out.print("\nmsg " + i + " : tag = " + msg.getTag()
136                                                 + ", src : " + sender.getRank() + " "
137                                                 + sender.getHostIP() + ", dest : " + dest.getRank()
138                                                 + " " + dest.getHostIP() + ", data = " + msg.getData()
139                                                 + "\n");
140
141                         }
142                 }
143         }
144
145         public synchronized int getSize() {
146                 return liste.size();
147         }
148
149 }