View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.directory.server.ldap.replication.consumer;
21  
22  
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  
29  import org.apache.commons.collections4.map.LRUMap;
30  import org.apache.directory.api.ldap.extras.controls.SynchronizationModeEnum;
31  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncDone.SyncDoneValue;
32  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncRequest.SyncRequestValue;
33  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncRequest.SyncRequestValueImpl;
34  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateTypeEnum;
35  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValue;
36  import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValue;
37  import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValueImpl;
38  import org.apache.directory.api.ldap.model.constants.Loggers;
39  import org.apache.directory.api.ldap.model.constants.SchemaConstants;
40  import org.apache.directory.api.ldap.model.csn.Csn;
41  import org.apache.directory.api.ldap.model.cursor.Cursor;
42  import org.apache.directory.api.ldap.model.entry.Attribute;
43  import org.apache.directory.api.ldap.model.entry.DefaultAttribute;
44  import org.apache.directory.api.ldap.model.entry.DefaultEntry;
45  import org.apache.directory.api.ldap.model.entry.DefaultModification;
46  import org.apache.directory.api.ldap.model.entry.Entry;
47  import org.apache.directory.api.ldap.model.entry.Modification;
48  import org.apache.directory.api.ldap.model.entry.ModificationOperation;
49  import org.apache.directory.api.ldap.model.exception.LdapException;
50  import org.apache.directory.api.ldap.model.exception.LdapNoSuchObjectException;
51  import org.apache.directory.api.ldap.model.filter.AndNode;
52  import org.apache.directory.api.ldap.model.filter.EqualityNode;
53  import org.apache.directory.api.ldap.model.filter.ExprNode;
54  import org.apache.directory.api.ldap.model.filter.NotNode;
55  import org.apache.directory.api.ldap.model.filter.OrNode;
56  import org.apache.directory.api.ldap.model.filter.PresenceNode;
57  import org.apache.directory.api.ldap.model.message.AliasDerefMode;
58  import org.apache.directory.api.ldap.model.message.IntermediateResponse;
59  import org.apache.directory.api.ldap.model.message.Response;
60  import org.apache.directory.api.ldap.model.message.ResultCodeEnum;
61  import org.apache.directory.api.ldap.model.message.SearchRequest;
62  import org.apache.directory.api.ldap.model.message.SearchRequestImpl;
63  import org.apache.directory.api.ldap.model.message.SearchResultDone;
64  import org.apache.directory.api.ldap.model.message.SearchResultEntry;
65  import org.apache.directory.api.ldap.model.message.SearchResultReference;
66  import org.apache.directory.api.ldap.model.message.SearchScope;
67  import org.apache.directory.api.ldap.model.message.controls.ManageDsaITImpl;
68  import org.apache.directory.api.ldap.model.message.controls.SortKey;
69  import org.apache.directory.api.ldap.model.message.controls.SortRequest;
70  import org.apache.directory.api.ldap.model.message.controls.SortRequestImpl;
71  import org.apache.directory.api.ldap.model.name.Dn;
72  import org.apache.directory.api.ldap.model.name.Rdn;
73  import org.apache.directory.api.ldap.model.schema.AttributeType;
74  import org.apache.directory.api.ldap.model.schema.SchemaManager;
75  import org.apache.directory.api.util.StringConstants;
76  import org.apache.directory.api.util.Strings;
77  import org.apache.directory.ldap.client.api.ConnectionClosedEventListener;
78  import org.apache.directory.ldap.client.api.LdapNetworkConnection;
79  import org.apache.directory.ldap.client.api.future.SearchFuture;
80  import org.apache.directory.server.constants.ApacheSchemaConstants;
81  import org.apache.directory.server.core.api.CoreSession;
82  import org.apache.directory.server.core.api.DirectoryService;
83  import org.apache.directory.server.core.api.OperationManager;
84  import org.apache.directory.server.core.api.interceptor.context.AddOperationContext;
85  import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
86  import org.apache.directory.server.core.api.interceptor.context.LookupOperationContext;
87  import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
88  import org.apache.directory.server.core.api.interceptor.context.MoveAndRenameOperationContext;
89  import org.apache.directory.server.core.api.interceptor.context.MoveOperationContext;
90  import org.apache.directory.server.core.api.interceptor.context.RenameOperationContext;
91  import org.apache.directory.server.core.api.partition.Partition;
92  import org.apache.directory.server.core.api.partition.PartitionTxn;
93  import org.apache.directory.server.ldap.LdapProtocolUtils;
94  import org.apache.directory.server.ldap.replication.ReplicationConsumerConfig;
95  import org.apache.directory.server.ldap.replication.SyncReplConfiguration;
96  import org.slf4j.Logger;
97  import org.slf4j.LoggerFactory;
98  import org.slf4j.MDC;
99  
100 
101 /**
102  * Implementation of syncrepl slave a.k.a consumer.
103  *
104  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
105  */
106 public class ReplicationConsumerImpl implements ConnectionClosedEventListener, ReplicationConsumer
107 {
108     /** A dedicated logger for the consumer */
109     private static final Logger CONSUMER_LOG = LoggerFactory.getLogger( Loggers.CONSUMER_LOG.getName() );
110 
111     /** the syncrepl configuration */
112     private SyncReplConfiguration config;
113 
114     /** the sync cookie sent by the server */
115     private byte[] syncCookie;
116 
117     /** connection to the syncrepl provider */
118     private LdapNetworkConnection connection;
119 
120     /** the search request with control */
121     private SearchRequest searchRequest;
122 
123     /** a reference to the directoryService */
124     private DirectoryService directoryService;
125 
126     /** the schema manager */
127     private SchemaManager schemaManager;
128 
129     /** flag to indicate whether the consumer was disconnected */
130     private volatile boolean disconnected;
131 
132     /** the core session */
133     private CoreSession session;
134 
135     /** attributes on which modification should be ignored */
136     private static final String[] MOD_IGNORE_AT = new String[]
137         {
138             SchemaConstants.ENTRY_UUID_AT,
139             SchemaConstants.ENTRY_DN_AT,
140             SchemaConstants.CREATE_TIMESTAMP_AT,
141             SchemaConstants.CREATORS_NAME_AT,
142             ApacheSchemaConstants.ENTRY_PARENT_ID_AT,
143             SchemaConstants.COLLECTIVE_ATTRIBUTE_SUBENTRIES_AT,
144             SchemaConstants.CONTEXT_CSN_AT,
145             ApacheSchemaConstants.NB_CHILDREN_AT,
146             ApacheSchemaConstants.NB_SUBORDINATES_AT,
147             SchemaConstants.HAS_SUBORDINATES_AT,
148             SchemaConstants.STRUCTURAL_OBJECT_CLASS_AT,
149     };
150 
151     /** the cookie that was saved last time */
152     private byte[] lastSavedCookie;
153 
154     private volatile boolean reload = false;
155 
156     /** The (entrtyUuid=*) filter */
157     private static final PresenceNode ENTRY_UUID_PRESENCE_FILTER = new PresenceNode( SchemaConstants.ENTRY_UUID_AT );
158 
159     private Modification cookieMod;
160 
161     private Modification ridMod;
162 
163     /** AttributeTypes used for replication */
164     private AttributeType adsReplCookieAT;
165     private AttributeType adsDsReplicaIdAT;
166 
167     private static final Map<String, Object> UUID_LOCK_MAP = new LRUMap( 1000 );
168 
169 
170     /**
171      * @return the config
172      */
173     @Override
174     public SyncReplConfiguration getConfig()
175     {
176         return config;
177     }
178 
179 
180     /**
181      * Init the replication service
182      * @param directoryservice The directory service
183      */
184     @Override
185     public void init( DirectoryService directoryservice ) throws Exception
186     {
187         this.directoryService = directoryservice;
188 
189         session = directoryService.getAdminSession();
190 
191         schemaManager = directoryservice.getSchemaManager();
192 
193         adsReplCookieAT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_COOKIE );
194         adsDsReplicaIdAT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_DS_REPLICA_ID );
195 
196         Attribute cookieAttr = new DefaultAttribute( adsReplCookieAT );
197         cookieMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, cookieAttr );
198 
199         Attribute ridAttr = new DefaultAttribute( adsDsReplicaIdAT );
200         ridMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, ridAttr );
201 
202         prepareSyncSearchRequest();
203     }
204 
205 
206     /**
207      * Connect to the remote server. Note that a SyncRepl consumer will be connected to only
208      * one remote server
209      *
210      * @return true if the connections have been successful.
211      */
212     public boolean connect()
213     {
214         String providerHost = config.getRemoteHost();
215         int port = config.getRemotePort();
216 
217         try
218         {
219             // Create a connection
220             if ( connection == null )
221             {
222                 connection = new LdapNetworkConnection( providerHost, port );
223                 connection.setSchemaManager( schemaManager );
224 
225                 if ( config.isUseTls() )
226                 {
227                     connection.getConfig().setTrustManagers( config.getTrustManager() );
228                     connection.getConfig().setUseTls( true );
229                 }
230 
231                 connection.addConnectionClosedEventListener( this );
232             }
233 
234             // Try to connect
235             if ( connection.connect() )
236             {
237                 CONSUMER_LOG.info( "Consumer {} connected to producer {}", config.getReplicaId(), config.getProducer() );
238 
239                 // Do a bind
240                 try
241                 {
242                     connection.bind( config.getReplUserDn(), Strings.utf8ToString( config.getReplUserPassword() ) );
243                     disconnected = false;
244 
245                     return true;
246                 }
247                 catch ( LdapException le )
248                 {
249                     CONSUMER_LOG.warn( "Failed to bind to the producer {} with the given bind Dn {}",
250                         config.getProducer(), config.getReplUserDn() );
251                     CONSUMER_LOG.warn( "", le );
252                     disconnected = true;
253                 }
254             }
255             else
256             {
257                 CONSUMER_LOG.warn( "Consumer {} cannot connect to producer {}", config.getReplicaId(),
258                     config.getProducer() );
259                 disconnected = true;
260 
261                 return false;
262             }
263         }
264         catch ( Exception e )
265         {
266             CONSUMER_LOG.error( "Failed to connect to the producer {}, cause : {}", config.getProducer(),
267                 e.getMessage() );
268             disconnected = true;
269         }
270 
271         return false;
272     }
273 
274 
275     /**
276      *  prepares a SearchRequest for syncing DIT content.
277      *
278      */
279     private void prepareSyncSearchRequest() throws LdapException
280     {
281         String baseDn = config.getBaseDn();
282 
283         searchRequest = new SearchRequestImpl();
284 
285         searchRequest.setBase( new Dn( baseDn ) );
286         searchRequest.setFilter( config.getFilter() );
287         searchRequest.setSizeLimit( config.getSearchSizeLimit() );
288         searchRequest.setTimeLimit( config.getSearchTimeout() );
289 
290         searchRequest.setDerefAliases( config.getAliasDerefMode() );
291         searchRequest.setScope( config.getSearchScope() );
292         searchRequest.setTypesOnly( false );
293 
294         searchRequest.addAttributes( config.getAttributes() );
295 
296         if ( !config.isChaseReferrals() )
297         {
298             searchRequest.addControl( new ManageDsaITImpl() );
299         }
300 
301         if ( CONSUMER_LOG.isDebugEnabled() )
302         {
303             MDC.put( "Replica", Integer.toString( config.getReplicaId() ) );
304             CONSUMER_LOG.debug( "Configuring consumer {}", config );
305         }
306     }
307 
308 
309     private ResultCodeEnum handleSearchResultDone( SearchResultDone searchDone )
310     {
311         CONSUMER_LOG.debug( "///////////////// handleSearchDone //////////////////" );
312 
313         SyncDoneValue ctrl = ( SyncDoneValue ) searchDone.getControls().get( SyncDoneValue.OID );
314 
315         if ( ( ctrl != null ) && ( ctrl.getCookie() != null ) )
316         {
317             syncCookie = ctrl.getCookie();
318             CONSUMER_LOG.debug( "assigning cookie from sync done value control: {}", Strings.utf8ToString( syncCookie ) );
319             storeCookie();
320         }
321 
322         CONSUMER_LOG.debug( "//////////////// END handleSearchDone//////////////////////" );
323 
324         reload = false;
325 
326         return searchDone.getLdapResult().getResultCode();
327     }
328 
329 
330     private void handleSearchReference( SearchResultReference searchRef )
331     {
332         // this method won't be called cause the provider will serve the referrals as
333         // normal entry objects due to the usage of ManageDsaITControl in the search request
334     }
335 
336 
337     /**
338      * Process a SearchResultEntry received from a consumer. We have to handle all the
339      * cases :
340      * - Add
341      * - Modify
342      * - Moddn
343      * - Delete
344      * - Present
345      * @param syncResult
346      */
347     private void handleSearchResultEntry( SearchResultEntry syncResult )
348     {
349         CONSUMER_LOG.debug( "------------- starting handleSearchResult ------------" );
350 
351         SyncStateValue syncStateCtrl = ( SyncStateValue ) syncResult.getControl( SyncStateValue.OID );
352 
353         try
354         {
355             Entry remoteEntry = new DefaultEntry( schemaManager, syncResult.getEntry() );
356             String uuid = remoteEntry.get( directoryService.getAtProvider().getEntryUUID() ).getString();
357             // lock on UUID to serialize the updates when there are multiple consumers
358             // connected to several producers and to the *same* base/partition
359             Object lock = getLockFor( uuid );
360 
361             synchronized ( lock )
362             {
363                 int rid = -1;
364 
365                 if ( syncStateCtrl.getCookie() != null )
366                 {
367                     syncCookie = syncStateCtrl.getCookie();
368                     rid = LdapProtocolUtils.getReplicaId( Strings.utf8ToString( syncCookie ) );
369                     CONSUMER_LOG.debug( "assigning the cookie from sync state value control: {}",
370                         Strings.utf8ToString( syncCookie ) );
371                 }
372 
373                 SyncStateTypeEnum state = syncStateCtrl.getSyncStateType();
374 
375                 // check to avoid conversion of UUID from byte[] to String
376                 if ( CONSUMER_LOG.isDebugEnabled() )
377                 {
378                     CONSUMER_LOG.debug( "state name {}", state.name() );
379                     CONSUMER_LOG.debug( "entryUUID = {}", Strings.uuidToString( syncStateCtrl.getEntryUUID() ) );
380                 }
381 
382                 Dn remoteDn = remoteEntry.getDn();
383 
384                 switch ( state )
385                 {
386                     case ADD:
387                         boolean remoteDnExist = false;
388 
389                         try
390                         {
391                             remoteDnExist = session.exists( remoteDn );
392                         }
393                         catch ( LdapNoSuchObjectException lnsoe )
394                         {
395                             CONSUMER_LOG.error( lnsoe.getMessage() );
396                         }
397 
398                         if ( !remoteDnExist )
399                         {
400                             CONSUMER_LOG.debug( "adding entry with dn {}", remoteDn );
401                             CONSUMER_LOG.debug( remoteEntry.toString() );
402                             AddOperationContext/interceptor/context/AddOperationContext.html#AddOperationContext">AddOperationContext addContext = new AddOperationContext( session, remoteEntry );
403                             addContext.setReplEvent( true );
404                             addContext.setRid( rid );
405 
406                             OperationManager operationManager = directoryService.getOperationManager();
407                             operationManager.add( addContext );
408                         }
409                         else
410                         {
411                             CONSUMER_LOG.debug( "updating entry in refreshOnly mode {}", remoteDn );
412                             modify( remoteEntry, rid );
413                         }
414 
415                         break;
416 
417                     case MODIFY:
418                         CONSUMER_LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() );
419                         modify( remoteEntry, rid );
420 
421                         break;
422 
423                     case MODDN:
424                         String entryUuid = Strings.uuidToString( syncStateCtrl.getEntryUUID() );
425                         applyModDnOperation( remoteEntry, entryUuid, rid );
426 
427                         break;
428 
429                     case DELETE:
430                         CONSUMER_LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() );
431 
432                         if ( !session.exists( remoteDn ) )
433                         {
434                             CONSUMER_LOG
435                                 .debug(
436                                     "looks like entry {} was already deleted in a prior update (possibly from another provider), skipping delete",
437                                     remoteDn );
438                         }
439                         else
440                         {
441                             // incase of a MODDN operation resulting in a branch to be moved out of scope
442                             // ApacheDS replication provider sends a single delete event on the Dn of the moved branch
443                             // so the branch needs to be recursively deleted here
444                             deleteRecursive( remoteEntry.getDn(), rid );
445                         }
446 
447                         break;
448 
449                     case PRESENT:
450                         CONSUMER_LOG.debug( "entry present {}", remoteEntry );
451                         break;
452 
453                     default:
454                         throw new IllegalArgumentException( "Unexpected sync state " + state );
455                 }
456 
457                 // store the cookie only if the above operation was successful
458                 if ( syncStateCtrl.getCookie() != null )
459                 {
460                     storeCookie();
461                 }
462             }
463         }
464         catch ( Exception e )
465         {
466             CONSUMER_LOG.error( e.getMessage(), e );
467         }
468 
469         CONSUMER_LOG.debug( "------------- Ending handleSearchResult ------------" );
470     }
471 
472 
473     /**
474      * {@inheritDoc}
475      */
476     private void handleSyncInfo( IntermediateResponse syncInfoResp )
477     {
478         try
479         {
480             CONSUMER_LOG.debug( "............... inside handleSyncInfo ..............." );
481 
482             byte[] syncInfoBytes = syncInfoResp.getResponseValue();
483 
484             if ( syncInfoBytes == null )
485             {
486                 return;
487             }
488 
489             SyncInfoValue syncInfoValue = new SyncInfoValueImpl();
490 
491             byte[] cookie = syncInfoValue.getCookie();
492 
493             if ( CONSUMER_LOG.isDebugEnabled() )
494             {
495                 CONSUMER_LOG.debug( "Received a SyncInfoValue from producer {} : {}", config.getProducer(),
496                     syncInfoValue );
497             }
498 
499             int replicaId = -1;
500 
501             if ( cookie != null )
502             {
503                 if ( CONSUMER_LOG.isDebugEnabled() )
504                 {
505                     CONSUMER_LOG.debug( "setting the cookie from the sync info: {}", Strings.utf8ToString( cookie ) );
506                     CONSUMER_LOG.debug( "setting the cookie from the sync info: {}", Strings.utf8ToString( cookie ) );
507                 }
508 
509                 syncCookie = cookie;
510 
511                 String cookieString = Strings.utf8ToString( syncCookie );
512                 replicaId = LdapProtocolUtils.getReplicaId( cookieString );
513             }
514 
515             CONSUMER_LOG.info( "refreshDeletes: {}", syncInfoValue.isRefreshDeletes() );
516 
517             List<byte[]> uuidList = syncInfoValue.getSyncUUIDs();
518 
519             // if refreshDeletes set to true then delete all the entries with entryUUID
520             // present in the syncIdSet
521             if ( syncInfoValue.isRefreshDeletes() )
522             {
523                 deleteEntries( uuidList, false, replicaId );
524             }
525             else
526             {
527                 deleteEntries( uuidList, true, replicaId );
528             }
529 
530             CONSUMER_LOG.info( "refreshDone: {}", syncInfoValue.isRefreshDone() );
531 
532             storeCookie();
533         }
534         catch ( Exception de )
535         {
536             CONSUMER_LOG.error( "Failed to handle syncinfo message", de );
537         }
538 
539         CONSUMER_LOG.debug( ".................... END handleSyncInfo ..............." );
540     }
541 
542 
543     /**
544      * {@inheritDoc}
545      */
546     @Override
547     public void connectionClosed()
548     {
549         if ( CONSUMER_LOG.isDebugEnabled() )
550         {
551             MDC.put( "Replica", Integer.toString( config.getReplicaId() ) );
552             CONSUMER_LOG.debug( "Consumer {} session with {} has been closed ", config.getReplicaId(),
553                 config.getProducer() );
554         }
555 
556         disconnect();
557     }
558 
559 
560     /**
561      * Starts the synchronization operation
562      */
563     @Override
564     public ReplicationStatusEnum startSync()
565     {
566         CONSUMER_LOG.debug( "Starting the SyncRepl process for consumer {}", config.getReplicaId() );
567 
568         // read the cookie if persisted
569         readCookie();
570 
571         if ( config.isRefreshNPersist() )
572         {
573             try
574             {
575                 CONSUMER_LOG.debug( "==================== Refresh And Persist ==========" );
576 
577                 return doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST, reload );
578             }
579             catch ( Exception e )
580             {
581                 CONSUMER_LOG.error( "Failed to sync with refreshAndPersist mode", e );
582                 return ReplicationStatusEnum.DISCONNECTED;
583             }
584         }
585         else
586         {
587             return doRefreshOnly();
588         }
589     }
590 
591 
592     private ReplicationStatusEnum doRefreshOnly()
593     {
594         while ( !disconnected )
595         {
596             CONSUMER_LOG.debug( "==================== Refresh Only ==========" );
597 
598             try
599             {
600                 doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, reload );
601 
602                 CONSUMER_LOG.debug( "--------------------- Sleep for {} seconds ------------------",
603                     ( config.getRefreshInterval() / 1000 ) );
604                 Thread.sleep( config.getRefreshInterval() );
605                 CONSUMER_LOG.debug( "--------------------- syncing again ------------------" );
606 
607             }
608             catch ( InterruptedException ie )
609             {
610                 CONSUMER_LOG.warn( "refresher thread interrupted" );
611 
612                 return ReplicationStatusEnum.DISCONNECTED;
613             }
614             catch ( Exception e )
615             {
616                 CONSUMER_LOG.error( "Failed to sync with refresh only mode", e );
617                 return ReplicationStatusEnum.DISCONNECTED;
618             }
619         }
620 
621         return ReplicationStatusEnum.STOPPED;
622     }
623 
624 
625     /**
626      * {@inheritDoc}
627      */
628     @Override
629     public void setConfig( ReplicationConsumerConfig config )
630     {
631         this.config = ( SyncReplConfiguration ) config;
632     }
633 
634 
635     /**
636      * {@inheritDoc}
637      */
638     @Override
639     public boolean connect( boolean now )
640     {
641         boolean connected = false;
642 
643         if ( now )
644         {
645             connected = connect();
646         }
647 
648         while ( !connected )
649         {
650             try
651             {
652                 CONSUMER_LOG.debug( "Consumer {} cannot connect to {}, wait 5 seconds.", config.getReplicaId(),
653                     config.getProducer() );
654 
655                 // try to establish a connection for every 5 seconds
656                 Thread.sleep( 5000 );
657             }
658             catch ( InterruptedException e )
659             {
660                 CONSUMER_LOG.warn( "Consumer {} Interrupted while trying to reconnect to the provider {}",
661                     config.getReplicaId(), config.getProducer() );
662             }
663 
664             connected = connect();
665         }
666 
667         // TODO : we may have cases were we get here with the connected flag to false. With the above
668         // code, thi sis not possible
669 
670         return connected;
671     }
672 
673 
674     /**
675      * {@inheritDoc}
676      */
677     @Override
678     public void ping()
679     {
680         boolean connected = !disconnected;
681 
682         boolean restartSync = false;
683 
684         if ( disconnected )
685         {
686             connected = connect();
687             restartSync = connected;
688         }
689 
690         if ( connected )
691         {
692             CONSUMER_LOG.debug( "PING : The consumer {} is alive", config.getReplicaId() );
693 
694             // DIRSERVER-2014
695             if ( restartSync )
696             {
697                 CONSUMER_LOG.warn( "Restarting the disconnected consumer {}", config.getReplicaId() );
698                 disconnected = false;
699                 startSync();
700             }
701         }
702         else
703         {
704             CONSUMER_LOG.debug( "PING : The consumer {} cannot be connected", config.getReplicaId() );
705         }
706     }
707 
708 
709     /**
710      * {@inheritDoc}
711      */
712     @Override
713     public void stop()
714     {
715         if ( !disconnected )
716         {
717             disconnect();
718         }
719     }
720 
721 
722     /**
723      * {@inheritDoc}
724      */
725     @Override
726     public String getId()
727     {
728         return String.valueOf( getConfig().getReplicaId() );
729     }
730 
731 
732     /**
733      * Performs a search on connection with updated syncRequest control. The provider
734      * will initiate an UpdateContant or an initContent depending on the current consumer
735      * status, accordingly to the cookie's content.
736      * If the mode is refreshOnly, the server will send a SearchResultDone when all the modified
737      * entries have been sent.
738      * If the mode is refreshAndPersist, the provider never send a SearchResultDone, so we keep
739      * receiving modifications' notifications on the consumer, and never exit the loop, unless
740      * some communication error occurs.
741      *
742      * @param syncType The synchornization type, either REFRESH_ONLY or REFRESH_AND_PERSIST
743      * @param reloadHint A flag used to tell the server that we want a reload
744      * @return The replication status
745      * @throws Exception in case of any problems encountered while searching
746      */
747     private ReplicationStatusEnum doSyncSearch( SynchronizationModeEnum syncType, boolean reloadHint ) throws Exception
748     {
749         CONSUMER_LOG.debug( "Starting synchronization mode {}, reloadHint {}", syncType, reloadHint );
750         // Prepare the Syncrepl Request
751         SyncRequestValue syncReq = new SyncRequestValueImpl();
752 
753         syncReq.setMode( syncType );
754         syncReq.setReloadHint( reloadHint );
755 
756         // If we have a persisted cookie, send it.
757         if ( syncCookie != null )
758         {
759             CONSUMER_LOG.debug( "searching on {} with searchRequest, cookie '{}'", config.getProducer(),
760                 Strings.utf8ToString( syncCookie ) );
761             syncReq.setCookie( syncCookie );
762         }
763         else
764         {
765             CONSUMER_LOG.debug( "searching on {} with searchRequest, no cookie", config.getProducer() );
766         }
767 
768         searchRequest.addControl( syncReq );
769 
770         // Do the search. We use a searchAsync because we want to get SearchResultDone responses
771         SearchFuture sf = connection.searchAsync( searchRequest );
772 
773         Response resp = sf.get();
774 
775         CONSUMER_LOG.debug( "Response from {} : {}", config.getProducer(), resp );
776 
777         // Now, process the responses. We loop until we have a connection termination or
778         // a SearchResultDone (RefreshOnly mode)
779         while ( !( resp instanceof SearchResultDone ) && !sf.isCancelled() && !disconnected )
780         {
781             if ( resp instanceof SearchResultEntry )
782             {
783                 SearchResultEntry result = ( SearchResultEntry ) resp;
784 
785                 handleSearchResultEntry( result );
786             }
787             else if ( resp instanceof SearchResultReference )
788             {
789                 handleSearchReference( ( SearchResultReference ) resp );
790             }
791             else if ( resp instanceof IntermediateResponse )
792             {
793                 handleSyncInfo( ( IntermediateResponse ) resp );
794             }
795 
796             // Next entry
797             resp = sf.get();
798             CONSUMER_LOG.debug( "Response from {} : {}", config.getProducer(), resp );
799         }
800 
801         if ( sf.isCancelled() )
802         {
803 
804             CONSUMER_LOG.debug( "Search sync on {} has been canceled ", config.getProducer(), sf.getCause() );
805 
806             return ReplicationStatusEnum.DISCONNECTED;
807         }
808         else if ( disconnected )
809         {
810             CONSUMER_LOG.debug( "Disconnected from {}", config.getProducer() );
811 
812             return ReplicationStatusEnum.DISCONNECTED;
813         }
814         else
815         {
816             ResultCodeEnum resultCode = handleSearchResultDone( ( SearchResultDone ) resp );
817 
818             CONSUMER_LOG.debug( "Rsultcode of Sync operation from {} : {}", config.getProducer(), resultCode );
819 
820             if ( resultCode == ResultCodeEnum.NO_SUCH_OBJECT )
821             {
822                 // log the error and handle it appropriately
823                 CONSUMER_LOG.warn( "The base Dn {} is not found on provider {}", config.getBaseDn(),
824                     config.getProducer() );
825 
826                 CONSUMER_LOG.warn( "Disconnecting the Refresh&Persist consumer from provider {}", config.getProducer() );
827                 disconnect();
828 
829                 return ReplicationStatusEnum.DISCONNECTED;
830             }
831             else if ( resultCode == ResultCodeEnum.E_SYNC_REFRESH_REQUIRED )
832             {
833                 CONSUMER_LOG.warn( "Full SYNC_REFRESH required from {}", config.getProducer() );
834 
835                 reload = true;
836 
837                 try
838                 {
839                     CONSUMER_LOG.debug( "Deleting baseDN {}", config.getBaseDn() );
840 
841                     // FIXME taking a backup right before deleting might be a good thing, just to be safe.
842                     // the backup file can be deleted after reload completes successfully
843 
844                     // the 'rid' value is not taken into consideration when 'reload' is set
845                     // so any dummy value is fine
846                     deleteRecursive( new Dn( config.getBaseDn() ), -1000 );
847                 }
848                 catch ( Exception e )
849                 {
850                     CONSUMER_LOG
851                         .error(
852                             "Failed to delete the replica base as part of handling E_SYNC_REFRESH_REQUIRED, disconnecting the consumer",
853                             e );
854                 }
855 
856                 // Do a full update.
857                 removeCookie();
858 
859                 CONSUMER_LOG.debug( "Re-doing a syncRefresh from producer {}", config.getProducer() );
860 
861                 return ReplicationStatusEnum.REFRESH_REQUIRED;
862             }
863             else
864             {
865                 CONSUMER_LOG.debug( "Got result code {} from producer {}. Replication stopped", resultCode,
866                     config.getProducer() );
867                 return ReplicationStatusEnum.DISCONNECTED;
868             }
869         }
870     }
871 
872 
873     /**
874      * Disconnect from the producer
875      */
876     private void disconnect()
877     {
878         disconnected = true;
879 
880         try
881         {
882             if ( ( connection != null ) && connection.isConnected() )
883             {
884                 connection.unBind();
885                 CONSUMER_LOG.info( "Unbound from the server {}", config.getProducer() );
886 
887                 if ( CONSUMER_LOG.isDebugEnabled() )
888                 {
889                     MDC.put( "Replica", Integer.toString( config.getReplicaId() ) );
890                     CONSUMER_LOG.info( "Unbound from the server {}", config.getProducer() );
891                 }
892 
893                 connection.close();
894                 CONSUMER_LOG.info( "Connection closed for the server {}", config.getProducer() );
895 
896                 connection = null;
897             }
898         }
899         catch ( Exception e )
900         {
901             CONSUMER_LOG.error( "Failed to close the connection", e );
902         }
903         finally
904         {
905             // persist the cookie
906             storeCookie();
907 
908             // reset the cookie
909             syncCookie = null;
910         }
911     }
912 
913 
914     /**
915      * stores the cookie.
916      */
917     private void storeCookie()
918     {
919         CONSUMER_LOG.debug( "Storing the cookie '{}'", Strings.utf8ToString( syncCookie ) );
920 
921         if ( syncCookie == null )
922         {
923             return;
924         }
925 
926         if ( ( lastSavedCookie != null ) && Arrays.equals( syncCookie, lastSavedCookie ) )
927         {
928             return;
929         }
930 
931         try
932         {
933             Attribute attr = cookieMod.getAttribute();
934             attr.clear();
935             attr.add( syncCookie );
936 
937             String cookieString = Strings.utf8ToString( syncCookie );
938             int replicaId = LdapProtocolUtils.getReplicaId( cookieString );
939 
940             Attribute ridAt = ridMod.getAttribute();
941             ridAt.clear();
942             ridAt.add( String.valueOf( replicaId ) );
943 
944             CONSUMER_LOG.debug( "Storing the cookie in the DIT : {}", config.getConfigEntryDn() );
945 
946             session.modify( config.getConfigEntryDn(), cookieMod );
947             CONSUMER_LOG.debug( "stored the cookie in entry {}", config.getConfigEntryDn() );
948 
949             lastSavedCookie = new byte[syncCookie.length];
950             System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length );
951         }
952         catch ( Exception e )
953         {
954             CONSUMER_LOG.error( "Failed to store the cookie in consumer entry {}", config.getConfigEntryDn(), e );
955         }
956     }
957 
958 
959     /**
960      * Read the cookie for a consumer
961      */
962     private void readCookie()
963     {
964         try
965         {
966             Entry entry = session.lookup( config.getConfigEntryDn(), SchemaConstants.ADS_REPL_COOKIE );
967 
968             if ( entry != null )
969             {
970                 Attribute attr = entry.get( adsReplCookieAT );
971 
972                 if ( attr != null )
973                 {
974                     syncCookie = attr.getBytes();
975                     lastSavedCookie = syncCookie;
976                     String syncCookieString = Strings.utf8ToString( syncCookie );
977                     CONSUMER_LOG.debug( "Loaded cookie {} for consumer {}", syncCookieString, config.getReplicaId() );
978                 }
979                 else
980                 {
981                     CONSUMER_LOG.debug( "No cookie found for consumer {}", config.getReplicaId() );
982                 }
983             }
984             else
985             {
986                 CONSUMER_LOG.debug( "Cannot find the configuration '{}' in the DIT for consumer {}",
987                     config.getConfigEntryDn(), config.getReplicaId() );
988             }
989         }
990         catch ( Exception e )
991         {
992             // can be ignored, most likely happens if there is no entry with the given Dn
993             // log in debug mode
994             CONSUMER_LOG.debug( "Failed to read the cookie, cannot find the entry '{}' in the DIT for consumer {}",
995                 config.getConfigEntryDn(),
996                 config.getReplicaId() );
997         }
998     }
999 
1000 
1001     /**
1002      * deletes the cookie and resets the syncCookie to null
1003      */
1004     private void removeCookie()
1005     {
1006         try
1007         {
1008             Attribute cookieAttr = new DefaultAttribute( adsReplCookieAT );
1009             Modification deleteCookieMod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE,
1010                 cookieAttr );
1011             session.modify( config.getConfigEntryDn(), deleteCookieMod );
1012             CONSUMER_LOG.info( "resetting sync cookie of the consumer with config entry Dn {}",
1013                 config.getConfigEntryDn() );
1014         }
1015         catch ( Exception e )
1016         {
1017             CONSUMER_LOG.warn( "Failed to delete the cookie from the consumer with config entry Dn {}",
1018                 config.getConfigEntryDn() );
1019             CONSUMER_LOG.warn( "{}", e );
1020         }
1021 
1022         syncCookie = null;
1023         lastSavedCookie = null;
1024     }
1025 
1026 
1027     private void applyModDnOperation( Entry remoteEntry, String entryUuid, int rid ) throws Exception
1028     {
1029         CONSUMER_LOG.debug( "MODDN for entry {}, new entry : {}", entryUuid, remoteEntry );
1030 
1031         // Retrieve locally the moved or renamed entry
1032         String filter = "(entryUuid=" + entryUuid + ")";
1033         SearchRequest searchRequest = new SearchRequestImpl();
1034         searchRequest.setBase( new Dn( schemaManager, config.getBaseDn() ) );
1035         searchRequest.setFilter( filter );
1036         searchRequest.setScope( SearchScope.SUBTREE );
1037         searchRequest.addAttributes( SchemaConstants.ENTRY_UUID_AT, SchemaConstants.ENTRY_CSN_AT,
1038             SchemaConstants.ALL_USER_ATTRIBUTES );
1039 
1040         Cursor<Entry> cursor = session.search( searchRequest );
1041         cursor.beforeFirst();
1042 
1043         Entry localEntry = null;
1044 
1045         if ( cursor.next() )
1046         {
1047             localEntry = cursor.get();
1048         }
1049 
1050         cursor.close();
1051 
1052         // can happen in MMR scenario
1053         if ( localEntry == null )
1054         {
1055             return;
1056         }
1057 
1058         if ( config.isMmrMode() )
1059         {
1060             Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1061             Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1062 
1063             if ( localCsn.compareTo( remoteCsn ) >= 0 )
1064             {
1065                 // just discard the received modified entry, that is old
1066                 CONSUMER_LOG.debug( "local modification is latest, discarding the modDn operation dn {}",
1067                     remoteEntry.getDn() );
1068                 return;
1069             }
1070         }
1071 
1072         // Compute the DN, parentDn and Rdn for both entries
1073         Dn localDn = localEntry.getDn();
1074         Dn remoteDn = directoryService.getDnFactory().create( remoteEntry.getDn().getName() );
1075 
1076         Dn localParentDn = localDn.getParent();
1077         Dn remoteParentDn = directoryService.getDnFactory().create( remoteDn.getParent().getName() );
1078 
1079         Rdn localRdn = localDn.getRdn();
1080         Rdn remoteRdn = directoryService.getDnFactory().create( remoteDn.getRdn().getName() ).getRdn();
1081 
1082         // Check if the OldRdn has been deleted
1083         boolean deleteOldRdn = !remoteEntry.contains( localRdn.getNormType(), localRdn.getValue() );
1084 
1085         if ( localRdn.equals( remoteRdn ) )
1086         {
1087             // If the RDN are equals, it's a MOVE
1088             CONSUMER_LOG.debug( "moving {} to the new parent {}", localDn, remoteParentDn );
1089             MoveOperationContextapi/interceptor/context/MoveOperationContext.html#MoveOperationContext">MoveOperationContext movCtx = new MoveOperationContext( session, localDn, remoteParentDn );
1090             movCtx.setReplEvent( true );
1091             movCtx.setRid( rid );
1092             directoryService.getOperationManager().move( movCtx );
1093         }
1094         else if ( localParentDn.equals( remoteParentDn ) )
1095         {
1096             // If the parentDn are equals, it's a RENAME
1097             CONSUMER_LOG.debug( "renaming the Dn {} with new Rdn {} and deleteOldRdn flag set to {}",
1098                 localDn.getName(), remoteRdn.getName(), String.valueOf( deleteOldRdn ) );
1099 
1100             RenameOperationContexti/interceptor/context/RenameOperationContext.html#RenameOperationContext">RenameOperationContext renCtx = new RenameOperationContext( session, localDn, remoteRdn,
1101                 deleteOldRdn );
1102             renCtx.setReplEvent( true );
1103             renCtx.setRid( rid );
1104             directoryService.getOperationManager().rename( renCtx );
1105         }
1106         else
1107         {
1108             // Otherwise, it's a MOVE and RENAME
1109             CONSUMER_LOG.debug(
1110                 "moveAndRename on the Dn {} with new newParent Dn {}, new Rdn {} and deleteOldRdn flag set to {}",
1111                 localDn.getName(),
1112                 remoteParentDn.getName(),
1113                 remoteRdn.getName(),
1114                 String.valueOf( deleteOldRdn ) );
1115 
1116             MoveAndRenameOperationContexttor/context/MoveAndRenameOperationContext.html#MoveAndRenameOperationContext">MoveAndRenameOperationContext movRenCtx = new MoveAndRenameOperationContext( session, localDn,
1117                 remoteParentDn, remoteRdn, deleteOldRdn );
1118             movRenCtx.setReplEvent( true );
1119             movRenCtx.setRid( rid );
1120             directoryService.getOperationManager().moveAndRename( movRenCtx );
1121         }
1122     }
1123 
1124 
1125     private void modify( Entry remoteEntry, int rid ) throws Exception
1126     {
1127         String[] attributes = computeAttributes( config.getAttributes(), SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES );
1128 
1129         LookupOperationContext lookupCtx =
1130             new LookupOperationContext( session, remoteEntry.getDn(), attributes );
1131 
1132         lookupCtx.setSyncreplLookup( true );
1133 
1134         Entry localEntry;
1135 
1136         Partition partition = session.getDirectoryService().getPartitionNexus().getPartition( remoteEntry.getDn() );
1137 
1138         try ( PartitionTxn partitionTxn = partition.beginReadTransaction() )
1139         {
1140             lookupCtx.setTransaction( partitionTxn );
1141             localEntry = session.getDirectoryService().getOperationManager().lookup( lookupCtx );
1142         }
1143 
1144         if ( config.isMmrMode() )
1145         {
1146             Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1147             Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1148 
1149             if ( localCsn.compareTo( remoteCsn ) >= 0 )
1150             {
1151                 // just discard the received modified entry, that is old
1152                 CONSUMER_LOG.debug( "local modification is latest, discarding the modification of dn {}",
1153                     remoteEntry.getDn() );
1154                 return;
1155             }
1156         }
1157 
1158         remoteEntry.removeAttributes( MOD_IGNORE_AT );
1159         localEntry.removeAttributes( MOD_IGNORE_AT );
1160 
1161         List<Modification> mods = new ArrayList<>();
1162         Iterator<Attribute> itr = localEntry.iterator();
1163 
1164         while ( itr.hasNext() )
1165         {
1166             Attribute localAttr = itr.next();
1167             String attrId = localAttr.getId();
1168             Modification mod;
1169             Attribute remoteAttr = remoteEntry.get( attrId );
1170 
1171             if ( remoteAttr != null ) // would be better if we compare the values also? or will it consume more time?
1172             {
1173                 mod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, remoteAttr );
1174                 remoteEntry.remove( remoteAttr );
1175             }
1176             else
1177             {
1178                 mod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE, localAttr );
1179             }
1180 
1181             mods.add( mod );
1182         }
1183 
1184         if ( remoteEntry.size() > 0 )
1185         {
1186             itr = remoteEntry.iterator();
1187 
1188             while ( itr.hasNext() )
1189             {
1190                 mods.add( new DefaultModification( ModificationOperation.ADD_ATTRIBUTE, itr.next() ) );
1191             }
1192         }
1193 
1194         List<Modification> serverModifications = new ArrayList<>( mods.size() );
1195 
1196         for ( Modification mod : mods )
1197         {
1198             serverModifications.add( new DefaultModification( directoryService.getSchemaManager(), mod ) );
1199         }
1200 
1201         ModifyOperationContextceptor/context/ModifyOperationContext.html#ModifyOperationContext">ModifyOperationContext modifyContext = new ModifyOperationContext( session, remoteEntry.getDn(),
1202             serverModifications );
1203         modifyContext.setReplEvent( true );
1204         modifyContext.setRid( rid );
1205 
1206         OperationManager operationManager = directoryService.getOperationManager();
1207         operationManager.modify( modifyContext );
1208     }
1209 
1210 
1211     /**
1212      * Create a new list combining a list and a newly added attribute
1213      */
1214     private String[] computeAttributes( String[] attributes, String addedAttribute )
1215     {
1216         if ( attributes != null )
1217         {
1218             if ( addedAttribute != null )
1219             {
1220                 String[] combinedAttributes = new String[attributes.length + 1];
1221 
1222                 System.arraycopy( attributes, 0, combinedAttributes, 0, attributes.length );
1223                 combinedAttributes[attributes.length] = addedAttribute;
1224 
1225                 return combinedAttributes;
1226             }
1227             else
1228             {
1229                 return attributes;
1230             }
1231         }
1232         else
1233         {
1234             if ( addedAttribute != null )
1235             {
1236                 return new String[]
1237                     { addedAttribute };
1238             }
1239             else
1240             {
1241                 return StringConstants.EMPTY_STRINGS;
1242             }
1243         }
1244     }
1245 
1246 
1247     /**
1248      * deletes the entries having the UUID given in the list
1249      *
1250      * @param uuidList the list of UUIDs
1251      * @param replicaId TODO
1252      * @throws Exception in case of any problems while deleting the entries
1253      */
1254     private void deleteEntries( List<byte[]> uuidList, boolean isRefreshPresent, int replicaId ) throws Exception
1255     {
1256         if ( uuidList == null || uuidList.isEmpty() )
1257         {
1258             return;
1259         }
1260 
1261         // if it is refreshPresent list then send all the UUIDs for
1262         // filtering, otherwise breaking the list will cause the
1263         // other present entries to be deleted from DIT
1264         if ( isRefreshPresent )
1265         {
1266             CONSUMER_LOG.debug( "refresh present syncinfo list has {} UUIDs", uuidList.size() );
1267             processDelete( uuidList, isRefreshPresent, replicaId );
1268             return;
1269         }
1270 
1271         int nodeLimit = 10;
1272 
1273         int count = uuidList.size() / nodeLimit;
1274 
1275         int startIndex = 0;
1276         int i = 0;
1277         for ( ; i < count; i++ )
1278         {
1279             startIndex = i * nodeLimit;
1280             processDelete( uuidList.subList( startIndex, startIndex + nodeLimit ), isRefreshPresent, replicaId );
1281         }
1282 
1283         if ( ( uuidList.size() % nodeLimit ) != 0 )
1284         {
1285             // remove the remaining entries
1286             if ( count > 0 )
1287             {
1288                 startIndex = i * nodeLimit;
1289             }
1290 
1291             processDelete( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent, replicaId );
1292         }
1293     }
1294 
1295 
1296     /**
1297      * do not call this method directly, instead call deleteEntries()
1298      *
1299      * @param limitedUuidList a list of UUIDs whose size is less than or equal to #NODE_LIMIT (node limit applies only for refreshDeletes list)
1300      * @param isRefreshPresent a flag indicating the type of entries present in the UUID list
1301      * @param replicaId TODO
1302      */
1303     private void processDelete( List<byte[]> limitedUuidList, boolean isRefreshPresent, int replicaId )
1304         throws Exception
1305     {
1306         ExprNode filter = null;
1307         int size = limitedUuidList.size();
1308         if ( size == 1 )
1309         {
1310             String uuid = Strings.uuidToString( limitedUuidList.get( 0 ) );
1311 
1312             filter = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, uuid );
1313             if ( isRefreshPresent )
1314             {
1315                 filter = new NotNode( filter );
1316             }
1317         }
1318         else
1319         {
1320             if ( isRefreshPresent )
1321             {
1322                 filter = new AndNode();
1323             }
1324             else
1325             {
1326                 filter = new OrNode();
1327             }
1328 
1329             for ( int i = 0; i < size; i++ )
1330             {
1331                 String uuid = Strings.uuidToString( limitedUuidList.get( i ) );
1332                 ExprNode uuidEqNode = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, uuid );
1333 
1334                 if ( isRefreshPresent )
1335                 {
1336                     uuidEqNode = new NotNode( uuidEqNode );
1337                     ( ( AndNode ) filter ).addNode( uuidEqNode );
1338                 }
1339                 else
1340                 {
1341                     ( ( OrNode ) filter ).addNode( uuidEqNode );
1342                 }
1343             }
1344         }
1345 
1346         Dn dn = new Dn( schemaManager, config.getBaseDn() );
1347 
1348         if ( CONSUMER_LOG.isDebugEnabled() )
1349         {
1350             CONSUMER_LOG.debug( "selecting entries to be deleted using filter {}", filter );
1351         }
1352 
1353         SearchRequest req = new SearchRequestImpl();
1354         req.setBase( dn );
1355         req.setFilter( filter );
1356         req.setScope( SearchScope.SUBTREE );
1357         req.setDerefAliases( AliasDerefMode.NEVER_DEREF_ALIASES );
1358         // the ENTRY_DN_AT must be in the attribute list, otherwise sorting fails
1359         req.addAttributes( SchemaConstants.ENTRY_DN_AT );
1360 
1361         SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT, SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID );
1362         SortRequest ctrl = new SortRequestImpl();
1363         ctrl.addSortKey( sk );
1364         req.addControl( ctrl );
1365 
1366         OperationManager operationManager = directoryService.getOperationManager();
1367 
1368         Cursor<Entry> cursor = session.search( req );
1369         cursor.beforeFirst();
1370 
1371         while ( cursor.next() )
1372         {
1373             Entry entry = cursor.get();
1374 
1375             DeleteOperationContext/api/interceptor/context/DeleteOperationContext.html#DeleteOperationContext">DeleteOperationContext ctx = new DeleteOperationContext( session );
1376             ctx.setReplEvent( true );
1377             ctx.setRid( replicaId );
1378 
1379             // DO NOT generate replication event if this is being deleted as part of
1380             // e_sync_refresh_required
1381             if ( reload )
1382             {
1383                 ctx.setGenerateNoReplEvt( true );
1384             }
1385 
1386             ctx.setDn( entry.getDn() );
1387             operationManager.delete( ctx );
1388         }
1389 
1390         cursor.close();
1391     }
1392 
1393 
1394     private synchronized Object getLockFor( String uuid )
1395     {
1396         Object lock = UUID_LOCK_MAP.get( uuid );
1397 
1398         if ( lock == null )
1399         {
1400             lock = new Object();
1401             UUID_LOCK_MAP.put( uuid, lock );
1402         }
1403 
1404         return lock;
1405     }
1406 
1407 
1408     /**
1409      * removes all child entries present under the given Dn and finally the Dn itself
1410      *
1411      * @param rootDn the Dn which will be removed after removing its children
1412      * @param rid the replica ID
1413      * @throws Exception If the Dn is not valid or if the deletion failed
1414      */
1415     private void deleteRecursive( Dn rootDn, int rid ) throws Exception
1416     {
1417         CONSUMER_LOG.debug( "searching for Dn {} and its children before deleting", rootDn.getName() );
1418         Cursor<Entry> cursor = null;
1419 
1420         try
1421         {
1422             SearchRequest req = new SearchRequestImpl();
1423             req.setBase( rootDn );
1424             req.setFilter( ENTRY_UUID_PRESENCE_FILTER );
1425             req.setScope( SearchScope.SUBTREE );
1426             req.setDerefAliases( AliasDerefMode.NEVER_DEREF_ALIASES );
1427             // the ENTRY_DN_AT must be in the attribute list, otherwise sorting fails
1428             req.addAttributes( SchemaConstants.ENTRY_DN_AT );
1429 
1430             SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT, SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID );
1431 
1432             SortRequest ctrl = new SortRequestImpl();
1433             ctrl.addSortKey( sk );
1434             req.addControl( ctrl );
1435 
1436             cursor = session.search( req );
1437             cursor.beforeFirst();
1438 
1439             OperationManager operationManager = directoryService.getOperationManager();
1440 
1441             while ( cursor.next() )
1442             {
1443                 Entry e = cursor.get();
1444 
1445                 DeleteOperationContext/api/interceptor/context/DeleteOperationContext.html#DeleteOperationContext">DeleteOperationContext ctx = new DeleteOperationContext( session );
1446                 ctx.setReplEvent( true );
1447                 ctx.setRid( rid );
1448 
1449                 // DO NOT generate replication event if this is being deleted as part of
1450                 // e_sync_refresh_required
1451                 if ( reload )
1452                 {
1453                     ctx.setGenerateNoReplEvt( true );
1454                 }
1455 
1456                 ctx.setDn( e.getDn() );
1457 
1458                 operationManager.delete( ctx );
1459             }
1460         }
1461         catch ( Exception e )
1462         {
1463             String msg = "Failed to delete the Dn " + rootDn.getName() + " and its children (if any present)";
1464             CONSUMER_LOG.error( msg, e );
1465             throw e;
1466         }
1467         finally
1468         {
1469             if ( cursor != null )
1470             {
1471                 cursor.close();
1472             }
1473         }
1474     }
1475 
1476 
1477     /**
1478      * @see Object#toString()
1479      */
1480     @Override
1481     public String toString()
1482     {
1483         StringBuilder sb = new StringBuilder();
1484 
1485         sb.append( "Consumer " ).append( config );
1486 
1487         return sb.toString();
1488     }
1489 }