1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import org.apache.mina.core.future.CloseFuture;
23 import org.apache.mina.core.future.ConnectFuture;
24 import org.apache.mina.core.future.IoFuture;
25 import org.apache.mina.core.future.IoFutureListener;
26 import org.apache.mina.core.session.IdleStatus;
27 import org.apache.mina.core.session.IoSession;
28 import org.apache.mina.filter.codec.ProtocolCodecFilter;
29 import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
30 import org.apache.mina.filter.logging.LoggingFilter;
31 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
32 import org.apache.mina.transport.socket.nio.NioSocketConnector;
33 import org.junit.Test;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import java.io.IOException;
38 import java.net.InetSocketAddress;
39 import java.nio.charset.StandardCharsets;
40 import java.util.ArrayList;
41 import java.util.List;
42 import java.util.concurrent.CountDownLatch;
43
44
45
46
47
48
49 public class AbstractIoServiceTest {
50
51 private static final int PORT = 9123;
52
53 @Test
54 public void testDispose() throws IOException, InterruptedException {
55
56 List<String> threadsBefore = getThreadNames();
57
58 final IoAcceptor acceptor = new NioSocketAcceptor();
59
60 acceptor.getFilterChain().addLast("logger", new LoggingFilter());
61 acceptor.getFilterChain().addLast("codec",
62 new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8)));
63
64 acceptor.setHandler(new ServerHandler());
65
66 acceptor.getSessionConfig().setReadBufferSize(2048);
67 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
68 acceptor.bind(new InetSocketAddress(PORT));
69 System.out.println("Server running ...");
70
71 final NioSocketConnector connector = new NioSocketConnector();
72
73
74 connector.setConnectTimeoutMillis(30 * 1000L);
75
76 connector.setHandler(new ClientHandler());
77 connector.getFilterChain().addLast("logger", new LoggingFilter());
78 connector.getFilterChain().addLast("codec",
79 new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8)));
80
81
82 ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", 9123));
83 cf.awaitUninterruptibly();
84
85 IoSession session = cf.getSession();
86
87
88 session.write("Hello World!\r");
89
90
91 CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
92 latch.await();
93
94
95 CloseFuture closeFuture = session.closeOnFlush();
96
97 System.out.println("session.close called");
98
99
100
101 closeFuture.addListener(new IoFutureListener<IoFuture>() {
102
103 public void operationComplete(IoFuture future) {
104 System.out.println("managed session count=" + connector.getManagedSessionCount());
105 System.out.println("Disposing connector ...");
106 connector.dispose(true);
107 System.out.println("Disposing connector ... *finished*");
108
109 }
110 });
111
112 closeFuture.awaitUninterruptibly();
113 acceptor.dispose(true);
114
115 List<String> threadsAfter = getThreadNames();
116
117 System.out.println("threadsBefore = " + threadsBefore);
118 System.out.println("threadsAfter = " + threadsAfter);
119
120
121
122 }
123
124 public static class ClientHandler extends IoHandlerAdapter {
125
126 private static final Logger LOGGER = LoggerFactory.getLogger("CLIENT");
127
128 @Override
129 public void sessionCreated(IoSession session) throws Exception {
130 session.setAttribute("latch", new CountDownLatch(1));
131 }
132
133 @Override
134 public void messageReceived(IoSession session, Object message) throws Exception {
135 LOGGER.info("client: messageReceived(" + session + ", " + message + ")");
136 CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
137 latch.countDown();
138 }
139
140 @Override
141 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
142 LOGGER.warn("exceptionCaught:", cause);
143 }
144 }
145
146 public static class ServerHandler extends IoHandlerAdapter {
147
148 private static final Logger LOGGER = LoggerFactory.getLogger("SERVER");
149
150 @Override
151 public void messageReceived(IoSession session, Object message) throws Exception {
152 LOGGER.info("server: messageReceived(" + session + ", " + message + ")");
153 session.write(message.toString());
154 }
155
156 @Override
157 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
158 LOGGER.warn("exceptionCaught:", cause);
159 }
160
161 }
162
163 public static void main(String[] args) throws IOException, InterruptedException {
164 new AbstractIoServiceTest().testDispose();
165 }
166
167 private List<String> getThreadNames() {
168 List<String> list = new ArrayList<String>();
169 int active = Thread.activeCount();
170 Thread[] threads = new Thread[active];
171 Thread.enumerate(threads);
172 for (Thread thread : threads) {
173 try {
174 String name = thread.getName();
175 list.add(name);
176 } catch (NullPointerException ignore) {
177 }
178 }
179 return list;
180 }
181
182 }