1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.stream;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertSame;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.net.InetSocketAddress;
28 import java.net.SocketAddress;
29 import java.security.MessageDigest;
30 import java.util.LinkedList;
31 import java.util.Queue;
32 import java.util.Random;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35
36 import org.apache.mina.core.buffer.IoBuffer;
37 import org.apache.mina.core.filterchain.IoFilter.NextFilter;
38 import org.apache.mina.core.future.IoFutureListener;
39 import org.apache.mina.core.future.WriteFuture;
40 import org.apache.mina.core.service.IoHandlerAdapter;
41 import org.apache.mina.core.session.DummySession;
42 import org.apache.mina.core.session.IdleStatus;
43 import org.apache.mina.core.session.IoSession;
44 import org.apache.mina.core.write.DefaultWriteRequest;
45 import org.apache.mina.core.write.WriteRequest;
46 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
47 import org.apache.mina.transport.socket.nio.NioSocketConnector;
48 import org.apache.mina.util.AvailablePortFinder;
49 import org.easymock.IArgumentMatcher;
50 import org.easymock.EasyMock;
51 import org.junit.Test;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55
56
57
58
59
60 public abstract class AbstractStreamWriteFilterTest<M, U extends AbstractStreamWriteFilter<M>> {
61
62 private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamWriteFilterTest.class);
63
64 protected final IoSession session = new DummySession();
65
66 abstract protected U createFilter();
67
68 abstract protected M createMessage(byte[] data) throws Exception;
69
70 @Test
71 public void testWriteEmptyFile() throws Exception {
72 AbstractStreamWriteFilter<M> filter = createFilter();
73 M message = createMessage(new byte[0]);
74
75 WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
76
77 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
78
79
80
81 nextFilter.messageSent(session, writeRequest);
82
83
84
85
86 EasyMock.replay(nextFilter);
87
88 filter.filterWrite(nextFilter, session, writeRequest);
89
90
91
92
93 EasyMock.verify(nextFilter);
94
95 assertTrue(writeRequest.getFuture().isWritten());
96 }
97
98
99
100
101
102
103
104 @Test
105 public void testWriteNonFileRegionMessage() throws Exception {
106 AbstractStreamWriteFilter<M> filter = createFilter();
107
108 Object message = new Object();
109 WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
110
111 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
112
113
114
115 nextFilter.filterWrite(session, writeRequest);
116 nextFilter.messageSent(session, writeRequest);
117
118
119
120
121 EasyMock.replay(nextFilter);
122
123 filter.filterWrite(nextFilter, session, writeRequest);
124 filter.messageSent(nextFilter, session, writeRequest);
125
126
127
128
129 EasyMock.verify(nextFilter);
130 }
131
132
133
134
135
136
137 @Test
138 public void testWriteSingleBufferFile() throws Exception {
139 byte[] data = new byte[] { 1, 2, 3, 4 };
140
141 AbstractStreamWriteFilter<M> filter = createFilter();
142 M message = createMessage(data);
143
144 WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
145
146 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
147
148
149
150 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(IoBuffer.wrap(data))));
151 nextFilter.messageSent(session, writeRequest);
152
153
154
155
156 EasyMock.replay(nextFilter);
157
158 filter.filterWrite(nextFilter, session, writeRequest);
159 filter.messageSent(nextFilter, session, writeRequest);
160
161
162
163
164 EasyMock.verify(nextFilter);
165
166 assertTrue(writeRequest.getFuture().isWritten());
167 }
168
169
170
171
172
173
174 @Test
175 public void testWriteSeveralBuffersStream() throws Exception {
176 AbstractStreamWriteFilter<M> filter = createFilter();
177 filter.setWriteBufferSize(4);
178
179 byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
180 byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
181 byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
182 byte[] chunk3 = new byte[] { 9, 10 };
183
184 M message = createMessage(data);
185 WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
186
187 WriteRequest chunk1Request = new DefaultWriteRequest(IoBuffer.wrap(chunk1));
188 WriteRequest chunk2Request = new DefaultWriteRequest(IoBuffer.wrap(chunk2));
189 WriteRequest chunk3Request = new DefaultWriteRequest(IoBuffer.wrap(chunk3));
190
191 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
192
193
194
195 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk1Request));
196 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk2Request));
197 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk3Request));
198 nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(writeRequest));
199
200
201
202
203 EasyMock.replay(nextFilter);
204
205 filter.filterWrite(nextFilter, session, writeRequest);
206 filter.messageSent(nextFilter, session, chunk1Request);
207 filter.messageSent(nextFilter, session, chunk2Request);
208 filter.messageSent(nextFilter, session, chunk3Request);
209
210
211
212
213 EasyMock.verify(nextFilter);
214
215 assertTrue(writeRequest.getFuture().isWritten());
216 }
217
218 @Test
219 public void testWriteWhileWriteInProgress() throws Exception {
220 AbstractStreamWriteFilter<M> filter = createFilter();
221 M message = createMessage(new byte[5]);
222
223 Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
224
225
226
227
228 session.setAttribute(filter.CURRENT_STREAM, message);
229 session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
230
231 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
232
233
234
235 EasyMock.replay(nextFilter);
236
237 WriteRequest wr = new DefaultWriteRequest(new Object(), new DummyWriteFuture());
238 filter.filterWrite(nextFilter, session, wr);
239 assertEquals(1, queue.size());
240 assertSame(wr, queue.poll());
241
242
243
244
245 EasyMock.verify(nextFilter);
246
247 session.removeAttribute(filter.CURRENT_STREAM);
248 session.removeAttribute(filter.WRITE_REQUEST_QUEUE);
249 }
250
251 @Test
252 public void testWritesWriteRequestQueueWhenFinished() throws Exception {
253 AbstractStreamWriteFilter<M> filter = createFilter();
254 M message = createMessage(new byte[0]);
255
256 WriteRequest wrs[] = new WriteRequest[] { new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
257 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
258 new DefaultWriteRequest(new Object(), new DummyWriteFuture()) };
259 Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
260 queue.add(wrs[0]);
261 queue.add(wrs[1]);
262 queue.add(wrs[2]);
263
264
265
266
267 session.setAttribute(filter.CURRENT_STREAM, message);
268 session.setAttribute(filter.CURRENT_WRITE_REQUEST, new DefaultWriteRequest(message));
269 session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
270
271
272
273
274 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
275 nextFilter.filterWrite(session, wrs[0]);
276 nextFilter.filterWrite(session, wrs[1]);
277 nextFilter.filterWrite(session, wrs[2]);
278 nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(message)));
279
280
281
282
283 EasyMock.replay(nextFilter);
284
285 filter.messageSent(nextFilter, session, new DefaultWriteRequest(new Object()));
286 assertEquals(0, queue.size());
287
288
289
290
291 EasyMock.verify(nextFilter);
292 }
293
294
295
296
297
298 @Test
299 public void testSetWriteBufferSize() {
300 AbstractStreamWriteFilter<M> filter = createFilter();
301
302 try {
303 filter.setWriteBufferSize(0);
304 fail("0 writeBuferSize specified. IllegalArgumentException expected.");
305 } catch (IllegalArgumentException iae) {
306
307
308 assertTrue(true);
309 }
310
311 try {
312 filter.setWriteBufferSize(-100);
313 fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
314 } catch (IllegalArgumentException iae) {
315
316
317 assertTrue(true);
318 }
319
320 filter.setWriteBufferSize(1);
321 assertEquals(1, filter.getWriteBufferSize());
322 filter.setWriteBufferSize(1024);
323 assertEquals(1024, filter.getWriteBufferSize());
324 }
325
326 @Test
327 public void testWriteUsingSocketTransport() throws Exception {
328 NioSocketAcceptor acceptor = new NioSocketAcceptor();
329 acceptor.setReuseAddress(true);
330 SocketAddress address = new InetSocketAddress("localhost", AvailablePortFinder.getNextAvailable());
331
332 NioSocketConnector connector = new NioSocketConnector();
333
334
335 byte[] data = new byte[4 * 1024 * 1024];
336 new Random().nextBytes(data);
337
338 byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data);
339
340 M message = createMessage(data);
341
342 SenderHandler sender = new SenderHandler(message);
343 ReceiverHandler receiver = new ReceiverHandler(data.length);
344
345 acceptor.setHandler(sender);
346 connector.setHandler(receiver);
347
348 acceptor.bind(address);
349 connector.connect(address);
350 sender.latch.await();
351 receiver.latch.await();
352
353 acceptor.dispose();
354
355 assertEquals(data.length, receiver.bytesRead);
356 byte[] actualMd5 = receiver.digest.digest();
357 assertEquals(expectedMd5.length, actualMd5.length);
358 for (int i = 0; i < expectedMd5.length; i++) {
359 assertEquals(expectedMd5[i], actualMd5[i]);
360 }
361 }
362
363 private class SenderHandler extends IoHandlerAdapter {
364 final CountDownLatch latch = new CountDownLatch(1);
365
366 private M message;
367
368 StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
369
370 SenderHandler(M message) {
371 this.message = message;
372 }
373
374 @Override
375 public void sessionCreated(IoSession session) throws Exception {
376 super.sessionCreated(session);
377 session.getFilterChain().addLast("codec", streamWriteFilter);
378 }
379
380 @Override
381 public void sessionOpened(IoSession session) throws Exception {
382 session.write(message);
383 }
384
385 @Override
386 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
387 LOGGER.error("SenderHandler: exceptionCaught", cause);
388 latch.countDown();
389 }
390
391 @Override
392 public void sessionClosed(IoSession session) throws Exception {
393 LOGGER.info("SenderHandler: sessionClosed");
394 latch.countDown();
395 }
396
397 @Override
398 public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
399 LOGGER.info("SenderHandler: sessionIdle");
400 latch.countDown();
401 }
402
403 @Override
404 public void messageSent(IoSession session, Object message) throws Exception {
405 LOGGER.info("SenderHandler: messageSent");
406 if (message == this.message) {
407 LOGGER.info("message == this.message");
408 latch.countDown();
409 }
410 }
411 }
412
413 private static class ReceiverHandler extends IoHandlerAdapter {
414 final CountDownLatch latch = new CountDownLatch(1);
415
416 long bytesRead = 0;
417
418 long size = 0;
419
420 MessageDigest digest;
421
422 ReceiverHandler(long size) throws Exception {
423 this.size = size;
424 digest = MessageDigest.getInstance("MD5");
425 }
426
427 @Override
428 public void sessionCreated(IoSession session) throws Exception {
429 super.sessionCreated(session);
430
431 session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5);
432 }
433
434 @Override
435 public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
436 LOGGER.info("ReceiverHandler: sessionIdle");
437 session.closeNow();
438 }
439
440 @Override
441 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
442 LOGGER.error("ReceiverHandler: exceptionCaught", cause);
443 latch.countDown();
444 }
445
446 @Override
447 public void sessionClosed(IoSession session) throws Exception {
448 LOGGER.info("ReceiverHandler: sessionClosed");
449 latch.countDown();
450 }
451
452 @Override
453 public void messageReceived(IoSession session, Object message) throws Exception {
454 LOGGER.info("messageReceived");
455 IoBuffer buf = (IoBuffer) message;
456 while (buf.hasRemaining()) {
457 digest.update(buf.get());
458 bytesRead++;
459 }
460 LOGGER.info("messageReceived: bytesRead = {}", bytesRead);
461 if (bytesRead >= size) {
462 session.closeNow();
463 }
464 }
465 }
466
467 public static WriteRequest eqWriteRequest(WriteRequest expected) {
468 EasyMock.reportMatcher(new WriteRequestMatcher(expected));
469 return null;
470 }
471
472 private static class WriteRequestMatcher implements IArgumentMatcher {
473 private final WriteRequest expected;
474
475 public WriteRequestMatcher(WriteRequest expected) {
476 this.expected = expected;
477 }
478
479 public boolean matches(Object actual) {
480 if (actual instanceof WriteRequest) {
481 WriteRequest w2 = (WriteRequest) actual;
482
483 return expected.getMessage().equals(w2.getMessage())
484 && expected.getFuture().isWritten() == w2.getFuture().isWritten();
485 }
486 return false;
487 }
488
489 public void appendTo(StringBuffer buffer) {
490 buffer.append("Expected a WriteRequest with the message '").append(expected.getMessage()).append("'");
491 }
492 }
493
494 private static class DummyWriteFuture implements WriteFuture {
495 private boolean written;
496
497
498
499
500 public DummyWriteFuture() {
501 super();
502 }
503
504 public boolean isWritten() {
505 return written;
506 }
507
508 public void setWritten() {
509 this.written = true;
510 }
511
512 public IoSession getSession() {
513 return null;
514 }
515
516 public void join() {
517
518 }
519
520 public boolean join(long timeoutInMillis) {
521 return true;
522 }
523
524 public boolean isDone() {
525 return true;
526 }
527
528 public WriteFuture addListener(IoFutureListener<?> listener) {
529 return this;
530 }
531
532 public WriteFuture removeListener(IoFutureListener<?> listener) {
533 return this;
534 }
535
536 public WriteFuture await() throws InterruptedException {
537 return this;
538 }
539
540 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
541 return true;
542 }
543
544 public boolean await(long timeoutMillis) throws InterruptedException {
545 return true;
546 }
547
548 public WriteFuture awaitUninterruptibly() {
549 return this;
550 }
551
552 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
553 return true;
554 }
555
556 public boolean awaitUninterruptibly(long timeoutMillis) {
557 return true;
558 }
559
560 public Throwable getException() {
561 return null;
562 }
563
564 public void setException(Throwable cause) {
565 throw new IllegalStateException();
566 }
567 }
568
569 }