View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport;
21  
22  import java.net.SocketAddress;
23  
24  import org.apache.mina.core.buffer.IoBuffer;
25  import org.apache.mina.core.future.ConnectFuture;
26  import org.apache.mina.core.service.IoAcceptor;
27  import org.apache.mina.core.service.IoHandler;
28  import org.apache.mina.core.service.IoHandlerAdapter;
29  import org.apache.mina.core.service.TransportMetadata;
30  import org.apache.mina.core.session.IoSession;
31  import org.junit.After;
32  import org.junit.Before;
33  import org.junit.Test;
34  
35  import static org.junit.Assert.assertEquals;
36  import static org.junit.Assert.assertFalse;
37  import static org.junit.Assert.assertTrue;
38  
39  /**
40   * Abstract base class for testing suspending and resuming reads and
41   * writes.
42   *
43   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
44   */
45  public abstract class AbstractTrafficControlTest {
46  
47      protected int port;
48  
49      protected IoAcceptor acceptor;
50  
51      protected TransportMetadata transportType;
52  
53      public AbstractTrafficControlTest(IoAcceptor acceptor) {
54          this.acceptor = acceptor;
55      }
56  
57      @Before
58      public void setUp() throws Exception {
59          acceptor.setHandler(new ServerIoHandler());
60          acceptor.bind(createServerSocketAddress(0));
61          port = getPort(acceptor.getLocalAddress());
62      }
63  
64      @After
65      public void tearDown() throws Exception {
66          acceptor.unbind();
67          acceptor.dispose();
68      }
69  
70      protected abstract ConnectFuture connect(int port, IoHandler handler) throws Exception;
71  
72      protected abstract SocketAddress createServerSocketAddress(int port);
73  
74      protected abstract int getPort(SocketAddress address);
75  
76      @Test
77      public void testSuspendResumeReadWrite() throws Exception {
78          ConnectFuture future = connect(port, new ClientIoHandler());
79          future.awaitUninterruptibly();
80          IoSession session = future.getSession();
81  
82          // We wait for the sessionCreated() event is fired because we
83          // cannot guarantee that it is invoked already.
84          while (session.getAttribute("lock") == null) {
85              Thread.yield();
86          }
87  
88          Object lock = session.getAttribute("lock");
89          synchronized (lock) {
90  
91              write(session, "1");
92              assertEquals('1', read(session));
93              assertEquals("1", getReceived(session));
94              assertEquals("1", getSent(session));
95  
96              session.suspendRead();
97  
98              Thread.sleep(100);
99  
100             write(session, "2");
101             assertFalse(canRead(session));
102             assertEquals("1", getReceived(session));
103             assertEquals("12", getSent(session));
104 
105             session.suspendWrite();
106 
107             Thread.sleep(100);
108 
109             write(session, "3");
110             assertFalse(canRead(session));
111             assertEquals("1", getReceived(session));
112             assertEquals("12", getSent(session));
113 
114             session.resumeRead();
115 
116             Thread.sleep(100);
117 
118             write(session, "4");
119             assertEquals('2', read(session));
120             assertEquals("12", getReceived(session));
121             assertEquals("12", getSent(session));
122 
123             session.resumeWrite();
124 
125             Thread.sleep(100);
126 
127             assertEquals('3', read(session));
128             assertEquals('4', read(session));
129 
130             write(session, "5");
131             assertEquals('5', read(session));
132             assertEquals("12345", getReceived(session));
133             assertEquals("12345", getSent(session));
134 
135             session.suspendWrite();
136 
137             Thread.sleep(100);
138 
139             write(session, "6");
140             assertFalse(canRead(session));
141             assertEquals("12345", getReceived(session));
142             assertEquals("12345", getSent(session));
143 
144             session.suspendRead();
145             session.resumeWrite();
146 
147             Thread.sleep(100);
148 
149             write(session, "7");
150             assertFalse(canRead(session));
151             assertEquals("12345", getReceived(session));
152             assertEquals("1234567", getSent(session));
153 
154             session.resumeRead();
155 
156             Thread.sleep(100);
157 
158             assertEquals('6', read(session));
159             assertEquals('7', read(session));
160 
161             assertEquals("1234567", getReceived(session));
162             assertEquals("1234567", getSent(session));
163 
164         }
165 
166         session.closeNow().awaitUninterruptibly();
167     }
168 
169     private void write(IoSession session, String s) throws Exception {
170         session.write(IoBuffer.wrap(s.getBytes("ASCII")));
171     }
172 
173     private int read(IoSession session) throws Exception {
174         int pos = ((Integer) session.getAttribute("pos")).intValue();
175         for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
176             Object lock = session.getAttribute("lock");
177             lock.wait(200);
178         }
179         session.setAttribute("pos", new Integer(pos + 1));
180         String received = getReceived(session);
181         assertTrue(received.length() > pos);
182         return getReceived(session).charAt(pos);
183     }
184 
185     private boolean canRead(IoSession session) throws Exception {
186         int pos = ((Integer) session.getAttribute("pos")).intValue();
187         Object lock = session.getAttribute("lock");
188         lock.wait(250);
189         String received = getReceived(session);
190         return pos < received.length();
191     }
192 
193     private String getReceived(IoSession session) throws Exception {
194         return session.getAttribute("received").toString();
195     }
196 
197     private String getSent(IoSession session) throws Exception {
198         return session.getAttribute("sent").toString();
199     }
200 
201     private static class ClientIoHandler extends IoHandlerAdapter {
202         /**
203          * Default constructor
204          */
205         public ClientIoHandler() {
206             super();
207         }
208 
209         @Override
210         public void sessionCreated(IoSession session) throws Exception {
211             super.sessionCreated(session);
212             session.setAttribute("pos", new Integer(0));
213             session.setAttribute("received", new StringBuffer());
214             session.setAttribute("sent", new StringBuffer());
215             session.setAttribute("lock", new Object());
216         }
217 
218         @Override
219         public void messageReceived(IoSession session, Object message) throws Exception {
220             IoBuffer buffer = (IoBuffer) message;
221             byte[] data = new byte[buffer.remaining()];
222             buffer.get(data);
223             Object lock = session.getAttribute("lock");
224             synchronized (lock) {
225                 StringBuffer sb = (StringBuffer) session.getAttribute("received");
226                 sb.append(new String(data, "ASCII"));
227                 lock.notifyAll();
228             }
229         }
230 
231         @Override
232         public void messageSent(IoSession session, Object message) throws Exception {
233             IoBuffer buffer = (IoBuffer) message;
234             buffer.rewind();
235             byte[] data = new byte[buffer.remaining()];
236             buffer.get(data);
237             StringBuffer sb = (StringBuffer) session.getAttribute("sent");
238             sb.append(new String(data, "ASCII"));
239         }
240 
241     }
242 
243     private static class ServerIoHandler extends IoHandlerAdapter {
244         /**
245          * Default constructor
246          */
247         public ServerIoHandler() {
248             super();
249         }
250 
251         @Override
252         public void messageReceived(IoSession session, Object message) throws Exception {
253             // Just echo the received bytes.
254             IoBuffer rb = (IoBuffer) message;
255             IoBuffer wb = IoBuffer.allocate(rb.remaining());
256             wb.put(rb);
257             wb.flip();
258             session.write(wb);
259         }
260     }
261 }