View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.directory.server.ldap.replication.provider;
21  
22  
23  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateTypeEnum;
24  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValue;
25  import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValueImpl;
26  import org.apache.directory.api.ldap.model.constants.SchemaConstants;
27  import org.apache.directory.api.ldap.model.entry.Entry;
28  import org.apache.directory.api.ldap.model.exception.LdapInvalidAttributeValueException;
29  import org.apache.directory.api.ldap.model.message.AbandonListener;
30  import org.apache.directory.api.ldap.model.message.AbandonableRequest;
31  import org.apache.directory.api.ldap.model.message.SearchRequest;
32  import org.apache.directory.api.ldap.model.message.SearchResultEntry;
33  import org.apache.directory.api.ldap.model.message.SearchResultEntryImpl;
34  import org.apache.directory.api.ldap.model.message.controls.ChangeType;
35  import org.apache.directory.api.util.Strings;
36  import org.apache.directory.server.constants.ServerDNConstants;
37  import org.apache.directory.server.core.api.DirectoryService;
38  import org.apache.directory.server.core.api.entry.ClonedServerEntry;
39  import org.apache.directory.server.core.api.event.DirectoryListener;
40  import org.apache.directory.server.core.api.event.EventType;
41  import org.apache.directory.server.core.api.interceptor.context.AbstractChangeOperationContext;
42  import org.apache.directory.server.core.api.interceptor.context.AddOperationContext;
43  import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
44  import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
45  import org.apache.directory.server.core.api.interceptor.context.MoveAndRenameOperationContext;
46  import org.apache.directory.server.core.api.interceptor.context.MoveOperationContext;
47  import org.apache.directory.server.core.api.interceptor.context.RenameOperationContext;
48  import org.apache.directory.server.i18n.I18n;
49  import org.apache.directory.server.ldap.LdapProtocolUtils;
50  import org.apache.directory.server.ldap.LdapSession;
51  import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
52  import org.apache.mina.core.future.WriteFuture;
53  import org.slf4j.Logger;
54  import org.slf4j.LoggerFactory;
55  
56  
57  /**
58   * A listener associated with the replication system. It does send the modifications to the 
59   * consumer, if it's connected, or store the data into a queue for a later transmission.
60   * 
61   * Note: we always log the entry irrespective of the client's connection status for guaranteed delivery
62   * 
63   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
64   */
65  public class SyncReplSearchListener implements DirectoryListener, AbandonListener
66  {
67      /** Logger for this class */
68      private static final Logger LOG = LoggerFactory.getLogger( SyncReplSearchListener.class );
69  
70      /** The ldap session */
71      private LdapSession session;
72      
73      /** The search request we are processing */
74      private SearchRequest searchRequest;
75  
76      /** A flag telling if we push the response to the consumer or if we store them in a queue */
77      private volatile boolean pushInRealTime;
78  
79      /** The consumer configuration */
80      private final ReplicaEventLog consumerMsgLog;
81      
82      private static String replConsumerConfigDn = Strings.toLowerCaseAscii( ServerDNConstants.REPL_CONSUMER_CONFIG_DN );
83      private static String schemaDn = Strings.toLowerCaseAscii( SchemaConstants.OU_SCHEMA );
84      private static String replConsumerDn = Strings.toLowerCaseAscii( ServerDNConstants.REPL_CONSUMER_DN_STR );
85      
86      /**
87       * Create a new instance of a consumer listener
88       * 
89       * @param session The LDAP session to use for this listener
90       * @param searchRequest The searchRequest to process
91       * @param consumerMsgLog The consumer configuration
92       * @param pushInRealTime Tells if we push the results to the consumer in real time
93       */
94      SyncReplSearchListener( LdapSession session, SearchRequest searchRequest, ReplicaEventLog consumerMsgLog,
95          boolean pushInRealTime )
96      {
97          this.pushInRealTime = pushInRealTime;
98          setSession( session );
99          setSearchRequest( searchRequest );
100         this.consumerMsgLog = consumerMsgLog;
101     }
102 
103 
104     /**
105      * Store the Ldap session to use
106      * @param session The Ldap Session to use
107      */
108     public void setSession( LdapSession session )
109     {
110         this.session = session;
111     }
112 
113 
114     /**
115      * Stores the SearchRequest, and associate a AbandonListener to it
116      * 
117      * @param searchRequest The SearchRequest instance to store
118      */
119     public void setSearchRequest( SearchRequest searchRequest )
120     {
121         this.searchRequest = searchRequest;
122         
123         if ( searchRequest != null )
124         {
125             searchRequest.addAbandonListener( this );
126         }
127     }
128 
129 
130     @Override
131     public boolean isSynchronous()
132     {
133             return true; // always synchronous
134             }
135 
136 
137     /**
138      * Abandon a SearchRequest
139      * 
140      * @param searchRequest The SearchRequest to abandon
141      */
142     public void requestAbandoned( AbandonableRequest searchRequest )
143     {
144         try
145         {
146             if ( session != null )
147             {
148                 // We first remove the Listener from the session's chain
149                 session.getCoreSession().getDirectoryService().getEventService().removeListener( this );
150             }
151 
152             /*
153              * From RFC 2251 Section 4.11:
154              * 
155              * In the event that a server receives an Abandon Request on a Search  
156              * operation in the midst of transmitting responses to the Search, that
157              * server MUST cease transmitting entry responses to the abandoned
158              * request immediately, and MUST NOT send the SearchResultDone. Of
159              * course, the server MUST ensure that only properly encoded LDAPMessage
160              * PDUs are transmitted. 
161              * 
162              * SO DON'T SEND BACK ANYTHING!!!!!
163              */
164         }
165         catch ( Exception e )
166         {
167             LOG.error( I18n.err( I18n.ERR_164 ), e );
168         }
169     }
170 
171     
172     /**
173      * Create the SyncStateValue control
174      */
175     private SyncStateValue createControl( DirectoryService directoryService, SyncStateTypeEnum operation, Entry entry ) 
176         throws LdapInvalidAttributeValueException
177     {
178         SyncStateValue syncStateValue = new SyncStateValueImpl();
179 
180         syncStateValue.setSyncStateType( operation );
181         String uuidStr = entry.get( SchemaConstants.ENTRY_UUID_AT ).getString();
182         syncStateValue.setEntryUUID( Strings.uuidToBytes( uuidStr ) );
183         syncStateValue.setCookie( getCookie( entry ) );
184         
185         return syncStateValue;
186     }
187     
188     
189     /**
190      * Send the result to the consumer. If the consumer has disconnected, we fail back to the queue.
191      */
192     private void sendResult( SearchResultEntry searchResultEntry, Entry entry, EventType eventType, 
193         SyncStateValue syncStateValue )
194     {
195         searchResultEntry.addControl( syncStateValue );
196 
197         LOG.debug( "sending event {} of entry {}", eventType, entry.getDn() );
198         WriteFuture future = session.getIoSession().write( searchResultEntry );
199 
200         // Now, send the entry to the consumer
201         handleWriteFuture( future, entry, eventType );
202     }
203     
204 
205     /**
206      * Process a ADD operation. The added entry is pushed to the consumer if it's connected,
207      * or stored in the consumer's queue if it's not.
208      * 
209      * @param addContext The Addition operation context
210      */
211     public void entryAdded( AddOperationContext addContext )
212     {
213         Entry entry = addContext.getEntry();
214         
215         if ( isConfigEntry( entry ) || isNotValidForReplication( addContext ) )
216         {
217             return;
218         }
219 
220         try
221         {
222             //System.out.println( "ADD Listener : log " + entry.getDn() );
223             // we log it first
224             consumerMsgLog.log( new ReplicaEventMessage( ChangeType.ADD, entry ) );
225 
226             // We send the added entry directly to the consumer if it's connected
227             if ( pushInRealTime )
228             {
229                 // Construct a new SearchResultEntry
230                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
231                 resultEntry.setObjectName( entry.getDn() );
232                 resultEntry.setEntry( entry );
233 
234                 // Create the control which will be added to the response.
235                 SyncStateValue syncAdd = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.ADD, entry );
236                 
237                 sendResult( resultEntry, entry, EventType.ADD, syncAdd );
238             }
239         }
240         catch ( LdapInvalidAttributeValueException e )
241         {
242             // shouldn't happen
243             LOG.error( e.getMessage(), e );
244         }
245     }
246 
247 
248     /**
249      * Process a Delete operation. A delete event is send to the consumer, or stored in its 
250      * queue if the consumer is not connected.
251      * 
252      * @param deleteContext The delete operation context
253      */
254     public void entryDeleted( DeleteOperationContext deleteContext )
255     {
256         Entry entry = deleteContext.getEntry();
257         
258         if ( isConfigEntry( entry ) || isNotValidForReplication( deleteContext ) )
259         {
260             return;
261         }
262         
263         sendDeletedEntry( ( ( ClonedServerEntry ) entry ).getClonedEntry() );
264     }
265     
266 
267     /**
268      * A helper method, as the delete opertaionis used by the ModDN operations.
269      */
270     private void sendDeletedEntry( Entry entry )
271     {
272         try
273         {
274             //System.out.println( "DELETE Listener : log " + entry.getDn() );
275             consumerMsgLog.log( new ReplicaEventMessage( ChangeType.DELETE, entry ) );
276             
277             if ( pushInRealTime )
278             {
279                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
280                 resultEntry.setObjectName( entry.getDn() );
281                 resultEntry.setEntry( entry );
282 
283                 SyncStateValue syncDelete = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.DELETE, entry );
284 
285                 sendResult( resultEntry, entry, EventType.DELETE, syncDelete );
286             }
287         }
288         catch ( LdapInvalidAttributeValueException e )
289         {
290             // shouldn't happen
291             LOG.error( e.getMessage(), e );
292         }
293     }
294 
295 
296     /**
297      * Process a Modify operation. A modify event is send to the consumer, or stored in its 
298      * queue if the consumer is not connected.
299      * 
300      * @param modifyContext The modify operation context
301      */
302     public void entryModified( ModifyOperationContext modifyContext )
303     {
304         Entry alteredEntry = modifyContext.getAlteredEntry();
305 
306         if ( isConfigEntry( alteredEntry ) || isNotValidForReplication( modifyContext ) )
307         {
308             return;
309         }
310 
311         try
312         {
313             //System.out.println( "MODIFY Listener : log " + alteredEntry.getDn() );
314             consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODIFY, alteredEntry ) );
315             
316             if ( pushInRealTime )
317             {
318 
319                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
320                 resultEntry.setObjectName( modifyContext.getDn() );
321                 resultEntry.setEntry( alteredEntry );
322 
323                 SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODIFY, alteredEntry );
324 
325                 sendResult( resultEntry, alteredEntry, EventType.MODIFY, syncModify );
326             }
327         }
328         catch ( Exception e )
329         {
330             LOG.error( e.getMessage(), e );
331         }
332     }
333 
334 
335     /**
336      * Process a Move operation. A MODDN event is send to the consumer, or stored in its 
337      * queue if the consumer is not connected.
338      * 
339      * @param moveContext The move operation context
340      */
341     public void entryMoved( MoveOperationContext moveContext )
342     {
343         // should always send the modified entry cause the consumer perform the modDn operation locally
344         Entry entry = moveContext.getModifiedEntry();
345 
346         if ( isConfigEntry( entry ) || isNotValidForReplication( moveContext ) )
347         {
348             return;
349         }
350 
351         try
352         {
353             if ( !moveContext.getNewSuperior().isDescendantOf( consumerMsgLog.getSearchCriteria().getBase() ) )
354             {
355                 sendDeletedEntry( moveContext.getOriginalEntry() );
356                 return;
357             }
358 
359             //System.out.println( "MOVE Listener : log " + moveContext.getDn() + " moved to " + moveContext.getNewSuperior() );
360             consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
361             
362             if ( pushInRealTime )
363             {
364                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
365                 resultEntry.setObjectName( moveContext.getDn() );
366                 resultEntry.setEntry( entry );
367 
368                 SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
369 
370                 sendResult( resultEntry, entry, EventType.MOVE, syncModify );
371             }
372         }
373         catch ( Exception e )
374         {
375             LOG.error( e.getMessage(), e );
376         }
377     }
378 
379 
380     /**
381      * Process a MoveAndRename operation. A MODDN event is send to the consumer, or stored in its 
382      * queue if the consumer is not connected.
383      * 
384      * @param moveAndRenameContext The move and rename operation context
385      */
386     public void entryMovedAndRenamed( MoveAndRenameOperationContext moveAndRenameContext )
387     {
388         // should always send the modified entry cause the consumer perform the modDn operation locally
389         Entry entry = moveAndRenameContext.getModifiedEntry();
390 
391         if ( isConfigEntry( entry ) || isNotValidForReplication( moveAndRenameContext ) )
392         {
393             return;
394         }
395 
396         try
397         {
398             if ( !moveAndRenameContext.getNewSuperiorDn().isDescendantOf( consumerMsgLog.getSearchCriteria().getBase() ) )
399             {
400                 sendDeletedEntry( entry );
401                 return;
402             }
403 
404 
405             //System.out.println( "MOVE AND RENAME Listener : log " + moveAndRenameContext.getDn() + 
406             //    " moved to " + moveAndRenameContext.getNewSuperiorDn() + " renamed to " + moveAndRenameContext.getNewRdn() );
407             consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
408             
409             if ( pushInRealTime )
410             {
411                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
412                 resultEntry.setObjectName( entry.getDn() );
413                 resultEntry.setEntry( entry );
414 
415                 SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
416 
417                 sendResult( resultEntry, entry, EventType.MOVE_AND_RENAME, syncModify );
418             }
419         }
420         catch ( Exception e )
421         {
422             LOG.error( e.getMessage(), e );
423         }
424     }
425 
426 
427     /**
428      * Process a Rename operation. A MODDN event is send to the consumer, or stored in its 
429      * queue if the consumer is not connected.
430      * 
431      * @param renameContext The rename operation context
432      */
433     public void entryRenamed( RenameOperationContext renameContext )
434     {
435         // should always send the modified entry cause the consumer perform the modDn operation locally
436         Entry entry = renameContext.getModifiedEntry();
437 
438         if ( isConfigEntry( entry ) || isNotValidForReplication( renameContext ) )
439         {
440             return;
441         }
442 
443         try
444         {
445             // should always send the original entry cause the consumer perform the modDn operation there
446             //System.out.println( "RENAME Listener : log " + renameContext.getDn() + " renamed to " + renameContext.getNewRdn() );
447             consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
448             
449             if ( pushInRealTime )
450             {
451                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
452                 resultEntry.setObjectName( entry.getDn() );
453                 resultEntry.setEntry( entry );
454 
455                 SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
456                 
457                 // In this case, the cookie is different
458                 syncModify.setCookie( getCookie( entry ) );
459 
460                 sendResult( resultEntry, entry, EventType.RENAME, syncModify );
461             }
462         }
463         catch ( Exception e )
464         {
465             LOG.error( e.getMessage(), e );
466         }
467     }
468 
469 
470     /**
471      * @return true if the entries are sent to the consumer in real time
472      */
473     public boolean isPushInRealTime()
474     {
475         return pushInRealTime;
476     }
477 
478 
479     /**
480      * Set the pushInRealTime parameter
481      * @param pushInRealTime true if the entries must be push to the consumer directly
482      */
483     public void setPushInRealTime( boolean pushInRealTime )
484     {
485         this.pushInRealTime = pushInRealTime;
486     }
487 
488 
489     /**
490      * Get the cookie from the entry
491      */
492     private byte[] getCookie( Entry entry ) throws LdapInvalidAttributeValueException
493     {
494         String csn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString();
495 
496         return LdapProtocolUtils.createCookie( consumerMsgLog.getId(), csn );
497     }
498 
499 
500     /**
501      * Process the writing of the replicated entry to the consumer
502      */
503     private void handleWriteFuture( WriteFuture future, Entry entry, EventType event )
504     {
505         // Let the operation be executed.
506         // Note : we wait 10 seconds max
507         future.awaitUninterruptibly( 10000L );
508         
509         if ( !future.isWritten() )
510         {
511             LOG.error( "Failed to write to the consumer {} during the event {} on entry {}", new Object[] { 
512                            consumerMsgLog.getId(), event, entry.getDn() } );
513             LOG.error( "", future.getException() );
514 
515             // set realtime push to false, will be set back to true when the client
516             // comes back and sends another request this flag will be set to true
517             pushInRealTime = false;
518         }
519         else
520         {
521             try
522             {
523                 // if successful update the last sent CSN
524                 consumerMsgLog.setLastSentCsn( entry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
525             }
526             catch ( Exception e )
527             {
528                 //should never happen
529                 LOG.error( "No entry CSN attribute found", e );
530             }
531         }
532     }
533     
534     
535     /**
536      * checks if the given entry belongs to the ou=config or ou=schema partition
537      * We don't replicate those two partitions
538      * @param entry the entry
539      * @return true if the entry belongs to ou=config partition, false otherwise
540      */
541     private boolean isConfigEntry( Entry entry )
542     {
543         // we can do Dn.isDescendantOf but in this part of the
544         // server the DNs are all normalized and a simple string compare should
545         // do the trick
546         
547         String name = Strings.toLowerCaseAscii( entry.getDn().getName() );
548         
549         if ( name.endsWith( replConsumerConfigDn )
550             || name.endsWith( schemaDn )
551             || name.endsWith( replConsumerDn ) )
552         {
553             return true;
554         }
555         
556         // do not replicate the changes made to transport config entries
557         return name.startsWith( "ads-transportid" ) && name.endsWith( ServerDNConstants.CONFIG_DN );
558     }
559     
560     
561     private boolean isNotValidForReplication( AbstractChangeOperationContext ctx )
562     {
563         if ( ctx.isGenerateNoReplEvt() )
564         {
565             return true;
566         }
567         
568         return isMmrConfiguredToReceiver( ctx );
569     }
570     
571 
572     /**
573      * checks if the sender of this replication event is setup with MMR
574      * (Note: this method is used to prevent sending a replicated event back to the sender after 
575      *  performing local update)
576      * @param ctx the operation's context
577      * @return true if the rid present in operation context is same as the event log's ID, false otherwise
578      */
579     private boolean isMmrConfiguredToReceiver( AbstractChangeOperationContext ctx )
580     {
581         if ( ctx.isReplEvent() )
582         {
583             boolean skip = ( ctx.getRid() == consumerMsgLog.getId() );
584             
585             if ( skip )
586             {
587                 LOG.debug( "RID in operation context matches with the ID of replication event log {} for host {}", consumerMsgLog.getName(), consumerMsgLog.getHostName() );
588             }
589             
590             return skip;
591         }
592         
593         return false;
594     }
595     
596     
597     /**
598      * {@inheritDoc}
599      */
600     public String toString()
601     {
602         StringBuilder sb = new StringBuilder();
603         
604         sb.append( "SyncReplSearchListener : \n" );
605         sb.append( '\'' ).append( searchRequest ).append( "', " );
606         sb.append( '\'' ).append( pushInRealTime ).append( "', \n" );
607         sb.append( consumerMsgLog );
608         sb.append( '\n' );
609         
610         return sb.toString();
611     }
612 }