1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport;
21
22 import java.net.SocketAddress;
23
24 import org.apache.mina.core.buffer.IoBuffer;
25 import org.apache.mina.core.future.ConnectFuture;
26 import org.apache.mina.core.service.IoAcceptor;
27 import org.apache.mina.core.service.IoHandler;
28 import org.apache.mina.core.service.IoHandlerAdapter;
29 import org.apache.mina.core.service.TransportMetadata;
30 import org.apache.mina.core.session.IoSession;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34
35 import static org.junit.Assert.assertEquals;
36 import static org.junit.Assert.assertFalse;
37 import static org.junit.Assert.assertTrue;
38
39
40
41
42
43
44
45 public abstract class AbstractTrafficControlTest {
46
47 protected int port;
48
49 protected IoAcceptor acceptor;
50
51 protected TransportMetadata transportType;
52
53 public AbstractTrafficControlTest(IoAcceptor acceptor) {
54 this.acceptor = acceptor;
55 }
56
57 @Before
58 public void setUp() throws Exception {
59 acceptor.setHandler(new ServerIoHandler());
60 acceptor.bind(createServerSocketAddress(0));
61 port = getPort(acceptor.getLocalAddress());
62 }
63
64 @After
65 public void tearDown() throws Exception {
66 acceptor.unbind();
67 acceptor.dispose();
68 }
69
70 protected abstract ConnectFuture connect(int port, IoHandler handler) throws Exception;
71
72 protected abstract SocketAddress createServerSocketAddress(int port);
73
74 protected abstract int getPort(SocketAddress address);
75
76 @Test
77 public void testSuspendResumeReadWrite() throws Exception {
78 ConnectFuture future = connect(port, new ClientIoHandler());
79 future.awaitUninterruptibly();
80 IoSession session = future.getSession();
81
82
83
84 while (session.getAttribute("lock") == null) {
85 Thread.yield();
86 }
87
88 Object lock = session.getAttribute("lock");
89 synchronized (lock) {
90
91 write(session, "1");
92 assertEquals('1', read(session));
93 assertEquals("1", getReceived(session));
94 assertEquals("1", getSent(session));
95
96 session.suspendRead();
97
98 Thread.sleep(100);
99
100 write(session, "2");
101 assertFalse(canRead(session));
102 assertEquals("1", getReceived(session));
103 assertEquals("12", getSent(session));
104
105 session.suspendWrite();
106
107 Thread.sleep(100);
108
109 write(session, "3");
110 assertFalse(canRead(session));
111 assertEquals("1", getReceived(session));
112 assertEquals("12", getSent(session));
113
114 session.resumeRead();
115
116 Thread.sleep(100);
117
118 write(session, "4");
119 assertEquals('2', read(session));
120 assertEquals("12", getReceived(session));
121 assertEquals("12", getSent(session));
122
123 session.resumeWrite();
124
125 Thread.sleep(100);
126
127 assertEquals('3', read(session));
128 assertEquals('4', read(session));
129
130 write(session, "5");
131 assertEquals('5', read(session));
132 assertEquals("12345", getReceived(session));
133 assertEquals("12345", getSent(session));
134
135 session.suspendWrite();
136
137 Thread.sleep(100);
138
139 write(session, "6");
140 assertFalse(canRead(session));
141 assertEquals("12345", getReceived(session));
142 assertEquals("12345", getSent(session));
143
144 session.suspendRead();
145 session.resumeWrite();
146
147 Thread.sleep(100);
148
149 write(session, "7");
150 assertFalse(canRead(session));
151 assertEquals("12345", getReceived(session));
152 assertEquals("1234567", getSent(session));
153
154 session.resumeRead();
155
156 Thread.sleep(100);
157
158 assertEquals('6', read(session));
159 assertEquals('7', read(session));
160
161 assertEquals("1234567", getReceived(session));
162 assertEquals("1234567", getSent(session));
163
164 }
165
166 session.closeNow().awaitUninterruptibly();
167 }
168
169 private void write(IoSession session, String s) throws Exception {
170 session.write(IoBuffer.wrap(s.getBytes("ASCII")));
171 }
172
173 private int read(IoSession session) throws Exception {
174 int pos = ((Integer) session.getAttribute("pos")).intValue();
175 for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
176 Object lock = session.getAttribute("lock");
177 lock.wait(200);
178 }
179 session.setAttribute("pos", new Integer(pos + 1));
180 String received = getReceived(session);
181 assertTrue(received.length() > pos);
182 return getReceived(session).charAt(pos);
183 }
184
185 private boolean canRead(IoSession session) throws Exception {
186 int pos = ((Integer) session.getAttribute("pos")).intValue();
187 Object lock = session.getAttribute("lock");
188 lock.wait(250);
189 String received = getReceived(session);
190 return pos < received.length();
191 }
192
193 private String getReceived(IoSession session) throws Exception {
194 return session.getAttribute("received").toString();
195 }
196
197 private String getSent(IoSession session) throws Exception {
198 return session.getAttribute("sent").toString();
199 }
200
201 private static class ClientIoHandler extends IoHandlerAdapter {
202
203
204
205 public ClientIoHandler() {
206 super();
207 }
208
209 @Override
210 public void sessionCreated(IoSession session) throws Exception {
211 super.sessionCreated(session);
212 session.setAttribute("pos", new Integer(0));
213 session.setAttribute("received", new StringBuffer());
214 session.setAttribute("sent", new StringBuffer());
215 session.setAttribute("lock", new Object());
216 }
217
218 @Override
219 public void messageReceived(IoSession session, Object message) throws Exception {
220 IoBuffer buffer = (IoBuffer) message;
221 byte[] data = new byte[buffer.remaining()];
222 buffer.get(data);
223 Object lock = session.getAttribute("lock");
224 synchronized (lock) {
225 StringBuffer sb = (StringBuffer) session.getAttribute("received");
226 sb.append(new String(data, "ASCII"));
227 lock.notifyAll();
228 }
229 }
230
231 @Override
232 public void messageSent(IoSession session, Object message) throws Exception {
233 IoBuffer buffer = (IoBuffer) message;
234 buffer.rewind();
235 byte[] data = new byte[buffer.remaining()];
236 buffer.get(data);
237 StringBuffer sb = (StringBuffer) session.getAttribute("sent");
238 sb.append(new String(data, "ASCII"));
239 }
240
241 }
242
243 private static class ServerIoHandler extends IoHandlerAdapter {
244
245
246
247 public ServerIoHandler() {
248 super();
249 }
250
251 @Override
252 public void messageReceived(IoSession session, Object message) throws Exception {
253
254 IoBuffer rb = (IoBuffer) message;
255 IoBuffer wb = IoBuffer.allocate(rb.remaining());
256 wb.put(rb);
257 wb.flip();
258 session.write(wb);
259 }
260 }
261 }