1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import java.util.AbstractSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.mina.core.IoUtil;
34 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
35 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
36 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
37 import org.apache.mina.core.future.ConnectFuture;
38 import org.apache.mina.core.future.DefaultIoFuture;
39 import org.apache.mina.core.future.IoFuture;
40 import org.apache.mina.core.future.WriteFuture;
41 import org.apache.mina.core.session.AbstractIoSession;
42 import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
43 import org.apache.mina.core.session.IdleStatus;
44 import org.apache.mina.core.session.IoSession;
45 import org.apache.mina.core.session.IoSessionConfig;
46 import org.apache.mina.core.session.IoSessionDataStructureFactory;
47 import org.apache.mina.core.session.IoSessionInitializationException;
48 import org.apache.mina.core.session.IoSessionInitializer;
49 import org.apache.mina.util.ExceptionMonitor;
50 import org.apache.mina.util.NamePreservingRunnable;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54
55
56
57
58
59
60
61
62 public abstract class AbstractIoService implements IoService {
63
64 protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class);
65
66
67
68
69
70 private static final AtomicInteger id = new AtomicInteger();
71
72
73
74
75
76 private final String threadName;
77
78
79
80
81 private final Executor executor;
82
83
84
85
86
87
88
89
90 private final boolean createdExecutor;
91
92
93
94
95 private IoHandler handler;
96
97
98
99
100 protected final IoSessionConfig sessionConfig;
101
102 private final IoServiceListenererviceListener">IoServiceListener serviceActivationListener = new IoServiceListener() {
103 IoServiceStatistics serviceStats;
104
105
106
107
108 @Override
109 public void serviceActivated(IoService service) {
110
111 serviceStats = service.getStatistics();
112 serviceStats.setLastReadTime(service.getActivationTime());
113 serviceStats.setLastWriteTime(service.getActivationTime());
114 serviceStats.setLastThroughputCalculationTime(service.getActivationTime());
115 }
116
117
118
119
120 @Override
121 public void serviceDeactivated(IoService service) throws Exception {
122
123 }
124
125
126
127
128 @Override
129 public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {
130
131 }
132
133
134
135
136 @Override
137 public void sessionCreated(IoSession session) throws Exception {
138
139 }
140
141
142
143
144 @Override
145 public void sessionClosed(IoSession session) throws Exception {
146
147 }
148
149
150
151
152 @Override
153 public void sessionDestroyed(IoSession session) throws Exception {
154
155 }
156 };
157
158
159
160
161 private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
162
163 private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
164
165
166
167
168 private final IoServiceListenerSupport listeners;
169
170
171
172
173
174 protected final Object disposalLock = new Object();
175
176 private volatile boolean disposing;
177
178 private volatile boolean disposed;
179
180 private IoServiceStatisticseStatistics.html#IoServiceStatistics">IoServiceStatistics stats = new IoServiceStatistics(this);
181
182
183
184
185
186
187
188
189
190
191
192
193
194 protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
195 if (sessionConfig == null) {
196 throw new IllegalArgumentException("sessionConfig");
197 }
198
199 if (getTransportMetadata() == null) {
200 throw new IllegalArgumentException("TransportMetadata");
201 }
202
203 if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
204 throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
205 + getTransportMetadata().getSessionConfigType() + ")");
206 }
207
208
209
210 listeners = new IoServiceListenerSupport(this);
211 listeners.add(serviceActivationListener);
212
213
214 this.sessionConfig = sessionConfig;
215
216
217
218 ExceptionMonitor.getInstance();
219
220 if (executor == null) {
221 this.executor = Executors.newCachedThreadPool();
222 createdExecutor = true;
223 } else {
224 this.executor = executor;
225 createdExecutor = false;
226 }
227
228 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
229 }
230
231
232
233
234 @Override
235 public final IoFilterChainBuilder getFilterChainBuilder() {
236 return filterChainBuilder;
237 }
238
239
240
241
242 @Override
243 public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
244 if (builder == null) {
245 filterChainBuilder = new DefaultIoFilterChainBuilder();
246 } else {
247 filterChainBuilder = builder;
248 }
249 }
250
251
252
253
254 @Override
255 public final DefaultIoFilterChainBuilder getFilterChain() {
256 if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
257 return (DefaultIoFilterChainBuilder) filterChainBuilder;
258 }
259
260 throw new IllegalStateException("Current filter chain builder is not a DefaultIoFilterChainBuilder.");
261 }
262
263
264
265
266 @Override
267 public final void addListener(IoServiceListener listener) {
268 listeners.add(listener);
269 }
270
271
272
273
274 @Override
275 public final void removeListener(IoServiceListener listener) {
276 listeners.remove(listener);
277 }
278
279
280
281
282 @Override
283 public final boolean isActive() {
284 return listeners.isActive();
285 }
286
287
288
289
290 @Override
291 public final boolean isDisposing() {
292 return disposing;
293 }
294
295
296
297
298 @Override
299 public final boolean isDisposed() {
300 return disposed;
301 }
302
303
304
305
306 @Override
307 public final void dispose() {
308 dispose(false);
309 }
310
311
312
313
314 @Override
315 public final void dispose(boolean awaitTermination) {
316 if (disposed) {
317 return;
318 }
319
320 synchronized (disposalLock) {
321 if (!disposing) {
322 disposing = true;
323
324 try {
325 dispose0();
326 } catch (Exception e) {
327 ExceptionMonitor.getInstance().exceptionCaught(e);
328 }
329 }
330 }
331
332 if (createdExecutor) {
333 ExecutorService e = (ExecutorService) executor;
334 e.shutdownNow();
335 if (awaitTermination) {
336
337 try {
338 if (LOGGER.isDebugEnabled()) {
339 LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
340 }
341
342 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
343
344 if (LOGGER.isDebugEnabled()) {
345 LOGGER.debug("awaitTermination on {} finished", this);
346 }
347 } catch (InterruptedException e1) {
348 LOGGER.warn("awaitTermination on [{}] was interrupted", this);
349
350 Thread.currentThread().interrupt();
351 }
352 }
353 }
354 disposed = true;
355 }
356
357
358
359
360
361
362
363 protected abstract void dispose0() throws Exception;
364
365
366
367
368 @Override
369 public final Map<Long, IoSession> getManagedSessions() {
370 return listeners.getManagedSessions();
371 }
372
373
374
375
376 @Override
377 public final int getManagedSessionCount() {
378 return listeners.getManagedSessionCount();
379 }
380
381
382
383
384 @Override
385 public final IoHandler getHandler() {
386 return handler;
387 }
388
389
390
391
392 @Override
393 public final void setHandler(IoHandler handler) {
394 if (handler == null) {
395 throw new IllegalArgumentException("handler cannot be null");
396 }
397
398 if (isActive()) {
399 throw new IllegalStateException("handler cannot be set while the service is active.");
400 }
401
402 this.handler = handler;
403 }
404
405
406
407
408 @Override
409 public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
410 return sessionDataStructureFactory;
411 }
412
413
414
415
416 @Override
417 public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) {
418 if (sessionDataStructureFactory == null) {
419 throw new IllegalArgumentException("sessionDataStructureFactory");
420 }
421
422 if (isActive()) {
423 throw new IllegalStateException("sessionDataStructureFactory cannot be set while the service is active.");
424 }
425
426 this.sessionDataStructureFactory = sessionDataStructureFactory;
427 }
428
429
430
431
432 @Override
433 public IoServiceStatistics getStatistics() {
434 return stats;
435 }
436
437
438
439
440 @Override
441 public final long getActivationTime() {
442 return listeners.getActivationTime();
443 }
444
445
446
447
448 @Override
449 public final Set<WriteFuture> broadcast(Object message) {
450
451
452
453 final List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values());
454 return new AbstractSet<WriteFuture>() {
455 @Override
456 public Iterator<WriteFuture> iterator() {
457 return futures.iterator();
458 }
459
460 @Override
461 public int size() {
462 return futures.size();
463 }
464 };
465 }
466
467
468
469
470 public final IoServiceListenerSupport getListeners() {
471 return listeners;
472 }
473
474 protected final void executeWorker(Runnable worker) {
475 executeWorker(worker, null);
476 }
477
478 protected final void executeWorker(Runnable worker, String suffix) {
479 String actualThreadName = threadName;
480 if (suffix != null) {
481 actualThreadName = actualThreadName + '-' + suffix;
482 }
483 executor.execute(new NamePreservingRunnable(worker, actualThreadName));
484 }
485
486 protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
487
488 if (stats.getLastReadTime() == 0) {
489 stats.setLastReadTime(getActivationTime());
490 }
491
492 if (stats.getLastWriteTime() == 0) {
493 stats.setLastWriteTime(getActivationTime());
494 }
495
496
497
498
499
500 try {
501 ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
502 .getAttributeMap(session));
503 } catch (IoSessionInitializationException e) {
504 throw e;
505 } catch (Exception e) {
506 throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
507 }
508
509 try {
510 ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
511 .getWriteRequestQueue(session));
512 } catch (IoSessionInitializationException e) {
513 throw e;
514 } catch (Exception e) {
515 throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
516 }
517
518 if ((future != null) && (future instanceof ConnectFuture)) {
519
520 session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
521 }
522
523 if (sessionInitializer != null) {
524 sessionInitializer.initializeSession(session, future);
525 }
526
527 finishSessionInitialization0(session, future);
528 }
529
530
531
532
533
534
535
536
537
538
539
540 protected void finishSessionInitialization0(IoSession session, IoFuture future) {
541
542 }
543
544
545
546
547
548 protected static class ServiceOperationFuture extends DefaultIoFuture {
549 public ServiceOperationFuture() {
550 super(null);
551 }
552
553
554
555
556 @Override
557 public final boolean isDone() {
558 return getValue() == Boolean.TRUE;
559 }
560
561 public final void setDone() {
562 setValue(Boolean.TRUE);
563 }
564
565 public final Exception getException() {
566 if (getValue() instanceof Exception) {
567 return (Exception) getValue();
568 }
569
570 return null;
571 }
572
573 public final void setException(Exception exception) {
574 if (exception == null) {
575 throw new IllegalArgumentException("exception");
576 }
577
578 setValue(exception);
579 }
580 }
581
582
583
584
585 @Override
586 public int getScheduledWriteBytes() {
587 return stats.getScheduledWriteBytes();
588 }
589
590
591
592
593 @Override
594 public int getScheduledWriteMessages() {
595 return stats.getScheduledWriteMessages();
596 }
597 }