1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.directory.server.ldap.replication.provider;
22
23
24 import static java.lang.Math.min;
25 import static org.apache.directory.server.ldap.LdapServer.NO_SIZE_LIMIT;
26 import static org.apache.directory.server.ldap.LdapServer.NO_TIME_LIMIT;
27
28 import java.io.File;
29 import java.io.FilenameFilter;
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40
41 import org.apache.directory.api.ldap.extras.controls.SynchronizationModeEnum;
42 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncDone.SyncDoneValue;
43 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncDone.SyncDoneValueImpl;
44 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncRequest.SyncRequestValue;
45 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateTypeEnum;
46 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValue;
47 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValueImpl;
48 import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValue;
49 import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValueImpl;
50 import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SynchronizationInfoEnum;
51 import org.apache.directory.api.ldap.model.constants.Loggers;
52 import org.apache.directory.api.ldap.model.constants.SchemaConstants;
53 import org.apache.directory.api.ldap.model.cursor.Cursor;
54 import org.apache.directory.api.ldap.model.entry.Attribute;
55 import org.apache.directory.api.ldap.model.entry.Entry;
56 import org.apache.directory.api.ldap.model.entry.Modification;
57 import org.apache.directory.api.ldap.model.entry.Value;
58 import org.apache.directory.api.ldap.model.exception.LdapException;
59 import org.apache.directory.api.ldap.model.exception.LdapInvalidAttributeValueException;
60 import org.apache.directory.api.ldap.model.exception.LdapURLEncodingException;
61 import org.apache.directory.api.ldap.model.filter.AndNode;
62 import org.apache.directory.api.ldap.model.filter.EqualityNode;
63 import org.apache.directory.api.ldap.model.filter.ExprNode;
64 import org.apache.directory.api.ldap.model.filter.GreaterEqNode;
65 import org.apache.directory.api.ldap.model.filter.LessEqNode;
66 import org.apache.directory.api.ldap.model.filter.OrNode;
67 import org.apache.directory.api.ldap.model.filter.PresenceNode;
68 import org.apache.directory.api.ldap.model.message.LdapResult;
69 import org.apache.directory.api.ldap.model.message.ReferralImpl;
70 import org.apache.directory.api.ldap.model.message.Response;
71 import org.apache.directory.api.ldap.model.message.ResultCodeEnum;
72 import org.apache.directory.api.ldap.model.message.SearchRequest;
73 import org.apache.directory.api.ldap.model.message.SearchResultDone;
74 import org.apache.directory.api.ldap.model.message.SearchResultEntry;
75 import org.apache.directory.api.ldap.model.message.SearchResultEntryImpl;
76 import org.apache.directory.api.ldap.model.message.SearchResultReference;
77 import org.apache.directory.api.ldap.model.message.SearchResultReferenceImpl;
78 import org.apache.directory.api.ldap.model.message.SearchScope;
79 import org.apache.directory.api.ldap.model.message.controls.ChangeType;
80 import org.apache.directory.api.ldap.model.message.controls.ManageDsaIT;
81 import org.apache.directory.api.ldap.model.message.controls.SortKey;
82 import org.apache.directory.api.ldap.model.message.controls.SortRequest;
83 import org.apache.directory.api.ldap.model.message.controls.SortRequestImpl;
84 import org.apache.directory.api.ldap.model.name.Dn;
85 import org.apache.directory.api.ldap.model.schema.AttributeType;
86 import org.apache.directory.api.ldap.model.url.LdapUrl;
87 import org.apache.directory.api.util.Strings;
88 import org.apache.directory.server.constants.ServerDNConstants;
89 import org.apache.directory.server.core.api.DirectoryService;
90 import org.apache.directory.server.core.api.event.DirectoryListenerAdapter;
91 import org.apache.directory.server.core.api.event.EventService;
92 import org.apache.directory.server.core.api.event.EventType;
93 import org.apache.directory.server.core.api.event.NotificationCriteria;
94 import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
95 import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
96 import org.apache.directory.server.core.api.interceptor.context.OperationContext;
97 import org.apache.directory.server.core.api.partition.Partition;
98 import org.apache.directory.server.core.api.partition.PartitionTxn;
99 import org.apache.directory.server.i18n.I18n;
100 import org.apache.directory.server.ldap.LdapProtocolUtils;
101 import org.apache.directory.server.ldap.LdapServer;
102 import org.apache.directory.server.ldap.LdapSession;
103 import org.apache.directory.server.ldap.handlers.SearchAbandonListener;
104 import org.apache.directory.server.ldap.handlers.SearchTimeLimitingMonitor;
105 import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
108
109
110
111
112
113
114
115 public class SyncReplRequestHandler implements ReplicationRequestHandler
116 {
117
118 private static final Logger PROVIDER_LOG = LoggerFactory.getLogger( Loggers.PROVIDER_LOG.getName() );
119
120
121 private boolean initialized = false;
122
123
124 private DirectoryService dirService;
125
126
127 protected LdapServer ldapServer;
128
129
130 private AttributeType objectClassAT;
131
132
133 private AttributeType csnAT;
134
135 private Map<Integer, ReplicaEventLog> replicaLogMap = new ConcurrentHashMap<>();
136
137 private File syncReplData;
138
139 private AtomicInteger replicaCount = new AtomicInteger( 0 );
140
141 private ReplConsumerManager replicaUtil;
142
143 private ConsumerLogEntryChangeListener cledListener;
144
145 private ReplicaEventLogJanitor logJanitor;
146
147 private AttributeType replLogMaxIdleAT;
148
149 private AttributeType replLogPurgeThresholdCountAT;
150
151
152 private Thread consumerInfoUpdateThread;
153
154
155
156
157 public SyncReplRequestHandler()
158 {
159 }
160
161
162
163
164
165 public void start( LdapServer server )
166 {
167
168 if ( initialized )
169 {
170 PROVIDER_LOG.warn( "syncrepl provider was already initialized" );
171
172 return;
173 }
174
175 try
176 {
177 PROVIDER_LOG.debug( "initializing the syncrepl provider" );
178
179 this.ldapServer = server;
180 this.dirService = server.getDirectoryService();
181
182 csnAT = dirService.getSchemaManager()
183 .lookupAttributeTypeRegistry( SchemaConstants.ENTRY_CSN_AT );
184
185 objectClassAT = dirService.getSchemaManager()
186 .lookupAttributeTypeRegistry( SchemaConstants.OBJECT_CLASS_AT );
187
188 replLogMaxIdleAT = dirService.getSchemaManager()
189 .lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_LOG_MAX_IDLE );
190
191 replLogPurgeThresholdCountAT = dirService.getSchemaManager()
192 .lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_LOG_PURGE_THRESHOLD_COUNT );
193
194
195 syncReplData = dirService.getInstanceLayout().getReplDirectory();
196
197 if ( !syncReplData.exists() && !syncReplData.mkdirs() )
198 {
199 throw new IOException( I18n.err( I18n.ERR_112_COULD_NOT_CREATE_DIRECTORY, syncReplData ) );
200 }
201
202
203 replicaUtil = new ReplConsumerManager( dirService );
204
205 loadReplicaInfo();
206
207 logJanitor = new ReplicaEventLogJanitor( dirService, replicaLogMap );
208 logJanitor.start();
209
210 registerPersistentSearches();
211
212 cledListener = new ConsumerLogEntryChangeListener();
213 NotificationCriteriai/event/NotificationCriteria.html#NotificationCriteria">NotificationCriteria criteria = new NotificationCriteria( dirService.getSchemaManager() );
214 criteria.setBase( new Dn( dirService.getSchemaManager(), ServerDNConstants.REPL_CONSUMER_DN_STR ) );
215 criteria.setEventMask( EventType.DELETE );
216
217 dirService.getEventService().addListener( cledListener, criteria );
218
219 CountDownLatch latch = new CountDownLatch( 1 );
220
221 consumerInfoUpdateThread = new Thread( createConsumerInfoUpdateTask( latch ) );
222 consumerInfoUpdateThread.setDaemon( true );
223 consumerInfoUpdateThread.start();
224
225
226
227 boolean threadInitDone = latch.await( 5, TimeUnit.MINUTES );
228
229 if ( !threadInitDone )
230 {
231
232 PROVIDER_LOG.error( "The consumer replica thread has not been initialized in time" );
233 throw new RuntimeException( "Cannot initialize the Provider replica listener" );
234 }
235
236 initialized = true;
237 PROVIDER_LOG.debug( "syncrepl provider initialized successfully" );
238 }
239 catch ( Exception e )
240 {
241 PROVIDER_LOG.error( "Failed to initialize the log files required by the syncrepl provider", e );
242 throw new RuntimeException( e );
243 }
244 }
245
246
247
248
249
250 public void stop()
251 {
252 EventService evtSrv = dirService.getEventService();
253
254 evtSrv.removeListener( cledListener );
255
256 logJanitor.stopCleaning();
257
258 logJanitor.interrupt();
259
260
261 consumerInfoUpdateThread.interrupt();
262
263 for ( ReplicaEventLog log : replicaLogMap.values() )
264 {
265 try
266 {
267 PROVIDER_LOG.debug( "Stopping the logging for replica {}", log.getId() );
268 evtSrv.removeListener( log.getPersistentListener() );
269 log.stop();
270 }
271 catch ( Exception e )
272 {
273 PROVIDER_LOG.error( "Failed to close the event log {}", log.getId(), e );
274 }
275 }
276
277
278 storeReplicaInfo();
279
280 initialized = false;
281 }
282
283
284
285
286
287
288
289
290 public void handleSyncRequest( LdapSession session, SearchRequest request ) throws LdapException
291 {
292 PROVIDER_LOG.debug( "Received a Syncrepl request : {} from {}", request, session );
293 try
294 {
295 if ( !request.getAttributes().contains( SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES ) )
296 {
297
298 request.addAttributes( SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES );
299 }
300
301
302 SyncRequestValue syncControl = ( SyncRequestValue ) request.getControls().get(
303 SyncRequestValue.OID );
304
305
306 byte[] cookieBytes = syncControl.getCookie();
307
308 if ( cookieBytes == null )
309 {
310 PROVIDER_LOG.debug( "Received a replication request with no cookie" );
311
312
313 doInitialRefresh( session, request );
314 }
315 else
316 {
317 String cookieString = Strings.utf8ToString( cookieBytes );
318
319 PROVIDER_LOG.debug( "Received a replication request {} with a cookie '{}'", request, cookieString );
320
321 if ( !LdapProtocolUtils.isValidCookie( cookieString ) )
322 {
323 PROVIDER_LOG.error( "received an invalid cookie {} from the consumer with session {}",
324 cookieString,
325 session );
326 sendESyncRefreshRequired( session, request );
327 }
328 else
329 {
330 ReplicaEventLog clientMsgLog = getReplicaEventLog( cookieString );
331
332 if ( clientMsgLog == null )
333 {
334 PROVIDER_LOG.debug(
335 "received a valid cookie {} but there is no event log associated with this replica",
336 cookieString );
337 sendESyncRefreshRequired( session, request );
338 }
339 else
340 {
341 String consumerCsn = LdapProtocolUtils.getCsn( cookieString );
342 doContentUpdate( session, request, clientMsgLog, consumerCsn );
343 }
344 }
345 }
346 }
347 catch ( Exception e )
348 {
349 PROVIDER_LOG.error( "Failed to handle the syncrepl request", e );
350
351 throw new LdapException( e.getMessage(), e );
352 }
353 }
354
355
356
357
358
359 private void sendContentFromLog( LdapSession session, SearchRequest req, ReplicaEventLog clientMsgLog,
360 String fromCsn )
361 throws Exception
362 {
363
364 String lastSentCsn = fromCsn;
365
366 ReplicaJournalCursor cursor = clientMsgLog.getCursor( fromCsn );
367
368 PROVIDER_LOG.debug( "Processing the log for replica {}", clientMsgLog.getId() );
369
370 try
371 {
372 while ( cursor.next() )
373 {
374 ReplicaEventMessage replicaEventMessage = cursor.get();
375 Entry entry = replicaEventMessage.getEntry();
376 PROVIDER_LOG.debug( "Read message from the queue {}", entry );
377
378 lastSentCsn = entry.get( csnAT ).getString();
379
380 ChangeType changeType = replicaEventMessage.getChangeType();
381
382 SyncStateTypeEnum syncStateType = null;
383
384 switch ( changeType )
385 {
386 case ADD:
387 syncStateType = SyncStateTypeEnum.ADD;
388 break;
389
390 case MODIFY:
391 syncStateType = SyncStateTypeEnum.MODIFY;
392 break;
393
394 case MODDN:
395 syncStateType = SyncStateTypeEnum.MODDN;
396 break;
397
398 case DELETE:
399 syncStateType = SyncStateTypeEnum.DELETE;
400 break;
401
402 default:
403 throw new IllegalStateException( I18n.err( I18n.ERR_686 ) );
404 }
405
406 sendSearchResultEntry( session, req, entry, syncStateType );
407
408 clientMsgLog.setLastSentCsn( lastSentCsn );
409
410 PROVIDER_LOG.debug( "The latest entry sent to the consumer {} has this CSN : {}", clientMsgLog.getId(),
411 lastSentCsn );
412 }
413
414 PROVIDER_LOG.debug( "All pending modifciations for replica {} processed", clientMsgLog.getId() );
415 }
416 finally
417 {
418 cursor.close();
419 }
420 }
421
422
423
424
425
426
427 private void doContentUpdate( LdapSession session, SearchRequest req, ReplicaEventLog replicaLog, String consumerCsn )
428 throws Exception
429 {
430 synchronized ( replicaLog )
431 {
432 boolean refreshNPersist = isRefreshNPersist( req );
433
434
435
436
437 if ( refreshNPersist )
438 {
439 SyncReplSearchListener handler = replicaLog.getPersistentListener();
440 handler.setSearchRequest( req );
441 handler.setSession( session );
442 }
443
444 sendContentFromLog( session, req, replicaLog, consumerCsn );
445
446 String lastSentCsn = replicaLog.getLastSentCsn();
447
448 byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), lastSentCsn );
449
450 if ( refreshNPersist )
451 {
452 SyncInfoValue syncInfoValue = new SyncInfoValueImpl();
453 syncInfoValue.setSyncInfoValueType( SynchronizationInfoEnum.NEW_COOKIE );
454 syncInfoValue.setMessageId( req.getMessageId() );
455 syncInfoValue.setCookie( cookie );
456
457 PROVIDER_LOG.debug( "Sent the intermediate response to the {} consumer, {}", replicaLog.getId(),
458 syncInfoValue );
459 session.getIoSession().write( syncInfoValue );
460
461 replicaLog.getPersistentListener().setPushInRealTime( refreshNPersist );
462 }
463 else
464 {
465 SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
466 searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.SUCCESS );
467 SyncDoneValue syncDone = new SyncDoneValueImpl();
468 syncDone.setCookie( cookie );
469 searchDoneResp.addControl( syncDone );
470
471 PROVIDER_LOG.debug( "Send a SearchResultDone response to the {} consumer", replicaLog.getId(),
472 searchDoneResp );
473
474 session.getIoSession().write( searchDoneResp );
475 }
476 }
477 }
478
479
480
481
482
483 private void doInitialRefresh( LdapSession session, SearchRequest request ) throws Exception
484 {
485 PROVIDER_LOG.debug( "Starting an initial refresh" );
486
487 SortRequest ctrl = ( SortRequest ) request.getControl( SortRequest.OID );
488
489 if ( ctrl != null )
490 {
491 PROVIDER_LOG
492 .warn( "Removing the received sort control from the syncrepl search request during initial refresh" );
493 request.removeControl( ctrl );
494 }
495
496 PROVIDER_LOG
497 .debug( "Adding sort control to sort the entries by entryDn attribute to preserve order of insertion" );
498 SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT );
499
500 sk.setMatchingRuleId( SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID );
501 sk.setReverseOrder( true );
502
503 ctrl = new SortRequestImpl();
504 ctrl.addSortKey( sk );
505
506 request.addControl( ctrl );
507
508 String originalFilter = request.getFilter().toString();
509 InetSocketAddress address = ( InetSocketAddress ) session.getIoSession().getRemoteAddress();
510 String hostName = address.getAddress().getHostName();
511
512 ExprNode modifiedFilter = modifyFilter( session, request );
513
514 Partition partition = dirService.getPartitionNexus().getPartition( request.getBase() );
515 String contextCsn;
516
517 try ( PartitionTxn partitionTxn = partition.beginReadTransaction() )
518 {
519 contextCsn = partition.getContextCsn( partitionTxn );
520 }
521
522 boolean refreshNPersist = isRefreshNPersist( request );
523
524
525
526 ReplicaEventLog replicaLog = null;
527
528 try ( PartitionTxn partitionTxn = partition.beginReadTransaction() )
529 {
530 replicaLog = createReplicaEventLog( partitionTxn, hostName, originalFilter );
531 }
532
533 replicaLog.setRefreshNPersist( refreshNPersist );
534 Value contexCsnValue = new Value( dirService.getAtProvider().getEntryCSN(), contextCsn );
535
536
537 GreaterEqNode csnGeNode = new GreaterEqNode( csnAT, contexCsnValue );
538 ExprNode postInitContentFilter = new AndNode( modifiedFilter, csnGeNode );
539 request.setFilter( postInitContentFilter );
540
541
542
543
544 PROVIDER_LOG.debug( "Starting the replicaLog {}", replicaLog );
545
546
547 SyncReplSearchListenervider/SyncReplSearchListener.html#SyncReplSearchListener">SyncReplSearchListener replicationListener = new SyncReplSearchListener( session, request, replicaLog, false );
548 replicaLog.setPersistentListener( replicationListener );
549
550
551
552
553 NotificationCriteriai/event/NotificationCriteria.html#NotificationCriteria">NotificationCriteria criteria = new NotificationCriteria( dirService.getSchemaManager() );
554 criteria.setAliasDerefMode( request.getDerefAliases() );
555 criteria.setBase( request.getBase() );
556 criteria.setFilter( request.getFilter() );
557 criteria.setScope( request.getScope() );
558 criteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
559
560 replicaLog.setSearchCriteria( criteria );
561
562 dirService.getEventService().addListener( replicationListener, criteria );
563
564
565 LessEqNode csnNode = new LessEqNode( csnAT, contexCsnValue );
566
567
568 ExprNode initialContentFilter = new AndNode( modifiedFilter, csnNode );
569 request.setFilter( initialContentFilter );
570
571
572 SearchResultDone searchDoneResp = doSimpleSearch( session, request, replicaLog );
573
574 if ( searchDoneResp.getLdapResult().getResultCode() == ResultCodeEnum.SUCCESS )
575 {
576 if ( replicaLog.getLastSentCsn() == null )
577 {
578 replicaLog.setLastSentCsn( contextCsn );
579 }
580
581 if ( refreshNPersist )
582 {
583 PROVIDER_LOG
584 .debug( "Refresh&Persist requested : send the data being modified since the initial refresh" );
585
586 sendContentFromLog( session, request, replicaLog, contextCsn );
587
588 byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), replicaLog.getLastSentCsn() );
589
590 SyncInfoValue syncInfoValue = new SyncInfoValueImpl();
591 syncInfoValue.setSyncInfoValueType( SynchronizationInfoEnum.NEW_COOKIE );
592 syncInfoValue.setMessageId( request.getMessageId() );
593 syncInfoValue.setCookie( cookie );
594
595 PROVIDER_LOG.info( "Sending the intermediate response to consumer {}, {}",
596 replicaLog, syncInfoValue );
597
598 session.getIoSession().write( syncInfoValue );
599
600
601 replicationListener.setPushInRealTime( refreshNPersist );
602 PROVIDER_LOG.debug( "e waiting for any modification for {}", replicaLog );
603 }
604 else
605 {
606 PROVIDER_LOG.debug( "RefreshOnly requested" );
607 byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), contextCsn );
608
609
610 SyncDoneValue syncDone = new SyncDoneValueImpl();
611 syncDone.setCookie( cookie );
612 searchDoneResp.addControl( syncDone );
613 PROVIDER_LOG.info( "Sending the searchResultDone response to consumer {}, {}", replicaLog,
614 searchDoneResp );
615
616 session.getIoSession().write( searchDoneResp );
617 }
618 }
619 else
620
621 {
622 PROVIDER_LOG.warn( "initial content refresh didn't succeed due to {}", searchDoneResp.getLdapResult()
623 .getResultCode() );
624 replicaLog.stop();
625 replicaLog = null;
626
627
628 dirService.getEventService().removeListener( replicationListener );
629
630 return;
631 }
632
633
634 replicaUtil.addConsumerEntry( replicaLog );
635
636
637 replicaLogMap.put( replicaLog.getId(), replicaLog );
638 }
639
640
641
642
643
644
645 private SearchResultDone doSimpleSearch( LdapSession session, SearchRequest req, ReplicaEventLog replicaLog )
646 throws Exception
647 {
648 PROVIDER_LOG.debug( "Simple Search {} for {}", req, session );
649 SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
650 LdapResult ldapResult = searchDoneResp.getLdapResult();
651
652
653
654
655 Cursor<Entry> cursor = session.getCoreSession().search( req );
656
657
658 cursor.beforeFirst();
659
660
661
662
663
664 try
665 {
666
667
668 long serverLimit = getServerSizeLimit( session, req );
669
670 long requestLimit = req.getSizeLimit() == 0L ? Long.MAX_VALUE : req.getSizeLimit();
671
672 req.addAbandonListener( new SearchAbandonListener( ldapServer, cursor ) );
673 setTimeLimitsOnCursor( req, session, cursor );
674 PROVIDER_LOG.debug( "search operation requested size limit {}, server size limit {}", requestLimit,
675 serverLimit );
676 long sizeLimit = min( requestLimit, serverLimit );
677
678 readResults( session, req, ldapResult, cursor, sizeLimit, replicaLog );
679 }
680 finally
681 {
682 if ( cursor != null )
683 {
684 try
685 {
686 cursor.close();
687 }
688 catch ( Exception e )
689 {
690 PROVIDER_LOG.error( I18n.err( I18n.ERR_168 ), e );
691 }
692 }
693 }
694
695 PROVIDER_LOG.debug( "Search done" );
696
697 return searchDoneResp;
698 }
699
700
701
702
703
704 private void readResults( LdapSession session, SearchRequest req, LdapResult ldapResult,
705 Cursor<Entry> cursor, long sizeLimit, ReplicaEventLog replicaLog ) throws Exception
706 {
707 long count = 0;
708
709 while ( ( count < sizeLimit ) && cursor.next() )
710 {
711
712 if ( session.getIoSession().isClosing() )
713 {
714
715 PROVIDER_LOG.debug( "Request terminated for message {}, the client has closed the session",
716 req.getMessageId() );
717 break;
718 }
719
720 if ( req.isAbandoned() )
721 {
722
723 PROVIDER_LOG.debug( "Request terminated by an AbandonRequest for message {}", req.getMessageId() );
724 break;
725 }
726
727 Entry entry = cursor.get();
728
729 sendSearchResultEntry( session, req, entry, SyncStateTypeEnum.ADD );
730
731 String lastSentCsn = entry.get( csnAT ).getString();
732 replicaLog.setLastSentCsn( lastSentCsn );
733
734 count++;
735 }
736
737 PROVIDER_LOG.debug( "Sent {} entries for {}", count, replicaLog );
738
739
740 ldapResult.setResultCode( ResultCodeEnum.SUCCESS );
741
742 if ( ( count >= sizeLimit ) && ( cursor.next() ) )
743 {
744
745
746
747 cursor.previous();
748
749 ldapResult.setResultCode( ResultCodeEnum.SIZE_LIMIT_EXCEEDED );
750 }
751 }
752
753
754
755
756
757
758 private void sendSearchResultEntry( LdapSession session, SearchRequest req, Entry entry,
759 SyncStateTypeEnum syncStateType ) throws Exception
760 {
761 Attribute uuid = entry.get( SchemaConstants.ENTRY_UUID_AT );
762
763
764 SyncStateValue syncStateControl = new SyncStateValueImpl();
765 syncStateControl.setSyncStateType( syncStateType );
766 syncStateControl.setEntryUUID( Strings.uuidToBytes( uuid.getString() ) );
767
768 if ( syncStateType == SyncStateTypeEnum.DELETE )
769 {
770
771 entry.clear();
772 entry.add( uuid );
773 }
774
775 Response resp = generateResponse( session, req, entry );
776 resp.addControl( syncStateControl );
777
778 PROVIDER_LOG.debug( "Sending the entry:\n {}", resp );
779 session.getIoSession().write( resp );
780 }
781
782
783
784
785
786 private Response generateResponse( LdapSession session, SearchRequest req, Entry entry ) throws Exception
787 {
788 Attribute ref = entry.get( SchemaConstants.REF_AT );
789 boolean hasManageDsaItControl = req.getControls().containsKey( ManageDsaIT.OID );
790
791 if ( ( ref != null ) && !hasManageDsaItControl )
792 {
793
794 SearchResultReference respRef;
795 respRef = new SearchResultReferenceImpl( req.getMessageId() );
796 respRef.setReferral( new ReferralImpl() );
797
798 for ( Value val : ref )
799 {
800 String url = val.getString();
801
802 if ( !url.startsWith( "ldap" ) )
803 {
804 respRef.getReferral().addLdapUrl( url );
805 }
806
807 LdapUrl ldapUrl = null;
808
809 try
810 {
811 ldapUrl = new LdapUrl( url );
812 ldapUrl.setForceScopeRendering( true );
813 }
814 catch ( LdapURLEncodingException e )
815 {
816 PROVIDER_LOG.error( I18n.err( I18n.ERR_165, url, entry ) );
817 }
818
819 switch ( req.getScope() )
820 {
821 case SUBTREE:
822 ldapUrl.setScope( SearchScope.SUBTREE.getScope() );
823 break;
824
825 case ONELEVEL:
826 ldapUrl.setScope( SearchScope.OBJECT.getScope() );
827 break;
828
829 default:
830 throw new IllegalStateException( I18n.err( I18n.ERR_686 ) );
831 }
832
833 respRef.getReferral().addLdapUrl( ldapUrl.toString() );
834 }
835
836 return respRef;
837 }
838 else
839 {
840
841 SearchResultEntry respEntry;
842 respEntry = new SearchResultEntryImpl( req.getMessageId() );
843 respEntry.setEntry( entry );
844 respEntry.setObjectName( entry.getDn() );
845
846 return respEntry;
847 }
848 }
849
850
851
852
853
854 private long getServerSizeLimit( LdapSession session, SearchRequest request )
855 {
856 if ( session.getCoreSession().isAnAdministrator() )
857 {
858 if ( request.getSizeLimit() == NO_SIZE_LIMIT )
859 {
860 return Long.MAX_VALUE;
861 }
862 else
863 {
864 return request.getSizeLimit();
865 }
866 }
867 else
868 {
869 if ( ldapServer.getMaxSizeLimit() == NO_SIZE_LIMIT )
870 {
871 return Long.MAX_VALUE;
872 }
873 else
874 {
875 return ldapServer.getMaxSizeLimit();
876 }
877 }
878 }
879
880
881 private void setTimeLimitsOnCursor( SearchRequest req, LdapSession session,
882 final Cursor<Entry> cursor )
883 {
884
885 if ( session.getCoreSession().isAnAdministrator() && req.getTimeLimit() == NO_TIME_LIMIT )
886 {
887 return;
888 }
889
890
891
892
893
894
895 if ( ldapServer.getMaxTimeLimit() == NO_TIME_LIMIT && req.getTimeLimit() == NO_TIME_LIMIT )
896 {
897 return;
898 }
899
900
901
902
903
904
905 if ( req.getTimeLimit() == 0 )
906 {
907 cursor.setClosureMonitor( new SearchTimeLimitingMonitor( ldapServer.getMaxTimeLimit(), TimeUnit.SECONDS ) );
908 return;
909 }
910
911
912
913
914
915
916 if ( ldapServer.getMaxTimeLimit() >= req.getTimeLimit() )
917 {
918 cursor.setClosureMonitor( new SearchTimeLimitingMonitor( req.getTimeLimit(), TimeUnit.SECONDS ) );
919 return;
920 }
921
922
923
924
925
926
927 cursor.setClosureMonitor( new SearchTimeLimitingMonitor( ldapServer.getMaxTimeLimit(), TimeUnit.SECONDS ) );
928 }
929
930
931 public ExprNode modifyFilter( LdapSession session, SearchRequest req ) throws Exception
932 {
933
934
935
936
937
938
939
940
941 boolean isOcPresenceFilter = false;
942
943 if ( req.getFilter() instanceof PresenceNode )
944 {
945 PresenceNode presenceNode = ( PresenceNode ) req.getFilter();
946
947 AttributeType at = session.getCoreSession().getDirectoryService().getSchemaManager()
948 .lookupAttributeTypeRegistry( presenceNode.getAttribute() );
949
950 if ( at.getOid().equals( SchemaConstants.OBJECT_CLASS_AT_OID ) )
951 {
952 isOcPresenceFilter = true;
953 }
954 }
955
956 ExprNode filter = req.getFilter();
957
958 if ( !req.hasControl( ManageDsaIT.OID ) && !isOcPresenceFilter )
959 {
960 filter = new OrNode( req.getFilter(), newIsReferralEqualityNode( session ) );
961 }
962
963 return filter;
964 }
965
966
967 public ReplicaEventLogJanitor getLogJanitor()
968 {
969 return logJanitor;
970 }
971
972
973 public Map<Integer, ReplicaEventLog> getReplicaLogMap()
974 {
975 return replicaLogMap;
976 }
977
978
979 private EqualityNode<String> newIsReferralEqualityNode( LdapSession session ) throws Exception
980 {
981 return new EqualityNode<>( SchemaConstants.OBJECT_CLASS_AT,
982 new Value( objectClassAT, SchemaConstants.REFERRAL_OC ).getString() );
983 }
984
985
986
987
988
989
990 private void storeReplicaInfo()
991 {
992 try
993 {
994 for ( Map.Entry<Integer, ReplicaEventLog> e : replicaLogMap.entrySet() )
995 {
996 ReplicaEventLog replica = e.getValue();
997
998 if ( replica.isDirty() )
999 {
1000 PROVIDER_LOG.debug( "updating the details of replica {}", replica );
1001 replicaUtil.updateReplicaLastSentCsn( replica );
1002 replica.setDirty( false );
1003 }
1004 }
1005 }
1006 catch ( Exception e )
1007 {
1008 PROVIDER_LOG.error( "Failed to store the replica information", e );
1009 }
1010 }
1011
1012
1013
1014
1015
1016 private void loadReplicaInfo()
1017 {
1018 try
1019 {
1020 List<ReplicaEventLog> eventLogs = replicaUtil.getReplicaEventLogs();
1021 Set<String> eventLogNames = new HashSet<>();
1022
1023 if ( !eventLogs.isEmpty() )
1024 {
1025 for ( ReplicaEventLog replica : eventLogs )
1026 {
1027 PROVIDER_LOG.debug( "initializing the replica log from {}", replica.getId() );
1028 replicaLogMap.put( replica.getId(), replica );
1029 eventLogNames.add( replica.getName() );
1030
1031
1032 if ( replicaCount.get() < replica.getId() )
1033 {
1034 replicaCount.set( replica.getId() );
1035 }
1036 }
1037 }
1038 else
1039 {
1040 PROVIDER_LOG.debug( "no replica logs found to initialize" );
1041 }
1042
1043
1044 for ( File f : getAllReplJournalNames() )
1045 {
1046 if ( !eventLogNames.contains( f.getName() ) )
1047 {
1048 f.delete();
1049 PROVIDER_LOG.info( "removed unused replication event log {}", f );
1050 }
1051 }
1052 }
1053 catch ( Exception e )
1054 {
1055 PROVIDER_LOG.error( "Failed to load the replica information", e );
1056 }
1057 }
1058
1059
1060
1061
1062
1063 private void registerPersistentSearches() throws Exception
1064 {
1065 for ( Map.Entry<Integer, ReplicaEventLog> e : replicaLogMap.entrySet() )
1066 {
1067 ReplicaEventLog log = e.getValue();
1068
1069 if ( log.getSearchCriteria() != null )
1070 {
1071 PROVIDER_LOG.debug( "registering persistent search for the replica {}", log.getId() );
1072 SyncReplSearchListenerlication/provider/SyncReplSearchListener.html#SyncReplSearchListener">SyncReplSearchListener handler = new SyncReplSearchListener( null, null, log, false );
1073 log.setPersistentListener( handler );
1074
1075 dirService.getEventService().addListener( handler, log.getSearchCriteria() );
1076 }
1077 else
1078 {
1079 PROVIDER_LOG.warn( "invalid persistent search criteria {} for the replica {}", log.getSearchCriteria(),
1080 log
1081 .getId() );
1082 }
1083 }
1084 }
1085
1086
1087
1088
1089
1090 private Runnable createConsumerInfoUpdateTask( final CountDownLatch latch )
1091 {
1092 return new Runnable()
1093 {
1094 public void run()
1095 {
1096 try
1097 {
1098 while ( true )
1099 {
1100 storeReplicaInfo();
1101
1102 latch.countDown();
1103 Thread.sleep( 10000 );
1104 }
1105 }
1106 catch ( InterruptedException e )
1107 {
1108
1109 PROVIDER_LOG.debug( "thread storing the replica information was interrupted", e );
1110 }
1111 }
1112 };
1113 }
1114
1115
1116
1117
1118
1119 private ReplicaEventLog getReplicaEventLog( String cookieString )
1120 {
1121 ReplicaEventLog replicaLog = null;
1122
1123 if ( LdapProtocolUtils.isValidCookie( cookieString ) )
1124 {
1125 int clientId = LdapProtocolUtils.getReplicaId( cookieString );
1126 replicaLog = replicaLogMap.get( clientId );
1127 }
1128
1129 return replicaLog;
1130 }
1131
1132
1133
1134
1135
1136 private ReplicaEventLog createReplicaEventLog( PartitionTxn partitionTxn, String hostName, String filter ) throws Exception
1137 {
1138 int replicaId = replicaCount.incrementAndGet();
1139
1140 PROVIDER_LOG.debug( "creating a new event log for the replica with id {}", replicaId );
1141
1142 ReplicaEventLog/replication/provider/ReplicaEventLog.html#ReplicaEventLog">ReplicaEventLog replicaLog = new ReplicaEventLog( partitionTxn, dirService, replicaId );
1143 replicaLog.setHostName( hostName );
1144 replicaLog.setSearchFilter( filter );
1145
1146 return replicaLog;
1147 }
1148
1149
1150
1151
1152
1153 private void sendESyncRefreshRequired( LdapSession session, SearchRequest req )
1154 {
1155 SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
1156 searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.E_SYNC_REFRESH_REQUIRED );
1157 SyncDoneValue syncDone = new SyncDoneValueImpl();
1158 searchDoneResp.addControl( syncDone );
1159
1160 session.getIoSession().write( searchDoneResp );
1161 }
1162
1163
1164
1165
1166
1167 private boolean isRefreshNPersist( SearchRequest req )
1168 {
1169 SyncRequestValue control = ( SyncRequestValue ) req.getControls().get( SyncRequestValue.OID );
1170
1171 return control.getMode() == SynchronizationModeEnum.REFRESH_AND_PERSIST;
1172 }
1173
1174
1175 private File[] getAllReplJournalNames()
1176 {
1177 File replDir = dirService.getInstanceLayout().getReplDirectory();
1178 FilenameFilter filter = new FilenameFilter()
1179 {
1180 @Override
1181 public boolean accept( File dir, String name )
1182 {
1183 return name.startsWith( ReplicaEventLog.REPLICA_EVENT_LOG_NAME_PREFIX );
1184 }
1185 };
1186
1187 return replDir.listFiles( filter );
1188 }
1189
1190
1191
1192
1193 private class ConsumerLogEntryChangeListener extends DirectoryListenerAdapter
1194 {
1195
1196 private ReplicaEventLog getEventLog( OperationContext opCtx )
1197 {
1198 Dn consumerLogDn = opCtx.getDn();
1199 String name = ReplicaEventLog.REPLICA_EVENT_LOG_NAME_PREFIX + consumerLogDn.getRdn().getValue();
1200
1201 for ( ReplicaEventLog log : replicaLogMap.values() )
1202 {
1203 if ( name.equalsIgnoreCase( log.getName() ) )
1204 {
1205 return log;
1206 }
1207 }
1208
1209 return null;
1210 }
1211
1212
1213 @Override
1214 public void entryDeleted( DeleteOperationContext deleteContext )
1215 {
1216
1217 synchronized ( this )
1218 {
1219 ReplicaEventLog log = getEventLog( deleteContext );
1220 if ( log != null )
1221 {
1222 logJanitor.removeEventLog( log );
1223 }
1224 }
1225 }
1226
1227
1228 @Override
1229 public void entryModified( ModifyOperationContext modifyContext )
1230 {
1231 List<Modification> mods = modifyContext.getModItems();
1232
1233
1234 synchronized ( this )
1235 {
1236 for ( Modification m : mods )
1237 {
1238 try
1239 {
1240 Attribute at = m.getAttribute();
1241
1242 if ( at.isInstanceOf( replLogMaxIdleAT ) )
1243 {
1244 ReplicaEventLog log = getEventLog( modifyContext );
1245 if ( log != null )
1246 {
1247 int maxIdlePeriod = Integer.parseInt( m.getAttribute().getString() );
1248 log.setMaxIdlePeriod( maxIdlePeriod );
1249 }
1250 }
1251 else if ( at.isInstanceOf( replLogPurgeThresholdCountAT ) )
1252 {
1253 ReplicaEventLog log = getEventLog( modifyContext );
1254 if ( log != null )
1255 {
1256 int purgeThreshold = Integer.parseInt( m.getAttribute().getString() );
1257 log.setPurgeThresholdCount( purgeThreshold );
1258 }
1259 }
1260 }
1261 catch ( LdapInvalidAttributeValueException e )
1262 {
1263 PROVIDER_LOG.warn( "Invalid attribute type", e );
1264 }
1265 }
1266 }
1267 }
1268 }
1269 }