View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.util.AbstractSet;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.mina.core.IoUtil;
34  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
35  import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
36  import org.apache.mina.core.filterchain.IoFilterChainBuilder;
37  import org.apache.mina.core.future.ConnectFuture;
38  import org.apache.mina.core.future.DefaultIoFuture;
39  import org.apache.mina.core.future.IoFuture;
40  import org.apache.mina.core.future.WriteFuture;
41  import org.apache.mina.core.session.AbstractIoSession;
42  import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
43  import org.apache.mina.core.session.IdleStatus;
44  import org.apache.mina.core.session.IoSession;
45  import org.apache.mina.core.session.IoSessionConfig;
46  import org.apache.mina.core.session.IoSessionDataStructureFactory;
47  import org.apache.mina.core.session.IoSessionInitializationException;
48  import org.apache.mina.core.session.IoSessionInitializer;
49  import org.apache.mina.util.ExceptionMonitor;
50  import org.apache.mina.util.NamePreservingRunnable;
51  import org.slf4j.Logger;
52  import org.slf4j.LoggerFactory;
53  
54  /**
55   * Base implementation of {@link IoService}s.
56   * 
57   * An instance of IoService contains an Executor which will handle the incoming
58   * events.
59   *
60   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
61   */
62  public abstract class AbstractIoService implements IoService {
63  
64      protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class);
65  
66      /**
67       * The unique number identifying the Service. It's incremented
68       * for each new IoService created.
69       */
70      private static final AtomicInteger id = new AtomicInteger();
71  
72      /**
73       * The thread name built from the IoService inherited
74       * instance class name and the IoService Id
75       **/
76      private final String threadName;
77  
78      /**
79       * The associated executor, responsible for handling execution of I/O events.
80       */
81      private final Executor executor;
82  
83      /**
84       * A flag used to indicate that the local executor has been created
85       * inside this instance, and not passed by a caller.
86       * 
87       * If the executor is locally created, then it will be an instance
88       * of the ThreadPoolExecutor class.
89       */
90      private final boolean createdExecutor;
91  
92      /**
93       * The IoHandler in charge of managing all the I/O Events. It is
94       */
95      private IoHandler handler;
96  
97      /**
98       * The default {@link IoSessionConfig} which will be used to configure new sessions.
99       */
100     protected final IoSessionConfig sessionConfig;
101 
102     private final IoServiceListenererviceListener">IoServiceListener serviceActivationListener = new IoServiceListener() {
103         IoServiceStatistics serviceStats;
104 
105         /**
106          * {@inheritDoc}
107          */
108         @Override
109         public void serviceActivated(IoService service) {
110             // Update lastIoTime.
111             serviceStats = service.getStatistics();
112             serviceStats.setLastReadTime(service.getActivationTime());
113             serviceStats.setLastWriteTime(service.getActivationTime());
114             serviceStats.setLastThroughputCalculationTime(service.getActivationTime());
115         }
116 
117         /**
118          * {@inheritDoc}
119          */
120         @Override
121         public void serviceDeactivated(IoService service) throws Exception {
122             // Empty handler
123         }
124 
125         /**
126          * {@inheritDoc}
127          */
128         @Override
129         public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {
130             // Empty handler
131         }
132 
133         /**
134          * {@inheritDoc}
135          */
136         @Override
137         public void sessionCreated(IoSession session) throws Exception {
138             // Empty handler
139         }
140 
141         /**
142          * {@inheritDoc}
143          */
144         @Override
145         public void sessionClosed(IoSession session) throws Exception {
146             // Empty handler
147         }
148 
149         /**
150          * {@inheritDoc}
151          */
152         @Override
153         public void sessionDestroyed(IoSession session) throws Exception {
154             // Empty handler
155         }
156     };
157 
158     /**
159      * Current filter chain builder.
160      */
161     private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
162 
163     private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
164 
165     /**
166      * Maintains the {@link IoServiceListener}s of this service.
167      */
168     private final IoServiceListenerSupport listeners;
169 
170     /**
171      * A lock object which must be acquired when related resources are
172      * destroyed.
173      */
174     protected final Object disposalLock = new Object();
175 
176     private volatile boolean disposing;
177 
178     private volatile boolean disposed;
179 
180     private IoServiceStatisticseStatistics.html#IoServiceStatistics">IoServiceStatistics stats = new IoServiceStatistics(this);
181 
182     /**
183      * Constructor for {@link AbstractIoService}. You need to provide a default
184      * session configuration and an {@link Executor} for handling I/O events. If
185      * a null {@link Executor} is provided, a default one will be created using
186      * {@link Executors#newCachedThreadPool()}.
187      * 
188      * @param sessionConfig
189      *            the default configuration for the managed {@link IoSession}
190      * @param executor
191      *            the {@link Executor} used for handling execution of I/O
192      *            events. Can be <code>null</code>.
193      */
194     protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
195         if (sessionConfig == null) {
196             throw new IllegalArgumentException("sessionConfig");
197         }
198 
199         if (getTransportMetadata() == null) {
200             throw new IllegalArgumentException("TransportMetadata");
201         }
202 
203         if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
204             throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
205                     + getTransportMetadata().getSessionConfigType() + ")");
206         }
207 
208         // Create the listeners, and add a first listener : a activation listener
209         // for this service, which will give information on the service state.
210         listeners = new IoServiceListenerSupport(this);
211         listeners.add(serviceActivationListener);
212 
213         // Stores the given session configuration
214         this.sessionConfig = sessionConfig;
215 
216         // Make JVM load the exception monitor before some transports
217         // change the thread context class loader.
218         ExceptionMonitor.getInstance();
219 
220         if (executor == null) {
221             this.executor = Executors.newCachedThreadPool();
222             createdExecutor = true;
223         } else {
224             this.executor = executor;
225             createdExecutor = false;
226         }
227 
228         threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
229     }
230 
231     /**
232      * {@inheritDoc}
233      */
234     @Override
235     public final IoFilterChainBuilder getFilterChainBuilder() {
236         return filterChainBuilder;
237     }
238 
239     /**
240      * {@inheritDoc}
241      */
242     @Override
243     public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
244         if (builder == null) {
245             filterChainBuilder = new DefaultIoFilterChainBuilder();
246         } else {
247             filterChainBuilder = builder;
248         }
249     }
250 
251     /**
252      * {@inheritDoc}
253      */
254     @Override
255     public final DefaultIoFilterChainBuilder getFilterChain() {
256         if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
257             return (DefaultIoFilterChainBuilder) filterChainBuilder;
258         }
259 
260         throw new IllegalStateException("Current filter chain builder is not a DefaultIoFilterChainBuilder.");
261     }
262 
263     /**
264      * {@inheritDoc}
265      */
266     @Override
267     public final void addListener(IoServiceListener listener) {
268         listeners.add(listener);
269     }
270 
271     /**
272      * {@inheritDoc}
273      */
274     @Override
275     public final void removeListener(IoServiceListener listener) {
276         listeners.remove(listener);
277     }
278 
279     /**
280      * {@inheritDoc}
281      */
282     @Override
283     public final boolean isActive() {
284         return listeners.isActive();
285     }
286 
287     /**
288      * {@inheritDoc}
289      */
290     @Override
291     public final boolean isDisposing() {
292         return disposing;
293     }
294 
295     /**
296      * {@inheritDoc}
297      */
298     @Override
299     public final boolean isDisposed() {
300         return disposed;
301     }
302 
303     /**
304      * {@inheritDoc}
305      */
306     @Override
307     public final void dispose() {
308         dispose(false);
309     }
310 
311     /**
312      * {@inheritDoc}
313      */
314     @Override
315     public final void dispose(boolean awaitTermination) {
316         if (disposed) {
317             return;
318         }
319 
320         synchronized (disposalLock) {
321             if (!disposing) {
322                 disposing = true;
323 
324                 try {
325                     dispose0();
326                 } catch (Exception e) {
327                     ExceptionMonitor.getInstance().exceptionCaught(e);
328                 }
329             }
330         }
331 
332         if (createdExecutor) {
333             ExecutorService e = (ExecutorService) executor;
334             e.shutdownNow();
335             if (awaitTermination) {
336 
337                 try {
338                     if (LOGGER.isDebugEnabled()) {
339                         LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
340                     }
341                     
342                     e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
343                     
344                     if (LOGGER.isDebugEnabled()) {
345                         LOGGER.debug("awaitTermination on {} finished", this);
346                     }
347                 } catch (InterruptedException e1) {
348                     LOGGER.warn("awaitTermination on [{}] was interrupted", this);
349                     // Restore the interrupted status
350                     Thread.currentThread().interrupt();
351                 }
352             }
353         }
354         disposed = true;
355     }
356 
357     /**
358      * Implement this method to release any acquired resources.  This method
359      * is invoked only once by {@link #dispose()}.
360      * 
361      * @throws Exception If the dispose failed
362      */
363     protected abstract void dispose0() throws Exception;
364 
365     /**
366      * {@inheritDoc}
367      */
368     @Override
369     public final Map<Long, IoSession> getManagedSessions() {
370         return listeners.getManagedSessions();
371     }
372 
373     /**
374      * {@inheritDoc}
375      */
376     @Override
377     public final int getManagedSessionCount() {
378         return listeners.getManagedSessionCount();
379     }
380 
381     /**
382      * {@inheritDoc}
383      */
384     @Override
385     public final IoHandler getHandler() {
386         return handler;
387     }
388 
389     /**
390      * {@inheritDoc}
391      */
392     @Override
393     public final void setHandler(IoHandler handler) {
394         if (handler == null) {
395             throw new IllegalArgumentException("handler cannot be null");
396         }
397 
398         if (isActive()) {
399             throw new IllegalStateException("handler cannot be set while the service is active.");
400         }
401 
402         this.handler = handler;
403     }
404 
405     /**
406      * {@inheritDoc}
407      */
408     @Override
409     public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
410         return sessionDataStructureFactory;
411     }
412 
413     /**
414      * {@inheritDoc}
415      */
416     @Override
417     public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) {
418         if (sessionDataStructureFactory == null) {
419             throw new IllegalArgumentException("sessionDataStructureFactory");
420         }
421 
422         if (isActive()) {
423             throw new IllegalStateException("sessionDataStructureFactory cannot be set while the service is active.");
424         }
425 
426         this.sessionDataStructureFactory = sessionDataStructureFactory;
427     }
428 
429     /**
430      * {@inheritDoc}
431      */
432     @Override
433     public IoServiceStatistics getStatistics() {
434         return stats;
435     }
436 
437     /**
438      * {@inheritDoc}
439      */
440     @Override
441     public final long getActivationTime() {
442         return listeners.getActivationTime();
443     }
444 
445     /**
446      * {@inheritDoc}
447      */
448     @Override
449     public final Set<WriteFuture> broadcast(Object message) {
450         // Convert to Set.  We do not return a List here because only the
451         // direct caller of MessageBroadcaster knows the order of write
452         // operations.
453         final List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values());
454         return new AbstractSet<WriteFuture>() {
455             @Override
456             public Iterator<WriteFuture> iterator() {
457                 return futures.iterator();
458             }
459 
460             @Override
461             public int size() {
462                 return futures.size();
463             }
464         };
465     }
466 
467     /**
468      * @return The {@link IoServiceListenerSupport} attached to this service
469      */
470     public final IoServiceListenerSupport getListeners() {
471         return listeners;
472     }
473 
474     protected final void executeWorker(Runnable worker) {
475         executeWorker(worker, null);
476     }
477 
478     protected final void executeWorker(Runnable worker, String suffix) {
479         String actualThreadName = threadName;
480         if (suffix != null) {
481             actualThreadName = actualThreadName + '-' + suffix;
482         }
483         executor.execute(new NamePreservingRunnable(worker, actualThreadName));
484     }
485 
486     protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
487         // Update lastIoTime if needed.
488         if (stats.getLastReadTime() == 0) {
489             stats.setLastReadTime(getActivationTime());
490         }
491 
492         if (stats.getLastWriteTime() == 0) {
493             stats.setLastWriteTime(getActivationTime());
494         }
495 
496         // Every property but attributeMap should be set now.
497         // Now initialize the attributeMap.  The reason why we initialize
498         // the attributeMap at last is to make sure all session properties
499         // such as remoteAddress are provided to IoSessionDataStructureFactory.
500         try {
501             ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
502                     .getAttributeMap(session));
503         } catch (IoSessionInitializationException e) {
504             throw e;
505         } catch (Exception e) {
506             throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
507         }
508 
509         try {
510             ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
511                     .getWriteRequestQueue(session));
512         } catch (IoSessionInitializationException e) {
513             throw e;
514         } catch (Exception e) {
515             throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
516         }
517 
518         if ((future != null) && (future instanceof ConnectFuture)) {
519             // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
520             session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
521         }
522 
523         if (sessionInitializer != null) {
524             sessionInitializer.initializeSession(session, future);
525         }
526 
527         finishSessionInitialization0(session, future);
528     }
529 
530     /**
531      * Implement this method to perform additional tasks required for session
532      * initialization. Do not call this method directly;
533      * {@link #initSession(IoSession, IoFuture, IoSessionInitializer)} will call
534      * this method instead.
535      * 
536      * @param session The session to initialize
537      * @param future The Future to use
538      * 
539      */
540     protected void finishSessionInitialization0(IoSession session, IoFuture future) {
541         // Do nothing. Extended class might add some specific code
542     }
543 
544     /**
545      * A  {@link IoFuture} dedicated class for 
546      *
547      */
548     protected static class ServiceOperationFuture extends DefaultIoFuture {
549         public ServiceOperationFuture() {
550             super(null);
551         }
552 
553         /**
554          * {@inheritDoc}
555          */
556         @Override
557         public final boolean isDone() {
558             return getValue() == Boolean.TRUE;
559         }
560 
561         public final void setDone() {
562             setValue(Boolean.TRUE);
563         }
564 
565         public final Exception getException() {
566             if (getValue() instanceof Exception) {
567                 return (Exception) getValue();
568             }
569 
570             return null;
571         }
572 
573         public final void setException(Exception exception) {
574             if (exception == null) {
575                 throw new IllegalArgumentException("exception");
576             }
577             
578             setValue(exception);
579         }
580     }
581 
582     /**
583      * {@inheritDoc}
584      */
585     @Override
586     public int getScheduledWriteBytes() {
587         return stats.getScheduledWriteBytes();
588     }
589 
590     /**
591      * {@inheritDoc}
592      */
593     @Override
594     public int getScheduledWriteMessages() {
595         return stats.getScheduledWriteMessages();
596     }
597 }