6
6
import java .net .ConnectException ;
7
7
import java .net .Socket ;
8
8
import java .net .SocketException ;
9
- import java .util .ArrayList ;
9
+ import java .util .Collections ;
10
+ import java .util .HashMap ;
10
11
import java .util .HashSet ;
12
+ import java .util .Set ;
13
+ import java .util .concurrent .ConcurrentHashMap ;
11
14
import java .util .logging .Logger ;
12
15
13
16
import unimelb .bitbox .actions .Action ;
37
40
38
41
public class Client extends Thread {
39
42
private static Logger log = Logger .getLogger (Client .class .getName ());
40
- public static HashSet <Client > establishedClients = new HashSet <Client >( );
43
+ public static Set <Client > establishedClients = Collections . newSetFromMap ( new ConcurrentHashMap <Client , Boolean >() );
41
44
private Socket socket ;
42
45
private String host ;
43
46
private long port ;
44
47
private FileSystemManager fileSystemManager ;
45
48
private boolean isIncomingConnection = false ;
49
+
50
+ private Set <Action > waitingActions ;
46
51
47
- public Client (String host , int port , FileSystemManager fileSystemManager ) {
52
+ public static HashMap <String , String > responseToRequest ;
53
+ public static HashSet <String > validCommandsBeforeConnectionEstablished ;
54
+
55
+ static {
56
+ responseToRequest = new HashMap <>();
57
+
58
+ responseToRequest .put ("FILE_CREATE_RESPONSE" , "FILE_CREATE_REQUEST" );
59
+ responseToRequest .put ("FILE_DELETE_RESPONSE" , "FILE_DELETE_REQUEST" );
60
+ responseToRequest .put ("FILE_MODIFY_RESPONSE" , "FILE_MODIFY_REQUEST" );
61
+ responseToRequest .put ("DIRECTORY_CREATE_RESPONSE" , "DIRECTORY_CREATE_REQUEST" );
62
+ responseToRequest .put ("DIRECTORY_DELETE_RESPONSE" , "DIRECTORY_DELETE_REQUEST" );
63
+ responseToRequest .put ("FILE_BYTES_RESPONSE" , "FILE_BYTES_REQUEST" );
64
+
65
+ validCommandsBeforeConnectionEstablished = new HashSet <>();
66
+
67
+ validCommandsBeforeConnectionEstablished .add ("HANDSHAKE_REQUEST" );
68
+ validCommandsBeforeConnectionEstablished .add ("HANDSHAKE_RESPONSE" );
69
+ validCommandsBeforeConnectionEstablished .add ("CONNECTION_REFUSED" );
70
+ }
71
+
72
+ public Client (String host , int port , FileSystemManager fileSystemManager ) {
73
+ waitingActions = Collections .newSetFromMap (new ConcurrentHashMap <Action , Boolean >());
48
74
this .host = host ;
49
75
this .port = port ;
50
76
this .fileSystemManager = fileSystemManager ;
51
77
try {
52
78
this .socket = new Socket (host , port );
53
79
HandshakeRequest requestAction = new HandshakeRequest (this .socket ,
54
80
Configuration .getConfigurationValue ("advertisedName" ),
55
- Long .parseLong (Configuration .getConfigurationValue ("port" )));
81
+ Long .parseLong (Configuration .getConfigurationValue ("port" )), this );
56
82
requestAction .send ();
57
83
this .start ();
58
84
} catch (ConnectException e ) {
@@ -63,11 +89,12 @@ public Client(String host, int port, FileSystemManager fileSystemManager) {
63
89
}
64
90
65
91
public Client (Socket socket , FileSystemManager fileSystemManager ) {
92
+ waitingActions = Collections .newSetFromMap (new ConcurrentHashMap <Action , Boolean >());
66
93
this .socket = socket ;
67
94
this .fileSystemManager = fileSystemManager ;
68
95
69
96
if (Client .getNumberIncomingEstablishedConnections () == Peer .maximumIncommingConnections ) {
70
- new ConnectionRefused (socket , "connection limit reached" ).send ();
97
+ new ConnectionRefused (socket , "connection limit reached" , this ).send ();
71
98
return ;
72
99
}
73
100
@@ -130,7 +157,44 @@ public void setPort(long port) {
130
157
* @return Boolean indication whether the message is valid
131
158
*/
132
159
private boolean validateRequest (Document message ) {
133
- return SchemaValidator .validateSchema (message );
160
+ if (!SchemaValidator .validateSchema (message )) {
161
+ return false ;
162
+ }
163
+
164
+ String command = message .getString ("command" );
165
+
166
+ if (!Client .establishedClients .contains (this )) {
167
+ if (!validCommandsBeforeConnectionEstablished .contains (command )) {
168
+ return false ;
169
+ }
170
+
171
+ if (command == "HANDSHAKE_RESPONSE" || command == "CONNECTION_REFUSED" ) {
172
+ return checkIfExpectingResponse (message );
173
+ }
174
+ } else {
175
+ if (responseToRequest .containsKey (command )) {
176
+ return checkIfExpectingResponse (message );
177
+ } else if (!responseToRequest .containsValue (command )) {
178
+ return false ;
179
+ }
180
+ }
181
+
182
+ return true ;
183
+ }
184
+
185
+ private boolean checkIfExpectingResponse (Document message ) {
186
+ for (Action action : waitingActions ) {
187
+ if (action .compare (message )) {
188
+ waitingActions .remove (action );
189
+ return true ;
190
+ }
191
+ }
192
+
193
+ return false ;
194
+ }
195
+
196
+ public void addToWaitingActions (Action action ) {
197
+ waitingActions .add (action );
134
198
}
135
199
136
200
/**
@@ -143,17 +207,17 @@ public void processEvent(FileSystemEvent fileSystemEvent) {
143
207
144
208
if (fileSystemEvent .event == EVENT .FILE_CREATE ) {
145
209
action = new FileCreateRequest (socket , new FileDescriptor (fileSystemEvent .fileDescriptor ),
146
- fileSystemEvent .pathName );
210
+ fileSystemEvent .pathName , this );
147
211
} else if (fileSystemEvent .event == EVENT .FILE_DELETE ) {
148
212
action = new FileDeleteRequest (socket , new FileDescriptor (fileSystemEvent .fileDescriptor ),
149
- fileSystemEvent .pathName );
213
+ fileSystemEvent .pathName , this );
150
214
} else if (fileSystemEvent .event == EVENT .FILE_MODIFY ) {
151
215
action = new FileModifyRequest (socket , new FileDescriptor (fileSystemEvent .fileDescriptor ),
152
- fileSystemEvent .pathName );
216
+ fileSystemEvent .pathName , this );
153
217
} else if (fileSystemEvent .event == EVENT .DIRECTORY_CREATE ) {
154
- action = new DirectoryCreateRequest (socket , fileSystemEvent .pathName );
218
+ action = new DirectoryCreateRequest (socket , fileSystemEvent .pathName , this );
155
219
} else if (fileSystemEvent .event == EVENT .DIRECTORY_DELETE ) {
156
- action = new DirectoryDeleteRequest (socket , fileSystemEvent .pathName );
220
+ action = new DirectoryDeleteRequest (socket , fileSystemEvent .pathName , this );
157
221
}
158
222
159
223
action .send ();
@@ -163,44 +227,44 @@ public void processEvent(FileSystemEvent fileSystemEvent) {
163
227
* Return an appropriate action for the received message
164
228
*
165
229
* @param message The received message
166
- * @return An action corresponding to the revceived message
230
+ * @return An action corresponding to the received message
167
231
*/
168
232
private Action getAction (Document message ) {
169
233
Action action = null ;
170
234
String command = message .getString ("command" );
171
235
172
236
if (command .equals ("INVALID_PROTOCOL" )) {
173
- action = new InvalidProtocol (socket , message );
237
+ action = new InvalidProtocol (socket , message , this );
174
238
} else if (command .equals ("CONNECTION_REFUSED" )) {
175
- action = new ConnectionRefused (socket , message );
239
+ action = new ConnectionRefused (socket , message , this );
176
240
} else if (command .equals ("HANDSHAKE_REQUEST" )) {
177
241
action = new HandshakeRequest (socket , message , this );
178
242
} else if (command .equals ("HANDSHAKE_RESPONSE" )) {
179
243
action = new HandshakeResponse (socket , message , this );
180
244
} else if (command .equals ("FILE_CREATE_REQUEST" )) {
181
- action = new FileCreateRequest (socket , message );
245
+ action = new FileCreateRequest (socket , message , this );
182
246
} else if (command .equals ("FILE_CREATE_RESPONSE" )) {
183
- action = new FileCreateResponse (socket , message );
247
+ action = new FileCreateResponse (socket , message , this );
184
248
} else if (command .equals ("FILE_DELETE_REQUEST" )) {
185
- action = new FileDeleteRequest (socket , message );
249
+ action = new FileDeleteRequest (socket , message , this );
186
250
} else if (command .equals ("FILE_DELETE_RESPONSE" )) {
187
- action = new FileDeleteResponse (socket , message );
251
+ action = new FileDeleteResponse (socket , message , this );
188
252
} else if (command .equals ("FILE_MODIFY_REQUEST" )) {
189
- action = new FileModifyRequest (socket , message );
253
+ action = new FileModifyRequest (socket , message , this );
190
254
} else if (command .equals ("FILE_MODIFY_RESPONSE" )) {
191
- action = new FileModifyResponse (socket , message );
255
+ action = new FileModifyResponse (socket , message , this );
192
256
} else if (command .equals ("DIRECTORY_CREATE_REQUEST" )) {
193
- action = new DirectoryCreateRequest (socket , message );
257
+ action = new DirectoryCreateRequest (socket , message , this );
194
258
} else if (command .equals ("DIRECTORY_CREATE_RESPONSE" )) {
195
- action = new DirectoryCreateResponse (socket , message );
259
+ action = new DirectoryCreateResponse (socket , message , this );
196
260
} else if (command .equals ("DIRECTORY_DELETE_REQUEST" )) {
197
- action = new DirectoryDeleteRequest (socket , message );
261
+ action = new DirectoryDeleteRequest (socket , message , this );
198
262
} else if (command .equals ("DIRECTORY_DELETE_RESPONSE" )) {
199
- action = new DirectoryDeleteResponse (socket , message );
263
+ action = new DirectoryDeleteResponse (socket , message , this );
200
264
} else if (command .equals ("FILE_BYTES_REQUEST" )) {
201
- action = new FileBytesRequest (socket , message );
265
+ action = new FileBytesRequest (socket , message , this );
202
266
} else if (command .equals ("FILE_BYTES_RESPONSE" )) {
203
- action = new FileBytesResponse (socket , message );
267
+ action = new FileBytesResponse (socket , message , this );
204
268
}
205
269
206
270
return action ;
@@ -212,13 +276,17 @@ public void run() {
212
276
213
277
String inputLine ;
214
278
while ((inputLine = in .readLine ()) != null ) {
215
- System .out .println (inputLine );
216
-
279
+ log .info ("Received from " + this .host + ":" + this .port + ": " + inputLine );
280
+
281
+
217
282
Document message = Document .parse (inputLine );
218
283
219
284
if (validateRequest (message )) {
220
285
Action action = getAction (message );
221
286
action .execute (fileSystemManager );
287
+ } else {
288
+ Action invalid = new InvalidProtocol (socket , "Could not validate message:" + message .toJson (), this );
289
+ invalid .send ();
222
290
}
223
291
}
224
292
0 commit comments