View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.stream;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertSame;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.net.InetSocketAddress;
28  import java.net.SocketAddress;
29  import java.security.MessageDigest;
30  import java.util.LinkedList;
31  import java.util.Queue;
32  import java.util.Random;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.mina.core.buffer.IoBuffer;
37  import org.apache.mina.core.filterchain.IoFilter.NextFilter;
38  import org.apache.mina.core.future.IoFutureListener;
39  import org.apache.mina.core.future.WriteFuture;
40  import org.apache.mina.core.service.IoHandlerAdapter;
41  import org.apache.mina.core.session.DummySession;
42  import org.apache.mina.core.session.IdleStatus;
43  import org.apache.mina.core.session.IoSession;
44  import org.apache.mina.core.write.DefaultWriteRequest;
45  import org.apache.mina.core.write.WriteRequest;
46  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
47  import org.apache.mina.transport.socket.nio.NioSocketConnector;
48  import org.apache.mina.util.AvailablePortFinder;
49  import org.easymock.IArgumentMatcher;
50  import org.easymock.EasyMock;
51  import org.junit.Test;
52  import org.slf4j.Logger;
53  import org.slf4j.LoggerFactory;
54  
55  /**
56   * TODO Add documentation
57   * 
58   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
59   */
60  public abstract class AbstractStreamWriteFilterTest<M, U extends AbstractStreamWriteFilter<M>> {
61  
62      private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamWriteFilterTest.class);
63  
64      protected final IoSession session = new DummySession();
65  
66      abstract protected U createFilter();
67  
68      abstract protected M createMessage(byte[] data) throws Exception;
69  
70      @Test
71      public void testWriteEmptyFile() throws Exception {
72          AbstractStreamWriteFilter<M> filter = createFilter();
73          M message = createMessage(new byte[0]);
74  
75          WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
76  
77          NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
78          /*
79            * Record expectations
80            */
81          nextFilter.messageSent(session, writeRequest);
82  
83          /*
84           * Replay.
85           */
86          EasyMock.replay(nextFilter);
87  
88          filter.filterWrite(nextFilter, session, writeRequest);
89  
90          /*
91           * Verify.
92           */
93          EasyMock.verify(nextFilter);
94  
95          assertTrue(writeRequest.getFuture().isWritten());
96      }
97  
98      /**
99       * Tests that the filter just passes objects which aren't FileRegion's
100      * through to the next filter.
101      *
102      * @throws Exception when something goes wrong
103      */
104     @Test
105     public void testWriteNonFileRegionMessage() throws Exception {
106         AbstractStreamWriteFilter<M> filter = createFilter();
107 
108         Object message = new Object();
109         WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
110 
111         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
112         /*
113          * Record expectations
114          */
115         nextFilter.filterWrite(session, writeRequest);
116         nextFilter.messageSent(session, writeRequest);
117 
118         /*
119          * Replay.
120          */
121         EasyMock.replay(nextFilter);
122 
123         filter.filterWrite(nextFilter, session, writeRequest);
124         filter.messageSent(nextFilter, session, writeRequest);
125 
126         /*
127          * Verify.
128          */
129         EasyMock.verify(nextFilter);
130     }
131 
132     /**
133      * Tests when the contents of the file fits into one write buffer.
134      *
135      * @throws Exception when something goes wrong
136      */
137     @Test
138     public void testWriteSingleBufferFile() throws Exception {
139         byte[] data = new byte[] { 1, 2, 3, 4 };
140 
141         AbstractStreamWriteFilter<M> filter = createFilter();
142         M message = createMessage(data);
143 
144         WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
145 
146         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
147         /*
148          * Record expectations
149          */
150         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(IoBuffer.wrap(data))));
151         nextFilter.messageSent(session, writeRequest);
152 
153         /*
154          * Replay.
155          */
156         EasyMock.replay(nextFilter);
157 
158         filter.filterWrite(nextFilter, session, writeRequest);
159         filter.messageSent(nextFilter, session, writeRequest);
160 
161         /*
162          * Verify.
163          */
164         EasyMock.verify(nextFilter);
165 
166         assertTrue(writeRequest.getFuture().isWritten());
167     }
168 
169     /**
170      * Tests when the contents of the file doesn't fit into one write buffer.
171      *
172      * @throws Exception when something goes wrong
173      */
174     @Test
175     public void testWriteSeveralBuffersStream() throws Exception {
176         AbstractStreamWriteFilter<M> filter = createFilter();
177         filter.setWriteBufferSize(4);
178 
179         byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
180         byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
181         byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
182         byte[] chunk3 = new byte[] { 9, 10 };
183 
184         M message = createMessage(data);
185         WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
186 
187         WriteRequest chunk1Request = new DefaultWriteRequest(IoBuffer.wrap(chunk1));
188         WriteRequest chunk2Request = new DefaultWriteRequest(IoBuffer.wrap(chunk2));
189         WriteRequest chunk3Request = new DefaultWriteRequest(IoBuffer.wrap(chunk3));
190 
191         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
192         /*
193          * Record expectations
194          */
195         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk1Request));
196         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk2Request));
197         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk3Request));
198         nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(writeRequest));
199 
200         /*
201          * Replay.
202          */
203         EasyMock.replay(nextFilter);
204 
205         filter.filterWrite(nextFilter, session, writeRequest);
206         filter.messageSent(nextFilter, session, chunk1Request);
207         filter.messageSent(nextFilter, session, chunk2Request);
208         filter.messageSent(nextFilter, session, chunk3Request);
209 
210         /*
211          * Verify.
212          */
213         EasyMock.verify(nextFilter);
214 
215         assertTrue(writeRequest.getFuture().isWritten());
216     }
217 
218     @Test
219     public void testWriteWhileWriteInProgress() throws Exception {
220         AbstractStreamWriteFilter<M> filter = createFilter();
221         M message = createMessage(new byte[5]);
222 
223         Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
224 
225         /*
226          * Make up the situation.
227          */
228         session.setAttribute(filter.CURRENT_STREAM, message);
229         session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
230 
231         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
232         /*
233          * Replay.  (We recorded *nothing* because nothing should occur.)
234          */
235         EasyMock.replay(nextFilter);
236 
237         WriteRequest wr = new DefaultWriteRequest(new Object(), new DummyWriteFuture());
238         filter.filterWrite(nextFilter, session, wr);
239         assertEquals(1, queue.size());
240         assertSame(wr, queue.poll());
241 
242         /*
243          * Verify.
244          */
245         EasyMock.verify(nextFilter);
246 
247         session.removeAttribute(filter.CURRENT_STREAM);
248         session.removeAttribute(filter.WRITE_REQUEST_QUEUE);
249     }
250 
251     @Test
252     public void testWritesWriteRequestQueueWhenFinished() throws Exception {
253         AbstractStreamWriteFilter<M> filter = createFilter();
254         M message = createMessage(new byte[0]);
255 
256         WriteRequest wrs[] = new WriteRequest[] { new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
257                 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
258                 new DefaultWriteRequest(new Object(), new DummyWriteFuture()) };
259         Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
260         queue.add(wrs[0]);
261         queue.add(wrs[1]);
262         queue.add(wrs[2]);
263 
264         /*
265          * Make up the situation.
266          */
267         session.setAttribute(filter.CURRENT_STREAM, message);
268         session.setAttribute(filter.CURRENT_WRITE_REQUEST, new DefaultWriteRequest(message));
269         session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
270 
271         /*
272          * Record expectations
273          */
274         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
275         nextFilter.filterWrite(session, wrs[0]);
276         nextFilter.filterWrite(session, wrs[1]);
277         nextFilter.filterWrite(session, wrs[2]);
278         nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(message)));
279 
280         /*
281          * Replay.
282          */
283         EasyMock.replay(nextFilter);
284 
285         filter.messageSent(nextFilter, session, new DefaultWriteRequest(new Object()));
286         assertEquals(0, queue.size());
287 
288         /*
289          * Verify.
290          */
291         EasyMock.verify(nextFilter);
292     }
293 
294     /**
295      * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
296      * specified size.
297      */
298     @Test
299     public void testSetWriteBufferSize() {
300         AbstractStreamWriteFilter<M> filter = createFilter();
301 
302         try {
303             filter.setWriteBufferSize(0);
304             fail("0 writeBuferSize specified. IllegalArgumentException expected.");
305         } catch (IllegalArgumentException iae) {
306             // Pass, exception was thrown
307             // Signifies a successful test execution
308             assertTrue(true);
309         }
310 
311         try {
312             filter.setWriteBufferSize(-100);
313             fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
314         } catch (IllegalArgumentException iae) {
315             // Pass, exception was thrown
316             // Signifies a successful test execution
317             assertTrue(true);
318         }
319 
320         filter.setWriteBufferSize(1);
321         assertEquals(1, filter.getWriteBufferSize());
322         filter.setWriteBufferSize(1024);
323         assertEquals(1024, filter.getWriteBufferSize());
324     }
325 
326     @Test
327     public void testWriteUsingSocketTransport() throws Exception {
328         NioSocketAcceptor acceptor = new NioSocketAcceptor();
329         acceptor.setReuseAddress(true);
330         SocketAddress address = new InetSocketAddress("localhost", AvailablePortFinder.getNextAvailable());
331 
332         NioSocketConnector connector = new NioSocketConnector();
333 
334         // Generate 4MB of random data
335         byte[] data = new byte[4 * 1024 * 1024];
336         new Random().nextBytes(data);
337 
338         byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data);
339 
340         M message = createMessage(data);
341 
342         SenderHandler sender = new SenderHandler(message);
343         ReceiverHandler receiver = new ReceiverHandler(data.length);
344 
345         acceptor.setHandler(sender);
346         connector.setHandler(receiver);
347 
348         acceptor.bind(address);
349         connector.connect(address);
350         sender.latch.await();
351         receiver.latch.await();
352 
353         acceptor.dispose();
354 
355         assertEquals(data.length, receiver.bytesRead);
356         byte[] actualMd5 = receiver.digest.digest();
357         assertEquals(expectedMd5.length, actualMd5.length);
358         for (int i = 0; i < expectedMd5.length; i++) {
359             assertEquals(expectedMd5[i], actualMd5[i]);
360         }
361     }
362 
363     private class SenderHandler extends IoHandlerAdapter {
364         final CountDownLatch latch = new CountDownLatch(1);
365 
366         private M message;
367 
368         StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
369 
370         SenderHandler(M message) {
371             this.message = message;
372         }
373 
374         @Override
375         public void sessionCreated(IoSession session) throws Exception {
376             super.sessionCreated(session);
377             session.getFilterChain().addLast("codec", streamWriteFilter);
378         }
379 
380         @Override
381         public void sessionOpened(IoSession session) throws Exception {
382             session.write(message);
383         }
384 
385         @Override
386         public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
387             LOGGER.error("SenderHandler: exceptionCaught", cause);
388             latch.countDown();
389         }
390 
391         @Override
392         public void sessionClosed(IoSession session) throws Exception {
393             LOGGER.info("SenderHandler: sessionClosed");
394             latch.countDown();
395         }
396 
397         @Override
398         public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
399             LOGGER.info("SenderHandler: sessionIdle");
400             latch.countDown();
401         }
402 
403         @Override
404         public void messageSent(IoSession session, Object message) throws Exception {
405             LOGGER.info("SenderHandler: messageSent");
406             if (message == this.message) {
407                 LOGGER.info("message == this.message");
408                 latch.countDown();
409             }
410         }
411     }
412 
413     private static class ReceiverHandler extends IoHandlerAdapter {
414         final CountDownLatch latch = new CountDownLatch(1);
415 
416         long bytesRead = 0;
417 
418         long size = 0;
419 
420         MessageDigest digest;
421 
422         ReceiverHandler(long size) throws Exception {
423             this.size = size;
424             digest = MessageDigest.getInstance("MD5");
425         }
426 
427         @Override
428         public void sessionCreated(IoSession session) throws Exception {
429             super.sessionCreated(session);
430 
431             session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5);
432         }
433 
434         @Override
435         public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
436             LOGGER.info("ReceiverHandler: sessionIdle");
437             session.closeNow();
438         }
439 
440         @Override
441         public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
442             LOGGER.error("ReceiverHandler: exceptionCaught", cause);
443             latch.countDown();
444         }
445 
446         @Override
447         public void sessionClosed(IoSession session) throws Exception {
448             LOGGER.info("ReceiverHandler: sessionClosed");
449             latch.countDown();
450         }
451 
452         @Override
453         public void messageReceived(IoSession session, Object message) throws Exception {
454             LOGGER.info("messageReceived");
455             IoBuffer buf = (IoBuffer) message;
456             while (buf.hasRemaining()) {
457                 digest.update(buf.get());
458                 bytesRead++;
459             }
460             LOGGER.info("messageReceived: bytesRead = {}", bytesRead);
461             if (bytesRead >= size) {
462                 session.closeNow();
463             }
464         }
465     }
466 
467     public static WriteRequest eqWriteRequest(WriteRequest expected) {
468         EasyMock.reportMatcher(new WriteRequestMatcher(expected));
469         return null;
470     }
471 
472     private static class WriteRequestMatcher implements IArgumentMatcher {
473         private final WriteRequest expected;
474 
475         public WriteRequestMatcher(WriteRequest expected) {
476             this.expected = expected;
477         }
478 
479         public boolean matches(Object actual) {
480             if (actual instanceof WriteRequest) {
481                 WriteRequest w2 = (WriteRequest) actual;
482 
483                 return expected.getMessage().equals(w2.getMessage())
484                         && expected.getFuture().isWritten() == w2.getFuture().isWritten();
485             }
486             return false;
487         }
488 
489         public void appendTo(StringBuffer buffer) {
490             buffer.append("Expected a WriteRequest with the message '").append(expected.getMessage()).append("'");
491         }
492     }
493 
494     private static class DummyWriteFuture implements WriteFuture {
495         private boolean written;
496 
497         /**
498          * Default constructor
499          */
500         public DummyWriteFuture() {
501             super();
502         }
503 
504         public boolean isWritten() {
505             return written;
506         }
507 
508         public void setWritten() {
509             this.written = true;
510         }
511 
512         public IoSession getSession() {
513             return null;
514         }
515 
516         public void join() {
517             // Do nothing
518         }
519 
520         public boolean join(long timeoutInMillis) {
521             return true;
522         }
523 
524         public boolean isDone() {
525             return true;
526         }
527 
528         public WriteFuture addListener(IoFutureListener<?> listener) {
529             return this;
530         }
531 
532         public WriteFuture removeListener(IoFutureListener<?> listener) {
533             return this;
534         }
535 
536         public WriteFuture await() throws InterruptedException {
537             return this;
538         }
539 
540         public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
541             return true;
542         }
543 
544         public boolean await(long timeoutMillis) throws InterruptedException {
545             return true;
546         }
547 
548         public WriteFuture awaitUninterruptibly() {
549             return this;
550         }
551 
552         public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
553             return true;
554         }
555 
556         public boolean awaitUninterruptibly(long timeoutMillis) {
557             return true;
558         }
559 
560         public Throwable getException() {
561             return null;
562         }
563 
564         public void setException(Throwable cause) {
565             throw new IllegalStateException();
566         }
567     }
568 
569 }