1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.future;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.TimeUnit;
25
26 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
27 import org.apache.mina.core.service.IoProcessor;
28 import org.apache.mina.core.session.IoSession;
29 import org.apache.mina.util.ExceptionMonitor;
30
31
32
33
34
35
36
37 public class DefaultIoFuture implements IoFuture {
38
39
40 private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L;
41
42
43 private final IoSession session;
44
45
46 private final Object lock;
47
48
49
50 private IoFutureListener<?> firstListener;
51
52
53 private List<IoFutureListener<?>> otherListeners;
54
55 private Object result;
56
57
58 private boolean ready;
59
60
61 private int waiters;
62
63
64
65
66
67
68 public DefaultIoFuture(IoSession session) {
69 this.session = session;
70 this.lock = this;
71 }
72
73
74
75
76 @Override
77 public IoSession getSession() {
78 return session;
79 }
80
81
82
83
84 @Override
85 @Deprecated
86 public void join() {
87 awaitUninterruptibly();
88 }
89
90
91
92
93 @Override
94 @Deprecated
95 public boolean join(long timeoutMillis) {
96 return awaitUninterruptibly(timeoutMillis);
97 }
98
99
100
101
102 @Override
103 public IoFuture await() throws InterruptedException {
104 synchronized (lock) {
105 while (!ready) {
106 waiters++;
107
108 try {
109
110
111
112 lock.wait(DEAD_LOCK_CHECK_INTERVAL);
113 } finally {
114 waiters--;
115
116 if (!ready) {
117 checkDeadLock();
118 }
119 }
120 }
121 }
122
123 return this;
124 }
125
126
127
128
129 @Override
130 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
131 return await0(unit.toMillis(timeout), true);
132 }
133
134
135
136
137 @Override
138 public boolean await(long timeoutMillis) throws InterruptedException {
139 return await0(timeoutMillis, true);
140 }
141
142
143
144
145 @Override
146 public IoFuture awaitUninterruptibly() {
147 try {
148 await0(Long.MAX_VALUE, false);
149 } catch (InterruptedException ie) {
150
151 }
152
153 return this;
154 }
155
156
157
158
159 @Override
160 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
161 try {
162 return await0(unit.toMillis(timeout), false);
163 } catch (InterruptedException e) {
164 throw new InternalError();
165 }
166 }
167
168
169
170
171 @Override
172 public boolean awaitUninterruptibly(long timeoutMillis) {
173 try {
174 return await0(timeoutMillis, false);
175 } catch (InterruptedException e) {
176 throw new InternalError();
177 }
178 }
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193 private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
194 long endTime = System.currentTimeMillis() + timeoutMillis;
195
196 if (endTime < 0) {
197 endTime = Long.MAX_VALUE;
198 }
199
200 synchronized (lock) {
201
202
203 if (ready||(timeoutMillis <= 0)) {
204 return ready;
205 }
206
207
208 waiters++;
209
210 try {
211 for (;;) {
212 try {
213 long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
214
215
216
217
218 lock.wait(timeOut);
219 } catch (InterruptedException e) {
220 if (interruptable) {
221 throw e;
222 }
223 }
224
225 if (ready || (endTime < System.currentTimeMillis())) {
226 return ready;
227 } else {
228
229 checkDeadLock();
230 }
231 }
232 } finally {
233
234
235
236
237
238 waiters--;
239
240 if (!ready) {
241 checkDeadLock();
242 }
243 }
244 }
245 }
246
247
248
249
250
251 private void checkDeadLock() {
252
253 if (!(this instanceof CloseFuture || this instanceof WriteFuture || this instanceof ReadFuture || this instanceof ConnectFuture)) {
254 return;
255 }
256
257
258
259
260
261
262
263 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
264
265
266 for (StackTraceElement stackElement : stackTrace) {
267 if (AbstractPollingIoProcessor.class.getName().equals(stackElement.getClassName())) {
268 IllegalStateException e = new IllegalStateException("t");
269 e.getStackTrace();
270 throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
271 + ".await() was invoked from an I/O processor thread. " + "Please use "
272 + IoFutureListener.class.getSimpleName() + " or configure a proper thread model alternatively.");
273 }
274 }
275
276
277 for (StackTraceElement s : stackTrace) {
278 try {
279 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
280
281 if (IoProcessor.class.isAssignableFrom(cls)) {
282 throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
283 + ".await() was invoked from an I/O processor thread. " + "Please use "
284 + IoFutureListener.class.getSimpleName()
285 + " or configure a proper thread model alternatively.");
286 }
287 } catch (ClassNotFoundException cnfe) {
288
289 }
290 }
291 }
292
293
294
295
296 @Override
297 public boolean isDone() {
298 synchronized (lock) {
299 return ready;
300 }
301 }
302
303
304
305
306
307
308
309
310 public boolean setValue(Object newValue) {
311 synchronized (lock) {
312
313 if (ready) {
314 return false;
315 }
316
317 result = newValue;
318 ready = true;
319
320
321 if (waiters > 0) {
322 lock.notifyAll();
323 }
324 }
325
326
327 notifyListeners();
328
329 return true;
330 }
331
332
333
334
335 protected Object getValue() {
336 synchronized (lock) {
337 return result;
338 }
339 }
340
341
342
343
344 @Override
345 public IoFuture addListener(IoFutureListener<?> listener) {
346 if (listener == null) {
347 throw new IllegalArgumentException("listener");
348 }
349
350 synchronized (lock) {
351 if (ready) {
352
353
354
355
356 notifyListener(listener);
357 } else {
358 if (firstListener == null) {
359 firstListener = listener;
360 } else {
361 if (otherListeners == null) {
362 otherListeners = new ArrayList<>(1);
363 }
364
365 otherListeners.add(listener);
366 }
367 }
368 }
369
370 return this;
371 }
372
373
374
375
376 @Override
377 public IoFuture removeListener(IoFutureListener<?> listener) {
378 if (listener == null) {
379 throw new IllegalArgumentException("listener");
380 }
381
382 synchronized (lock) {
383 if (!ready) {
384 if (listener == firstListener) {
385 if ((otherListeners != null) && !otherListeners.isEmpty()) {
386 firstListener = otherListeners.remove(0);
387 } else {
388 firstListener = null;
389 }
390 } else if (otherListeners != null) {
391 otherListeners.remove(listener);
392 }
393 }
394 }
395
396 return this;
397 }
398
399
400
401
402 private void notifyListeners() {
403
404
405
406 if (firstListener != null) {
407 notifyListener(firstListener);
408 firstListener = null;
409
410 if (otherListeners != null) {
411 for (IoFutureListener<?> listener : otherListeners) {
412 notifyListener(listener);
413 }
414
415 otherListeners = null;
416 }
417 }
418 }
419
420 @SuppressWarnings("unchecked")
421 private void notifyListener(IoFutureListener listener) {
422 try {
423 listener.operationComplete(this);
424 } catch (Exception e) {
425 ExceptionMonitor.getInstance().exceptionCaught(e);
426 }
427 }
428 }