001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * https://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.directory.ldap.client.api.future; 021 022 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026 027import org.apache.directory.api.i18n.I18n; 028import org.apache.directory.api.ldap.model.message.Response; 029import org.apache.directory.ldap.client.api.LdapConnection; 030 031 032/** 033 * A Future implementation used in LdapConnection operations. 034 * 035 * @param <R> The result type returned by this Future's <tt>get</tt> method 036 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> 037 */ 038public abstract class MultipleResponseFuture<R extends Response> implements ResponseFuture<R> 039{ 040 /** the blocking queue holding LDAP responses */ 041 protected BlockingQueue<R> queue; 042 043 /** flag to determine if this future is cancelled */ 044 protected boolean cancelled = false; 045 046 /** If the request has been cancelled because of an exception it will be stored here */ 047 protected Throwable cause; 048 049 /** The messageID for this future */ 050 protected int messageId; 051 052 /** The connection used by the request */ 053 protected LdapConnection connection; 054 055 056 /** 057 * Creates a new instance of ResponseFuture. 058 * 059 * @param connection The LdapConnection used by the request 060 * @param messageId The associated message ID 061 */ 062 public MultipleResponseFuture( LdapConnection connection, int messageId ) 063 { 064 queue = new LinkedBlockingQueue<>(); 065 this.messageId = messageId; 066 this.connection = connection; 067 } 068 069 070 /** 071 * {@inheritDoc} 072 */ 073 @Override 074 public boolean cancel( boolean mayInterruptIfRunning ) 075 { 076 if ( !cancelled ) 077 { 078 // set the cancel flag first 079 cancelled = true; 080 081 // Send an abandonRequest only if this future exists 082 if ( !connection.isRequestCompleted( messageId ) ) 083 { 084 connection.abandon( messageId ); 085 } 086 087 // then clear the queue, cause the might be some incoming messages before this abandon request 088 // hits the server 089 queue.clear(); 090 } 091 092 return cancelled; 093 } 094 095 096 /** 097 * {@inheritDoc} 098 * @throws InterruptedException if the operation has been cancelled by client 099 */ 100 @Override 101 public R get() throws InterruptedException 102 { 103 return queue.take(); 104 } 105 106 107 /** 108 * Set the associated Response in this Future 109 * 110 * @param response The response to add into the Future 111 * @throws InterruptedException if the operation has been cancelled by client 112 */ 113 @Override 114 public void set( R response ) throws InterruptedException 115 { 116 queue.add( response ); 117 } 118 119 120 /** 121 * {@inheritDoc} 122 * @throws InterruptedException if the operation has been cancelled by client 123 */ 124 @Override 125 public R get( long timeout, TimeUnit unit ) throws InterruptedException 126 { 127 return queue.poll( timeout, unit ); 128 } 129 130 131 /** 132 * {@inheritDoc} 133 */ 134 @Override 135 public boolean isCancelled() 136 { 137 return cancelled; 138 } 139 140 141 /** 142 * This operation is not supported in this implementation of Future. 143 * 144 * {@inheritDoc} 145 */ 146 @Override 147 public boolean isDone() 148 { 149 throw new UnsupportedOperationException( I18n.err( I18n.ERR_04106_OPERATION_NOT_SUPPORTED ) ); 150 } 151 152 153 /** 154 * @return the cause 155 */ 156 public Throwable getCause() 157 { 158 return cause; 159 } 160 161 162 /** 163 * Associate a cause to the ResponseFuture 164 * @param cause the cause to set 165 */ 166 public void setCause( Throwable cause ) 167 { 168 this.cause = cause; 169 } 170 171 172 /** 173 * Cancel the Future 174 * 175 */ 176 public void cancel() 177 { 178 // set the cancel flag first 179 cancelled = true; 180 } 181 182 183 /** 184 * {@inheritDoc} 185 */ 186 @Override 187 public String toString() 188 { 189 StringBuilder sb = new StringBuilder(); 190 191 sb.append( "[msgId : " ).append( messageId ).append( ", " ); 192 sb.append( "size : " ).append( queue.size() ).append( ", " ); 193 sb.append( "Canceled :" ).append( cancelled ).append( "]" ); 194 195 return sb.toString(); 196 } 197}