001/*
002 *   Licensed to the Apache Software Foundation (ASF) under one
003 *   or more contributor license agreements.  See the NOTICE file
004 *   distributed with this work for additional information
005 *   regarding copyright ownership.  The ASF licenses this file
006 *   to you under the Apache License, Version 2.0 (the
007 *   "License"); you may not use this file except in compliance
008 *   with the License.  You may obtain a copy of the License at
009 *
010 *     https://www.apache.org/licenses/LICENSE-2.0
011 *
012 *   Unless required by applicable law or agreed to in writing,
013 *   software distributed under the License is distributed on an
014 *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *   KIND, either express or implied.  See the License for the
016 *   specific language governing permissions and limitations
017 *   under the License.
018 *
019 */
020package org.apache.directory.ldap.client.api.future;
021
022
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.directory.api.i18n.I18n;
028import org.apache.directory.api.ldap.model.message.Response;
029import org.apache.directory.ldap.client.api.LdapConnection;
030
031
032/**
033 * A Future implementation used in LdapConnection operations.
034 *
035 * @param <R> The result type returned by this Future's <tt>get</tt> method
036 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
037 */
038public abstract class MultipleResponseFuture<R extends Response> implements ResponseFuture<R>
039{
040    /** the blocking queue holding LDAP responses */
041    protected BlockingQueue<R> queue;
042
043    /** flag to determine if this future is cancelled */
044    protected boolean cancelled = false;
045
046    /** If the request has been cancelled because of an exception  it will be stored here */
047    protected Throwable cause;
048
049    /** The messageID for this future */
050    protected int messageId;
051
052    /** The connection used by the request */
053    protected LdapConnection connection;
054
055
056    /**
057     * Creates a new instance of ResponseFuture.
058     *
059     * @param connection The LdapConnection used by the request
060     * @param messageId The associated message ID
061     */
062    public MultipleResponseFuture( LdapConnection connection, int messageId )
063    {
064        queue = new LinkedBlockingQueue<>();
065        this.messageId = messageId;
066        this.connection = connection;
067    }
068
069
070    /**
071     * {@inheritDoc}
072     */
073    @Override
074    public boolean cancel( boolean mayInterruptIfRunning )
075    {
076        if ( !cancelled )
077        {
078            // set the cancel flag first
079            cancelled = true;
080        
081            // Send an abandonRequest only if this future exists
082            if ( !connection.isRequestCompleted( messageId ) )
083            {
084                connection.abandon( messageId );
085            }
086        
087            // then clear the queue, cause the might be some incoming messages before this abandon request
088            // hits the server
089            queue.clear();
090        }
091
092        return cancelled;
093    }
094
095
096    /**
097     * {@inheritDoc}
098     * @throws InterruptedException if the operation has been cancelled by client
099     */
100    @Override
101    public R get() throws InterruptedException
102    {
103        return queue.take();
104    }
105
106
107    /**
108     * Set the associated Response in this Future
109     * 
110     * @param response The response to add into the Future
111     * @throws InterruptedException if the operation has been cancelled by client
112     */
113    @Override
114    public void set( R response ) throws InterruptedException
115    {
116        queue.add( response );
117    }
118
119
120    /**
121     * {@inheritDoc}
122     * @throws InterruptedException if the operation has been cancelled by client
123     */
124    @Override
125    public R get( long timeout, TimeUnit unit ) throws InterruptedException
126    {
127        return queue.poll( timeout, unit );
128    }
129
130
131    /**
132     * {@inheritDoc}
133     */
134    @Override
135    public boolean isCancelled()
136    {
137        return cancelled;
138    }
139
140
141    /**
142     * This operation is not supported in this implementation of Future.
143     * 
144     * {@inheritDoc}
145     */
146    @Override
147    public boolean isDone()
148    {
149        throw new UnsupportedOperationException( I18n.err( I18n.ERR_04106_OPERATION_NOT_SUPPORTED ) );
150    }
151
152
153    /**
154     * @return the cause
155     */
156    public Throwable getCause()
157    {
158        return cause;
159    }
160
161
162    /**
163     * Associate a cause to the ResponseFuture
164     * @param cause the cause to set
165     */
166    public void setCause( Throwable cause )
167    {
168        this.cause = cause;
169    }
170
171
172    /**
173     * Cancel the Future
174     *
175     */
176    public void cancel()
177    {
178        // set the cancel flag first
179        cancelled = true;
180    }
181
182
183    /**
184     * {@inheritDoc}
185     */
186    @Override
187    public String toString()
188    {
189        StringBuilder sb = new StringBuilder();
190
191        sb.append( "[msgId : " ).append( messageId ).append( ", " );
192        sb.append( "size : " ).append( queue.size() ).append( ", " );
193        sb.append( "Canceled :" ).append( cancelled ).append( "]" );
194
195        return sb.toString();
196    }
197}