1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.apache.mina.core.filterchain.IoFilterChain;
32 import org.apache.mina.core.future.IoFuture;
33 import org.apache.mina.core.future.IoFutureListener;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.util.ExceptionMonitor;
36
37
38
39
40
41
42
43 public class IoServiceListenerSupport {
44
45 private final IoService service;
46
47
48 private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<>();
49
50
51 private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<>();
52
53
54 private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);
55
56 private final AtomicBoolean activated = new AtomicBoolean();
57
58
59 private volatile long activationTime;
60
61
62 private volatile int largestManagedSessionCount = 0;
63
64
65 private AtomicLong cumulativeManagedSessionCount = new AtomicLong(0);
66
67
68
69
70
71
72 public IoServiceListenerSupport(IoService service) {
73 if (service == null) {
74 throw new IllegalArgumentException("service");
75 }
76
77 this.service = service;
78 }
79
80
81
82
83
84
85 public void add(IoServiceListener listener) {
86 if (listener != null) {
87 listeners.add(listener);
88 }
89 }
90
91
92
93
94
95
96 public void remove(IoServiceListener listener) {
97 if (listener != null) {
98 listeners.remove(listener);
99 }
100 }
101
102
103
104
105 public long getActivationTime() {
106 return activationTime;
107 }
108
109
110
111
112 public Map<Long, IoSession> getManagedSessions() {
113 return readOnlyManagedSessions;
114 }
115
116
117
118
119 public int getManagedSessionCount() {
120 return managedSessions.size();
121 }
122
123
124
125
126
127 public int getLargestManagedSessionCount() {
128 return largestManagedSessionCount;
129 }
130
131
132
133
134
135 public long getCumulativeManagedSessionCount() {
136 return cumulativeManagedSessionCount.get();
137 }
138
139
140
141
142 public boolean isActive() {
143 return activated.get();
144 }
145
146
147
148
149
150 public void fireServiceActivated() {
151 if (!activated.compareAndSet(false, true)) {
152
153 return;
154 }
155
156 activationTime = System.currentTimeMillis();
157
158
159 for (IoServiceListener listener : listeners) {
160 try {
161 listener.serviceActivated(service);
162 } catch (Exception e) {
163 ExceptionMonitor.getInstance().exceptionCaught(e);
164 }
165 }
166 }
167
168
169
170
171
172 public void fireServiceDeactivated() {
173 if (!activated.compareAndSet(true, false)) {
174
175 return;
176 }
177
178
179 try {
180 for (IoServiceListener listener : listeners) {
181 try {
182 listener.serviceDeactivated(service);
183 } catch (Exception e) {
184 ExceptionMonitor.getInstance().exceptionCaught(e);
185 }
186 }
187 } finally {
188 disconnectSessions();
189 }
190 }
191
192
193
194
195
196
197 public void fireSessionCreated(IoSession session) {
198 boolean firstSession = false;
199
200 if (session.getService() instanceof IoConnector) {
201 synchronized (managedSessions) {
202 firstSession = managedSessions.isEmpty();
203 }
204 }
205
206
207 if (managedSessions.putIfAbsent(session.getId(), session) != null) {
208 return;
209 }
210
211
212 if (firstSession) {
213 fireServiceActivated();
214 }
215
216
217 IoFilterChain filterChain = session.getFilterChain();
218 filterChain.fireSessionCreated();
219 filterChain.fireSessionOpened();
220
221 int managedSessionCount = managedSessions.size();
222
223 if (managedSessionCount > largestManagedSessionCount) {
224 largestManagedSessionCount = managedSessionCount;
225 }
226
227 cumulativeManagedSessionCount.incrementAndGet();
228
229
230 for (IoServiceListener l : listeners) {
231 try {
232 l.sessionCreated(session);
233 } catch (Exception e) {
234 ExceptionMonitor.getInstance().exceptionCaught(e);
235 }
236 }
237 }
238
239
240
241
242
243
244 public void fireSessionDestroyed(IoSession session) {
245
246 if (managedSessions.remove(session.getId()) == null) {
247 return;
248 }
249
250
251 session.getFilterChain().fireSessionClosed();
252
253
254 try {
255 for (IoServiceListener l : listeners) {
256 try {
257 l.sessionDestroyed(session);
258 } catch (Exception e) {
259 ExceptionMonitor.getInstance().exceptionCaught(e);
260 }
261 }
262 } finally {
263
264 if (session.getService() instanceof IoConnector) {
265 boolean lastSession = false;
266
267 synchronized (managedSessions) {
268 lastSession = managedSessions.isEmpty();
269 }
270
271 if (lastSession) {
272 fireServiceDeactivated();
273 }
274 }
275 }
276 }
277
278
279
280
281
282 private void disconnectSessions() {
283 if (!(service instanceof IoAcceptor)) {
284
285 return;
286 }
287
288 if (!((IoAcceptor) service).isCloseOnDeactivation()) {
289 return;
290 }
291
292 Object lock = new Object();
293 IoFutureListener<IoFuture> listener = new LockNotifyingListener(lock);
294
295 for (IoSession s : managedSessions.values()) {
296 s.closeNow().addListener(listener);
297 }
298
299 try {
300 synchronized (lock) {
301 while (!managedSessions.isEmpty()) {
302 lock.wait(500);
303 }
304 }
305 } catch (InterruptedException ie) {
306
307 }
308 }
309
310
311
312
313 private static class LockNotifyingListener implements IoFutureListener<IoFuture> {
314 private final Object lock;
315
316 public LockNotifyingListener(Object lock) {
317 this.lock = lock;
318 }
319
320 @Override
321 public void operationComplete(IoFuture future) {
322 synchronized (lock) {
323 lock.notifyAll();
324 }
325 }
326 }
327 }