View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.session;
21  
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.IOException;
25  import java.net.SocketAddress;
26  import java.nio.channels.FileChannel;
27  import java.util.Iterator;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.file.DefaultFileRegion;
37  import org.apache.mina.core.file.FilenameFileRegion;
38  import org.apache.mina.core.filterchain.IoFilterChain;
39  import org.apache.mina.core.future.CloseFuture;
40  import org.apache.mina.core.future.DefaultCloseFuture;
41  import org.apache.mina.core.future.DefaultReadFuture;
42  import org.apache.mina.core.future.DefaultWriteFuture;
43  import org.apache.mina.core.future.IoFutureListener;
44  import org.apache.mina.core.future.ReadFuture;
45  import org.apache.mina.core.future.WriteFuture;
46  import org.apache.mina.core.service.AbstractIoService;
47  import org.apache.mina.core.service.IoAcceptor;
48  import org.apache.mina.core.service.IoHandler;
49  import org.apache.mina.core.service.IoProcessor;
50  import org.apache.mina.core.service.IoService;
51  import org.apache.mina.core.service.TransportMetadata;
52  import org.apache.mina.core.write.DefaultWriteRequest;
53  import org.apache.mina.core.write.WriteException;
54  import org.apache.mina.core.write.WriteRequest;
55  import org.apache.mina.core.write.WriteRequestQueue;
56  import org.apache.mina.core.write.WriteTimeoutException;
57  import org.apache.mina.core.write.WriteToClosedSessionException;
58  import org.apache.mina.util.ExceptionMonitor;
59  
60  /**
61   * Base implementation of {@link IoSession}.
62   * 
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   */
65  public abstract class AbstractIoSession implements IoSession {
66      /** The associated handler */
67      private final IoHandler handler;
68  
69      /** The session config */
70      protected IoSessionConfig config;
71  
72      /** The service which will manage this session */
73      private final IoService service;
74  
75      private static final AttributeKeyAttributeKey">AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
76              "readyReadFutures");
77  
78      private static final AttributeKeytributeKey">AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
79              "waitingReadFutures");
80  
81      private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
82          public void operationComplete(CloseFuture future) {
83              AbstractIoSession../org/apache/mina/core/session/AbstractIoSession.html#AbstractIoSession">AbstractIoSession session = (AbstractIoSession) future.getSession();
84              session.scheduledWriteBytes.set(0);
85              session.scheduledWriteMessages.set(0);
86              session.readBytesThroughput = 0;
87              session.readMessagesThroughput = 0;
88              session.writtenBytesThroughput = 0;
89              session.writtenMessagesThroughput = 0;
90          }
91      };
92  
93      /**
94       * An internal write request object that triggers session close.
95       */
96      public static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
97  
98      /**
99       * An internal write request object that triggers message sent events.
100      */
101     public static final WriteRequest MESSAGE_SENT_REQUEST = new DefaultWriteRequest(DefaultWriteRequest.EMPTY_MESSAGE);
102 
103     private final Object lock = new Object();
104 
105     private IoSessionAttributeMap attributes;
106 
107     private WriteRequestQueue writeRequestQueue;
108 
109     private WriteRequest currentWriteRequest;
110 
111     /** The Session creation's time */
112     private final long creationTime;
113 
114     /** An id generator guaranteed to generate unique IDs for the session */
115     private static AtomicLong idGenerator = new AtomicLong(0);
116 
117     /** The session ID */
118     private long sessionId;
119 
120     /**
121      * A future that will be set 'closed' when the connection is closed.
122      */
123     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
124 
125     private volatile boolean closing;
126 
127     // traffic control
128     private boolean readSuspended = false;
129 
130     private boolean writeSuspended = false;
131 
132     // Status variables
133     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
134 
135     private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
136 
137     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
138 
139     private long readBytes;
140 
141     private long writtenBytes;
142 
143     private long readMessages;
144 
145     private long writtenMessages;
146 
147     private long lastReadTime;
148 
149     private long lastWriteTime;
150 
151     private long lastThroughputCalculationTime;
152 
153     private long lastReadBytes;
154 
155     private long lastWrittenBytes;
156 
157     private long lastReadMessages;
158 
159     private long lastWrittenMessages;
160 
161     private double readBytesThroughput;
162 
163     private double writtenBytesThroughput;
164 
165     private double readMessagesThroughput;
166 
167     private double writtenMessagesThroughput;
168 
169     private AtomicInteger idleCountForBoth = new AtomicInteger();
170 
171     private AtomicInteger idleCountForRead = new AtomicInteger();
172 
173     private AtomicInteger idleCountForWrite = new AtomicInteger();
174 
175     private long lastIdleTimeForBoth;
176 
177     private long lastIdleTimeForRead;
178 
179     private long lastIdleTimeForWrite;
180 
181     private boolean deferDecreaseReadBuffer = true;
182 
183     /**
184      * Create a Session for a service
185      * 
186      * @param service the Service for this session
187      */
188     protected AbstractIoSession(IoService service) {
189         this.service = service;
190         this.handler = service.getHandler();
191 
192         // Initialize all the Session counters to the current time
193         long currentTime = System.currentTimeMillis();
194         creationTime = currentTime;
195         lastThroughputCalculationTime = currentTime;
196         lastReadTime = currentTime;
197         lastWriteTime = currentTime;
198         lastIdleTimeForBoth = currentTime;
199         lastIdleTimeForRead = currentTime;
200         lastIdleTimeForWrite = currentTime;
201 
202         // TODO add documentation
203         closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
204 
205         // Set a new ID for this session
206         sessionId = idGenerator.incrementAndGet();
207     }
208 
209     /**
210      * {@inheritDoc}
211      * 
212      * We use an AtomicLong to guarantee that the session ID are unique.
213      */
214     public final long getId() {
215         return sessionId;
216     }
217 
218     /**
219      * @return The associated IoProcessor for this session
220      */
221     public abstract IoProcessor getProcessor();
222 
223     /**
224      * {@inheritDoc}
225      */
226     public final boolean isConnected() {
227         return !closeFuture.isClosed();
228     }
229 
230     /**
231      * {@inheritDoc}
232      */
233     public boolean isActive() {
234         // Return true by default
235         return true;
236     }
237 
238     /**
239      * {@inheritDoc}
240      */
241     public final boolean isClosing() {
242         return closing || closeFuture.isClosed();
243     }
244 
245     /**
246      * {@inheritDoc}
247      */
248     public boolean isSecured() {
249         // Always false...
250         return false;
251     }
252 
253     /**
254      * {@inheritDoc}
255      */
256     public final CloseFuture getCloseFuture() {
257         return closeFuture;
258     }
259 
260     /**
261      * Tells if the session is scheduled for flushed
262      * 
263      * @return true if the session is scheduled for flush
264      */
265     public final boolean isScheduledForFlush() {
266         return scheduledForFlush.get();
267     }
268 
269     /**
270      * Schedule the session for flushed
271      */
272     public final void scheduledForFlush() {
273         scheduledForFlush.set(true);
274     }
275 
276     /**
277      * Change the session's status : it's not anymore scheduled for flush
278      */
279     public final void unscheduledForFlush() {
280         scheduledForFlush.set(false);
281     }
282 
283     /**
284      * Set the scheduledForFLush flag. As we may have concurrent access to this
285      * flag, we compare and set it in one call.
286      * 
287      * @param schedule
288      *            the new value to set if not already set.
289      * @return true if the session flag has been set, and if it wasn't set
290      *         already.
291      */
292     public final boolean setScheduledForFlush(boolean schedule) {
293         if (schedule) {
294             // If the current tag is set to false, switch it to true,
295             // otherwise, we do nothing but return false : the session
296             // is already scheduled for flush
297             return scheduledForFlush.compareAndSet(false, schedule);
298         }
299 
300         scheduledForFlush.set(schedule);
301         return true;
302     }
303 
304     /**
305      * {@inheritDoc}
306      */
307     public final CloseFuture close(boolean rightNow) {
308         if (rightNow) {
309             return closeNow();
310         } else {
311             return closeOnFlush();
312         }
313     }
314 
315     /**
316      * {@inheritDoc}
317      */
318     public final CloseFuture close() {
319         return closeNow();
320     }
321 
322     /**
323      * {@inheritDoc}
324      */
325     public final CloseFuture closeOnFlush() {
326         if (!isClosing()) {
327             getWriteRequestQueue().offer(this, CLOSE_REQUEST);
328             getProcessor().flush(this);
329         }
330         
331         return closeFuture;
332     }
333 
334     /**
335      * {@inheritDoc}
336      */
337     public final CloseFuture closeNow() {
338         synchronized (lock) {
339             if (isClosing()) {
340                 return closeFuture;
341             }
342 
343             closing = true;
344             
345             try {
346                 destroy();
347             } catch (Exception e) {
348                 IoFilterChain filterChain = getFilterChain();
349                 filterChain.fireExceptionCaught(e);
350             }
351         }
352 
353         getFilterChain().fireFilterClose();
354 
355         return closeFuture;
356     }
357     
358     /**
359      * Destroy the session
360      */
361     protected void destroy() {
362         if (writeRequestQueue != null) {
363             while (!writeRequestQueue.isEmpty(this)) {
364                 WriteRequest writeRequest = writeRequestQueue.poll(this);
365                 
366                 if (writeRequest != null) {
367                     WriteFuture writeFuture = writeRequest.getFuture();
368                     
369                     // The WriteRequest may not always have a future : The CLOSE_REQUEST
370                     // and MESSAGE_SENT_REQUEST don't.
371                     if (writeFuture != null) {
372                         writeFuture.setWritten();
373                     }
374                 }
375             }
376         }
377     }
378 
379     /**
380      * {@inheritDoc}
381      */
382     public IoHandler getHandler() {
383         return handler;
384     }
385 
386     /**
387      * {@inheritDoc}
388      */
389     public IoSessionConfig getConfig() {
390         return config;
391     }
392 
393     /**
394      * {@inheritDoc}
395      */
396     public final ReadFuture read() {
397         if (!getConfig().isUseReadOperation()) {
398             throw new IllegalStateException("useReadOperation is not enabled.");
399         }
400 
401         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
402         ReadFuture future;
403         
404         synchronized (readyReadFutures) {
405             future = readyReadFutures.poll();
406             
407             if (future != null) {
408                 if (future.isClosed()) {
409                     // Let other readers get notified.
410                     readyReadFutures.offer(future);
411                 }
412             } else {
413                 future = new DefaultReadFuture(this);
414                 getWaitingReadFutures().offer(future);
415             }
416         }
417 
418         return future;
419     }
420 
421     /**
422      * Associates a message to a ReadFuture
423      * 
424      * @param message the message to associate to the ReadFuture
425      * 
426      */
427     public final void offerReadFuture(Object message) {
428         newReadFuture().setRead(message);
429     }
430 
431     /**
432      * Associates a failure to a ReadFuture
433      * 
434      * @param exception the exception to associate to the ReadFuture
435      */
436     public final void offerFailedReadFuture(Throwable exception) {
437         newReadFuture().setException(exception);
438     }
439 
440     /**
441      * Inform the ReadFuture that the session has been closed
442      */
443     public final void offerClosedReadFuture() {
444         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
445         
446         synchronized (readyReadFutures) {
447             newReadFuture().setClosed();
448         }
449     }
450 
451     /**
452      * @return a readFuture get from the waiting ReadFuture
453      */
454     private ReadFuture newReadFuture() {
455         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
456         Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
457         ReadFuture future;
458         
459         synchronized (readyReadFutures) {
460             future = waitingReadFutures.poll();
461             
462             if (future == null) {
463                 future = new DefaultReadFuture(this);
464                 readyReadFutures.offer(future);
465             }
466         }
467         
468         return future;
469     }
470 
471     /**
472      * @return a queue of ReadFuture
473      */
474     private Queue<ReadFuture> getReadyReadFutures() {
475         Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
476         
477         if (readyReadFutures == null) {
478             readyReadFutures = new ConcurrentLinkedQueue<>();
479 
480             Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY,
481                     readyReadFutures);
482             
483             if (oldReadyReadFutures != null) {
484                 readyReadFutures = oldReadyReadFutures;
485             }
486         }
487         
488         return readyReadFutures;
489     }
490 
491     /**
492      * @return the queue of waiting ReadFuture
493      */
494     private Queue<ReadFuture> getWaitingReadFutures() {
495         Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
496         
497         if (waitingReadyReadFutures == null) {
498             waitingReadyReadFutures = new ConcurrentLinkedQueue<>();
499 
500             Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
501                     WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
502             
503             if (oldWaitingReadyReadFutures != null) {
504                 waitingReadyReadFutures = oldWaitingReadyReadFutures;
505             }
506         }
507         
508         return waitingReadyReadFutures;
509     }
510 
511     /**
512      * {@inheritDoc}
513      */
514     public WriteFuture write(Object message) {
515         return write(message, null);
516     }
517 
518     /**
519      * {@inheritDoc}
520      */
521     public WriteFuture write(Object message, SocketAddress remoteAddress) {
522         if (message == null) {
523             throw new IllegalArgumentException("Trying to write a null message : not allowed");
524         }
525 
526         // We can't send a message to a connected session if we don't have
527         // the remote address
528         if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
529             throw new UnsupportedOperationException();
530         }
531 
532         // If the session has been closed or is closing, we can't either
533         // send a message to the remote side. We generate a future
534         // containing an exception.
535         if (isClosing() || !isConnected()) {
536             WriteFuture future = new DefaultWriteFuture(this);
537             WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
538             WriteException writeException = new WriteToClosedSessionException(request);
539             future.setException(writeException);
540             return future;
541         }
542 
543         FileChannel openedFileChannel = null;
544 
545         // TODO: remove this code as soon as we use InputStream
546         // instead of Object for the message.
547         try {
548             if ((message instanceof IoBuffer./../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer) && !((IoBuffer) message).hasRemaining()) {
549                 // Nothing to write : probably an error in the user code
550                 throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
551             } else if (message instanceof FileChannel) {
552                 FileChannel fileChannel = (FileChannel) message;
553                 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
554             } else if (message instanceof File) {
555                 File file = (File) message;
556                 openedFileChannel = new FileInputStream(file).getChannel();
557                 message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
558             }
559         } catch (IOException e) {
560             ExceptionMonitor.getInstance().exceptionCaught(e);
561             return DefaultWriteFuture.newNotWrittenFuture(this, e);
562         }
563 
564         // Now, we can write the message. First, create a future
565         WriteFuture writeFuture = new DefaultWriteFuture(this);
566         WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
567 
568         // Then, get the chain and inject the WriteRequest into it
569         IoFilterChain filterChain = getFilterChain();
570         filterChain.fireFilterWrite(writeRequest);
571 
572         // TODO : This is not our business ! The caller has created a
573         // FileChannel,
574         // he has to close it !
575         if (openedFileChannel != null) {
576             // If we opened a FileChannel, it needs to be closed when the write
577             // has completed
578             final FileChannel finalChannel = openedFileChannel;
579             writeFuture.addListener(new IoFutureListener<WriteFuture>() {
580                 public void operationComplete(WriteFuture future) {
581                     try {
582                         finalChannel.close();
583                     } catch (IOException e) {
584                         ExceptionMonitor.getInstance().exceptionCaught(e);
585                     }
586                 }
587             });
588         }
589 
590         // Return the WriteFuture.
591         return writeFuture;
592     }
593 
594     /**
595      * {@inheritDoc}
596      */
597     public final Object getAttachment() {
598         return getAttribute("");
599     }
600 
601     /**
602      * {@inheritDoc}
603      */
604     public final Object setAttachment(Object attachment) {
605         return setAttribute("", attachment);
606     }
607 
608     /**
609      * {@inheritDoc}
610      */
611     public final Object getAttribute(Object key) {
612         return getAttribute(key, null);
613     }
614 
615     /**
616      * {@inheritDoc}
617      */
618     public final Object getAttribute(Object key, Object defaultValue) {
619         return attributes.getAttribute(this, key, defaultValue);
620     }
621 
622     /**
623      * {@inheritDoc}
624      */
625     public final Object setAttribute(Object key, Object value) {
626         return attributes.setAttribute(this, key, value);
627     }
628 
629     /**
630      * {@inheritDoc}
631      */
632     public final Object setAttribute(Object key) {
633         return setAttribute(key, Boolean.TRUE);
634     }
635 
636     /**
637      * {@inheritDoc}
638      */
639     public final Object setAttributeIfAbsent(Object key, Object value) {
640         return attributes.setAttributeIfAbsent(this, key, value);
641     }
642 
643     /**
644      * {@inheritDoc}
645      */
646     public final Object setAttributeIfAbsent(Object key) {
647         return setAttributeIfAbsent(key, Boolean.TRUE);
648     }
649 
650     /**
651      * {@inheritDoc}
652      */
653     public final Object removeAttribute(Object key) {
654         return attributes.removeAttribute(this, key);
655     }
656 
657     /**
658      * {@inheritDoc}
659      */
660     public final boolean removeAttribute(Object key, Object value) {
661         return attributes.removeAttribute(this, key, value);
662     }
663 
664     /**
665      * {@inheritDoc}
666      */
667     public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
668         return attributes.replaceAttribute(this, key, oldValue, newValue);
669     }
670 
671     /**
672      * {@inheritDoc}
673      */
674     public final boolean containsAttribute(Object key) {
675         return attributes.containsAttribute(this, key);
676     }
677 
678     /**
679      * {@inheritDoc}
680      */
681     public final Set<Object> getAttributeKeys() {
682         return attributes.getAttributeKeys(this);
683     }
684 
685     /**
686      * @return The map of attributes associated with the session
687      */
688     public final IoSessionAttributeMap getAttributeMap() {
689         return attributes;
690     }
691 
692     /**
693      * Set the map of attributes associated with the session
694      * 
695      * @param attributes The Map of attributes
696      */
697     public final void setAttributeMap(IoSessionAttributeMap attributes) {
698         this.attributes = attributes;
699     }
700 
701     /**
702      * Create a new close aware write queue, based on the given write queue.
703      * 
704      * @param writeRequestQueue The write request queue
705      */
706     public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
707         this.writeRequestQueue = writeRequestQueue;
708     }
709 
710     /**
711      * {@inheritDoc}
712      */
713     public final void suspendRead() {
714         readSuspended = true;
715         if (isClosing() || !isConnected()) {
716             return;
717         }
718         getProcessor().updateTrafficControl(this);
719     }
720 
721     /**
722      * {@inheritDoc}
723      */
724     public final void suspendWrite() {
725         writeSuspended = true;
726         if (isClosing() || !isConnected()) {
727             return;
728         }
729         getProcessor().updateTrafficControl(this);
730     }
731 
732     /**
733      * {@inheritDoc}
734      */
735     @SuppressWarnings("unchecked")
736     public final void resumeRead() {
737         readSuspended = false;
738         if (isClosing() || !isConnected()) {
739             return;
740         }
741         getProcessor().updateTrafficControl(this);
742     }
743 
744     /**
745      * {@inheritDoc}
746      */
747     @SuppressWarnings("unchecked")
748     public final void resumeWrite() {
749         writeSuspended = false;
750         if (isClosing() || !isConnected()) {
751             return;
752         }
753         getProcessor().updateTrafficControl(this);
754     }
755 
756     /**
757      * {@inheritDoc}
758      */
759     public boolean isReadSuspended() {
760         return readSuspended;
761     }
762 
763     /**
764      * {@inheritDoc}
765      */
766     public boolean isWriteSuspended() {
767         return writeSuspended;
768     }
769 
770     /**
771      * {@inheritDoc}
772      */
773     public final long getReadBytes() {
774         return readBytes;
775     }
776 
777     /**
778      * {@inheritDoc}
779      */
780     public final long getWrittenBytes() {
781         return writtenBytes;
782     }
783 
784     /**
785      * {@inheritDoc}
786      */
787     public final long getReadMessages() {
788         return readMessages;
789     }
790 
791     /**
792      * {@inheritDoc}
793      */
794     public final long getWrittenMessages() {
795         return writtenMessages;
796     }
797 
798     /**
799      * {@inheritDoc}
800      */
801     public final double getReadBytesThroughput() {
802         return readBytesThroughput;
803     }
804 
805     /**
806      * {@inheritDoc}
807      */
808     public final double getWrittenBytesThroughput() {
809         return writtenBytesThroughput;
810     }
811 
812     /**
813      * {@inheritDoc}
814      */
815     public final double getReadMessagesThroughput() {
816         return readMessagesThroughput;
817     }
818 
819     /**
820      * {@inheritDoc}
821      */
822     public final double getWrittenMessagesThroughput() {
823         return writtenMessagesThroughput;
824     }
825 
826     /**
827      * {@inheritDoc}
828      */
829     public final void updateThroughput(long currentTime, boolean force) {
830         int interval = (int) (currentTime - lastThroughputCalculationTime);
831 
832         long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
833 
834         if (((minInterval == 0) || (interval < minInterval)) && !force) {
835             return;
836         }
837 
838         readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
839         writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
840         readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
841         writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
842 
843         lastReadBytes = readBytes;
844         lastWrittenBytes = writtenBytes;
845         lastReadMessages = readMessages;
846         lastWrittenMessages = writtenMessages;
847 
848         lastThroughputCalculationTime = currentTime;
849     }
850 
851     /**
852      * {@inheritDoc}
853      */
854     public final long getScheduledWriteBytes() {
855         return scheduledWriteBytes.get();
856     }
857 
858     /**
859      * {@inheritDoc}
860      */
861     public final int getScheduledWriteMessages() {
862         return scheduledWriteMessages.get();
863     }
864 
865     /**
866      * Set the number of scheduled write bytes
867      * 
868      * @param byteCount The number of scheduled bytes for write
869      */
870     protected void setScheduledWriteBytes(int byteCount) {
871         scheduledWriteBytes.set(byteCount);
872     }
873 
874     /**
875      * Set the number of scheduled write messages
876      * 
877      * @param messages The number of scheduled messages for write
878      */
879     protected void setScheduledWriteMessages(int messages) {
880         scheduledWriteMessages.set(messages);
881     }
882 
883     /**
884      * Increase the number of read bytes
885      * 
886      * @param increment The number of read bytes
887      * @param currentTime The current time
888      */
889     public final void increaseReadBytes(long increment, long currentTime) {
890         if (increment <= 0) {
891             return;
892         }
893 
894         readBytes += increment;
895         lastReadTime = currentTime;
896         idleCountForBoth.set(0);
897         idleCountForRead.set(0);
898 
899         if (getService() instanceof AbstractIoService) {
900             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
901         }
902     }
903 
904     /**
905      * Increase the number of read messages
906      * 
907      * @param currentTime The current time
908      */
909     public final void increaseReadMessages(long currentTime) {
910         readMessages++;
911         lastReadTime = currentTime;
912         idleCountForBoth.set(0);
913         idleCountForRead.set(0);
914 
915         if (getService() instanceof AbstractIoService) {
916             ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
917         }
918     }
919 
920     /**
921      * Increase the number of written bytes
922      * 
923      * @param increment The number of written bytes
924      * @param currentTime The current time
925      */
926     public final void increaseWrittenBytes(int increment, long currentTime) {
927         if (increment <= 0) {
928             return;
929         }
930 
931         writtenBytes += increment;
932         lastWriteTime = currentTime;
933         idleCountForBoth.set(0);
934         idleCountForWrite.set(0);
935 
936         if (getService() instanceof AbstractIoService) {
937             ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
938         }
939 
940         increaseScheduledWriteBytes(-increment);
941     }
942 
943     /**
944      * Increase the number of written messages
945      * 
946      * @param request The written message
947      * @param currentTime The current tile
948      */
949     public final void increaseWrittenMessages(WriteRequest request, long currentTime) {
950         Object message = request.getMessage();
951 
952         if (message instanceof IoBuffer) {
953             IoBufferf="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer b = (IoBuffer) message;
954 
955             if (b.hasRemaining()) {
956                 return;
957             }
958         }
959 
960         writtenMessages++;
961         lastWriteTime = currentTime;
962 
963         if (getService() instanceof AbstractIoService) {
964             ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
965         }
966 
967         decreaseScheduledWriteMessages();
968     }
969 
970     /**
971      * Increase the number of scheduled write bytes for the session
972      * 
973      * @param increment The number of newly added bytes to write
974      */
975     public final void increaseScheduledWriteBytes(int increment) {
976         scheduledWriteBytes.addAndGet(increment);
977         if (getService() instanceof AbstractIoService) {
978             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
979         }
980     }
981 
982     /**
983      * Increase the number of scheduled message to write
984      */
985     public final void increaseScheduledWriteMessages() {
986         scheduledWriteMessages.incrementAndGet();
987         
988         if (getService() instanceof AbstractIoService) {
989             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
990         }
991     }
992 
993     /**
994      * Decrease the number of scheduled message written
995      */
996     private void decreaseScheduledWriteMessages() {
997         scheduledWriteMessages.decrementAndGet();
998         if (getService() instanceof AbstractIoService) {
999             ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
1000         }
1001     }
1002 
1003     /**
1004      * Decrease the counters of written messages and written bytes when a message has been written
1005      * 
1006      * @param request The written message
1007      */
1008     public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
1009         Object message = request.getMessage();
1010         
1011         if (message instanceof IoBuffer) {
1012             IoBufferf="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer b = (IoBuffer) message;
1013             
1014             if (b.hasRemaining()) {
1015                 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
1016             } else {
1017                 decreaseScheduledWriteMessages();
1018             }
1019         } else {
1020             decreaseScheduledWriteMessages();
1021         }
1022     }
1023 
1024     /**
1025      * {@inheritDoc}
1026      */
1027     public final WriteRequestQueue getWriteRequestQueue() {
1028         if (writeRequestQueue == null) {
1029             throw new IllegalStateException();
1030         }
1031         
1032         return writeRequestQueue;
1033     }
1034 
1035     /**
1036      * {@inheritDoc}
1037      */
1038     public final WriteRequest getCurrentWriteRequest() {
1039         return currentWriteRequest;
1040     }
1041 
1042     /**
1043      * {@inheritDoc}
1044      */
1045     public final Object getCurrentWriteMessage() {
1046         WriteRequest req = getCurrentWriteRequest();
1047         
1048         if (req == null) {
1049             return null;
1050         }
1051         return req.getMessage();
1052     }
1053 
1054     /**
1055      * {@inheritDoc}
1056      */
1057     public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
1058         this.currentWriteRequest = currentWriteRequest;
1059     }
1060 
1061     /**
1062      * Increase the ReadBuffer size (it will double)
1063      */
1064     public final void increaseReadBufferSize() {
1065         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
1066         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
1067             getConfig().setReadBufferSize(newReadBufferSize);
1068         } else {
1069             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
1070         }
1071 
1072         deferDecreaseReadBuffer = true;
1073     }
1074 
1075     /**
1076      * Decrease the ReadBuffer size (it will be divided by a factor 2)
1077      */
1078     public final void decreaseReadBufferSize() {
1079         if (deferDecreaseReadBuffer) {
1080             deferDecreaseReadBuffer = false;
1081             return;
1082         }
1083 
1084         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
1085             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
1086         }
1087 
1088         deferDecreaseReadBuffer = true;
1089     }
1090 
1091     /**
1092      * {@inheritDoc}
1093      */
1094     public final long getCreationTime() {
1095         return creationTime;
1096     }
1097 
1098     /**
1099      * {@inheritDoc}
1100      */
1101     public final long getLastIoTime() {
1102         return Math.max(lastReadTime, lastWriteTime);
1103     }
1104 
1105     /**
1106      * {@inheritDoc}
1107      */
1108     public final long getLastReadTime() {
1109         return lastReadTime;
1110     }
1111 
1112     /**
1113      * {@inheritDoc}
1114      */
1115     public final long getLastWriteTime() {
1116         return lastWriteTime;
1117     }
1118 
1119     /**
1120      * {@inheritDoc}
1121      */
1122     public final boolean isIdle(IdleStatus status) {
1123         if (status == IdleStatus.BOTH_IDLE) {
1124             return idleCountForBoth.get() > 0;
1125         }
1126 
1127         if (status == IdleStatus.READER_IDLE) {
1128             return idleCountForRead.get() > 0;
1129         }
1130 
1131         if (status == IdleStatus.WRITER_IDLE) {
1132             return idleCountForWrite.get() > 0;
1133         }
1134 
1135         throw new IllegalArgumentException("Unknown idle status: " + status);
1136     }
1137 
1138     /**
1139      * {@inheritDoc}
1140      */
1141     public final boolean isBothIdle() {
1142         return isIdle(IdleStatus.BOTH_IDLE);
1143     }
1144 
1145     /**
1146      * {@inheritDoc}
1147      */
1148     public final boolean isReaderIdle() {
1149         return isIdle(IdleStatus.READER_IDLE);
1150     }
1151 
1152     /**
1153      * {@inheritDoc}
1154      */
1155     public final boolean isWriterIdle() {
1156         return isIdle(IdleStatus.WRITER_IDLE);
1157     }
1158 
1159     /**
1160      * {@inheritDoc}
1161      */
1162     public final int getIdleCount(IdleStatus status) {
1163         if (getConfig().getIdleTime(status) == 0) {
1164             if (status == IdleStatus.BOTH_IDLE) {
1165                 idleCountForBoth.set(0);
1166             }
1167 
1168             if (status == IdleStatus.READER_IDLE) {
1169                 idleCountForRead.set(0);
1170             }
1171 
1172             if (status == IdleStatus.WRITER_IDLE) {
1173                 idleCountForWrite.set(0);
1174             }
1175         }
1176 
1177         if (status == IdleStatus.BOTH_IDLE) {
1178             return idleCountForBoth.get();
1179         }
1180 
1181         if (status == IdleStatus.READER_IDLE) {
1182             return idleCountForRead.get();
1183         }
1184 
1185         if (status == IdleStatus.WRITER_IDLE) {
1186             return idleCountForWrite.get();
1187         }
1188 
1189         throw new IllegalArgumentException("Unknown idle status: " + status);
1190     }
1191 
1192     /**
1193      * {@inheritDoc}
1194      */
1195     public final long getLastIdleTime(IdleStatus status) {
1196         if (status == IdleStatus.BOTH_IDLE) {
1197             return lastIdleTimeForBoth;
1198         }
1199 
1200         if (status == IdleStatus.READER_IDLE) {
1201             return lastIdleTimeForRead;
1202         }
1203 
1204         if (status == IdleStatus.WRITER_IDLE) {
1205             return lastIdleTimeForWrite;
1206         }
1207 
1208         throw new IllegalArgumentException("Unknown idle status: " + status);
1209     }
1210 
1211     /**
1212      * Increase the count of the various Idle counter
1213      * 
1214      * @param status The current status
1215      * @param currentTime The current time
1216      */
1217     public final void increaseIdleCount(IdleStatus status, long currentTime) {
1218         if (status == IdleStatus.BOTH_IDLE) {
1219             idleCountForBoth.incrementAndGet();
1220             lastIdleTimeForBoth = currentTime;
1221         } else if (status == IdleStatus.READER_IDLE) {
1222             idleCountForRead.incrementAndGet();
1223             lastIdleTimeForRead = currentTime;
1224         } else if (status == IdleStatus.WRITER_IDLE) {
1225             idleCountForWrite.incrementAndGet();
1226             lastIdleTimeForWrite = currentTime;
1227         } else {
1228             throw new IllegalArgumentException("Unknown idle status: " + status);
1229         }
1230     }
1231 
1232     /**
1233      * {@inheritDoc}
1234      */
1235     public final int getBothIdleCount() {
1236         return getIdleCount(IdleStatus.BOTH_IDLE);
1237     }
1238 
1239     /**
1240      * {@inheritDoc}
1241      */
1242     public final long getLastBothIdleTime() {
1243         return getLastIdleTime(IdleStatus.BOTH_IDLE);
1244     }
1245 
1246     /**
1247      * {@inheritDoc}
1248      */
1249     public final long getLastReaderIdleTime() {
1250         return getLastIdleTime(IdleStatus.READER_IDLE);
1251     }
1252 
1253     /**
1254      * {@inheritDoc}
1255      */
1256     public final long getLastWriterIdleTime() {
1257         return getLastIdleTime(IdleStatus.WRITER_IDLE);
1258     }
1259 
1260     /**
1261      * {@inheritDoc}
1262      */
1263     public final int getReaderIdleCount() {
1264         return getIdleCount(IdleStatus.READER_IDLE);
1265     }
1266 
1267     /**
1268      * {@inheritDoc}
1269      */
1270     public final int getWriterIdleCount() {
1271         return getIdleCount(IdleStatus.WRITER_IDLE);
1272     }
1273 
1274     /**
1275      * {@inheritDoc}
1276      */
1277     public SocketAddress getServiceAddress() {
1278         IoService service = getService();
1279         if (service instanceof IoAcceptor) {
1280             return ((IoAcceptor) service).getLocalAddress();
1281         }
1282 
1283         return getRemoteAddress();
1284     }
1285 
1286     /**
1287      * {@inheritDoc}
1288      */
1289     @Override
1290     public final int hashCode() {
1291         return super.hashCode();
1292     }
1293 
1294     /**
1295      * {@inheritDoc} TODO This is a ridiculous implementation. Need to be
1296      * replaced.
1297      */
1298     @Override
1299     public final boolean equals(Object o) {
1300         return super.equals(o);
1301     }
1302 
1303     /**
1304      * {@inheritDoc}
1305      */
1306     @Override
1307     public String toString() {
1308         if (isConnected() || isClosing()) {
1309             String remote = null;
1310             String local = null;
1311 
1312             try {
1313                 remote = String.valueOf(getRemoteAddress());
1314             } catch (Exception e) {
1315                 remote = "Cannot get the remote address informations: " + e.getMessage();
1316             }
1317 
1318             try {
1319                 local = String.valueOf(getLocalAddress());
1320             } catch (Exception e) {
1321             }
1322 
1323             if (getService() instanceof IoAcceptor) {
1324                 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')';
1325             }
1326 
1327             return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')';
1328         }
1329 
1330         return "(" + getIdAsString() + ") Session disconnected ...";
1331     }
1332 
1333     /**
1334      * Get the Id as a String
1335      */
1336     private String getIdAsString() {
1337         String id = Long.toHexString(getId()).toUpperCase();
1338         
1339         if (id.length() <= 8) {
1340             return "0x00000000".substring(0, 10 - id.length()) + id;
1341         } else {
1342             return "0x" + id;
1343         }
1344     }
1345 
1346     /**
1347      * TGet the Service name
1348      */
1349     private String getServiceName() {
1350         TransportMetadata tm = getTransportMetadata();
1351         if (tm == null) {
1352             return "null";
1353         }
1354 
1355         return tm.getProviderName() + ' ' + tm.getName();
1356     }
1357 
1358     /**
1359      * {@inheritDoc}
1360      */
1361     public IoService getService() {
1362         return service;
1363     }
1364 
1365     /**
1366      * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions
1367      * in the specified collection.
1368      * 
1369      * @param sessions The sessions that are notified
1370      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1371      */
1372     public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1373         while (sessions.hasNext()) {
1374             IoSession session = sessions.next();
1375             
1376             if (!session.getCloseFuture().isClosed()) {
1377                 notifyIdleSession(session, currentTime);
1378             }
1379         }
1380     }
1381 
1382     /**
1383      * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1384      * specified {@code session}.
1385      * 
1386      * @param session The session that is notified
1387      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1388      */
1389     public static void notifyIdleSession(IoSession session, long currentTime) {
1390         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1391                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1392 
1393         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1394                 IdleStatus.READER_IDLE,
1395                 Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
1396 
1397         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1398                 IdleStatus.WRITER_IDLE,
1399                 Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1400 
1401         notifyWriteTimeout(session, currentTime);
1402     }
1403 
1404     private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,
1405             long lastIoTime) {
1406         if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
1407             session.getFilterChain().fireSessionIdle(status);
1408         }
1409     }
1410 
1411     private static void notifyWriteTimeout(IoSession session, long currentTime) {
1412 
1413         long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1414         if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout)
1415                 && !session.getWriteRequestQueue().isEmpty(session)) {
1416             WriteRequest request = session.getCurrentWriteRequest();
1417             if (request != null) {
1418                 session.setCurrentWriteRequest(null);
1419                 WriteTimeoutExceptionException.html#WriteTimeoutException">WriteTimeoutException cause = new WriteTimeoutException(request);
1420                 request.getFuture().setException(cause);
1421                 session.getFilterChain().fireExceptionCaught(cause);
1422                 // WriteException is an IOException, so we close the session.
1423                 session.closeNow();
1424             }
1425         }
1426     }
1427 }