View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import org.apache.mina.core.future.CloseFuture;
23  import org.apache.mina.core.future.ConnectFuture;
24  import org.apache.mina.core.future.IoFuture;
25  import org.apache.mina.core.future.IoFutureListener;
26  import org.apache.mina.core.session.IdleStatus;
27  import org.apache.mina.core.session.IoSession;
28  import org.apache.mina.filter.codec.ProtocolCodecFilter;
29  import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
30  import org.apache.mina.filter.logging.LoggingFilter;
31  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
32  import org.apache.mina.transport.socket.nio.NioSocketConnector;
33  import org.junit.Test;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  import java.io.IOException;
38  import java.net.InetSocketAddress;
39  import java.nio.charset.StandardCharsets;
40  import java.util.ArrayList;
41  import java.util.List;
42  import java.util.concurrent.CountDownLatch;
43  
44  /**
45   * test the AbstractIoService
46   *
47   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
48   */
49  public class AbstractIoServiceTest {
50  
51      private static final int PORT = 9123;
52  
53      @Test
54      public void testDispose() throws IOException, InterruptedException {
55  
56          List<String> threadsBefore = getThreadNames();
57  
58          final IoAcceptor acceptor = new NioSocketAcceptor();
59  
60          acceptor.getFilterChain().addLast("logger", new LoggingFilter());
61          acceptor.getFilterChain().addLast("codec",
62                  new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8)));
63  
64          acceptor.setHandler(new ServerHandler());
65  
66          acceptor.getSessionConfig().setReadBufferSize(2048);
67          acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
68          acceptor.bind(new InetSocketAddress(PORT));
69          System.out.println("Server running ...");
70  
71          final NioSocketConnector connector = new NioSocketConnector();
72  
73          // Set connect timeout.
74          connector.setConnectTimeoutMillis(30 * 1000L);
75  
76          connector.setHandler(new ClientHandler());
77          connector.getFilterChain().addLast("logger", new LoggingFilter());
78          connector.getFilterChain().addLast("codec",
79                  new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8)));
80  
81          // Start communication.
82          ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", 9123));
83          cf.awaitUninterruptibly();
84  
85          IoSession session = cf.getSession();
86  
87          // send a message
88          session.write("Hello World!\r");
89  
90          // wait until response is received
91          CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
92          latch.await();
93  
94          // close the session
95          CloseFuture closeFuture = session.closeOnFlush();
96  
97          System.out.println("session.close called");
98          //Thread.sleep(5);
99  
100         // wait for session close and then dispose the connector
101         closeFuture.addListener(new IoFutureListener<IoFuture>() {
102 
103             public void operationComplete(IoFuture future) {
104                 System.out.println("managed session count=" + connector.getManagedSessionCount());
105                 System.out.println("Disposing connector ...");
106                 connector.dispose(true);
107                 System.out.println("Disposing connector ... *finished*");
108 
109             }
110         });
111 
112         closeFuture.awaitUninterruptibly();
113         acceptor.dispose(true);
114 
115         List<String> threadsAfter = getThreadNames();
116 
117         System.out.println("threadsBefore = " + threadsBefore);
118         System.out.println("threadsAfter  = " + threadsAfter);
119 
120         // Assert.assertEquals(threadsBefore, threadsAfter);
121 
122     }
123 
124     public static class ClientHandler extends IoHandlerAdapter {
125 
126         private static final Logger LOGGER = LoggerFactory.getLogger("CLIENT");
127 
128         @Override
129         public void sessionCreated(IoSession session) throws Exception {
130             session.setAttribute("latch", new CountDownLatch(1));
131         }
132 
133         @Override
134         public void messageReceived(IoSession session, Object message) throws Exception {
135             LOGGER.info("client: messageReceived(" + session + ", " + message + ")");
136             CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
137             latch.countDown();
138         }
139 
140         @Override
141         public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
142             LOGGER.warn("exceptionCaught:", cause);
143         }
144     }
145 
146     public static class ServerHandler extends IoHandlerAdapter {
147 
148         private static final Logger LOGGER = LoggerFactory.getLogger("SERVER");
149 
150         @Override
151         public void messageReceived(IoSession session, Object message) throws Exception {
152             LOGGER.info("server: messageReceived(" + session + ", " + message + ")");
153             session.write(message.toString());
154         }
155 
156         @Override
157         public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
158             LOGGER.warn("exceptionCaught:", cause);
159         }
160 
161     }
162 
163     public static void main(String[] args) throws IOException, InterruptedException {
164         new AbstractIoServiceTest().testDispose();
165     }
166 
167     private List<String> getThreadNames() {
168         List<String> list = new ArrayList<String>();
169         int active = Thread.activeCount();
170         Thread[] threads = new Thread[active];
171         Thread.enumerate(threads);
172         for (Thread thread : threads) {
173             try {
174                 String name = thread.getName();
175                 list.add(name);
176             } catch (NullPointerException ignore) {
177             }
178         }
179         return list;
180     }
181 
182 }