1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.server.ldap.replication.consumer;
21
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28
29 import org.apache.commons.collections4.map.LRUMap;
30 import org.apache.directory.api.ldap.extras.controls.SynchronizationModeEnum;
31 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncDone.SyncDoneValue;
32 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncRequest.SyncRequestValue;
33 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncRequest.SyncRequestValueImpl;
34 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateTypeEnum;
35 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValue;
36 import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValue;
37 import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValueImpl;
38 import org.apache.directory.api.ldap.model.constants.Loggers;
39 import org.apache.directory.api.ldap.model.constants.SchemaConstants;
40 import org.apache.directory.api.ldap.model.csn.Csn;
41 import org.apache.directory.api.ldap.model.cursor.Cursor;
42 import org.apache.directory.api.ldap.model.entry.Attribute;
43 import org.apache.directory.api.ldap.model.entry.DefaultAttribute;
44 import org.apache.directory.api.ldap.model.entry.DefaultEntry;
45 import org.apache.directory.api.ldap.model.entry.DefaultModification;
46 import org.apache.directory.api.ldap.model.entry.Entry;
47 import org.apache.directory.api.ldap.model.entry.Modification;
48 import org.apache.directory.api.ldap.model.entry.ModificationOperation;
49 import org.apache.directory.api.ldap.model.exception.LdapException;
50 import org.apache.directory.api.ldap.model.exception.LdapNoSuchObjectException;
51 import org.apache.directory.api.ldap.model.filter.AndNode;
52 import org.apache.directory.api.ldap.model.filter.EqualityNode;
53 import org.apache.directory.api.ldap.model.filter.ExprNode;
54 import org.apache.directory.api.ldap.model.filter.NotNode;
55 import org.apache.directory.api.ldap.model.filter.OrNode;
56 import org.apache.directory.api.ldap.model.filter.PresenceNode;
57 import org.apache.directory.api.ldap.model.message.AliasDerefMode;
58 import org.apache.directory.api.ldap.model.message.IntermediateResponse;
59 import org.apache.directory.api.ldap.model.message.Response;
60 import org.apache.directory.api.ldap.model.message.ResultCodeEnum;
61 import org.apache.directory.api.ldap.model.message.SearchRequest;
62 import org.apache.directory.api.ldap.model.message.SearchRequestImpl;
63 import org.apache.directory.api.ldap.model.message.SearchResultDone;
64 import org.apache.directory.api.ldap.model.message.SearchResultEntry;
65 import org.apache.directory.api.ldap.model.message.SearchResultReference;
66 import org.apache.directory.api.ldap.model.message.SearchScope;
67 import org.apache.directory.api.ldap.model.message.controls.ManageDsaITImpl;
68 import org.apache.directory.api.ldap.model.message.controls.SortKey;
69 import org.apache.directory.api.ldap.model.message.controls.SortRequest;
70 import org.apache.directory.api.ldap.model.message.controls.SortRequestImpl;
71 import org.apache.directory.api.ldap.model.name.Dn;
72 import org.apache.directory.api.ldap.model.name.Rdn;
73 import org.apache.directory.api.ldap.model.schema.AttributeType;
74 import org.apache.directory.api.ldap.model.schema.SchemaManager;
75 import org.apache.directory.api.util.StringConstants;
76 import org.apache.directory.api.util.Strings;
77 import org.apache.directory.ldap.client.api.ConnectionClosedEventListener;
78 import org.apache.directory.ldap.client.api.LdapNetworkConnection;
79 import org.apache.directory.ldap.client.api.future.SearchFuture;
80 import org.apache.directory.server.constants.ApacheSchemaConstants;
81 import org.apache.directory.server.core.api.CoreSession;
82 import org.apache.directory.server.core.api.DirectoryService;
83 import org.apache.directory.server.core.api.OperationManager;
84 import org.apache.directory.server.core.api.interceptor.context.AddOperationContext;
85 import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
86 import org.apache.directory.server.core.api.interceptor.context.LookupOperationContext;
87 import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
88 import org.apache.directory.server.core.api.interceptor.context.MoveAndRenameOperationContext;
89 import org.apache.directory.server.core.api.interceptor.context.MoveOperationContext;
90 import org.apache.directory.server.core.api.interceptor.context.RenameOperationContext;
91 import org.apache.directory.server.core.api.partition.Partition;
92 import org.apache.directory.server.core.api.partition.PartitionTxn;
93 import org.apache.directory.server.ldap.LdapProtocolUtils;
94 import org.apache.directory.server.ldap.replication.ReplicationConsumerConfig;
95 import org.apache.directory.server.ldap.replication.SyncReplConfiguration;
96 import org.slf4j.Logger;
97 import org.slf4j.LoggerFactory;
98 import org.slf4j.MDC;
99
100
101
102
103
104
105
106 public class ReplicationConsumerImpl implements ConnectionClosedEventListener, ReplicationConsumer
107 {
108
109 private static final Logger CONSUMER_LOG = LoggerFactory.getLogger( Loggers.CONSUMER_LOG.getName() );
110
111
112 private SyncReplConfiguration config;
113
114
115 private byte[] syncCookie;
116
117
118 private LdapNetworkConnection connection;
119
120
121 private SearchRequest searchRequest;
122
123
124 private DirectoryService directoryService;
125
126
127 private SchemaManager schemaManager;
128
129
130 private volatile boolean disconnected;
131
132
133 private CoreSession session;
134
135
136 private static final String[] MOD_IGNORE_AT = new String[]
137 {
138 SchemaConstants.ENTRY_UUID_AT,
139 SchemaConstants.ENTRY_DN_AT,
140 SchemaConstants.CREATE_TIMESTAMP_AT,
141 SchemaConstants.CREATORS_NAME_AT,
142 ApacheSchemaConstants.ENTRY_PARENT_ID_AT,
143 SchemaConstants.COLLECTIVE_ATTRIBUTE_SUBENTRIES_AT,
144 SchemaConstants.CONTEXT_CSN_AT,
145 ApacheSchemaConstants.NB_CHILDREN_AT,
146 ApacheSchemaConstants.NB_SUBORDINATES_AT,
147 SchemaConstants.HAS_SUBORDINATES_AT,
148 SchemaConstants.STRUCTURAL_OBJECT_CLASS_AT,
149 };
150
151
152 private byte[] lastSavedCookie;
153
154 private volatile boolean reload = false;
155
156
157 private static final PresenceNode ENTRY_UUID_PRESENCE_FILTER = new PresenceNode( SchemaConstants.ENTRY_UUID_AT );
158
159 private Modification cookieMod;
160
161 private Modification ridMod;
162
163
164 private AttributeType adsReplCookieAT;
165 private AttributeType adsDsReplicaIdAT;
166
167 private static final Map<String, Object> UUID_LOCK_MAP = new LRUMap( 1000 );
168
169
170
171
172
173 @Override
174 public SyncReplConfiguration getConfig()
175 {
176 return config;
177 }
178
179
180
181
182
183
184 @Override
185 public void init( DirectoryService directoryservice ) throws Exception
186 {
187 this.directoryService = directoryservice;
188
189 session = directoryService.getAdminSession();
190
191 schemaManager = directoryservice.getSchemaManager();
192
193 adsReplCookieAT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_COOKIE );
194 adsDsReplicaIdAT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_DS_REPLICA_ID );
195
196 Attribute cookieAttr = new DefaultAttribute( adsReplCookieAT );
197 cookieMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, cookieAttr );
198
199 Attribute ridAttr = new DefaultAttribute( adsDsReplicaIdAT );
200 ridMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, ridAttr );
201
202 prepareSyncSearchRequest();
203 }
204
205
206
207
208
209
210
211
212 public boolean connect()
213 {
214 String providerHost = config.getRemoteHost();
215 int port = config.getRemotePort();
216
217 try
218 {
219
220 if ( connection == null )
221 {
222 connection = new LdapNetworkConnection( providerHost, port );
223 connection.setSchemaManager( schemaManager );
224
225 if ( config.isUseTls() )
226 {
227 connection.getConfig().setTrustManagers( config.getTrustManager() );
228 connection.getConfig().setUseTls( true );
229 }
230
231 connection.addConnectionClosedEventListener( this );
232 }
233
234
235 if ( connection.connect() )
236 {
237 CONSUMER_LOG.info( "Consumer {} connected to producer {}", config.getReplicaId(), config.getProducer() );
238
239
240 try
241 {
242 connection.bind( config.getReplUserDn(), Strings.utf8ToString( config.getReplUserPassword() ) );
243 disconnected = false;
244
245 return true;
246 }
247 catch ( LdapException le )
248 {
249 CONSUMER_LOG.warn( "Failed to bind to the producer {} with the given bind Dn {}",
250 config.getProducer(), config.getReplUserDn() );
251 CONSUMER_LOG.warn( "", le );
252 disconnected = true;
253 }
254 }
255 else
256 {
257 CONSUMER_LOG.warn( "Consumer {} cannot connect to producer {}", config.getReplicaId(),
258 config.getProducer() );
259 disconnected = true;
260
261 return false;
262 }
263 }
264 catch ( Exception e )
265 {
266 CONSUMER_LOG.error( "Failed to connect to the producer {}, cause : {}", config.getProducer(),
267 e.getMessage() );
268 disconnected = true;
269 }
270
271 return false;
272 }
273
274
275
276
277
278
279 private void prepareSyncSearchRequest() throws LdapException
280 {
281 String baseDn = config.getBaseDn();
282
283 searchRequest = new SearchRequestImpl();
284
285 searchRequest.setBase( new Dn( baseDn ) );
286 searchRequest.setFilter( config.getFilter() );
287 searchRequest.setSizeLimit( config.getSearchSizeLimit() );
288 searchRequest.setTimeLimit( config.getSearchTimeout() );
289
290 searchRequest.setDerefAliases( config.getAliasDerefMode() );
291 searchRequest.setScope( config.getSearchScope() );
292 searchRequest.setTypesOnly( false );
293
294 searchRequest.addAttributes( config.getAttributes() );
295
296 if ( !config.isChaseReferrals() )
297 {
298 searchRequest.addControl( new ManageDsaITImpl() );
299 }
300
301 if ( CONSUMER_LOG.isDebugEnabled() )
302 {
303 MDC.put( "Replica", Integer.toString( config.getReplicaId() ) );
304 CONSUMER_LOG.debug( "Configuring consumer {}", config );
305 }
306 }
307
308
309 private ResultCodeEnum handleSearchResultDone( SearchResultDone searchDone )
310 {
311 CONSUMER_LOG.debug( "///////////////// handleSearchDone //////////////////" );
312
313 SyncDoneValue ctrl = ( SyncDoneValue ) searchDone.getControls().get( SyncDoneValue.OID );
314
315 if ( ( ctrl != null ) && ( ctrl.getCookie() != null ) )
316 {
317 syncCookie = ctrl.getCookie();
318 CONSUMER_LOG.debug( "assigning cookie from sync done value control: {}", Strings.utf8ToString( syncCookie ) );
319 storeCookie();
320 }
321
322 CONSUMER_LOG.debug( "//////////////// END handleSearchDone//////////////////////" );
323
324 reload = false;
325
326 return searchDone.getLdapResult().getResultCode();
327 }
328
329
330 private void handleSearchReference( SearchResultReference searchRef )
331 {
332
333
334 }
335
336
337
338
339
340
341
342
343
344
345
346
347 private void handleSearchResultEntry( SearchResultEntry syncResult )
348 {
349 CONSUMER_LOG.debug( "------------- starting handleSearchResult ------------" );
350
351 SyncStateValue syncStateCtrl = ( SyncStateValue ) syncResult.getControl( SyncStateValue.OID );
352
353 try
354 {
355 Entry remoteEntry = new DefaultEntry( schemaManager, syncResult.getEntry() );
356 String uuid = remoteEntry.get( directoryService.getAtProvider().getEntryUUID() ).getString();
357
358
359 Object lock = getLockFor( uuid );
360
361 synchronized ( lock )
362 {
363 int rid = -1;
364
365 if ( syncStateCtrl.getCookie() != null )
366 {
367 syncCookie = syncStateCtrl.getCookie();
368 rid = LdapProtocolUtils.getReplicaId( Strings.utf8ToString( syncCookie ) );
369 CONSUMER_LOG.debug( "assigning the cookie from sync state value control: {}",
370 Strings.utf8ToString( syncCookie ) );
371 }
372
373 SyncStateTypeEnum state = syncStateCtrl.getSyncStateType();
374
375
376 if ( CONSUMER_LOG.isDebugEnabled() )
377 {
378 CONSUMER_LOG.debug( "state name {}", state.name() );
379 CONSUMER_LOG.debug( "entryUUID = {}", Strings.uuidToString( syncStateCtrl.getEntryUUID() ) );
380 }
381
382 Dn remoteDn = remoteEntry.getDn();
383
384 switch ( state )
385 {
386 case ADD:
387 boolean remoteDnExist = false;
388
389 try
390 {
391 remoteDnExist = session.exists( remoteDn );
392 }
393 catch ( LdapNoSuchObjectException lnsoe )
394 {
395 CONSUMER_LOG.error( lnsoe.getMessage() );
396 }
397
398 if ( !remoteDnExist )
399 {
400 CONSUMER_LOG.debug( "adding entry with dn {}", remoteDn );
401 CONSUMER_LOG.debug( remoteEntry.toString() );
402 AddOperationContext/interceptor/context/AddOperationContext.html#AddOperationContext">AddOperationContext addContext = new AddOperationContext( session, remoteEntry );
403 addContext.setReplEvent( true );
404 addContext.setRid( rid );
405
406 OperationManager operationManager = directoryService.getOperationManager();
407 operationManager.add( addContext );
408 }
409 else
410 {
411 CONSUMER_LOG.debug( "updating entry in refreshOnly mode {}", remoteDn );
412 modify( remoteEntry, rid );
413 }
414
415 break;
416
417 case MODIFY:
418 CONSUMER_LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() );
419 modify( remoteEntry, rid );
420
421 break;
422
423 case MODDN:
424 String entryUuid = Strings.uuidToString( syncStateCtrl.getEntryUUID() );
425 applyModDnOperation( remoteEntry, entryUuid, rid );
426
427 break;
428
429 case DELETE:
430 CONSUMER_LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() );
431
432 if ( !session.exists( remoteDn ) )
433 {
434 CONSUMER_LOG
435 .debug(
436 "looks like entry {} was already deleted in a prior update (possibly from another provider), skipping delete",
437 remoteDn );
438 }
439 else
440 {
441
442
443
444 deleteRecursive( remoteEntry.getDn(), rid );
445 }
446
447 break;
448
449 case PRESENT:
450 CONSUMER_LOG.debug( "entry present {}", remoteEntry );
451 break;
452
453 default:
454 throw new IllegalArgumentException( "Unexpected sync state " + state );
455 }
456
457
458 if ( syncStateCtrl.getCookie() != null )
459 {
460 storeCookie();
461 }
462 }
463 }
464 catch ( Exception e )
465 {
466 CONSUMER_LOG.error( e.getMessage(), e );
467 }
468
469 CONSUMER_LOG.debug( "------------- Ending handleSearchResult ------------" );
470 }
471
472
473
474
475
476 private void handleSyncInfo( IntermediateResponse syncInfoResp )
477 {
478 try
479 {
480 CONSUMER_LOG.debug( "............... inside handleSyncInfo ..............." );
481
482 byte[] syncInfoBytes = syncInfoResp.getResponseValue();
483
484 if ( syncInfoBytes == null )
485 {
486 return;
487 }
488
489 SyncInfoValue syncInfoValue = new SyncInfoValueImpl();
490
491 byte[] cookie = syncInfoValue.getCookie();
492
493 if ( CONSUMER_LOG.isDebugEnabled() )
494 {
495 CONSUMER_LOG.debug( "Received a SyncInfoValue from producer {} : {}", config.getProducer(),
496 syncInfoValue );
497 }
498
499 int replicaId = -1;
500
501 if ( cookie != null )
502 {
503 if ( CONSUMER_LOG.isDebugEnabled() )
504 {
505 CONSUMER_LOG.debug( "setting the cookie from the sync info: {}", Strings.utf8ToString( cookie ) );
506 CONSUMER_LOG.debug( "setting the cookie from the sync info: {}", Strings.utf8ToString( cookie ) );
507 }
508
509 syncCookie = cookie;
510
511 String cookieString = Strings.utf8ToString( syncCookie );
512 replicaId = LdapProtocolUtils.getReplicaId( cookieString );
513 }
514
515 CONSUMER_LOG.info( "refreshDeletes: {}", syncInfoValue.isRefreshDeletes() );
516
517 List<byte[]> uuidList = syncInfoValue.getSyncUUIDs();
518
519
520
521 if ( syncInfoValue.isRefreshDeletes() )
522 {
523 deleteEntries( uuidList, false, replicaId );
524 }
525 else
526 {
527 deleteEntries( uuidList, true, replicaId );
528 }
529
530 CONSUMER_LOG.info( "refreshDone: {}", syncInfoValue.isRefreshDone() );
531
532 storeCookie();
533 }
534 catch ( Exception de )
535 {
536 CONSUMER_LOG.error( "Failed to handle syncinfo message", de );
537 }
538
539 CONSUMER_LOG.debug( ".................... END handleSyncInfo ..............." );
540 }
541
542
543
544
545
546 @Override
547 public void connectionClosed()
548 {
549 if ( CONSUMER_LOG.isDebugEnabled() )
550 {
551 MDC.put( "Replica", Integer.toString( config.getReplicaId() ) );
552 CONSUMER_LOG.debug( "Consumer {} session with {} has been closed ", config.getReplicaId(),
553 config.getProducer() );
554 }
555
556 disconnect();
557 }
558
559
560
561
562
563 @Override
564 public ReplicationStatusEnum startSync()
565 {
566 CONSUMER_LOG.debug( "Starting the SyncRepl process for consumer {}", config.getReplicaId() );
567
568
569 readCookie();
570
571 if ( config.isRefreshNPersist() )
572 {
573 try
574 {
575 CONSUMER_LOG.debug( "==================== Refresh And Persist ==========" );
576
577 return doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST, reload );
578 }
579 catch ( Exception e )
580 {
581 CONSUMER_LOG.error( "Failed to sync with refreshAndPersist mode", e );
582 return ReplicationStatusEnum.DISCONNECTED;
583 }
584 }
585 else
586 {
587 return doRefreshOnly();
588 }
589 }
590
591
592 private ReplicationStatusEnum doRefreshOnly()
593 {
594 while ( !disconnected )
595 {
596 CONSUMER_LOG.debug( "==================== Refresh Only ==========" );
597
598 try
599 {
600 doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, reload );
601
602 CONSUMER_LOG.debug( "--------------------- Sleep for {} seconds ------------------",
603 ( config.getRefreshInterval() / 1000 ) );
604 Thread.sleep( config.getRefreshInterval() );
605 CONSUMER_LOG.debug( "--------------------- syncing again ------------------" );
606
607 }
608 catch ( InterruptedException ie )
609 {
610 CONSUMER_LOG.warn( "refresher thread interrupted" );
611
612 return ReplicationStatusEnum.DISCONNECTED;
613 }
614 catch ( Exception e )
615 {
616 CONSUMER_LOG.error( "Failed to sync with refresh only mode", e );
617 return ReplicationStatusEnum.DISCONNECTED;
618 }
619 }
620
621 return ReplicationStatusEnum.STOPPED;
622 }
623
624
625
626
627
628 @Override
629 public void setConfig( ReplicationConsumerConfig config )
630 {
631 this.config = ( SyncReplConfiguration ) config;
632 }
633
634
635
636
637
638 @Override
639 public boolean connect( boolean now )
640 {
641 boolean connected = false;
642
643 if ( now )
644 {
645 connected = connect();
646 }
647
648 while ( !connected )
649 {
650 try
651 {
652 CONSUMER_LOG.debug( "Consumer {} cannot connect to {}, wait 5 seconds.", config.getReplicaId(),
653 config.getProducer() );
654
655
656 Thread.sleep( 5000 );
657 }
658 catch ( InterruptedException e )
659 {
660 CONSUMER_LOG.warn( "Consumer {} Interrupted while trying to reconnect to the provider {}",
661 config.getReplicaId(), config.getProducer() );
662 }
663
664 connected = connect();
665 }
666
667
668
669
670 return connected;
671 }
672
673
674
675
676
677 @Override
678 public void ping()
679 {
680 boolean connected = !disconnected;
681
682 boolean restartSync = false;
683
684 if ( disconnected )
685 {
686 connected = connect();
687 restartSync = connected;
688 }
689
690 if ( connected )
691 {
692 CONSUMER_LOG.debug( "PING : The consumer {} is alive", config.getReplicaId() );
693
694
695 if ( restartSync )
696 {
697 CONSUMER_LOG.warn( "Restarting the disconnected consumer {}", config.getReplicaId() );
698 disconnected = false;
699 startSync();
700 }
701 }
702 else
703 {
704 CONSUMER_LOG.debug( "PING : The consumer {} cannot be connected", config.getReplicaId() );
705 }
706 }
707
708
709
710
711
712 @Override
713 public void stop()
714 {
715 if ( !disconnected )
716 {
717 disconnect();
718 }
719 }
720
721
722
723
724
725 @Override
726 public String getId()
727 {
728 return String.valueOf( getConfig().getReplicaId() );
729 }
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747 private ReplicationStatusEnum doSyncSearch( SynchronizationModeEnum syncType, boolean reloadHint ) throws Exception
748 {
749 CONSUMER_LOG.debug( "Starting synchronization mode {}, reloadHint {}", syncType, reloadHint );
750
751 SyncRequestValue syncReq = new SyncRequestValueImpl();
752
753 syncReq.setMode( syncType );
754 syncReq.setReloadHint( reloadHint );
755
756
757 if ( syncCookie != null )
758 {
759 CONSUMER_LOG.debug( "searching on {} with searchRequest, cookie '{}'", config.getProducer(),
760 Strings.utf8ToString( syncCookie ) );
761 syncReq.setCookie( syncCookie );
762 }
763 else
764 {
765 CONSUMER_LOG.debug( "searching on {} with searchRequest, no cookie", config.getProducer() );
766 }
767
768 searchRequest.addControl( syncReq );
769
770
771 SearchFuture sf = connection.searchAsync( searchRequest );
772
773 Response resp = sf.get();
774
775 CONSUMER_LOG.debug( "Response from {} : {}", config.getProducer(), resp );
776
777
778
779 while ( !( resp instanceof SearchResultDone ) && !sf.isCancelled() && !disconnected )
780 {
781 if ( resp instanceof SearchResultEntry )
782 {
783 SearchResultEntry result = ( SearchResultEntry ) resp;
784
785 handleSearchResultEntry( result );
786 }
787 else if ( resp instanceof SearchResultReference )
788 {
789 handleSearchReference( ( SearchResultReference ) resp );
790 }
791 else if ( resp instanceof IntermediateResponse )
792 {
793 handleSyncInfo( ( IntermediateResponse ) resp );
794 }
795
796
797 resp = sf.get();
798 CONSUMER_LOG.debug( "Response from {} : {}", config.getProducer(), resp );
799 }
800
801 if ( sf.isCancelled() )
802 {
803
804 CONSUMER_LOG.debug( "Search sync on {} has been canceled ", config.getProducer(), sf.getCause() );
805
806 return ReplicationStatusEnum.DISCONNECTED;
807 }
808 else if ( disconnected )
809 {
810 CONSUMER_LOG.debug( "Disconnected from {}", config.getProducer() );
811
812 return ReplicationStatusEnum.DISCONNECTED;
813 }
814 else
815 {
816 ResultCodeEnum resultCode = handleSearchResultDone( ( SearchResultDone ) resp );
817
818 CONSUMER_LOG.debug( "Rsultcode of Sync operation from {} : {}", config.getProducer(), resultCode );
819
820 if ( resultCode == ResultCodeEnum.NO_SUCH_OBJECT )
821 {
822
823 CONSUMER_LOG.warn( "The base Dn {} is not found on provider {}", config.getBaseDn(),
824 config.getProducer() );
825
826 CONSUMER_LOG.warn( "Disconnecting the Refresh&Persist consumer from provider {}", config.getProducer() );
827 disconnect();
828
829 return ReplicationStatusEnum.DISCONNECTED;
830 }
831 else if ( resultCode == ResultCodeEnum.E_SYNC_REFRESH_REQUIRED )
832 {
833 CONSUMER_LOG.warn( "Full SYNC_REFRESH required from {}", config.getProducer() );
834
835 reload = true;
836
837 try
838 {
839 CONSUMER_LOG.debug( "Deleting baseDN {}", config.getBaseDn() );
840
841
842
843
844
845
846 deleteRecursive( new Dn( config.getBaseDn() ), -1000 );
847 }
848 catch ( Exception e )
849 {
850 CONSUMER_LOG
851 .error(
852 "Failed to delete the replica base as part of handling E_SYNC_REFRESH_REQUIRED, disconnecting the consumer",
853 e );
854 }
855
856
857 removeCookie();
858
859 CONSUMER_LOG.debug( "Re-doing a syncRefresh from producer {}", config.getProducer() );
860
861 return ReplicationStatusEnum.REFRESH_REQUIRED;
862 }
863 else
864 {
865 CONSUMER_LOG.debug( "Got result code {} from producer {}. Replication stopped", resultCode,
866 config.getProducer() );
867 return ReplicationStatusEnum.DISCONNECTED;
868 }
869 }
870 }
871
872
873
874
875
876 private void disconnect()
877 {
878 disconnected = true;
879
880 try
881 {
882 if ( ( connection != null ) && connection.isConnected() )
883 {
884 connection.unBind();
885 CONSUMER_LOG.info( "Unbound from the server {}", config.getProducer() );
886
887 if ( CONSUMER_LOG.isDebugEnabled() )
888 {
889 MDC.put( "Replica", Integer.toString( config.getReplicaId() ) );
890 CONSUMER_LOG.info( "Unbound from the server {}", config.getProducer() );
891 }
892
893 connection.close();
894 CONSUMER_LOG.info( "Connection closed for the server {}", config.getProducer() );
895
896 connection = null;
897 }
898 }
899 catch ( Exception e )
900 {
901 CONSUMER_LOG.error( "Failed to close the connection", e );
902 }
903 finally
904 {
905
906 storeCookie();
907
908
909 syncCookie = null;
910 }
911 }
912
913
914
915
916
917 private void storeCookie()
918 {
919 CONSUMER_LOG.debug( "Storing the cookie '{}'", Strings.utf8ToString( syncCookie ) );
920
921 if ( syncCookie == null )
922 {
923 return;
924 }
925
926 if ( ( lastSavedCookie != null ) && Arrays.equals( syncCookie, lastSavedCookie ) )
927 {
928 return;
929 }
930
931 try
932 {
933 Attribute attr = cookieMod.getAttribute();
934 attr.clear();
935 attr.add( syncCookie );
936
937 String cookieString = Strings.utf8ToString( syncCookie );
938 int replicaId = LdapProtocolUtils.getReplicaId( cookieString );
939
940 Attribute ridAt = ridMod.getAttribute();
941 ridAt.clear();
942 ridAt.add( String.valueOf( replicaId ) );
943
944 CONSUMER_LOG.debug( "Storing the cookie in the DIT : {}", config.getConfigEntryDn() );
945
946 session.modify( config.getConfigEntryDn(), cookieMod );
947 CONSUMER_LOG.debug( "stored the cookie in entry {}", config.getConfigEntryDn() );
948
949 lastSavedCookie = new byte[syncCookie.length];
950 System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length );
951 }
952 catch ( Exception e )
953 {
954 CONSUMER_LOG.error( "Failed to store the cookie in consumer entry {}", config.getConfigEntryDn(), e );
955 }
956 }
957
958
959
960
961
962 private void readCookie()
963 {
964 try
965 {
966 Entry entry = session.lookup( config.getConfigEntryDn(), SchemaConstants.ADS_REPL_COOKIE );
967
968 if ( entry != null )
969 {
970 Attribute attr = entry.get( adsReplCookieAT );
971
972 if ( attr != null )
973 {
974 syncCookie = attr.getBytes();
975 lastSavedCookie = syncCookie;
976 String syncCookieString = Strings.utf8ToString( syncCookie );
977 CONSUMER_LOG.debug( "Loaded cookie {} for consumer {}", syncCookieString, config.getReplicaId() );
978 }
979 else
980 {
981 CONSUMER_LOG.debug( "No cookie found for consumer {}", config.getReplicaId() );
982 }
983 }
984 else
985 {
986 CONSUMER_LOG.debug( "Cannot find the configuration '{}' in the DIT for consumer {}",
987 config.getConfigEntryDn(), config.getReplicaId() );
988 }
989 }
990 catch ( Exception e )
991 {
992
993
994 CONSUMER_LOG.debug( "Failed to read the cookie, cannot find the entry '{}' in the DIT for consumer {}",
995 config.getConfigEntryDn(),
996 config.getReplicaId() );
997 }
998 }
999
1000
1001
1002
1003
1004 private void removeCookie()
1005 {
1006 try
1007 {
1008 Attribute cookieAttr = new DefaultAttribute( adsReplCookieAT );
1009 Modification deleteCookieMod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE,
1010 cookieAttr );
1011 session.modify( config.getConfigEntryDn(), deleteCookieMod );
1012 CONSUMER_LOG.info( "resetting sync cookie of the consumer with config entry Dn {}",
1013 config.getConfigEntryDn() );
1014 }
1015 catch ( Exception e )
1016 {
1017 CONSUMER_LOG.warn( "Failed to delete the cookie from the consumer with config entry Dn {}",
1018 config.getConfigEntryDn() );
1019 CONSUMER_LOG.warn( "{}", e );
1020 }
1021
1022 syncCookie = null;
1023 lastSavedCookie = null;
1024 }
1025
1026
1027 private void applyModDnOperation( Entry remoteEntry, String entryUuid, int rid ) throws Exception
1028 {
1029 CONSUMER_LOG.debug( "MODDN for entry {}, new entry : {}", entryUuid, remoteEntry );
1030
1031
1032 String filter = "(entryUuid=" + entryUuid + ")";
1033 SearchRequest searchRequest = new SearchRequestImpl();
1034 searchRequest.setBase( new Dn( schemaManager, config.getBaseDn() ) );
1035 searchRequest.setFilter( filter );
1036 searchRequest.setScope( SearchScope.SUBTREE );
1037 searchRequest.addAttributes( SchemaConstants.ENTRY_UUID_AT, SchemaConstants.ENTRY_CSN_AT,
1038 SchemaConstants.ALL_USER_ATTRIBUTES );
1039
1040 Cursor<Entry> cursor = session.search( searchRequest );
1041 cursor.beforeFirst();
1042
1043 Entry localEntry = null;
1044
1045 if ( cursor.next() )
1046 {
1047 localEntry = cursor.get();
1048 }
1049
1050 cursor.close();
1051
1052
1053 if ( localEntry == null )
1054 {
1055 return;
1056 }
1057
1058 if ( config.isMmrMode() )
1059 {
1060 Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1061 Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1062
1063 if ( localCsn.compareTo( remoteCsn ) >= 0 )
1064 {
1065
1066 CONSUMER_LOG.debug( "local modification is latest, discarding the modDn operation dn {}",
1067 remoteEntry.getDn() );
1068 return;
1069 }
1070 }
1071
1072
1073 Dn localDn = localEntry.getDn();
1074 Dn remoteDn = directoryService.getDnFactory().create( remoteEntry.getDn().getName() );
1075
1076 Dn localParentDn = localDn.getParent();
1077 Dn remoteParentDn = directoryService.getDnFactory().create( remoteDn.getParent().getName() );
1078
1079 Rdn localRdn = localDn.getRdn();
1080 Rdn remoteRdn = directoryService.getDnFactory().create( remoteDn.getRdn().getName() ).getRdn();
1081
1082
1083 boolean deleteOldRdn = !remoteEntry.contains( localRdn.getNormType(), localRdn.getValue() );
1084
1085 if ( localRdn.equals( remoteRdn ) )
1086 {
1087
1088 CONSUMER_LOG.debug( "moving {} to the new parent {}", localDn, remoteParentDn );
1089 MoveOperationContextapi/interceptor/context/MoveOperationContext.html#MoveOperationContext">MoveOperationContext movCtx = new MoveOperationContext( session, localDn, remoteParentDn );
1090 movCtx.setReplEvent( true );
1091 movCtx.setRid( rid );
1092 directoryService.getOperationManager().move( movCtx );
1093 }
1094 else if ( localParentDn.equals( remoteParentDn ) )
1095 {
1096
1097 CONSUMER_LOG.debug( "renaming the Dn {} with new Rdn {} and deleteOldRdn flag set to {}",
1098 localDn.getName(), remoteRdn.getName(), String.valueOf( deleteOldRdn ) );
1099
1100 RenameOperationContexti/interceptor/context/RenameOperationContext.html#RenameOperationContext">RenameOperationContext renCtx = new RenameOperationContext( session, localDn, remoteRdn,
1101 deleteOldRdn );
1102 renCtx.setReplEvent( true );
1103 renCtx.setRid( rid );
1104 directoryService.getOperationManager().rename( renCtx );
1105 }
1106 else
1107 {
1108
1109 CONSUMER_LOG.debug(
1110 "moveAndRename on the Dn {} with new newParent Dn {}, new Rdn {} and deleteOldRdn flag set to {}",
1111 localDn.getName(),
1112 remoteParentDn.getName(),
1113 remoteRdn.getName(),
1114 String.valueOf( deleteOldRdn ) );
1115
1116 MoveAndRenameOperationContexttor/context/MoveAndRenameOperationContext.html#MoveAndRenameOperationContext">MoveAndRenameOperationContext movRenCtx = new MoveAndRenameOperationContext( session, localDn,
1117 remoteParentDn, remoteRdn, deleteOldRdn );
1118 movRenCtx.setReplEvent( true );
1119 movRenCtx.setRid( rid );
1120 directoryService.getOperationManager().moveAndRename( movRenCtx );
1121 }
1122 }
1123
1124
1125 private void modify( Entry remoteEntry, int rid ) throws Exception
1126 {
1127 String[] attributes = computeAttributes( config.getAttributes(), SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES );
1128
1129 LookupOperationContext lookupCtx =
1130 new LookupOperationContext( session, remoteEntry.getDn(), attributes );
1131
1132 lookupCtx.setSyncreplLookup( true );
1133
1134 Entry localEntry;
1135
1136 Partition partition = session.getDirectoryService().getPartitionNexus().getPartition( remoteEntry.getDn() );
1137
1138 try ( PartitionTxn partitionTxn = partition.beginReadTransaction() )
1139 {
1140 lookupCtx.setTransaction( partitionTxn );
1141 localEntry = session.getDirectoryService().getOperationManager().lookup( lookupCtx );
1142 }
1143
1144 if ( config.isMmrMode() )
1145 {
1146 Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1147 Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1148
1149 if ( localCsn.compareTo( remoteCsn ) >= 0 )
1150 {
1151
1152 CONSUMER_LOG.debug( "local modification is latest, discarding the modification of dn {}",
1153 remoteEntry.getDn() );
1154 return;
1155 }
1156 }
1157
1158 remoteEntry.removeAttributes( MOD_IGNORE_AT );
1159 localEntry.removeAttributes( MOD_IGNORE_AT );
1160
1161 List<Modification> mods = new ArrayList<>();
1162 Iterator<Attribute> itr = localEntry.iterator();
1163
1164 while ( itr.hasNext() )
1165 {
1166 Attribute localAttr = itr.next();
1167 String attrId = localAttr.getId();
1168 Modification mod;
1169 Attribute remoteAttr = remoteEntry.get( attrId );
1170
1171 if ( remoteAttr != null )
1172 {
1173 mod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, remoteAttr );
1174 remoteEntry.remove( remoteAttr );
1175 }
1176 else
1177 {
1178 mod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE, localAttr );
1179 }
1180
1181 mods.add( mod );
1182 }
1183
1184 if ( remoteEntry.size() > 0 )
1185 {
1186 itr = remoteEntry.iterator();
1187
1188 while ( itr.hasNext() )
1189 {
1190 mods.add( new DefaultModification( ModificationOperation.ADD_ATTRIBUTE, itr.next() ) );
1191 }
1192 }
1193
1194 List<Modification> serverModifications = new ArrayList<>( mods.size() );
1195
1196 for ( Modification mod : mods )
1197 {
1198 serverModifications.add( new DefaultModification( directoryService.getSchemaManager(), mod ) );
1199 }
1200
1201 ModifyOperationContextceptor/context/ModifyOperationContext.html#ModifyOperationContext">ModifyOperationContext modifyContext = new ModifyOperationContext( session, remoteEntry.getDn(),
1202 serverModifications );
1203 modifyContext.setReplEvent( true );
1204 modifyContext.setRid( rid );
1205
1206 OperationManager operationManager = directoryService.getOperationManager();
1207 operationManager.modify( modifyContext );
1208 }
1209
1210
1211
1212
1213
1214 private String[] computeAttributes( String[] attributes, String addedAttribute )
1215 {
1216 if ( attributes != null )
1217 {
1218 if ( addedAttribute != null )
1219 {
1220 String[] combinedAttributes = new String[attributes.length + 1];
1221
1222 System.arraycopy( attributes, 0, combinedAttributes, 0, attributes.length );
1223 combinedAttributes[attributes.length] = addedAttribute;
1224
1225 return combinedAttributes;
1226 }
1227 else
1228 {
1229 return attributes;
1230 }
1231 }
1232 else
1233 {
1234 if ( addedAttribute != null )
1235 {
1236 return new String[]
1237 { addedAttribute };
1238 }
1239 else
1240 {
1241 return StringConstants.EMPTY_STRINGS;
1242 }
1243 }
1244 }
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254 private void deleteEntries( List<byte[]> uuidList, boolean isRefreshPresent, int replicaId ) throws Exception
1255 {
1256 if ( uuidList == null || uuidList.isEmpty() )
1257 {
1258 return;
1259 }
1260
1261
1262
1263
1264 if ( isRefreshPresent )
1265 {
1266 CONSUMER_LOG.debug( "refresh present syncinfo list has {} UUIDs", uuidList.size() );
1267 processDelete( uuidList, isRefreshPresent, replicaId );
1268 return;
1269 }
1270
1271 int nodeLimit = 10;
1272
1273 int count = uuidList.size() / nodeLimit;
1274
1275 int startIndex = 0;
1276 int i = 0;
1277 for ( ; i < count; i++ )
1278 {
1279 startIndex = i * nodeLimit;
1280 processDelete( uuidList.subList( startIndex, startIndex + nodeLimit ), isRefreshPresent, replicaId );
1281 }
1282
1283 if ( ( uuidList.size() % nodeLimit ) != 0 )
1284 {
1285
1286 if ( count > 0 )
1287 {
1288 startIndex = i * nodeLimit;
1289 }
1290
1291 processDelete( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent, replicaId );
1292 }
1293 }
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303 private void processDelete( List<byte[]> limitedUuidList, boolean isRefreshPresent, int replicaId )
1304 throws Exception
1305 {
1306 ExprNode filter = null;
1307 int size = limitedUuidList.size();
1308 if ( size == 1 )
1309 {
1310 String uuid = Strings.uuidToString( limitedUuidList.get( 0 ) );
1311
1312 filter = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, uuid );
1313 if ( isRefreshPresent )
1314 {
1315 filter = new NotNode( filter );
1316 }
1317 }
1318 else
1319 {
1320 if ( isRefreshPresent )
1321 {
1322 filter = new AndNode();
1323 }
1324 else
1325 {
1326 filter = new OrNode();
1327 }
1328
1329 for ( int i = 0; i < size; i++ )
1330 {
1331 String uuid = Strings.uuidToString( limitedUuidList.get( i ) );
1332 ExprNode uuidEqNode = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, uuid );
1333
1334 if ( isRefreshPresent )
1335 {
1336 uuidEqNode = new NotNode( uuidEqNode );
1337 ( ( AndNode ) filter ).addNode( uuidEqNode );
1338 }
1339 else
1340 {
1341 ( ( OrNode ) filter ).addNode( uuidEqNode );
1342 }
1343 }
1344 }
1345
1346 Dn dn = new Dn( schemaManager, config.getBaseDn() );
1347
1348 if ( CONSUMER_LOG.isDebugEnabled() )
1349 {
1350 CONSUMER_LOG.debug( "selecting entries to be deleted using filter {}", filter );
1351 }
1352
1353 SearchRequest req = new SearchRequestImpl();
1354 req.setBase( dn );
1355 req.setFilter( filter );
1356 req.setScope( SearchScope.SUBTREE );
1357 req.setDerefAliases( AliasDerefMode.NEVER_DEREF_ALIASES );
1358
1359 req.addAttributes( SchemaConstants.ENTRY_DN_AT );
1360
1361 SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT, SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID );
1362 SortRequest ctrl = new SortRequestImpl();
1363 ctrl.addSortKey( sk );
1364 req.addControl( ctrl );
1365
1366 OperationManager operationManager = directoryService.getOperationManager();
1367
1368 Cursor<Entry> cursor = session.search( req );
1369 cursor.beforeFirst();
1370
1371 while ( cursor.next() )
1372 {
1373 Entry entry = cursor.get();
1374
1375 DeleteOperationContext/api/interceptor/context/DeleteOperationContext.html#DeleteOperationContext">DeleteOperationContext ctx = new DeleteOperationContext( session );
1376 ctx.setReplEvent( true );
1377 ctx.setRid( replicaId );
1378
1379
1380
1381 if ( reload )
1382 {
1383 ctx.setGenerateNoReplEvt( true );
1384 }
1385
1386 ctx.setDn( entry.getDn() );
1387 operationManager.delete( ctx );
1388 }
1389
1390 cursor.close();
1391 }
1392
1393
1394 private synchronized Object getLockFor( String uuid )
1395 {
1396 Object lock = UUID_LOCK_MAP.get( uuid );
1397
1398 if ( lock == null )
1399 {
1400 lock = new Object();
1401 UUID_LOCK_MAP.put( uuid, lock );
1402 }
1403
1404 return lock;
1405 }
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415 private void deleteRecursive( Dn rootDn, int rid ) throws Exception
1416 {
1417 CONSUMER_LOG.debug( "searching for Dn {} and its children before deleting", rootDn.getName() );
1418 Cursor<Entry> cursor = null;
1419
1420 try
1421 {
1422 SearchRequest req = new SearchRequestImpl();
1423 req.setBase( rootDn );
1424 req.setFilter( ENTRY_UUID_PRESENCE_FILTER );
1425 req.setScope( SearchScope.SUBTREE );
1426 req.setDerefAliases( AliasDerefMode.NEVER_DEREF_ALIASES );
1427
1428 req.addAttributes( SchemaConstants.ENTRY_DN_AT );
1429
1430 SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT, SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID );
1431
1432 SortRequest ctrl = new SortRequestImpl();
1433 ctrl.addSortKey( sk );
1434 req.addControl( ctrl );
1435
1436 cursor = session.search( req );
1437 cursor.beforeFirst();
1438
1439 OperationManager operationManager = directoryService.getOperationManager();
1440
1441 while ( cursor.next() )
1442 {
1443 Entry e = cursor.get();
1444
1445 DeleteOperationContext/api/interceptor/context/DeleteOperationContext.html#DeleteOperationContext">DeleteOperationContext ctx = new DeleteOperationContext( session );
1446 ctx.setReplEvent( true );
1447 ctx.setRid( rid );
1448
1449
1450
1451 if ( reload )
1452 {
1453 ctx.setGenerateNoReplEvt( true );
1454 }
1455
1456 ctx.setDn( e.getDn() );
1457
1458 operationManager.delete( ctx );
1459 }
1460 }
1461 catch ( Exception e )
1462 {
1463 String msg = "Failed to delete the Dn " + rootDn.getName() + " and its children (if any present)";
1464 CONSUMER_LOG.error( msg, e );
1465 throw e;
1466 }
1467 finally
1468 {
1469 if ( cursor != null )
1470 {
1471 cursor.close();
1472 }
1473 }
1474 }
1475
1476
1477
1478
1479
1480 @Override
1481 public String toString()
1482 {
1483 StringBuilder sb = new StringBuilder();
1484
1485 sb.append( "Consumer " ).append( config );
1486
1487 return sb.toString();
1488 }
1489 }