View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import org.apache.mina.core.filterchain.IoFilter;
23  import org.apache.mina.core.session.DummySession;
24  import org.apache.mina.core.session.IdleStatus;
25  import org.apache.mina.core.session.IoSession;
26  import org.apache.mina.core.write.WriteRequest;
27  import org.apache.mina.filter.FilterEvent;
28  import org.junit.Ignore;
29  import org.junit.Test;
30  
31  import java.util.ArrayList;
32  import java.util.Comparator;
33  import java.util.List;
34  import java.util.concurrent.ExecutorService;
35  import java.util.concurrent.TimeUnit;
36  
37  import static org.junit.Assert.assertEquals;
38  import static org.junit.Assert.assertTrue;
39  
40  /**
41   * Tests that verify the functionality provided by the implementation of
42   * {@link PriorityThreadPoolExecutor}.
43   *
44   * @author Guus der Kinderen, guus.der.kinderen@gmail.com
45   */
46  public class PriorityThreadPoolExecutorTest {
47      /**
48       * Tests that verify the functionality provided by the implementation of
49       * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
50       * .
51       *
52       * This test asserts that, without a provided comparator, entries are
53       * considered equal, when they reference the same session.
54       */
55      @Test
56      public void fifoEntryTestNoComparatorSameSession() throws Exception {
57          // Set up fixture.
58          IoSession session = new DummySession();
59          PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, null);
60          PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, null);
61      
62          // Execute system under test.
63          int result = first.compareTo(last);
64      
65          // Verify results.
66          assertEquals("Without a comparator, entries of the same session are expected to be equal.", 0, result);
67      }
68  
69      /**
70       * Tests that verify the functionality provided by the implementation of
71       * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
72       * .
73       *
74       * This test asserts that, without a provided comparator, the first entry
75       * created is 'less than' an entry that is created later.
76       */
77      @Test
78      public void fifoEntryTestNoComparatorDifferentSession() throws Exception {
79          // Set up fixture (the order in which the entries are created is
80          // relevant here!)
81          PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
82          PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
83          
84          // Execute system under test.
85          int result = first.compareTo(last);
86          
87          // Verify results.
88          assertTrue("Without a comparator, the first entry created should be the first entry out. Expected a negative result, instead, got: " + result, result < 0);
89      }
90  
91      /**
92       * Tests that verify the functionality provided by the implementation of
93       * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
94       * .
95       *
96       * This test asserts that, with a provided comparator, entries are
97       * considered equal, when they reference the same session (the provided
98       * comparator is ignored).
99       */
100     @Test
101     public void fifoEntryTestWithComparatorSameSession() throws Exception {
102         // Set up fixture.
103         IoSession session = new DummySession();
104         final int predeterminedResult = 3853;
105         
106         Comparator<IoSession> comparator = new Comparator<IoSession>() {
107             @Override
108             public int compare(IoSession o1, IoSession o2) {
109                 return predeterminedResult;
110             }
111         };
112         
113         PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, comparator);
114         PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, comparator);
115         
116         // Execute system under test.
117         int result = first.compareTo(last);
118         
119         // Verify results.
120         assertEquals("With a comparator, entries of the same session are expected to be equal.", 0, result);
121     }
122 
123     /**
124      * Tests that verify the functionality provided by the implementation of
125      * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
126      * .
127      *
128      * This test asserts that a provided comparator is used instead of the
129      * (fallback) default behavior (when entries are referring different
130      * sessions).
131      */
132     @Test
133     public void fifoEntryTestComparatorDifferentSession() throws Exception {
134         // Set up fixture (the order in which the entries are created is
135         // relevant here!)
136         final int predeterminedResult = 3853;
137         
138         Comparator<IoSession> comparator = new Comparator<IoSession>() {
139             @Override
140             public int compare(IoSession o1, IoSession o2) {
141                 return predeterminedResult;
142             }
143         };
144         
145         PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
146         PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
147         
148         // Execute system under test.
149         int result = first.compareTo(last);
150         
151         // Verify results.
152         assertEquals("With a comparator, comparing entries of different sessions is expected to yield the comparator result.", predeterminedResult, result);
153     }
154 
155     /**
156      * Asserts that, when enough work is being submitted to the executor for it
157      * to start queuing work, prioritisation of work starts to occur.
158      *
159      * This implementation starts a number of sessions, and evenly distributes a
160      * number of messages to them. Processing each message is artificially made
161      * 'expensive', while the executor pool is kept small. This causes work to
162      * be queued in the executor.
163      *
164      * The executor that is used is configured to prefer one specific session.
165      * Each session records the timestamp of its last activity. After all work
166      * has been processed, the test asserts that the last activity of all
167      * sessions was later than the last activity of the preferred session.
168      */
169     @Test
170     @Ignore("This test faiuls randomly")
171     public void testPrioritisation() throws Throwable {
172         // Set up fixture.
173         MockWorkFilter nextFilter = new MockWorkFilter();
174         List<LastActivityTracker> sessions = new ArrayList<>();
175         
176         for (int i = 0; i < 10; i++) {
177             sessions.add(new LastActivityTracker());
178         }
179         
180         LastActivityTracker preferredSession = sessions.get(4); // prefer an arbitrary session
181                                                                 // (but not the first or last
182                                                                 // session, for good measure).
183         Comparator<IoSession> comparator = new UnfairComparator(preferredSession);
184         int maximumPoolSize = 1; // keep this low, to force resource contention.
185         int amountOfTasks = 400;
186         
187         ExecutorService executor = new PriorityThreadPoolExecutor(maximumPoolSize, comparator);
188         ExecutorFilter filter = new ExecutorFilter(executor);
189         
190         // Execute system under test.
191         for (int i = 0; i < amountOfTasks; i++) {
192             int sessionIndex = i % sessions.size();
193             
194             LastActivityTracker currentSession = sessions.get(sessionIndex);
195             filter.messageReceived(nextFilter, currentSession, null);
196         
197             if (nextFilter.throwable != null) {
198                 throw nextFilter.throwable;
199             }
200         }
201         
202         executor.shutdown();
203         
204         // Verify results.
205         executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
206         
207         for (LastActivityTracker session : sessions) {
208             if (session != preferredSession) {
209                 assertTrue("All other sessions should have finished later than the preferred session (but at least one did not).", 
210                     session.lastActivity > preferredSession.lastActivity);
211             }
212         }
213     }
214 
215     /**
216      * A comparator that prefers a particular session.
217      */
218     private static class UnfairComparator implements Comparator<IoSession> {
219         private IoSession preferred;
220         
221         public UnfairComparator(IoSession preferred) {
222             this.preferred = preferred;
223         }
224         
225         @Override
226         public int compare(IoSession o1, IoSession o2) {
227             if (o1 == preferred) {
228                 return -1;
229             }
230         
231             if (o2 == preferred) {
232                 return 1;
233             }
234         
235             return 0;
236         }
237     }
238 
239     /**
240      * A session that tracks the timestamp of last activity.
241      */
242     private static class LastActivityTracker extends DummySession {
243         long lastActivity = System.currentTimeMillis();
244 
245         public synchronized void setLastActivity() {
246             lastActivity = System.currentTimeMillis();
247         }
248     }
249 
250     /**
251      * A filter that simulates a non-negligible amount of work.
252      */
253     private static class MockWorkFilter implements IoFilter.NextFilter {
254         Throwable throwable;
255         
256         public void sessionOpened(IoSession session) {
257             // Do nothing
258         }
259         
260         public void sessionClosed(IoSession session) {
261             // Do nothing
262         }
263         
264         public void sessionIdle(IoSession session, IdleStatus status) {
265             // Do nothing
266         }
267         
268         public void exceptionCaught(IoSession session, Throwable cause) {
269             // Do nothing
270         }
271         
272         public void inputClosed(IoSession session) {
273             // Do nothing
274         }
275         
276         public void messageReceived(IoSession session, Object message) {
277             try {
278                 Thread.sleep(20); // mimic work.
279                 ((LastActivityTracker) session).setLastActivity();
280             } catch (Exception e) {
281                 if (this.throwable == null) {
282                     this.throwable = e;
283                 }
284             }
285         }
286         
287         public void messageSent(IoSession session, WriteRequest writeRequest) {
288             // Do nothing
289         }
290         
291         public void filterWrite(IoSession session, WriteRequest writeRequest) {
292             // Do nothing
293         }
294         
295         public void filterClose(IoSession session) {
296             // Do nothing
297         }
298         
299         public void sessionCreated(IoSession session) {
300             // Do nothing
301         }
302         
303         @Override
304         public void event(IoSession session, FilterEvent event) {
305             // TODO Auto-generated method stub
306         }
307     }
308 }