1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import org.apache.mina.core.filterchain.IoFilter;
23 import org.apache.mina.core.session.DummySession;
24 import org.apache.mina.core.session.IdleStatus;
25 import org.apache.mina.core.session.IoSession;
26 import org.apache.mina.core.write.WriteRequest;
27 import org.apache.mina.filter.FilterEvent;
28 import org.junit.Ignore;
29 import org.junit.Test;
30
31 import java.util.ArrayList;
32 import java.util.Comparator;
33 import java.util.List;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.TimeUnit;
36
37 import static org.junit.Assert.assertEquals;
38 import static org.junit.Assert.assertTrue;
39
40
41
42
43
44
45
46 public class PriorityThreadPoolExecutorTest {
47
48
49
50
51
52
53
54
55 @Test
56 public void fifoEntryTestNoComparatorSameSession() throws Exception {
57
58 IoSession session = new DummySession();
59 PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, null);
60 PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, null);
61
62
63 int result = first.compareTo(last);
64
65
66 assertEquals("Without a comparator, entries of the same session are expected to be equal.", 0, result);
67 }
68
69
70
71
72
73
74
75
76
77 @Test
78 public void fifoEntryTestNoComparatorDifferentSession() throws Exception {
79
80
81 PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
82 PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
83
84
85 int result = first.compareTo(last);
86
87
88 assertTrue("Without a comparator, the first entry created should be the first entry out. Expected a negative result, instead, got: " + result, result < 0);
89 }
90
91
92
93
94
95
96
97
98
99
100 @Test
101 public void fifoEntryTestWithComparatorSameSession() throws Exception {
102
103 IoSession session = new DummySession();
104 final int predeterminedResult = 3853;
105
106 Comparator<IoSession> comparator = new Comparator<IoSession>() {
107 @Override
108 public int compare(IoSession o1, IoSession o2) {
109 return predeterminedResult;
110 }
111 };
112
113 PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, comparator);
114 PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, comparator);
115
116
117 int result = first.compareTo(last);
118
119
120 assertEquals("With a comparator, entries of the same session are expected to be equal.", 0, result);
121 }
122
123
124
125
126
127
128
129
130
131
132 @Test
133 public void fifoEntryTestComparatorDifferentSession() throws Exception {
134
135
136 final int predeterminedResult = 3853;
137
138 Comparator<IoSession> comparator = new Comparator<IoSession>() {
139 @Override
140 public int compare(IoSession o1, IoSession o2) {
141 return predeterminedResult;
142 }
143 };
144
145 PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
146 PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
147
148
149 int result = first.compareTo(last);
150
151
152 assertEquals("With a comparator, comparing entries of different sessions is expected to yield the comparator result.", predeterminedResult, result);
153 }
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 @Test
170 @Ignore("This test faiuls randomly")
171 public void testPrioritisation() throws Throwable {
172
173 MockWorkFilter nextFilter = new MockWorkFilter();
174 List<LastActivityTracker> sessions = new ArrayList<>();
175
176 for (int i = 0; i < 10; i++) {
177 sessions.add(new LastActivityTracker());
178 }
179
180 LastActivityTracker preferredSession = sessions.get(4);
181
182
183 Comparator<IoSession> comparator = new UnfairComparator(preferredSession);
184 int maximumPoolSize = 1;
185 int amountOfTasks = 400;
186
187 ExecutorService executor = new PriorityThreadPoolExecutor(maximumPoolSize, comparator);
188 ExecutorFilter filter = new ExecutorFilter(executor);
189
190
191 for (int i = 0; i < amountOfTasks; i++) {
192 int sessionIndex = i % sessions.size();
193
194 LastActivityTracker currentSession = sessions.get(sessionIndex);
195 filter.messageReceived(nextFilter, currentSession, null);
196
197 if (nextFilter.throwable != null) {
198 throw nextFilter.throwable;
199 }
200 }
201
202 executor.shutdown();
203
204
205 executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
206
207 for (LastActivityTracker session : sessions) {
208 if (session != preferredSession) {
209 assertTrue("All other sessions should have finished later than the preferred session (but at least one did not).",
210 session.lastActivity > preferredSession.lastActivity);
211 }
212 }
213 }
214
215
216
217
218 private static class UnfairComparator implements Comparator<IoSession> {
219 private IoSession preferred;
220
221 public UnfairComparator(IoSession preferred) {
222 this.preferred = preferred;
223 }
224
225 @Override
226 public int compare(IoSession o1, IoSession o2) {
227 if (o1 == preferred) {
228 return -1;
229 }
230
231 if (o2 == preferred) {
232 return 1;
233 }
234
235 return 0;
236 }
237 }
238
239
240
241
242 private static class LastActivityTracker extends DummySession {
243 long lastActivity = System.currentTimeMillis();
244
245 public synchronized void setLastActivity() {
246 lastActivity = System.currentTimeMillis();
247 }
248 }
249
250
251
252
253 private static class MockWorkFilter implements IoFilter.NextFilter {
254 Throwable throwable;
255
256 public void sessionOpened(IoSession session) {
257
258 }
259
260 public void sessionClosed(IoSession session) {
261
262 }
263
264 public void sessionIdle(IoSession session, IdleStatus status) {
265
266 }
267
268 public void exceptionCaught(IoSession session, Throwable cause) {
269
270 }
271
272 public void inputClosed(IoSession session) {
273
274 }
275
276 public void messageReceived(IoSession session, Object message) {
277 try {
278 Thread.sleep(20);
279 ((LastActivityTracker) session).setLastActivity();
280 } catch (Exception e) {
281 if (this.throwable == null) {
282 this.throwable = e;
283 }
284 }
285 }
286
287 public void messageSent(IoSession session, WriteRequest writeRequest) {
288
289 }
290
291 public void filterWrite(IoSession session, WriteRequest writeRequest) {
292
293 }
294
295 public void filterClose(IoSession session) {
296
297 }
298
299 public void sessionCreated(IoSession session) {
300
301 }
302
303 @Override
304 public void event(IoSession session, FilterEvent event) {
305
306 }
307 }
308 }