View Javadoc
1   /*
2    *   Licensed to the Apache Software Foundation (ASF) under one
3    *   or more contributor license agreements.  See the NOTICE file
4    *   distributed with this work for additional information
5    *   regarding copyright ownership.  The ASF licenses this file
6    *   to you under the Apache License, Version 2.0 (the
7    *   "License"); you may not use this file except in compliance
8    *   with the License.  You may obtain a copy of the License at
9    *
10   *     https://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing,
13   *   software distributed under the License is distributed on an
14   *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *   KIND, either express or implied.  See the License for the
16   *   specific language governing permissions and limitations
17   *   under the License.
18   *
19   */
20  package org.apache.directory.ldap.client.api.future;
21  
22  
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.LinkedBlockingQueue;
25  import java.util.concurrent.TimeUnit;
26  
27  import org.apache.directory.api.i18n.I18n;
28  import org.apache.directory.api.ldap.model.message.Response;
29  import org.apache.directory.ldap.client.api.LdapConnection;
30  
31  
32  /**
33   * A Future implementation used in LdapConnection operations.
34   *
35   * @param <R> The result type returned by this Future's <tt>get</tt> method
36   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
37   */
38  public abstract class MultipleResponseFuture<R extends Response> implements ResponseFuture<R>
39  {
40      /** the blocking queue holding LDAP responses */
41      protected BlockingQueue<R> queue;
42  
43      /** flag to determine if this future is cancelled */
44      protected boolean cancelled = false;
45  
46      /** If the request has been cancelled because of an exception  it will be stored here */
47      protected Throwable cause;
48  
49      /** The messageID for this future */
50      protected int messageId;
51  
52      /** The connection used by the request */
53      protected LdapConnection connection;
54  
55  
56      /**
57       * Creates a new instance of ResponseFuture.
58       *
59       * @param connection The LdapConnection used by the request
60       * @param messageId The associated message ID
61       */
62      public MultipleResponseFuture( LdapConnection connection, int messageId )
63      {
64          queue = new LinkedBlockingQueue<>();
65          this.messageId = messageId;
66          this.connection = connection;
67      }
68  
69  
70      /**
71       * {@inheritDoc}
72       */
73      @Override
74      public boolean cancel( boolean mayInterruptIfRunning )
75      {
76          if ( !cancelled )
77          {
78              // set the cancel flag first
79              cancelled = true;
80          
81              // Send an abandonRequest only if this future exists
82              if ( !connection.isRequestCompleted( messageId ) )
83              {
84                  connection.abandon( messageId );
85              }
86          
87              // then clear the queue, cause the might be some incoming messages before this abandon request
88              // hits the server
89              queue.clear();
90          }
91  
92          return cancelled;
93      }
94  
95  
96      /**
97       * {@inheritDoc}
98       * @throws InterruptedException if the operation has been cancelled by client
99       */
100     @Override
101     public R get() throws InterruptedException
102     {
103         return queue.take();
104     }
105 
106 
107     /**
108      * Set the associated Response in this Future
109      * 
110      * @param response The response to add into the Future
111      * @throws InterruptedException if the operation has been cancelled by client
112      */
113     @Override
114     public void set( R response ) throws InterruptedException
115     {
116         queue.add( response );
117     }
118 
119 
120     /**
121      * {@inheritDoc}
122      * @throws InterruptedException if the operation has been cancelled by client
123      */
124     @Override
125     public R get( long timeout, TimeUnit unit ) throws InterruptedException
126     {
127         return queue.poll( timeout, unit );
128     }
129 
130 
131     /**
132      * {@inheritDoc}
133      */
134     @Override
135     public boolean isCancelled()
136     {
137         return cancelled;
138     }
139 
140 
141     /**
142      * This operation is not supported in this implementation of Future.
143      * 
144      * {@inheritDoc}
145      */
146     @Override
147     public boolean isDone()
148     {
149         throw new UnsupportedOperationException( I18n.err( I18n.ERR_04106_OPERATION_NOT_SUPPORTED ) );
150     }
151 
152 
153     /**
154      * @return the cause
155      */
156     public Throwable getCause()
157     {
158         return cause;
159     }
160 
161 
162     /**
163      * Associate a cause to the ResponseFuture
164      * @param cause the cause to set
165      */
166     public void setCause( Throwable cause )
167     {
168         this.cause = cause;
169     }
170 
171 
172     /**
173      * Cancel the Future
174      *
175      */
176     public void cancel()
177     {
178         // set the cancel flag first
179         cancelled = true;
180     }
181 
182 
183     /**
184      * {@inheritDoc}
185      */
186     @Override
187     public String toString()
188     {
189         StringBuilder sb = new StringBuilder();
190 
191         sb.append( "[msgId : " ).append( messageId ).append( ", " );
192         sb.append( "size : " ).append( queue.size() ).append( ", " );
193         sb.append( "Canceled :" ).append( cancelled ).append( "]" );
194 
195         return sb.toString();
196     }
197 }