1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.server.ldap.replication.provider;
21
22
23 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateTypeEnum;
24 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValue;
25 import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValueImpl;
26 import org.apache.directory.api.ldap.model.constants.SchemaConstants;
27 import org.apache.directory.api.ldap.model.entry.Entry;
28 import org.apache.directory.api.ldap.model.exception.LdapInvalidAttributeValueException;
29 import org.apache.directory.api.ldap.model.message.AbandonListener;
30 import org.apache.directory.api.ldap.model.message.AbandonableRequest;
31 import org.apache.directory.api.ldap.model.message.SearchRequest;
32 import org.apache.directory.api.ldap.model.message.SearchResultEntry;
33 import org.apache.directory.api.ldap.model.message.SearchResultEntryImpl;
34 import org.apache.directory.api.ldap.model.message.controls.ChangeType;
35 import org.apache.directory.api.util.Strings;
36 import org.apache.directory.server.constants.ServerDNConstants;
37 import org.apache.directory.server.core.api.DirectoryService;
38 import org.apache.directory.server.core.api.entry.ClonedServerEntry;
39 import org.apache.directory.server.core.api.event.DirectoryListener;
40 import org.apache.directory.server.core.api.event.EventType;
41 import org.apache.directory.server.core.api.interceptor.context.AbstractChangeOperationContext;
42 import org.apache.directory.server.core.api.interceptor.context.AddOperationContext;
43 import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
44 import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
45 import org.apache.directory.server.core.api.interceptor.context.MoveAndRenameOperationContext;
46 import org.apache.directory.server.core.api.interceptor.context.MoveOperationContext;
47 import org.apache.directory.server.core.api.interceptor.context.RenameOperationContext;
48 import org.apache.directory.server.i18n.I18n;
49 import org.apache.directory.server.ldap.LdapProtocolUtils;
50 import org.apache.directory.server.ldap.LdapSession;
51 import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
52 import org.apache.mina.core.future.WriteFuture;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56
57
58
59
60
61
62
63
64
65 public class SyncReplSearchListener implements DirectoryListener, AbandonListener
66 {
67
68 private static final Logger LOG = LoggerFactory.getLogger( SyncReplSearchListener.class );
69
70
71 private LdapSession session;
72
73
74 private SearchRequest searchRequest;
75
76
77 private volatile boolean pushInRealTime;
78
79
80 private final ReplicaEventLog consumerMsgLog;
81
82 private static String replConsumerConfigDn = Strings.toLowerCaseAscii( ServerDNConstants.REPL_CONSUMER_CONFIG_DN );
83 private static String schemaDn = Strings.toLowerCaseAscii( SchemaConstants.OU_SCHEMA );
84 private static String replConsumerDn = Strings.toLowerCaseAscii( ServerDNConstants.REPL_CONSUMER_DN_STR );
85
86
87
88
89
90
91
92
93
94 SyncReplSearchListener( LdapSession session, SearchRequest searchRequest, ReplicaEventLog consumerMsgLog,
95 boolean pushInRealTime )
96 {
97 this.pushInRealTime = pushInRealTime;
98 setSession( session );
99 setSearchRequest( searchRequest );
100 this.consumerMsgLog = consumerMsgLog;
101 }
102
103
104
105
106
107
108 public void setSession( LdapSession session )
109 {
110 this.session = session;
111 }
112
113
114
115
116
117
118
119 public void setSearchRequest( SearchRequest searchRequest )
120 {
121 this.searchRequest = searchRequest;
122
123 if ( searchRequest != null )
124 {
125 searchRequest.addAbandonListener( this );
126 }
127 }
128
129
130 @Override
131 public boolean isSynchronous()
132 {
133 return true;
134 }
135
136
137
138
139
140
141
142 public void requestAbandoned( AbandonableRequest searchRequest )
143 {
144 try
145 {
146 if ( session != null )
147 {
148
149 session.getCoreSession().getDirectoryService().getEventService().removeListener( this );
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163
164 }
165 catch ( Exception e )
166 {
167 LOG.error( I18n.err( I18n.ERR_164 ), e );
168 }
169 }
170
171
172
173
174
175 private SyncStateValue createControl( DirectoryService directoryService, SyncStateTypeEnum operation, Entry entry )
176 throws LdapInvalidAttributeValueException
177 {
178 SyncStateValue syncStateValue = new SyncStateValueImpl();
179
180 syncStateValue.setSyncStateType( operation );
181 String uuidStr = entry.get( SchemaConstants.ENTRY_UUID_AT ).getString();
182 syncStateValue.setEntryUUID( Strings.uuidToBytes( uuidStr ) );
183 syncStateValue.setCookie( getCookie( entry ) );
184
185 return syncStateValue;
186 }
187
188
189
190
191
192 private void sendResult( SearchResultEntry searchResultEntry, Entry entry, EventType eventType,
193 SyncStateValue syncStateValue )
194 {
195 searchResultEntry.addControl( syncStateValue );
196
197 LOG.debug( "sending event {} of entry {}", eventType, entry.getDn() );
198 WriteFuture future = session.getIoSession().write( searchResultEntry );
199
200
201 handleWriteFuture( future, entry, eventType );
202 }
203
204
205
206
207
208
209
210
211 public void entryAdded( AddOperationContext addContext )
212 {
213 Entry entry = addContext.getEntry();
214
215 if ( isConfigEntry( entry ) || isNotValidForReplication( addContext ) )
216 {
217 return;
218 }
219
220 try
221 {
222
223
224 consumerMsgLog.log( new ReplicaEventMessage( ChangeType.ADD, entry ) );
225
226
227 if ( pushInRealTime )
228 {
229
230 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
231 resultEntry.setObjectName( entry.getDn() );
232 resultEntry.setEntry( entry );
233
234
235 SyncStateValue syncAdd = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.ADD, entry );
236
237 sendResult( resultEntry, entry, EventType.ADD, syncAdd );
238 }
239 }
240 catch ( LdapInvalidAttributeValueException e )
241 {
242
243 LOG.error( e.getMessage(), e );
244 }
245 }
246
247
248
249
250
251
252
253
254 public void entryDeleted( DeleteOperationContext deleteContext )
255 {
256 Entry entry = deleteContext.getEntry();
257
258 if ( isConfigEntry( entry ) || isNotValidForReplication( deleteContext ) )
259 {
260 return;
261 }
262
263 sendDeletedEntry( ( ( ClonedServerEntry ) entry ).getClonedEntry() );
264 }
265
266
267
268
269
270 private void sendDeletedEntry( Entry entry )
271 {
272 try
273 {
274
275 consumerMsgLog.log( new ReplicaEventMessage( ChangeType.DELETE, entry ) );
276
277 if ( pushInRealTime )
278 {
279 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
280 resultEntry.setObjectName( entry.getDn() );
281 resultEntry.setEntry( entry );
282
283 SyncStateValue syncDelete = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.DELETE, entry );
284
285 sendResult( resultEntry, entry, EventType.DELETE, syncDelete );
286 }
287 }
288 catch ( LdapInvalidAttributeValueException e )
289 {
290
291 LOG.error( e.getMessage(), e );
292 }
293 }
294
295
296
297
298
299
300
301
302 public void entryModified( ModifyOperationContext modifyContext )
303 {
304 Entry alteredEntry = modifyContext.getAlteredEntry();
305
306 if ( isConfigEntry( alteredEntry ) || isNotValidForReplication( modifyContext ) )
307 {
308 return;
309 }
310
311 try
312 {
313
314 consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODIFY, alteredEntry ) );
315
316 if ( pushInRealTime )
317 {
318
319 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
320 resultEntry.setObjectName( modifyContext.getDn() );
321 resultEntry.setEntry( alteredEntry );
322
323 SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODIFY, alteredEntry );
324
325 sendResult( resultEntry, alteredEntry, EventType.MODIFY, syncModify );
326 }
327 }
328 catch ( Exception e )
329 {
330 LOG.error( e.getMessage(), e );
331 }
332 }
333
334
335
336
337
338
339
340
341 public void entryMoved( MoveOperationContext moveContext )
342 {
343
344 Entry entry = moveContext.getModifiedEntry();
345
346 if ( isConfigEntry( entry ) || isNotValidForReplication( moveContext ) )
347 {
348 return;
349 }
350
351 try
352 {
353 if ( !moveContext.getNewSuperior().isDescendantOf( consumerMsgLog.getSearchCriteria().getBase() ) )
354 {
355 sendDeletedEntry( moveContext.getOriginalEntry() );
356 return;
357 }
358
359
360 consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
361
362 if ( pushInRealTime )
363 {
364 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
365 resultEntry.setObjectName( moveContext.getDn() );
366 resultEntry.setEntry( entry );
367
368 SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
369
370 sendResult( resultEntry, entry, EventType.MOVE, syncModify );
371 }
372 }
373 catch ( Exception e )
374 {
375 LOG.error( e.getMessage(), e );
376 }
377 }
378
379
380
381
382
383
384
385
386 public void entryMovedAndRenamed( MoveAndRenameOperationContext moveAndRenameContext )
387 {
388
389 Entry entry = moveAndRenameContext.getModifiedEntry();
390
391 if ( isConfigEntry( entry ) || isNotValidForReplication( moveAndRenameContext ) )
392 {
393 return;
394 }
395
396 try
397 {
398 if ( !moveAndRenameContext.getNewSuperiorDn().isDescendantOf( consumerMsgLog.getSearchCriteria().getBase() ) )
399 {
400 sendDeletedEntry( entry );
401 return;
402 }
403
404
405
406
407 consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
408
409 if ( pushInRealTime )
410 {
411 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
412 resultEntry.setObjectName( entry.getDn() );
413 resultEntry.setEntry( entry );
414
415 SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
416
417 sendResult( resultEntry, entry, EventType.MOVE_AND_RENAME, syncModify );
418 }
419 }
420 catch ( Exception e )
421 {
422 LOG.error( e.getMessage(), e );
423 }
424 }
425
426
427
428
429
430
431
432
433 public void entryRenamed( RenameOperationContext renameContext )
434 {
435
436 Entry entry = renameContext.getModifiedEntry();
437
438 if ( isConfigEntry( entry ) || isNotValidForReplication( renameContext ) )
439 {
440 return;
441 }
442
443 try
444 {
445
446
447 consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
448
449 if ( pushInRealTime )
450 {
451 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
452 resultEntry.setObjectName( entry.getDn() );
453 resultEntry.setEntry( entry );
454
455 SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
456
457
458 syncModify.setCookie( getCookie( entry ) );
459
460 sendResult( resultEntry, entry, EventType.RENAME, syncModify );
461 }
462 }
463 catch ( Exception e )
464 {
465 LOG.error( e.getMessage(), e );
466 }
467 }
468
469
470
471
472
473 public boolean isPushInRealTime()
474 {
475 return pushInRealTime;
476 }
477
478
479
480
481
482
483 public void setPushInRealTime( boolean pushInRealTime )
484 {
485 this.pushInRealTime = pushInRealTime;
486 }
487
488
489
490
491
492 private byte[] getCookie( Entry entry ) throws LdapInvalidAttributeValueException
493 {
494 String csn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString();
495
496 return LdapProtocolUtils.createCookie( consumerMsgLog.getId(), csn );
497 }
498
499
500
501
502
503 private void handleWriteFuture( WriteFuture future, Entry entry, EventType event )
504 {
505
506
507 future.awaitUninterruptibly( 10000L );
508
509 if ( !future.isWritten() )
510 {
511 LOG.error( "Failed to write to the consumer {} during the event {} on entry {}", new Object[] {
512 consumerMsgLog.getId(), event, entry.getDn() } );
513 LOG.error( "", future.getException() );
514
515
516
517 pushInRealTime = false;
518 }
519 else
520 {
521 try
522 {
523
524 consumerMsgLog.setLastSentCsn( entry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
525 }
526 catch ( Exception e )
527 {
528
529 LOG.error( "No entry CSN attribute found", e );
530 }
531 }
532 }
533
534
535
536
537
538
539
540
541 private boolean isConfigEntry( Entry entry )
542 {
543
544
545
546
547 String name = Strings.toLowerCaseAscii( entry.getDn().getName() );
548
549 if ( name.endsWith( replConsumerConfigDn )
550 || name.endsWith( schemaDn )
551 || name.endsWith( replConsumerDn ) )
552 {
553 return true;
554 }
555
556
557 return name.startsWith( "ads-transportid" ) && name.endsWith( ServerDNConstants.CONFIG_DN );
558 }
559
560
561 private boolean isNotValidForReplication( AbstractChangeOperationContext ctx )
562 {
563 if ( ctx.isGenerateNoReplEvt() )
564 {
565 return true;
566 }
567
568 return isMmrConfiguredToReceiver( ctx );
569 }
570
571
572
573
574
575
576
577
578
579 private boolean isMmrConfiguredToReceiver( AbstractChangeOperationContext ctx )
580 {
581 if ( ctx.isReplEvent() )
582 {
583 boolean skip = ( ctx.getRid() == consumerMsgLog.getId() );
584
585 if ( skip )
586 {
587 LOG.debug( "RID in operation context matches with the ID of replication event log {} for host {}", consumerMsgLog.getName(), consumerMsgLog.getHostName() );
588 }
589
590 return skip;
591 }
592
593 return false;
594 }
595
596
597
598
599
600 public String toString()
601 {
602 StringBuilder sb = new StringBuilder();
603
604 sb.append( "SyncReplSearchListener : \n" );
605 sb.append( '\'' ).append( searchRequest ).append( "', " );
606 sb.append( '\'' ).append( pushInRealTime ).append( "', \n" );
607 sb.append( consumerMsgLog );
608 sb.append( '\n' );
609
610 return sb.toString();
611 }
612 }