View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.future;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
27  import org.apache.mina.core.service.IoProcessor;
28  import org.apache.mina.core.session.IoSession;
29  import org.apache.mina.util.ExceptionMonitor;
30  
31  /**
32   * A default implementation of {@link IoFuture} associated with
33   * an {@link IoSession}.
34   * 
35   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
36   */
37  public class DefaultIoFuture implements IoFuture {
38  
39      /** A number of milliseconds to wait between two deadlock controls ( 5 seconds ) */
40      private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L;
41  
42      /** The associated session */
43      private final IoSession session;
44  
45      /** A lock used by the wait() method */
46      private final Object lock;
47  
48      /** The first listener. This is easier to have this variable
49       * when we most of the time have one single listener */
50      private IoFutureListener<?> firstListener;
51  
52      /** All the other listeners, in case we have more than one */
53      private List<IoFutureListener<?>> otherListeners;
54  
55      private Object result;
56  
57      /** The flag used to determinate if the Future is completed or not */
58      private boolean ready;
59  
60      /** A counter for the number of threads waiting on this future */
61      private int waiters;
62  
63      /**
64       * Creates a new instance associated with an {@link IoSession}.
65       *
66       * @param session an {@link IoSession} which is associated with this future
67       */
68      public DefaultIoFuture(IoSession session) {
69          this.session = session;
70          this.lock = this;
71      }
72  
73      /**
74       * {@inheritDoc}
75       */
76      @Override
77      public IoSession getSession() {
78          return session;
79      }
80  
81      /**
82       * @deprecated Replaced with {@link #awaitUninterruptibly()}.
83       */
84      @Override
85      @Deprecated
86      public void join() {
87          awaitUninterruptibly();
88      }
89  
90      /**
91       * @deprecated Replaced with {@link #awaitUninterruptibly(long)}.
92       */
93      @Override
94      @Deprecated
95      public boolean join(long timeoutMillis) {
96          return awaitUninterruptibly(timeoutMillis);
97      }
98  
99      /**
100      * {@inheritDoc}
101      */
102     @Override
103     public IoFuture await() throws InterruptedException {
104         synchronized (lock) {
105             while (!ready) {
106                 waiters++;
107                 
108                 try {
109                     // Wait for a notify, or if no notify is called,
110                     // assume that we have a deadlock and exit the
111                     // loop to check for a potential deadlock.
112                     lock.wait(DEAD_LOCK_CHECK_INTERVAL);
113                 } finally {
114                     waiters--;
115                     
116                     if (!ready) {
117                         checkDeadLock();
118                     }
119                 }
120             }
121         }
122         
123         return this;
124     }
125 
126     /**
127      * {@inheritDoc}
128      */
129     @Override
130     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
131         return await0(unit.toMillis(timeout), true);
132     }
133 
134     /**
135      * {@inheritDoc}
136      */
137     @Override
138     public boolean await(long timeoutMillis) throws InterruptedException {
139         return await0(timeoutMillis, true);
140     }
141 
142     /**
143      * {@inheritDoc}
144      */
145     @Override
146     public IoFuture awaitUninterruptibly() {
147         try {
148             await0(Long.MAX_VALUE, false);
149         } catch (InterruptedException ie) {
150             // Do nothing : this catch is just mandatory by contract
151         }
152 
153         return this;
154     }
155 
156     /**
157      * {@inheritDoc}
158      */
159     @Override
160     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
161         try {
162             return await0(unit.toMillis(timeout), false);
163         } catch (InterruptedException e) {
164             throw new InternalError();
165         }
166     }
167 
168     /**
169      * {@inheritDoc}
170      */
171     @Override
172     public boolean awaitUninterruptibly(long timeoutMillis) {
173         try {
174             return await0(timeoutMillis, false);
175         } catch (InterruptedException e) {
176             throw new InternalError();
177         }
178     }
179 
180     /**
181      * Wait for the Future to be ready. If the requested delay is 0 or
182      * negative, this method immediately returns the value of the
183      * 'ready' flag.
184      * Every 5 second, the wait will be suspended to be able to check if
185      * there is a deadlock or not.
186      * 
187      * @param timeoutMillis The delay we will wait for the Future to be ready
188      * @param interruptable Tells if the wait can be interrupted or not
189      * @return <tt>true</tt> if the Future is ready
190      * @throws InterruptedException If the thread has been interrupted
191      * when it's not allowed.
192      */
193     private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
194         long endTime = System.currentTimeMillis() + timeoutMillis;
195 
196         if (endTime < 0) {
197             endTime = Long.MAX_VALUE;
198         }
199 
200         synchronized (lock) {
201             // We can quit if the ready flag is set to true, or if
202             // the timeout is set to 0 or below : we don't wait in this case.
203             if (ready||(timeoutMillis <= 0)) {
204                 return ready;
205             }
206 
207             // The operation is not completed : we have to wait
208             waiters++;
209 
210             try {
211                 for (;;) {
212                     try {
213                         long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
214                         
215                         // Wait for the requested period of time,
216                         // but every DEAD_LOCK_CHECK_INTERVAL seconds, we will
217                         // check that we aren't blocked.
218                         lock.wait(timeOut);
219                     } catch (InterruptedException e) {
220                         if (interruptable) {
221                             throw e;
222                         }
223                     }
224 
225                     if (ready || (endTime < System.currentTimeMillis())) {
226                         return ready;
227                     } else {
228                         // Take a chance, detect a potential deadlock
229                         checkDeadLock();
230                     }
231                 }
232             } finally {
233                 // We get here for 3 possible reasons :
234                 // 1) We have been notified (the operation has completed a way or another)
235                 // 2) We have reached the timeout
236                 // 3) The thread has been interrupted
237                 // In any case, we decrement the number of waiters, and we get out.
238                 waiters--;
239                 
240                 if (!ready) {
241                     checkDeadLock();
242                 }
243             }
244         }
245     }
246 
247     /**
248      * Check for a deadlock, ie look into the stack trace that we don't have already an 
249      * instance of the caller.
250      */
251     private void checkDeadLock() {
252         // Only read / write / connect / write future can cause dead lock.
253         if (!(this instanceof CloseFuture || this instanceof WriteFuture || this instanceof ReadFuture || this instanceof ConnectFuture)) {
254             return;
255         }
256 
257         // Get the current thread stackTrace.
258         // Using Thread.currentThread().getStackTrace() is the best solution,
259         // even if slightly less efficient than doing a new Exception().getStackTrace(),
260         // as internally, it does exactly the same thing. The advantage of using
261         // this solution is that we may benefit some improvement with some
262         // future versions of Java.
263         StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
264 
265         // Simple and quick check.
266         for (StackTraceElement stackElement : stackTrace) {
267             if (AbstractPollingIoProcessor.class.getName().equals(stackElement.getClassName())) {
268                 IllegalStateException e = new IllegalStateException("t");
269                 e.getStackTrace();
270                 throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
271                         + ".await() was invoked from an I/O processor thread.  " + "Please use "
272                         + IoFutureListener.class.getSimpleName() + " or configure a proper thread model alternatively.");
273             }
274         }
275 
276         // And then more precisely.
277         for (StackTraceElement s : stackTrace) {
278             try {
279                 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
280                 
281                 if (IoProcessor.class.isAssignableFrom(cls)) {
282                     throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
283                             + ".await() was invoked from an I/O processor thread.  " + "Please use "
284                             + IoFutureListener.class.getSimpleName()
285                             + " or configure a proper thread model alternatively.");
286                 }
287             } catch (ClassNotFoundException cnfe) {
288                 // Ignore
289             }
290         }
291     }
292 
293     /**
294      * {@inheritDoc}
295      */
296     @Override
297     public boolean isDone() {
298         synchronized (lock) {
299             return ready;
300         }
301     }
302 
303     /**
304      * Sets the result of the asynchronous operation, and mark it as finished.
305      * 
306      * @param newValue The result to store into the Future
307      * @return {@code true} if the value has been set, {@code false} if
308      * the future already has a value (thus is in ready state)
309      */
310     public boolean setValue(Object newValue) {
311         synchronized (lock) {
312             // Allowed only once.
313             if (ready) {
314                 return false;
315             }
316 
317             result = newValue;
318             ready = true;
319             
320             // Now, if we have waiters, notify them that the operation has completed
321             if (waiters > 0) {
322                 lock.notifyAll();
323             }
324         }
325 
326         // Last, not least, inform the listeners
327         notifyListeners();
328         
329         return true;
330     }
331 
332     /**
333      * @return the result of the asynchronous operation.
334      */
335     protected Object getValue() {
336         synchronized (lock) {
337             return result;
338         }
339     }
340 
341     /**
342      * {@inheritDoc}
343      */
344     @Override
345     public IoFuture addListener(IoFutureListener<?> listener) {
346         if (listener == null) {
347             throw new IllegalArgumentException("listener");
348         }
349 
350         synchronized (lock) {
351             if (ready) {
352                 // Shortcut : if the operation has completed, no need to 
353                 // add a new listener, we just have to notify it. The existing
354                 // listeners have already been notified anyway, when the 
355                 // 'ready' flag has been set.
356                 notifyListener(listener);
357             } else {
358                 if (firstListener == null) {
359                     firstListener = listener;
360                 } else {
361                     if (otherListeners == null) {
362                         otherListeners = new ArrayList<>(1);
363                     }
364                     
365                     otherListeners.add(listener);
366                 }
367             }
368         }
369         
370         return this;
371     }
372 
373     /**
374      * {@inheritDoc}
375      */
376     @Override
377     public IoFuture removeListener(IoFutureListener<?> listener) {
378         if (listener == null) {
379             throw new IllegalArgumentException("listener");
380         }
381 
382         synchronized (lock) {
383             if (!ready) {
384                 if (listener == firstListener) {
385                     if ((otherListeners != null) && !otherListeners.isEmpty()) {
386                         firstListener = otherListeners.remove(0);
387                     } else {
388                         firstListener = null;
389                     }
390                 } else if (otherListeners != null) {
391                     otherListeners.remove(listener);
392                 }
393             }
394         }
395 
396         return this;
397     }
398 
399     /**
400      * Notify the listeners, if we have some.
401      */
402     private void notifyListeners() {
403         // There won't be any visibility problem or concurrent modification
404         // because 'ready' flag will be checked against both addListener and
405         // removeListener calls.
406         if (firstListener != null) {
407             notifyListener(firstListener);
408             firstListener = null;
409 
410             if (otherListeners != null) {
411                 for (IoFutureListener<?> listener : otherListeners) {
412                     notifyListener(listener);
413                 }
414                 
415                 otherListeners = null;
416             }
417         }
418     }
419 
420     @SuppressWarnings("unchecked")
421     private void notifyListener(IoFutureListener listener) {
422         try {
423             listener.operationComplete(this);
424         } catch (Exception e) {
425             ExceptionMonitor.getInstance().exceptionCaught(e);
426         }
427     }
428 }