View Javadoc
1   /*
2    *   Licensed to the Apache Software Foundation (ASF) under one
3    *   or more contributor license agreements.  See the NOTICE file
4    *   distributed with this work for additional information
5    *   regarding copyright ownership.  The ASF licenses this file
6    *   to you under the Apache License, Version 2.0 (the
7    *   "License"); you may not use this file except in compliance
8    *   with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing,
13   *   software distributed under the License is distributed on an
14   *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *   KIND, either express or implied.  See the License for the
16   *   specific language governing permissions and limitations
17   *   under the License.
18   *
19   */
20  
21  package org.apache.directory.server.ldap.replication.provider;
22  
23  
24  import static java.lang.Math.min;
25  import static org.apache.directory.server.ldap.LdapServer.NO_SIZE_LIMIT;
26  import static org.apache.directory.server.ldap.LdapServer.NO_TIME_LIMIT;
27  
28  import java.io.File;
29  import java.io.FilenameFilter;
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.HashSet;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicInteger;
40  
41  import org.apache.directory.api.ldap.extras.controls.SynchronizationModeEnum;
42  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncDone.SyncDoneValue;
43  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncDone.SyncDoneValueImpl;
44  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncRequest.SyncRequestValue;
45  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateTypeEnum;
46  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValue;
47  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValueImpl;
48  import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValue;
49  import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValueImpl;
50  import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SynchronizationInfoEnum;
51  import org.apache.directory.api.ldap.model.constants.Loggers;
52  import org.apache.directory.api.ldap.model.constants.SchemaConstants;
53  import org.apache.directory.api.ldap.model.cursor.Cursor;
54  import org.apache.directory.api.ldap.model.entry.Attribute;
55  import org.apache.directory.api.ldap.model.entry.Entry;
56  import org.apache.directory.api.ldap.model.entry.Modification;
57  import org.apache.directory.api.ldap.model.entry.Value;
58  import org.apache.directory.api.ldap.model.exception.LdapException;
59  import org.apache.directory.api.ldap.model.exception.LdapInvalidAttributeValueException;
60  import org.apache.directory.api.ldap.model.exception.LdapURLEncodingException;
61  import org.apache.directory.api.ldap.model.filter.AndNode;
62  import org.apache.directory.api.ldap.model.filter.EqualityNode;
63  import org.apache.directory.api.ldap.model.filter.ExprNode;
64  import org.apache.directory.api.ldap.model.filter.GreaterEqNode;
65  import org.apache.directory.api.ldap.model.filter.LessEqNode;
66  import org.apache.directory.api.ldap.model.filter.OrNode;
67  import org.apache.directory.api.ldap.model.filter.PresenceNode;
68  import org.apache.directory.api.ldap.model.message.LdapResult;
69  import org.apache.directory.api.ldap.model.message.ReferralImpl;
70  import org.apache.directory.api.ldap.model.message.Response;
71  import org.apache.directory.api.ldap.model.message.ResultCodeEnum;
72  import org.apache.directory.api.ldap.model.message.SearchRequest;
73  import org.apache.directory.api.ldap.model.message.SearchResultDone;
74  import org.apache.directory.api.ldap.model.message.SearchResultEntry;
75  import org.apache.directory.api.ldap.model.message.SearchResultEntryImpl;
76  import org.apache.directory.api.ldap.model.message.SearchResultReference;
77  import org.apache.directory.api.ldap.model.message.SearchResultReferenceImpl;
78  import org.apache.directory.api.ldap.model.message.SearchScope;
79  import org.apache.directory.api.ldap.model.message.controls.ChangeType;
80  import org.apache.directory.api.ldap.model.message.controls.ManageDsaIT;
81  import org.apache.directory.api.ldap.model.message.controls.SortKey;
82  import org.apache.directory.api.ldap.model.message.controls.SortRequest;
83  import org.apache.directory.api.ldap.model.message.controls.SortRequestImpl;
84  import org.apache.directory.api.ldap.model.name.Dn;
85  import org.apache.directory.api.ldap.model.schema.AttributeType;
86  import org.apache.directory.api.ldap.model.url.LdapUrl;
87  import org.apache.directory.api.util.Strings;
88  import org.apache.directory.server.constants.ServerDNConstants;
89  import org.apache.directory.server.core.api.DirectoryService;
90  import org.apache.directory.server.core.api.event.DirectoryListenerAdapter;
91  import org.apache.directory.server.core.api.event.EventService;
92  import org.apache.directory.server.core.api.event.EventType;
93  import org.apache.directory.server.core.api.event.NotificationCriteria;
94  import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
95  import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
96  import org.apache.directory.server.core.api.interceptor.context.OperationContext;
97  import org.apache.directory.server.core.api.partition.Partition;
98  import org.apache.directory.server.core.api.partition.PartitionTxn;
99  import org.apache.directory.server.i18n.I18n;
100 import org.apache.directory.server.ldap.LdapProtocolUtils;
101 import org.apache.directory.server.ldap.LdapServer;
102 import org.apache.directory.server.ldap.LdapSession;
103 import org.apache.directory.server.ldap.handlers.SearchAbandonListener;
104 import org.apache.directory.server.ldap.handlers.SearchTimeLimitingMonitor;
105 import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
108 
109 
110 /**
111  * Class used to process the incoming synchronization request from the consumers.
112  *
113  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
114  */
115 public class SyncReplRequestHandler implements ReplicationRequestHandler
116 {
117     /** A logger for the replication provider */
118     private static final Logger PROVIDER_LOG = LoggerFactory.getLogger( Loggers.PROVIDER_LOG.getName() );
119 
120     /** Tells if the replication handler is already started */
121     private boolean initialized = false;
122 
123     /** The directory service instance */
124     private DirectoryService dirService;
125 
126     /** The reference on the Ldap server instance */
127     protected LdapServer ldapServer;
128 
129     /** An ObjectClass AT instance */
130     private AttributeType objectClassAT;
131 
132     /** The CSN AttributeType instance */
133     private AttributeType csnAT;
134 
135     private Map<Integer, ReplicaEventLog> replicaLogMap = new ConcurrentHashMap<>();
136 
137     private File syncReplData;
138 
139     private AtomicInteger replicaCount = new AtomicInteger( 0 );
140 
141     private ReplConsumerManager replicaUtil;
142 
143     private ConsumerLogEntryChangeListener cledListener;
144 
145     private ReplicaEventLogJanitor logJanitor;
146 
147     private AttributeType replLogMaxIdleAT;
148 
149     private AttributeType replLogPurgeThresholdCountAT;
150 
151     /** thread used for updating consumer infor */
152     private Thread consumerInfoUpdateThread;
153 
154     /**
155      * Create a SyncReplRequestHandler empty instance
156      */
157     public SyncReplRequestHandler()
158     {
159     }
160 
161 
162     /**
163      * {@inheritDoc}
164      */
165     public void start( LdapServer server )
166     {
167         // Check that the handler is not already started : we don't want to start it twice...
168         if ( initialized )
169         {
170             PROVIDER_LOG.warn( "syncrepl provider was already initialized" );
171 
172             return;
173         }
174 
175         try
176         {
177             PROVIDER_LOG.debug( "initializing the syncrepl provider" );
178 
179             this.ldapServer = server;
180             this.dirService = server.getDirectoryService();
181 
182             csnAT = dirService.getSchemaManager()
183                 .lookupAttributeTypeRegistry( SchemaConstants.ENTRY_CSN_AT );
184 
185             objectClassAT = dirService.getSchemaManager()
186                 .lookupAttributeTypeRegistry( SchemaConstants.OBJECT_CLASS_AT );
187 
188             replLogMaxIdleAT = dirService.getSchemaManager()
189                 .lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_LOG_MAX_IDLE );
190 
191             replLogPurgeThresholdCountAT = dirService.getSchemaManager()
192                 .lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_LOG_PURGE_THRESHOLD_COUNT );
193 
194             // Get and create the replication directory if it does not exist
195             syncReplData = dirService.getInstanceLayout().getReplDirectory();
196 
197             if ( !syncReplData.exists() && !syncReplData.mkdirs() )
198             {
199                 throw new IOException( I18n.err( I18n.ERR_112_COULD_NOT_CREATE_DIRECTORY, syncReplData ) );
200             }
201 
202             // Create the replication manager
203             replicaUtil = new ReplConsumerManager( dirService );
204 
205             loadReplicaInfo();
206 
207             logJanitor = new ReplicaEventLogJanitor( dirService, replicaLogMap );
208             logJanitor.start();
209 
210             registerPersistentSearches();
211 
212             cledListener = new ConsumerLogEntryChangeListener();
213             NotificationCriteriai/event/NotificationCriteria.html#NotificationCriteria">NotificationCriteria criteria = new NotificationCriteria( dirService.getSchemaManager() );
214             criteria.setBase( new Dn( dirService.getSchemaManager(), ServerDNConstants.REPL_CONSUMER_DN_STR ) );
215             criteria.setEventMask( EventType.DELETE );
216 
217             dirService.getEventService().addListener( cledListener, criteria );
218 
219             CountDownLatch latch = new CountDownLatch( 1 );
220 
221             consumerInfoUpdateThread = new Thread( createConsumerInfoUpdateTask( latch ) );
222             consumerInfoUpdateThread.setDaemon( true );
223             consumerInfoUpdateThread.start();
224 
225             // Wait for the thread to be ready. We wait 5 minutes, it should be way more
226             // than necessary
227             boolean threadInitDone = latch.await( 5, TimeUnit.MINUTES );
228 
229             if ( !threadInitDone )
230             {
231                 // We have had a time out : just get out
232                 PROVIDER_LOG.error( "The consumer replica thread has not been initialized in time" );
233                 throw new RuntimeException( "Cannot initialize the Provider replica listener" );
234             }
235 
236             initialized = true;
237             PROVIDER_LOG.debug( "syncrepl provider initialized successfully" );
238         }
239         catch ( Exception e )
240         {
241             PROVIDER_LOG.error( "Failed to initialize the log files required by the syncrepl provider", e );
242             throw new RuntimeException( e );
243         }
244     }
245 
246 
247     /**
248      * {@inheritDoc}
249      */
250     public void stop()
251     {
252         EventService evtSrv = dirService.getEventService();
253 
254         evtSrv.removeListener( cledListener );
255         //first set the 'stop' flag
256         logJanitor.stopCleaning();
257         //then interrupt the janitor
258         logJanitor.interrupt();
259 
260         //then stop the consumerInfoUpdateThread
261         consumerInfoUpdateThread.interrupt();
262         
263         for ( ReplicaEventLog log : replicaLogMap.values() )
264         {
265             try
266             {
267                 PROVIDER_LOG.debug( "Stopping the logging for replica {}", log.getId() );
268                 evtSrv.removeListener( log.getPersistentListener() );
269                 log.stop();
270             }
271             catch ( Exception e )
272             {
273                 PROVIDER_LOG.error( "Failed to close the event log {}", log.getId(), e );
274             }
275         }
276 
277         // flush the dirty repos
278         storeReplicaInfo();
279 
280         initialized = false;
281     }
282 
283 
284     /**
285      * Process the incoming search request sent by a remote server when trying to replicate.
286      *
287      * @param session The used LdapSession. Should be the dedicated user
288      * @param request The search request
289      */
290     public void handleSyncRequest( LdapSession session, SearchRequest request ) throws LdapException
291     {
292         PROVIDER_LOG.debug( "Received a Syncrepl request : {} from {}", request, session );
293         try
294         {
295             if ( !request.getAttributes().contains( SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES ) )
296             {
297                 // this is needed for accessing entryUUID and entryCSN attributes for internal purpose
298                 request.addAttributes( SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES );
299             }
300 
301             // First extract the Sync control from the request
302             SyncRequestValue syncControl = ( SyncRequestValue ) request.getControls().get(
303                 SyncRequestValue.OID );
304 
305             // cookie is in the format <replicaId>;<Csn value>
306             byte[] cookieBytes = syncControl.getCookie();
307 
308             if ( cookieBytes == null )
309             {
310                 PROVIDER_LOG.debug( "Received a replication request with no cookie" );
311                 // No cookie ? We have to get all the entries from the provider
312                 // This is an initiate Content Poll action (RFC 4533, 3.3.1)
313                 doInitialRefresh( session, request );
314             }
315             else
316             {
317                 String cookieString = Strings.utf8ToString( cookieBytes );
318 
319                 PROVIDER_LOG.debug( "Received a replication request {} with a cookie '{}'", request, cookieString );
320 
321                 if ( !LdapProtocolUtils.isValidCookie( cookieString ) )
322                 {
323                     PROVIDER_LOG.error( "received an invalid cookie {} from the consumer with session {}",
324                         cookieString,
325                         session );
326                     sendESyncRefreshRequired( session, request );
327                 }
328                 else
329                 {
330                     ReplicaEventLog clientMsgLog = getReplicaEventLog( cookieString );
331 
332                     if ( clientMsgLog == null )
333                     {
334                         PROVIDER_LOG.debug(
335                             "received a valid cookie {} but there is no event log associated with this replica",
336                             cookieString );
337                         sendESyncRefreshRequired( session, request );
338                     }
339                     else
340                     {
341                         String consumerCsn = LdapProtocolUtils.getCsn( cookieString );
342                         doContentUpdate( session, request, clientMsgLog, consumerCsn );
343                     }
344                 }
345             }
346         }
347         catch ( Exception e )
348         {
349             PROVIDER_LOG.error( "Failed to handle the syncrepl request", e );
350 
351             throw new LdapException( e.getMessage(), e );
352         }
353     }
354 
355 
356     /**
357      * Send all the stored modifications to the consumer
358      */
359     private void sendContentFromLog( LdapSession session, SearchRequest req, ReplicaEventLog clientMsgLog,
360         String fromCsn )
361         throws Exception
362     {
363         // do the search from the log
364         String lastSentCsn = fromCsn;
365 
366         ReplicaJournalCursor cursor = clientMsgLog.getCursor( fromCsn );
367 
368         PROVIDER_LOG.debug( "Processing the log for replica {}", clientMsgLog.getId() );
369 
370         try
371         {
372             while ( cursor.next() )
373             {
374                 ReplicaEventMessage replicaEventMessage = cursor.get();
375                 Entry entry = replicaEventMessage.getEntry();
376                 PROVIDER_LOG.debug( "Read message from the queue {}", entry );
377 
378                 lastSentCsn = entry.get( csnAT ).getString();
379 
380                 ChangeType changeType = replicaEventMessage.getChangeType();
381 
382                 SyncStateTypeEnum syncStateType = null;
383 
384                 switch ( changeType )
385                 {
386                     case ADD:
387                         syncStateType = SyncStateTypeEnum.ADD;
388                         break;
389 
390                     case MODIFY:
391                         syncStateType = SyncStateTypeEnum.MODIFY;
392                         break;
393 
394                     case MODDN:
395                         syncStateType = SyncStateTypeEnum.MODDN;
396                         break;
397 
398                     case DELETE:
399                         syncStateType = SyncStateTypeEnum.DELETE;
400                         break;
401 
402                     default:
403                         throw new IllegalStateException( I18n.err( I18n.ERR_686 ) );
404                 }
405 
406                 sendSearchResultEntry( session, req, entry, syncStateType );
407 
408                 clientMsgLog.setLastSentCsn( lastSentCsn );
409 
410                 PROVIDER_LOG.debug( "The latest entry sent to the consumer {} has this CSN : {}", clientMsgLog.getId(),
411                     lastSentCsn );
412             }
413 
414             PROVIDER_LOG.debug( "All pending modifciations for replica {} processed", clientMsgLog.getId() );
415         }
416         finally
417         {
418             cursor.close();
419         }
420     }
421 
422 
423     /**
424      * process the update of the consumer, starting from the given LastEntryCSN the consumer
425      * has sent with the sync request.
426      */
427     private void doContentUpdate( LdapSession session, SearchRequest req, ReplicaEventLog replicaLog, String consumerCsn )
428         throws Exception
429     {
430         synchronized ( replicaLog )
431         {
432             boolean refreshNPersist = isRefreshNPersist( req );
433 
434             // if this method is called with refreshAndPersist
435             // means the client was offline after it initiated a persistent synch session
436             // we need to update the handler's session
437             if ( refreshNPersist )
438             {
439                 SyncReplSearchListener handler = replicaLog.getPersistentListener();
440                 handler.setSearchRequest( req );
441                 handler.setSession( session );
442             }
443 
444             sendContentFromLog( session, req, replicaLog, consumerCsn );
445 
446             String lastSentCsn = replicaLog.getLastSentCsn();
447 
448             byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), lastSentCsn );
449 
450             if ( refreshNPersist )
451             {
452                 SyncInfoValue syncInfoValue = new SyncInfoValueImpl();
453                 syncInfoValue.setSyncInfoValueType( SynchronizationInfoEnum.NEW_COOKIE );
454                 syncInfoValue.setMessageId( req.getMessageId() );
455                 syncInfoValue.setCookie( cookie );
456 
457                 PROVIDER_LOG.debug( "Sent the intermediate response to the {} consumer, {}", replicaLog.getId(),
458                     syncInfoValue );
459                 session.getIoSession().write( syncInfoValue );
460 
461                 replicaLog.getPersistentListener().setPushInRealTime( refreshNPersist );
462             }
463             else
464             {
465                 SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
466                 searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.SUCCESS );
467                 SyncDoneValue syncDone = new SyncDoneValueImpl();
468                 syncDone.setCookie( cookie );
469                 searchDoneResp.addControl( syncDone );
470 
471                 PROVIDER_LOG.debug( "Send a SearchResultDone response to the {} consumer", replicaLog.getId(),
472                     searchDoneResp );
473 
474                 session.getIoSession().write( searchDoneResp );
475             }
476         }
477     }
478 
479 
480     /**
481      * Process the initial refresh : we will send all the entries
482      */
483     private void doInitialRefresh( LdapSession session, SearchRequest request ) throws Exception
484     {
485         PROVIDER_LOG.debug( "Starting an initial refresh" );
486 
487         SortRequest ctrl = ( SortRequest ) request.getControl( SortRequest.OID );
488 
489         if ( ctrl != null )
490         {
491             PROVIDER_LOG
492                 .warn( "Removing the received sort control from the syncrepl search request during initial refresh" );
493             request.removeControl( ctrl );
494         }
495 
496         PROVIDER_LOG
497             .debug( "Adding sort control to sort the entries by entryDn attribute to preserve order of insertion" );
498         SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT );
499         // matchingrule for "entryDn"
500         sk.setMatchingRuleId( SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID );
501         sk.setReverseOrder( true );
502 
503         ctrl = new SortRequestImpl();
504         ctrl.addSortKey( sk );
505 
506         request.addControl( ctrl );
507 
508         String originalFilter = request.getFilter().toString();
509         InetSocketAddress address = ( InetSocketAddress ) session.getIoSession().getRemoteAddress();
510         String hostName = address.getAddress().getHostName();
511 
512         ExprNode modifiedFilter = modifyFilter( session, request );
513 
514         Partition partition = dirService.getPartitionNexus().getPartition( request.getBase() );
515         String contextCsn;
516         
517         try ( PartitionTxn partitionTxn = partition.beginReadTransaction() )
518         {
519             contextCsn = partition.getContextCsn( partitionTxn );
520         }
521 
522         boolean refreshNPersist = isRefreshNPersist( request );
523 
524         // first register a ReplicaEventLog before starting the initial content refresh
525         // this is to log all the operations happen on DIT during initial content refresh
526         ReplicaEventLog replicaLog = null;
527         
528         try ( PartitionTxn partitionTxn = partition.beginReadTransaction() )
529         {
530             replicaLog = createReplicaEventLog( partitionTxn, hostName, originalFilter );
531         }
532 
533         replicaLog.setRefreshNPersist( refreshNPersist );
534         Value contexCsnValue = new Value( dirService.getAtProvider().getEntryCSN(), contextCsn );
535 
536         // modify the filter to include the context Csn
537         GreaterEqNode csnGeNode = new GreaterEqNode( csnAT, contexCsnValue );
538         ExprNode postInitContentFilter = new AndNode( modifiedFilter, csnGeNode );
539         request.setFilter( postInitContentFilter );
540 
541         // now we process entries forever as they change
542         // irrespective of the sync mode set the 'isRealtimePush' to false initially so that we can
543         // store the modifications in the queue and later if it is a persist mode
544         PROVIDER_LOG.debug( "Starting the replicaLog {}", replicaLog );
545 
546         // we push this queue's content and switch to realtime mode
547         SyncReplSearchListenervider/SyncReplSearchListener.html#SyncReplSearchListener">SyncReplSearchListener replicationListener = new SyncReplSearchListener( session, request, replicaLog, false );
548         replicaLog.setPersistentListener( replicationListener );
549 
550         // compose notification criteria and add the listener to the event
551         // service using that notification criteria to determine which events
552         // are to be delivered to the persistent search issuing client
553         NotificationCriteriai/event/NotificationCriteria.html#NotificationCriteria">NotificationCriteria criteria = new NotificationCriteria( dirService.getSchemaManager() );
554         criteria.setAliasDerefMode( request.getDerefAliases() );
555         criteria.setBase( request.getBase() );
556         criteria.setFilter( request.getFilter() );
557         criteria.setScope( request.getScope() );
558         criteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
559 
560         replicaLog.setSearchCriteria( criteria );
561 
562         dirService.getEventService().addListener( replicationListener, criteria );
563 
564         // then start pushing initial content
565         LessEqNode csnNode = new LessEqNode( csnAT, contexCsnValue );
566 
567         // modify the filter to include the context Csn
568         ExprNode initialContentFilter = new AndNode( modifiedFilter, csnNode );
569         request.setFilter( initialContentFilter );
570 
571         // Now, do a search to get all the entries
572         SearchResultDone searchDoneResp = doSimpleSearch( session, request, replicaLog );
573 
574         if ( searchDoneResp.getLdapResult().getResultCode() == ResultCodeEnum.SUCCESS )
575         {
576             if ( replicaLog.getLastSentCsn() == null )
577             {
578                 replicaLog.setLastSentCsn( contextCsn );
579             }
580 
581             if ( refreshNPersist ) // refreshAndPersist mode
582             {
583                 PROVIDER_LOG
584                     .debug( "Refresh&Persist requested : send the data being modified since the initial refresh" );
585                 // Now, send the modified entries since the search has started
586                 sendContentFromLog( session, request, replicaLog, contextCsn );
587 
588                 byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), replicaLog.getLastSentCsn() );
589 
590                 SyncInfoValue syncInfoValue = new SyncInfoValueImpl();
591                 syncInfoValue.setSyncInfoValueType( SynchronizationInfoEnum.NEW_COOKIE );
592                 syncInfoValue.setMessageId( request.getMessageId() );
593                 syncInfoValue.setCookie( cookie );
594 
595                 PROVIDER_LOG.info( "Sending the intermediate response to consumer {}, {}", 
596                     replicaLog, syncInfoValue );
597 
598                 session.getIoSession().write( syncInfoValue );
599 
600                 // switch the handler mode to realtime push
601                 replicationListener.setPushInRealTime( refreshNPersist );
602                 PROVIDER_LOG.debug( "e waiting for any modification for {}", replicaLog );
603             }
604             else
605             {
606                 PROVIDER_LOG.debug( "RefreshOnly requested" );
607                 byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), contextCsn );
608 
609                 // no need to send from the log, that will be done in the next refreshOnly session
610                 SyncDoneValue syncDone = new SyncDoneValueImpl();
611                 syncDone.setCookie( cookie );
612                 searchDoneResp.addControl( syncDone );
613                 PROVIDER_LOG.info( "Sending the searchResultDone response to consumer {}, {}", replicaLog,
614                     searchDoneResp );
615 
616                 session.getIoSession().write( searchDoneResp );
617             }
618         }
619         else
620         // if not succeeded return
621         {
622             PROVIDER_LOG.warn( "initial content refresh didn't succeed due to {}", searchDoneResp.getLdapResult()
623                 .getResultCode() );
624             replicaLog.stop();
625             replicaLog = null;
626 
627             // remove the listener
628             dirService.getEventService().removeListener( replicationListener );
629 
630             return;
631         }
632 
633         // if all is well then store the consumer information
634         replicaUtil.addConsumerEntry( replicaLog );
635 
636         // add to the map only after storing in the DIT, else the Replica update thread barfs
637         replicaLogMap.put( replicaLog.getId(), replicaLog );
638     }
639 
640 
641     /**
642      * Process a search on the provider to get all the modified entries. We then send all
643      * of them to the consumer
644      */
645     private SearchResultDone doSimpleSearch( LdapSession session, SearchRequest req, ReplicaEventLog replicaLog )
646         throws Exception
647     {
648         PROVIDER_LOG.debug( "Simple Search {} for {}", req, session );
649         SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
650         LdapResult ldapResult = searchDoneResp.getLdapResult();
651 
652         // A normal search
653         // Check that we have a cursor or not.
654         // No cursor : do a search.
655         Cursor<Entry> cursor = session.getCoreSession().search( req );
656 
657         // Position the cursor at the beginning
658         cursor.beforeFirst();
659 
660         /*
661          * Iterate through all search results building and sending back responses
662          * for each search result returned.
663          */
664         try
665         {
666             // Get the size limits
667             // Don't bother setting size limits for administrators that don't ask for it
668             long serverLimit = getServerSizeLimit( session, req );
669 
670             long requestLimit = req.getSizeLimit() == 0L ? Long.MAX_VALUE : req.getSizeLimit();
671 
672             req.addAbandonListener( new SearchAbandonListener( ldapServer, cursor ) );
673             setTimeLimitsOnCursor( req, session, cursor );
674             PROVIDER_LOG.debug( "search operation requested size limit {}, server size limit {}", requestLimit,
675                 serverLimit );
676             long sizeLimit = min( requestLimit, serverLimit );
677 
678             readResults( session, req, ldapResult, cursor, sizeLimit, replicaLog );
679         }
680         finally
681         {
682             if ( cursor != null )
683             {
684                 try
685                 {
686                     cursor.close();
687                 }
688                 catch ( Exception e )
689                 {
690                     PROVIDER_LOG.error( I18n.err( I18n.ERR_168 ), e );
691                 }
692             }
693         }
694 
695         PROVIDER_LOG.debug( "Search done" );
696 
697         return searchDoneResp;
698     }
699 
700 
701     /**
702      * Process the results get from a search request. We will send them to the client.
703      */
704     private void readResults( LdapSession session, SearchRequest req, LdapResult ldapResult,
705         Cursor<Entry> cursor, long sizeLimit, ReplicaEventLog replicaLog ) throws Exception
706     {
707         long count = 0;
708 
709         while ( ( count < sizeLimit ) && cursor.next() )
710         {
711             // Handle closed session
712             if ( session.getIoSession().isClosing() )
713             {
714                 // The client has closed the connection
715                 PROVIDER_LOG.debug( "Request terminated for message {}, the client has closed the session",
716                     req.getMessageId() );
717                 break;
718             }
719 
720             if ( req.isAbandoned() )
721             {
722                 // The cursor has been closed by an abandon request.
723                 PROVIDER_LOG.debug( "Request terminated by an AbandonRequest for message {}", req.getMessageId() );
724                 break;
725             }
726 
727             Entry entry = cursor.get();
728 
729             sendSearchResultEntry( session, req, entry, SyncStateTypeEnum.ADD );
730 
731             String lastSentCsn = entry.get( csnAT ).getString();
732             replicaLog.setLastSentCsn( lastSentCsn );
733 
734             count++;
735         }
736 
737         PROVIDER_LOG.debug( "Sent {} entries for {}", count, replicaLog );
738 
739         // DO NOT WRITE THE RESPONSE - JUST RETURN IT
740         ldapResult.setResultCode( ResultCodeEnum.SUCCESS );
741 
742         if ( ( count >= sizeLimit ) && ( cursor.next() ) )
743         {
744             // We have reached the limit
745             // Move backward on the cursor to restore the previous position, as we moved forward
746             // to check if there is one more entry available
747             cursor.previous();
748             // Special case if the user has requested more elements than the request size limit
749             ldapResult.setResultCode( ResultCodeEnum.SIZE_LIMIT_EXCEEDED );
750         }
751     }
752 
753 
754     /**
755      * Prepare and send a search result entry response, with the associated
756      * SyncState control.
757      */
758     private void sendSearchResultEntry( LdapSession session, SearchRequest req, Entry entry,
759         SyncStateTypeEnum syncStateType ) throws Exception
760     {
761         Attribute uuid = entry.get( SchemaConstants.ENTRY_UUID_AT );
762 
763         // Create the SyncState control
764         SyncStateValue syncStateControl = new SyncStateValueImpl();
765         syncStateControl.setSyncStateType( syncStateType );
766         syncStateControl.setEntryUUID( Strings.uuidToBytes( uuid.getString() ) );
767 
768         if ( syncStateType == SyncStateTypeEnum.DELETE )
769         {
770             // clear the entry's all attributes except the Dn and entryUUID
771             entry.clear();
772             entry.add( uuid );
773         }
774 
775         Response resp = generateResponse( session, req, entry );
776         resp.addControl( syncStateControl );
777 
778         PROVIDER_LOG.debug( "Sending the entry:\n {}", resp );
779         session.getIoSession().write( resp );
780     }
781 
782 
783     /**
784      * Build the response to be sent to the client
785      */
786     private Response generateResponse( LdapSession session, SearchRequest req, Entry entry ) throws Exception
787     {
788         Attribute ref = entry.get( SchemaConstants.REF_AT );
789         boolean hasManageDsaItControl = req.getControls().containsKey( ManageDsaIT.OID );
790 
791         if ( ( ref != null ) && !hasManageDsaItControl )
792         {
793             // The entry is a referral.
794             SearchResultReference respRef;
795             respRef = new SearchResultReferenceImpl( req.getMessageId() );
796             respRef.setReferral( new ReferralImpl() );
797 
798             for ( Value val : ref )
799             {
800                 String url = val.getString();
801 
802                 if ( !url.startsWith( "ldap" ) )
803                 {
804                     respRef.getReferral().addLdapUrl( url );
805                 }
806 
807                 LdapUrl ldapUrl = null;
808 
809                 try
810                 {
811                     ldapUrl = new LdapUrl( url );
812                     ldapUrl.setForceScopeRendering( true );
813                 }
814                 catch ( LdapURLEncodingException e )
815                 {
816                     PROVIDER_LOG.error( I18n.err( I18n.ERR_165, url, entry ) );
817                 }
818 
819                 switch ( req.getScope() )
820                 {
821                     case SUBTREE:
822                         ldapUrl.setScope( SearchScope.SUBTREE.getScope() );
823                         break;
824 
825                     case ONELEVEL: // one level here is object level on remote server
826                         ldapUrl.setScope( SearchScope.OBJECT.getScope() );
827                         break;
828 
829                     default:
830                         throw new IllegalStateException( I18n.err( I18n.ERR_686 ) );
831                 }
832 
833                 respRef.getReferral().addLdapUrl( ldapUrl.toString() );
834             }
835 
836             return respRef;
837         }
838         else
839         {
840             // The entry is not a referral, or the ManageDsaIt control is set
841             SearchResultEntry respEntry;
842             respEntry = new SearchResultEntryImpl( req.getMessageId() );
843             respEntry.setEntry( entry );
844             respEntry.setObjectName( entry.getDn() );
845 
846             return respEntry;
847         }
848     }
849 
850 
851     /**
852      * Return the server size limit
853      */
854     private long getServerSizeLimit( LdapSession session, SearchRequest request )
855     {
856         if ( session.getCoreSession().isAnAdministrator() )
857         {
858             if ( request.getSizeLimit() == NO_SIZE_LIMIT )
859             {
860                 return Long.MAX_VALUE;
861             }
862             else
863             {
864                 return request.getSizeLimit();
865             }
866         }
867         else
868         {
869             if ( ldapServer.getMaxSizeLimit() == NO_SIZE_LIMIT )
870             {
871                 return Long.MAX_VALUE;
872             }
873             else
874             {
875                 return ldapServer.getMaxSizeLimit();
876             }
877         }
878     }
879 
880 
881     private void setTimeLimitsOnCursor( SearchRequest req, LdapSession session,
882         final Cursor<Entry> cursor )
883     {
884         // Don't bother setting time limits for administrators
885         if ( session.getCoreSession().isAnAdministrator() && req.getTimeLimit() == NO_TIME_LIMIT )
886         {
887             return;
888         }
889 
890         /*
891          * Non administrator based searches are limited by time if the server
892          * has been configured with unlimited time and the request specifies
893          * unlimited search time
894          */
895         if ( ldapServer.getMaxTimeLimit() == NO_TIME_LIMIT && req.getTimeLimit() == NO_TIME_LIMIT )
896         {
897             return;
898         }
899 
900         /*
901          * If the non-administrator user specifies unlimited time but the server
902          * is configured to limit the search time then we limit by the max time
903          * allowed by the configuration
904          */
905         if ( req.getTimeLimit() == 0 )
906         {
907             cursor.setClosureMonitor( new SearchTimeLimitingMonitor( ldapServer.getMaxTimeLimit(), TimeUnit.SECONDS ) );
908             return;
909         }
910 
911         /*
912          * If the non-administrative user specifies a time limit equal to or
913          * less than the maximum limit configured in the server then we
914          * constrain search by the amount specified in the request
915          */
916         if ( ldapServer.getMaxTimeLimit() >= req.getTimeLimit() )
917         {
918             cursor.setClosureMonitor( new SearchTimeLimitingMonitor( req.getTimeLimit(), TimeUnit.SECONDS ) );
919             return;
920         }
921 
922         /*
923          * Here the non-administrative user's requested time limit is greater
924          * than what the server's configured maximum limit allows so we limit
925          * the search to the configured limit
926          */
927         cursor.setClosureMonitor( new SearchTimeLimitingMonitor( ldapServer.getMaxTimeLimit(), TimeUnit.SECONDS ) );
928     }
929 
930 
931     public ExprNode modifyFilter( LdapSession session, SearchRequest req ) throws Exception
932     {
933         /*
934          * Most of the time the search filter is just (objectClass=*) and if
935          * this is the case then there's no reason at all to OR this with an
936          * (objectClass=referral).  If we detect this case then we leave it
937          * as is to represent the OR condition:
938          *
939          *  (| (objectClass=referral)(objectClass=*)) == (objectClass=*)
940          */
941         boolean isOcPresenceFilter = false;
942 
943         if ( req.getFilter() instanceof PresenceNode )
944         {
945             PresenceNode presenceNode = ( PresenceNode ) req.getFilter();
946 
947             AttributeType at = session.getCoreSession().getDirectoryService().getSchemaManager()
948                 .lookupAttributeTypeRegistry( presenceNode.getAttribute() );
949 
950             if ( at.getOid().equals( SchemaConstants.OBJECT_CLASS_AT_OID ) )
951             {
952                 isOcPresenceFilter = true;
953             }
954         }
955 
956         ExprNode filter = req.getFilter();
957 
958         if ( !req.hasControl( ManageDsaIT.OID ) && !isOcPresenceFilter )
959         {
960             filter = new OrNode( req.getFilter(), newIsReferralEqualityNode( session ) );
961         }
962 
963         return filter;
964     }
965 
966 
967     public ReplicaEventLogJanitor getLogJanitor()
968     {
969         return logJanitor;
970     }
971 
972 
973     public Map<Integer, ReplicaEventLog> getReplicaLogMap()
974     {
975         return replicaLogMap;
976     }
977 
978 
979     private EqualityNode<String> newIsReferralEqualityNode( LdapSession session ) throws Exception
980     {
981         return new EqualityNode<>( SchemaConstants.OBJECT_CLASS_AT, 
982             new Value( objectClassAT, SchemaConstants.REFERRAL_OC ).getString() );
983     }
984 
985 
986     /**
987      * Update the consumer configuration entries if they are 'dirty' (ie, if
988      * the consumer lastCSN is not up to date)
989      */
990     private void storeReplicaInfo()
991     {
992         try
993         {
994             for ( Map.Entry<Integer, ReplicaEventLog> e : replicaLogMap.entrySet() )
995             {
996                 ReplicaEventLog replica = e.getValue();
997 
998                 if ( replica.isDirty() )
999                 {
1000                     PROVIDER_LOG.debug( "updating the details of replica {}", replica );
1001                     replicaUtil.updateReplicaLastSentCsn( replica );
1002                     replica.setDirty( false );
1003                 }
1004             }
1005         }
1006         catch ( Exception e )
1007         {
1008             PROVIDER_LOG.error( "Failed to store the replica information", e );
1009         }
1010     }
1011 
1012 
1013     /**
1014      * Read and store the consumer's informations
1015      */
1016     private void loadReplicaInfo()
1017     {
1018         try
1019         {
1020             List<ReplicaEventLog> eventLogs = replicaUtil.getReplicaEventLogs();
1021             Set<String> eventLogNames = new HashSet<>();
1022 
1023             if ( !eventLogs.isEmpty() )
1024             {
1025                 for ( ReplicaEventLog replica : eventLogs )
1026                 {
1027                     PROVIDER_LOG.debug( "initializing the replica log from {}", replica.getId() );
1028                     replicaLogMap.put( replica.getId(), replica );
1029                     eventLogNames.add( replica.getName() );
1030 
1031                     // update the replicaCount's value to assign a correct value to the new replica(s)
1032                     if ( replicaCount.get() < replica.getId() )
1033                     {
1034                         replicaCount.set( replica.getId() );
1035                     }
1036                 }
1037             }
1038             else
1039             {
1040                 PROVIDER_LOG.debug( "no replica logs found to initialize" );
1041             }
1042 
1043             // remove unused logs
1044             for ( File f : getAllReplJournalNames() )
1045             {
1046                 if ( !eventLogNames.contains( f.getName() ) )
1047                 {
1048                     f.delete();
1049                     PROVIDER_LOG.info( "removed unused replication event log {}", f );
1050                 }
1051             }
1052         }
1053         catch ( Exception e )
1054         {
1055             PROVIDER_LOG.error( "Failed to load the replica information", e );
1056         }
1057     }
1058 
1059 
1060     /**
1061      * Register the listeners for each existing consumers
1062      */
1063     private void registerPersistentSearches() throws Exception
1064     {
1065         for ( Map.Entry<Integer, ReplicaEventLog> e : replicaLogMap.entrySet() )
1066         {
1067             ReplicaEventLog log = e.getValue();
1068 
1069             if ( log.getSearchCriteria() != null )
1070             {
1071                 PROVIDER_LOG.debug( "registering persistent search for the replica {}", log.getId() );
1072                 SyncReplSearchListenerlication/provider/SyncReplSearchListener.html#SyncReplSearchListener">SyncReplSearchListener handler = new SyncReplSearchListener( null, null, log, false );
1073                 log.setPersistentListener( handler );
1074 
1075                 dirService.getEventService().addListener( handler, log.getSearchCriteria() );
1076             }
1077             else
1078             {
1079                 PROVIDER_LOG.warn( "invalid persistent search criteria {} for the replica {}", log.getSearchCriteria(),
1080                     log
1081                         .getId() );
1082             }
1083         }
1084     }
1085 
1086 
1087     /**
1088      * Create a thread to process replication communication with a consumer
1089      */
1090     private Runnable createConsumerInfoUpdateTask( final CountDownLatch latch )
1091     {
1092         return new Runnable()
1093         {
1094             public void run()
1095             {
1096                 try
1097                 {
1098                     while ( true )
1099                     {
1100                         storeReplicaInfo();
1101                         
1102                         latch.countDown();
1103                         Thread.sleep( 10000 );
1104                     }
1105                 }
1106                 catch ( InterruptedException e )
1107                 {
1108                     // log at debug level, this will be interrupted during stop
1109                     PROVIDER_LOG.debug( "thread storing the replica information was interrupted", e );
1110                 }
1111             }
1112         };
1113     }
1114 
1115 
1116     /**
1117      * Get the Replica event log from the replica ID found in the cookie
1118      */
1119     private ReplicaEventLog getReplicaEventLog( String cookieString )
1120     {
1121         ReplicaEventLog replicaLog = null;
1122 
1123         if ( LdapProtocolUtils.isValidCookie( cookieString ) )
1124         {
1125             int clientId = LdapProtocolUtils.getReplicaId( cookieString );
1126             replicaLog = replicaLogMap.get( clientId );
1127         }
1128 
1129         return replicaLog;
1130     }
1131 
1132 
1133     /**
1134      * Create a new ReplicaEventLog. Each replica will have a unique ID, created by the provider.
1135      */
1136     private ReplicaEventLog createReplicaEventLog( PartitionTxn partitionTxn, String hostName, String filter ) throws Exception
1137     {
1138         int replicaId = replicaCount.incrementAndGet();
1139 
1140         PROVIDER_LOG.debug( "creating a new event log for the replica with id {}", replicaId );
1141 
1142         ReplicaEventLog/replication/provider/ReplicaEventLog.html#ReplicaEventLog">ReplicaEventLog replicaLog = new ReplicaEventLog( partitionTxn, dirService, replicaId );
1143         replicaLog.setHostName( hostName );
1144         replicaLog.setSearchFilter( filter );
1145 
1146         return replicaLog;
1147     }
1148 
1149 
1150     /**
1151      * Send an error response to he consue r: it has to send a SYNC_REFRESH request first.
1152      */
1153     private void sendESyncRefreshRequired( LdapSession session, SearchRequest req )
1154     {
1155         SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
1156         searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.E_SYNC_REFRESH_REQUIRED );
1157         SyncDoneValue syncDone = new SyncDoneValueImpl();
1158         searchDoneResp.addControl( syncDone );
1159 
1160         session.getIoSession().write( searchDoneResp );
1161     }
1162 
1163 
1164     /**
1165      * Tells if the control contains the REFRESHNPERSIST mode
1166      */
1167     private boolean isRefreshNPersist( SearchRequest req )
1168     {
1169         SyncRequestValue control = ( SyncRequestValue ) req.getControls().get( SyncRequestValue.OID );
1170 
1171         return control.getMode() == SynchronizationModeEnum.REFRESH_AND_PERSIST;
1172     }
1173 
1174 
1175     private File[] getAllReplJournalNames()
1176     {
1177         File replDir = dirService.getInstanceLayout().getReplDirectory();
1178         FilenameFilter filter = new FilenameFilter()
1179         {
1180             @Override
1181             public boolean accept( File dir, String name )
1182             {
1183                 return name.startsWith( ReplicaEventLog.REPLICA_EVENT_LOG_NAME_PREFIX );
1184             }
1185         };
1186 
1187         return replDir.listFiles( filter );
1188     }
1189 
1190     /**
1191      * an event listener for handling deletions and updates of replication event log entries present under ou=consumers,ou=system
1192      */
1193     private class ConsumerLogEntryChangeListener extends DirectoryListenerAdapter
1194     {
1195 
1196         private ReplicaEventLog getEventLog( OperationContext opCtx )
1197         {
1198             Dn consumerLogDn = opCtx.getDn();
1199             String name = ReplicaEventLog.REPLICA_EVENT_LOG_NAME_PREFIX + consumerLogDn.getRdn().getValue();
1200 
1201             for ( ReplicaEventLog log : replicaLogMap.values() )
1202             {
1203                 if ( name.equalsIgnoreCase( log.getName() ) )
1204                 {
1205                     return log;
1206                 }
1207             } // end of for
1208 
1209             return null;
1210         }
1211 
1212 
1213         @Override
1214         public void entryDeleted( DeleteOperationContext deleteContext )
1215         {
1216             // lock this listener instance
1217             synchronized ( this )
1218             {
1219                 ReplicaEventLog log = getEventLog( deleteContext );
1220                 if ( log != null )
1221                 {
1222                     logJanitor.removeEventLog( log );
1223                 }
1224             } // end of synchronized block
1225         } // end of delete method
1226 
1227 
1228         @Override
1229         public void entryModified( ModifyOperationContext modifyContext )
1230         {
1231             List<Modification> mods = modifyContext.getModItems();
1232 
1233             // lock this listener instance
1234             synchronized ( this )
1235             {
1236                 for ( Modification m : mods )
1237                 {
1238                     try
1239                     {
1240                         Attribute at = m.getAttribute();
1241 
1242                         if ( at.isInstanceOf( replLogMaxIdleAT ) )
1243                         {
1244                             ReplicaEventLog log = getEventLog( modifyContext );
1245                             if ( log != null )
1246                             {
1247                                 int maxIdlePeriod = Integer.parseInt( m.getAttribute().getString() );
1248                                 log.setMaxIdlePeriod( maxIdlePeriod );
1249                             }
1250                         }
1251                         else if ( at.isInstanceOf( replLogPurgeThresholdCountAT ) )
1252                         {
1253                             ReplicaEventLog log = getEventLog( modifyContext );
1254                             if ( log != null )
1255                             {
1256                                 int purgeThreshold = Integer.parseInt( m.getAttribute().getString() );
1257                                 log.setPurgeThresholdCount( purgeThreshold );
1258                             }
1259                         }
1260                     }
1261                     catch ( LdapInvalidAttributeValueException e )
1262                     {
1263                         PROVIDER_LOG.warn( "Invalid attribute type", e );
1264                     }
1265                 }
1266             }
1267         }
1268     } // end of listener class
1269 }