1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.session;
21
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.IOException;
25 import java.net.SocketAddress;
26 import java.nio.channels.FileChannel;
27 import java.util.Iterator;
28 import java.util.Queue;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.file.DefaultFileRegion;
37 import org.apache.mina.core.file.FilenameFileRegion;
38 import org.apache.mina.core.filterchain.IoFilterChain;
39 import org.apache.mina.core.future.CloseFuture;
40 import org.apache.mina.core.future.DefaultCloseFuture;
41 import org.apache.mina.core.future.DefaultReadFuture;
42 import org.apache.mina.core.future.DefaultWriteFuture;
43 import org.apache.mina.core.future.IoFutureListener;
44 import org.apache.mina.core.future.ReadFuture;
45 import org.apache.mina.core.future.WriteFuture;
46 import org.apache.mina.core.service.AbstractIoService;
47 import org.apache.mina.core.service.IoAcceptor;
48 import org.apache.mina.core.service.IoHandler;
49 import org.apache.mina.core.service.IoProcessor;
50 import org.apache.mina.core.service.IoService;
51 import org.apache.mina.core.service.TransportMetadata;
52 import org.apache.mina.core.write.DefaultWriteRequest;
53 import org.apache.mina.core.write.WriteException;
54 import org.apache.mina.core.write.WriteRequest;
55 import org.apache.mina.core.write.WriteRequestQueue;
56 import org.apache.mina.core.write.WriteTimeoutException;
57 import org.apache.mina.core.write.WriteToClosedSessionException;
58 import org.apache.mina.util.ExceptionMonitor;
59
60
61
62
63
64
65 public abstract class AbstractIoSession implements IoSession {
66
67 private final IoHandler handler;
68
69
70 protected IoSessionConfig config;
71
72
73 private final IoService service;
74
75 private static final AttributeKeyAttributeKey">AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
76 "readyReadFutures");
77
78 private static final AttributeKeytributeKey">AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
79 "waitingReadFutures");
80
81 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
82 public void operationComplete(CloseFuture future) {
83 AbstractIoSession../org/apache/mina/core/session/AbstractIoSession.html#AbstractIoSession">AbstractIoSession session = (AbstractIoSession) future.getSession();
84 session.scheduledWriteBytes.set(0);
85 session.scheduledWriteMessages.set(0);
86 session.readBytesThroughput = 0;
87 session.readMessagesThroughput = 0;
88 session.writtenBytesThroughput = 0;
89 session.writtenMessagesThroughput = 0;
90 }
91 };
92
93
94
95
96 public static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
97
98
99
100
101 public static final WriteRequest MESSAGE_SENT_REQUEST = new DefaultWriteRequest(DefaultWriteRequest.EMPTY_MESSAGE);
102
103 private final Object lock = new Object();
104
105 private IoSessionAttributeMap attributes;
106
107 private WriteRequestQueue writeRequestQueue;
108
109 private WriteRequest currentWriteRequest;
110
111
112 private final long creationTime;
113
114
115 private static AtomicLong idGenerator = new AtomicLong(0);
116
117
118 private long sessionId;
119
120
121
122
123 private final CloseFuture closeFuture = new DefaultCloseFuture(this);
124
125 private volatile boolean closing;
126
127
128 private boolean readSuspended = false;
129
130 private boolean writeSuspended = false;
131
132
133 private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
134
135 private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
136
137 private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
138
139 private long readBytes;
140
141 private long writtenBytes;
142
143 private long readMessages;
144
145 private long writtenMessages;
146
147 private long lastReadTime;
148
149 private long lastWriteTime;
150
151 private long lastThroughputCalculationTime;
152
153 private long lastReadBytes;
154
155 private long lastWrittenBytes;
156
157 private long lastReadMessages;
158
159 private long lastWrittenMessages;
160
161 private double readBytesThroughput;
162
163 private double writtenBytesThroughput;
164
165 private double readMessagesThroughput;
166
167 private double writtenMessagesThroughput;
168
169 private AtomicInteger idleCountForBoth = new AtomicInteger();
170
171 private AtomicInteger idleCountForRead = new AtomicInteger();
172
173 private AtomicInteger idleCountForWrite = new AtomicInteger();
174
175 private long lastIdleTimeForBoth;
176
177 private long lastIdleTimeForRead;
178
179 private long lastIdleTimeForWrite;
180
181 private boolean deferDecreaseReadBuffer = true;
182
183
184
185
186
187
188 protected AbstractIoSession(IoService service) {
189 this.service = service;
190 this.handler = service.getHandler();
191
192
193 long currentTime = System.currentTimeMillis();
194 creationTime = currentTime;
195 lastThroughputCalculationTime = currentTime;
196 lastReadTime = currentTime;
197 lastWriteTime = currentTime;
198 lastIdleTimeForBoth = currentTime;
199 lastIdleTimeForRead = currentTime;
200 lastIdleTimeForWrite = currentTime;
201
202
203 closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
204
205
206 sessionId = idGenerator.incrementAndGet();
207 }
208
209
210
211
212
213
214 public final long getId() {
215 return sessionId;
216 }
217
218
219
220
221 public abstract IoProcessor getProcessor();
222
223
224
225
226 public final boolean isConnected() {
227 return !closeFuture.isClosed();
228 }
229
230
231
232
233 public boolean isActive() {
234
235 return true;
236 }
237
238
239
240
241 public final boolean isClosing() {
242 return closing || closeFuture.isClosed();
243 }
244
245
246
247
248 public boolean isSecured() {
249
250 return false;
251 }
252
253
254
255
256 public final CloseFuture getCloseFuture() {
257 return closeFuture;
258 }
259
260
261
262
263
264
265 public final boolean isScheduledForFlush() {
266 return scheduledForFlush.get();
267 }
268
269
270
271
272 public final void scheduledForFlush() {
273 scheduledForFlush.set(true);
274 }
275
276
277
278
279 public final void unscheduledForFlush() {
280 scheduledForFlush.set(false);
281 }
282
283
284
285
286
287
288
289
290
291
292 public final boolean setScheduledForFlush(boolean schedule) {
293 if (schedule) {
294
295
296
297 return scheduledForFlush.compareAndSet(false, schedule);
298 }
299
300 scheduledForFlush.set(schedule);
301 return true;
302 }
303
304
305
306
307 public final CloseFuture close(boolean rightNow) {
308 if (rightNow) {
309 return closeNow();
310 } else {
311 return closeOnFlush();
312 }
313 }
314
315
316
317
318 public final CloseFuture close() {
319 return closeNow();
320 }
321
322
323
324
325 public final CloseFuture closeOnFlush() {
326 if (!isClosing()) {
327 getWriteRequestQueue().offer(this, CLOSE_REQUEST);
328 getProcessor().flush(this);
329 }
330
331 return closeFuture;
332 }
333
334
335
336
337 public final CloseFuture closeNow() {
338 synchronized (lock) {
339 if (isClosing()) {
340 return closeFuture;
341 }
342
343 closing = true;
344
345 try {
346 destroy();
347 } catch (Exception e) {
348 IoFilterChain filterChain = getFilterChain();
349 filterChain.fireExceptionCaught(e);
350 }
351 }
352
353 getFilterChain().fireFilterClose();
354
355 return closeFuture;
356 }
357
358
359
360
361 protected void destroy() {
362 if (writeRequestQueue != null) {
363 while (!writeRequestQueue.isEmpty(this)) {
364 WriteRequest writeRequest = writeRequestQueue.poll(this);
365
366 if (writeRequest != null) {
367 WriteFuture writeFuture = writeRequest.getFuture();
368
369
370
371 if (writeFuture != null) {
372 writeFuture.setWritten();
373 }
374 }
375 }
376 }
377 }
378
379
380
381
382 public IoHandler getHandler() {
383 return handler;
384 }
385
386
387
388
389 public IoSessionConfig getConfig() {
390 return config;
391 }
392
393
394
395
396 public final ReadFuture read() {
397 if (!getConfig().isUseReadOperation()) {
398 throw new IllegalStateException("useReadOperation is not enabled.");
399 }
400
401 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
402 ReadFuture future;
403
404 synchronized (readyReadFutures) {
405 future = readyReadFutures.poll();
406
407 if (future != null) {
408 if (future.isClosed()) {
409
410 readyReadFutures.offer(future);
411 }
412 } else {
413 future = new DefaultReadFuture(this);
414 getWaitingReadFutures().offer(future);
415 }
416 }
417
418 return future;
419 }
420
421
422
423
424
425
426
427 public final void offerReadFuture(Object message) {
428 newReadFuture().setRead(message);
429 }
430
431
432
433
434
435
436 public final void offerFailedReadFuture(Throwable exception) {
437 newReadFuture().setException(exception);
438 }
439
440
441
442
443 public final void offerClosedReadFuture() {
444 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
445
446 synchronized (readyReadFutures) {
447 newReadFuture().setClosed();
448 }
449 }
450
451
452
453
454 private ReadFuture newReadFuture() {
455 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
456 Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
457 ReadFuture future;
458
459 synchronized (readyReadFutures) {
460 future = waitingReadFutures.poll();
461
462 if (future == null) {
463 future = new DefaultReadFuture(this);
464 readyReadFutures.offer(future);
465 }
466 }
467
468 return future;
469 }
470
471
472
473
474 private Queue<ReadFuture> getReadyReadFutures() {
475 Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
476
477 if (readyReadFutures == null) {
478 readyReadFutures = new ConcurrentLinkedQueue<>();
479
480 Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY,
481 readyReadFutures);
482
483 if (oldReadyReadFutures != null) {
484 readyReadFutures = oldReadyReadFutures;
485 }
486 }
487
488 return readyReadFutures;
489 }
490
491
492
493
494 private Queue<ReadFuture> getWaitingReadFutures() {
495 Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
496
497 if (waitingReadyReadFutures == null) {
498 waitingReadyReadFutures = new ConcurrentLinkedQueue<>();
499
500 Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
501 WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
502
503 if (oldWaitingReadyReadFutures != null) {
504 waitingReadyReadFutures = oldWaitingReadyReadFutures;
505 }
506 }
507
508 return waitingReadyReadFutures;
509 }
510
511
512
513
514 public WriteFuture write(Object message) {
515 return write(message, null);
516 }
517
518
519
520
521 public WriteFuture write(Object message, SocketAddress remoteAddress) {
522 if (message == null) {
523 throw new IllegalArgumentException("Trying to write a null message : not allowed");
524 }
525
526
527
528 if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
529 throw new UnsupportedOperationException();
530 }
531
532
533
534
535 if (isClosing() || !isConnected()) {
536 WriteFuture future = new DefaultWriteFuture(this);
537 WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
538 WriteException writeException = new WriteToClosedSessionException(request);
539 future.setException(writeException);
540 return future;
541 }
542
543 FileChannel openedFileChannel = null;
544
545
546
547 try {
548 if ((message instanceof IoBuffer./../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer) && !((IoBuffer) message).hasRemaining()) {
549
550 throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
551 } else if (message instanceof FileChannel) {
552 FileChannel fileChannel = (FileChannel) message;
553 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
554 } else if (message instanceof File) {
555 File file = (File) message;
556 openedFileChannel = new FileInputStream(file).getChannel();
557 message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
558 }
559 } catch (IOException e) {
560 ExceptionMonitor.getInstance().exceptionCaught(e);
561 return DefaultWriteFuture.newNotWrittenFuture(this, e);
562 }
563
564
565 WriteFuture writeFuture = new DefaultWriteFuture(this);
566 WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
567
568
569 IoFilterChain filterChain = getFilterChain();
570 filterChain.fireFilterWrite(writeRequest);
571
572
573
574
575 if (openedFileChannel != null) {
576
577
578 final FileChannel finalChannel = openedFileChannel;
579 writeFuture.addListener(new IoFutureListener<WriteFuture>() {
580 public void operationComplete(WriteFuture future) {
581 try {
582 finalChannel.close();
583 } catch (IOException e) {
584 ExceptionMonitor.getInstance().exceptionCaught(e);
585 }
586 }
587 });
588 }
589
590
591 return writeFuture;
592 }
593
594
595
596
597 public final Object getAttachment() {
598 return getAttribute("");
599 }
600
601
602
603
604 public final Object setAttachment(Object attachment) {
605 return setAttribute("", attachment);
606 }
607
608
609
610
611 public final Object getAttribute(Object key) {
612 return getAttribute(key, null);
613 }
614
615
616
617
618 public final Object getAttribute(Object key, Object defaultValue) {
619 return attributes.getAttribute(this, key, defaultValue);
620 }
621
622
623
624
625 public final Object setAttribute(Object key, Object value) {
626 return attributes.setAttribute(this, key, value);
627 }
628
629
630
631
632 public final Object setAttribute(Object key) {
633 return setAttribute(key, Boolean.TRUE);
634 }
635
636
637
638
639 public final Object setAttributeIfAbsent(Object key, Object value) {
640 return attributes.setAttributeIfAbsent(this, key, value);
641 }
642
643
644
645
646 public final Object setAttributeIfAbsent(Object key) {
647 return setAttributeIfAbsent(key, Boolean.TRUE);
648 }
649
650
651
652
653 public final Object removeAttribute(Object key) {
654 return attributes.removeAttribute(this, key);
655 }
656
657
658
659
660 public final boolean removeAttribute(Object key, Object value) {
661 return attributes.removeAttribute(this, key, value);
662 }
663
664
665
666
667 public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
668 return attributes.replaceAttribute(this, key, oldValue, newValue);
669 }
670
671
672
673
674 public final boolean containsAttribute(Object key) {
675 return attributes.containsAttribute(this, key);
676 }
677
678
679
680
681 public final Set<Object> getAttributeKeys() {
682 return attributes.getAttributeKeys(this);
683 }
684
685
686
687
688 public final IoSessionAttributeMap getAttributeMap() {
689 return attributes;
690 }
691
692
693
694
695
696
697 public final void setAttributeMap(IoSessionAttributeMap attributes) {
698 this.attributes = attributes;
699 }
700
701
702
703
704
705
706 public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
707 this.writeRequestQueue = writeRequestQueue;
708 }
709
710
711
712
713 public final void suspendRead() {
714 readSuspended = true;
715 if (isClosing() || !isConnected()) {
716 return;
717 }
718 getProcessor().updateTrafficControl(this);
719 }
720
721
722
723
724 public final void suspendWrite() {
725 writeSuspended = true;
726 if (isClosing() || !isConnected()) {
727 return;
728 }
729 getProcessor().updateTrafficControl(this);
730 }
731
732
733
734
735 @SuppressWarnings("unchecked")
736 public final void resumeRead() {
737 readSuspended = false;
738 if (isClosing() || !isConnected()) {
739 return;
740 }
741 getProcessor().updateTrafficControl(this);
742 }
743
744
745
746
747 @SuppressWarnings("unchecked")
748 public final void resumeWrite() {
749 writeSuspended = false;
750 if (isClosing() || !isConnected()) {
751 return;
752 }
753 getProcessor().updateTrafficControl(this);
754 }
755
756
757
758
759 public boolean isReadSuspended() {
760 return readSuspended;
761 }
762
763
764
765
766 public boolean isWriteSuspended() {
767 return writeSuspended;
768 }
769
770
771
772
773 public final long getReadBytes() {
774 return readBytes;
775 }
776
777
778
779
780 public final long getWrittenBytes() {
781 return writtenBytes;
782 }
783
784
785
786
787 public final long getReadMessages() {
788 return readMessages;
789 }
790
791
792
793
794 public final long getWrittenMessages() {
795 return writtenMessages;
796 }
797
798
799
800
801 public final double getReadBytesThroughput() {
802 return readBytesThroughput;
803 }
804
805
806
807
808 public final double getWrittenBytesThroughput() {
809 return writtenBytesThroughput;
810 }
811
812
813
814
815 public final double getReadMessagesThroughput() {
816 return readMessagesThroughput;
817 }
818
819
820
821
822 public final double getWrittenMessagesThroughput() {
823 return writtenMessagesThroughput;
824 }
825
826
827
828
829 public final void updateThroughput(long currentTime, boolean force) {
830 int interval = (int) (currentTime - lastThroughputCalculationTime);
831
832 long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
833
834 if (((minInterval == 0) || (interval < minInterval)) && !force) {
835 return;
836 }
837
838 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
839 writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
840 readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
841 writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
842
843 lastReadBytes = readBytes;
844 lastWrittenBytes = writtenBytes;
845 lastReadMessages = readMessages;
846 lastWrittenMessages = writtenMessages;
847
848 lastThroughputCalculationTime = currentTime;
849 }
850
851
852
853
854 public final long getScheduledWriteBytes() {
855 return scheduledWriteBytes.get();
856 }
857
858
859
860
861 public final int getScheduledWriteMessages() {
862 return scheduledWriteMessages.get();
863 }
864
865
866
867
868
869
870 protected void setScheduledWriteBytes(int byteCount) {
871 scheduledWriteBytes.set(byteCount);
872 }
873
874
875
876
877
878
879 protected void setScheduledWriteMessages(int messages) {
880 scheduledWriteMessages.set(messages);
881 }
882
883
884
885
886
887
888
889 public final void increaseReadBytes(long increment, long currentTime) {
890 if (increment <= 0) {
891 return;
892 }
893
894 readBytes += increment;
895 lastReadTime = currentTime;
896 idleCountForBoth.set(0);
897 idleCountForRead.set(0);
898
899 if (getService() instanceof AbstractIoService) {
900 ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
901 }
902 }
903
904
905
906
907
908
909 public final void increaseReadMessages(long currentTime) {
910 readMessages++;
911 lastReadTime = currentTime;
912 idleCountForBoth.set(0);
913 idleCountForRead.set(0);
914
915 if (getService() instanceof AbstractIoService) {
916 ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
917 }
918 }
919
920
921
922
923
924
925
926 public final void increaseWrittenBytes(int increment, long currentTime) {
927 if (increment <= 0) {
928 return;
929 }
930
931 writtenBytes += increment;
932 lastWriteTime = currentTime;
933 idleCountForBoth.set(0);
934 idleCountForWrite.set(0);
935
936 if (getService() instanceof AbstractIoService) {
937 ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
938 }
939
940 increaseScheduledWriteBytes(-increment);
941 }
942
943
944
945
946
947
948
949 public final void increaseWrittenMessages(WriteRequest request, long currentTime) {
950 Object message = request.getMessage();
951
952 if (message instanceof IoBuffer) {
953 IoBufferf="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer b = (IoBuffer) message;
954
955 if (b.hasRemaining()) {
956 return;
957 }
958 }
959
960 writtenMessages++;
961 lastWriteTime = currentTime;
962
963 if (getService() instanceof AbstractIoService) {
964 ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
965 }
966
967 decreaseScheduledWriteMessages();
968 }
969
970
971
972
973
974
975 public final void increaseScheduledWriteBytes(int increment) {
976 scheduledWriteBytes.addAndGet(increment);
977 if (getService() instanceof AbstractIoService) {
978 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
979 }
980 }
981
982
983
984
985 public final void increaseScheduledWriteMessages() {
986 scheduledWriteMessages.incrementAndGet();
987
988 if (getService() instanceof AbstractIoService) {
989 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
990 }
991 }
992
993
994
995
996 private void decreaseScheduledWriteMessages() {
997 scheduledWriteMessages.decrementAndGet();
998 if (getService() instanceof AbstractIoService) {
999 ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
1000 }
1001 }
1002
1003
1004
1005
1006
1007
1008 public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
1009 Object message = request.getMessage();
1010
1011 if (message instanceof IoBuffer) {
1012 IoBufferf="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer b = (IoBuffer) message;
1013
1014 if (b.hasRemaining()) {
1015 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
1016 } else {
1017 decreaseScheduledWriteMessages();
1018 }
1019 } else {
1020 decreaseScheduledWriteMessages();
1021 }
1022 }
1023
1024
1025
1026
1027 public final WriteRequestQueue getWriteRequestQueue() {
1028 if (writeRequestQueue == null) {
1029 throw new IllegalStateException();
1030 }
1031
1032 return writeRequestQueue;
1033 }
1034
1035
1036
1037
1038 public final WriteRequest getCurrentWriteRequest() {
1039 return currentWriteRequest;
1040 }
1041
1042
1043
1044
1045 public final Object getCurrentWriteMessage() {
1046 WriteRequest req = getCurrentWriteRequest();
1047
1048 if (req == null) {
1049 return null;
1050 }
1051 return req.getMessage();
1052 }
1053
1054
1055
1056
1057 public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
1058 this.currentWriteRequest = currentWriteRequest;
1059 }
1060
1061
1062
1063
1064 public final void increaseReadBufferSize() {
1065 int newReadBufferSize = getConfig().getReadBufferSize() << 1;
1066 if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
1067 getConfig().setReadBufferSize(newReadBufferSize);
1068 } else {
1069 getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
1070 }
1071
1072 deferDecreaseReadBuffer = true;
1073 }
1074
1075
1076
1077
1078 public final void decreaseReadBufferSize() {
1079 if (deferDecreaseReadBuffer) {
1080 deferDecreaseReadBuffer = false;
1081 return;
1082 }
1083
1084 if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
1085 getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
1086 }
1087
1088 deferDecreaseReadBuffer = true;
1089 }
1090
1091
1092
1093
1094 public final long getCreationTime() {
1095 return creationTime;
1096 }
1097
1098
1099
1100
1101 public final long getLastIoTime() {
1102 return Math.max(lastReadTime, lastWriteTime);
1103 }
1104
1105
1106
1107
1108 public final long getLastReadTime() {
1109 return lastReadTime;
1110 }
1111
1112
1113
1114
1115 public final long getLastWriteTime() {
1116 return lastWriteTime;
1117 }
1118
1119
1120
1121
1122 public final boolean isIdle(IdleStatus status) {
1123 if (status == IdleStatus.BOTH_IDLE) {
1124 return idleCountForBoth.get() > 0;
1125 }
1126
1127 if (status == IdleStatus.READER_IDLE) {
1128 return idleCountForRead.get() > 0;
1129 }
1130
1131 if (status == IdleStatus.WRITER_IDLE) {
1132 return idleCountForWrite.get() > 0;
1133 }
1134
1135 throw new IllegalArgumentException("Unknown idle status: " + status);
1136 }
1137
1138
1139
1140
1141 public final boolean isBothIdle() {
1142 return isIdle(IdleStatus.BOTH_IDLE);
1143 }
1144
1145
1146
1147
1148 public final boolean isReaderIdle() {
1149 return isIdle(IdleStatus.READER_IDLE);
1150 }
1151
1152
1153
1154
1155 public final boolean isWriterIdle() {
1156 return isIdle(IdleStatus.WRITER_IDLE);
1157 }
1158
1159
1160
1161
1162 public final int getIdleCount(IdleStatus status) {
1163 if (getConfig().getIdleTime(status) == 0) {
1164 if (status == IdleStatus.BOTH_IDLE) {
1165 idleCountForBoth.set(0);
1166 }
1167
1168 if (status == IdleStatus.READER_IDLE) {
1169 idleCountForRead.set(0);
1170 }
1171
1172 if (status == IdleStatus.WRITER_IDLE) {
1173 idleCountForWrite.set(0);
1174 }
1175 }
1176
1177 if (status == IdleStatus.BOTH_IDLE) {
1178 return idleCountForBoth.get();
1179 }
1180
1181 if (status == IdleStatus.READER_IDLE) {
1182 return idleCountForRead.get();
1183 }
1184
1185 if (status == IdleStatus.WRITER_IDLE) {
1186 return idleCountForWrite.get();
1187 }
1188
1189 throw new IllegalArgumentException("Unknown idle status: " + status);
1190 }
1191
1192
1193
1194
1195 public final long getLastIdleTime(IdleStatus status) {
1196 if (status == IdleStatus.BOTH_IDLE) {
1197 return lastIdleTimeForBoth;
1198 }
1199
1200 if (status == IdleStatus.READER_IDLE) {
1201 return lastIdleTimeForRead;
1202 }
1203
1204 if (status == IdleStatus.WRITER_IDLE) {
1205 return lastIdleTimeForWrite;
1206 }
1207
1208 throw new IllegalArgumentException("Unknown idle status: " + status);
1209 }
1210
1211
1212
1213
1214
1215
1216
1217 public final void increaseIdleCount(IdleStatus status, long currentTime) {
1218 if (status == IdleStatus.BOTH_IDLE) {
1219 idleCountForBoth.incrementAndGet();
1220 lastIdleTimeForBoth = currentTime;
1221 } else if (status == IdleStatus.READER_IDLE) {
1222 idleCountForRead.incrementAndGet();
1223 lastIdleTimeForRead = currentTime;
1224 } else if (status == IdleStatus.WRITER_IDLE) {
1225 idleCountForWrite.incrementAndGet();
1226 lastIdleTimeForWrite = currentTime;
1227 } else {
1228 throw new IllegalArgumentException("Unknown idle status: " + status);
1229 }
1230 }
1231
1232
1233
1234
1235 public final int getBothIdleCount() {
1236 return getIdleCount(IdleStatus.BOTH_IDLE);
1237 }
1238
1239
1240
1241
1242 public final long getLastBothIdleTime() {
1243 return getLastIdleTime(IdleStatus.BOTH_IDLE);
1244 }
1245
1246
1247
1248
1249 public final long getLastReaderIdleTime() {
1250 return getLastIdleTime(IdleStatus.READER_IDLE);
1251 }
1252
1253
1254
1255
1256 public final long getLastWriterIdleTime() {
1257 return getLastIdleTime(IdleStatus.WRITER_IDLE);
1258 }
1259
1260
1261
1262
1263 public final int getReaderIdleCount() {
1264 return getIdleCount(IdleStatus.READER_IDLE);
1265 }
1266
1267
1268
1269
1270 public final int getWriterIdleCount() {
1271 return getIdleCount(IdleStatus.WRITER_IDLE);
1272 }
1273
1274
1275
1276
1277 public SocketAddress getServiceAddress() {
1278 IoService service = getService();
1279 if (service instanceof IoAcceptor) {
1280 return ((IoAcceptor) service).getLocalAddress();
1281 }
1282
1283 return getRemoteAddress();
1284 }
1285
1286
1287
1288
1289 @Override
1290 public final int hashCode() {
1291 return super.hashCode();
1292 }
1293
1294
1295
1296
1297
1298 @Override
1299 public final boolean equals(Object o) {
1300 return super.equals(o);
1301 }
1302
1303
1304
1305
1306 @Override
1307 public String toString() {
1308 if (isConnected() || isClosing()) {
1309 String remote = null;
1310 String local = null;
1311
1312 try {
1313 remote = String.valueOf(getRemoteAddress());
1314 } catch (Exception e) {
1315 remote = "Cannot get the remote address informations: " + e.getMessage();
1316 }
1317
1318 try {
1319 local = String.valueOf(getLocalAddress());
1320 } catch (Exception e) {
1321 }
1322
1323 if (getService() instanceof IoAcceptor) {
1324 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')';
1325 }
1326
1327 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')';
1328 }
1329
1330 return "(" + getIdAsString() + ") Session disconnected ...";
1331 }
1332
1333
1334
1335
1336 private String getIdAsString() {
1337 String id = Long.toHexString(getId()).toUpperCase();
1338
1339 if (id.length() <= 8) {
1340 return "0x00000000".substring(0, 10 - id.length()) + id;
1341 } else {
1342 return "0x" + id;
1343 }
1344 }
1345
1346
1347
1348
1349 private String getServiceName() {
1350 TransportMetadata tm = getTransportMetadata();
1351 if (tm == null) {
1352 return "null";
1353 }
1354
1355 return tm.getProviderName() + ' ' + tm.getName();
1356 }
1357
1358
1359
1360
1361 public IoService getService() {
1362 return service;
1363 }
1364
1365
1366
1367
1368
1369
1370
1371
1372 public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1373 while (sessions.hasNext()) {
1374 IoSession session = sessions.next();
1375
1376 if (!session.getCloseFuture().isClosed()) {
1377 notifyIdleSession(session, currentTime);
1378 }
1379 }
1380 }
1381
1382
1383
1384
1385
1386
1387
1388
1389 public static void notifyIdleSession(IoSession session, long currentTime) {
1390 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1391 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1392
1393 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1394 IdleStatus.READER_IDLE,
1395 Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
1396
1397 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1398 IdleStatus.WRITER_IDLE,
1399 Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1400
1401 notifyWriteTimeout(session, currentTime);
1402 }
1403
1404 private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,
1405 long lastIoTime) {
1406 if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
1407 session.getFilterChain().fireSessionIdle(status);
1408 }
1409 }
1410
1411 private static void notifyWriteTimeout(IoSession session, long currentTime) {
1412
1413 long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1414 if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout)
1415 && !session.getWriteRequestQueue().isEmpty(session)) {
1416 WriteRequest request = session.getCurrentWriteRequest();
1417 if (request != null) {
1418 session.setCurrentWriteRequest(null);
1419 WriteTimeoutExceptionException.html#WriteTimeoutException">WriteTimeoutException cause = new WriteTimeoutException(request);
1420 request.getFuture().setException(cause);
1421 session.getFilterChain().fireExceptionCaught(cause);
1422
1423 session.closeNow();
1424 }
1425 }
1426 }
1427 }