View Javadoc
1   /*
2    *   Licensed to the Apache Software Foundation (ASF) under one
3    *   or more contributor license agreements.  See the NOTICE file
4    *   distributed with this work for additional information
5    *   regarding copyright ownership.  The ASF licenses this file
6    *   to you under the Apache License, Version 2.0 (the
7    *   "License"); you may not use this file except in compliance
8    *   with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing,
13   *   software distributed under the License is distributed on an
14   *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *   KIND, either express or implied.  See the License for the
16   *   specific language governing permissions and limitations
17   *   under the License.
18   *
19   */
20  
21  package org.apache.directory.server.ldap.replication.provider;
22  
23  
24  import java.io.File;
25  import java.io.IOException;
26  
27  import jdbm.RecordManager;
28  import jdbm.recman.BaseRecordManager;
29  import jdbm.recman.TransactionManager;
30  
31  import org.apache.directory.api.ldap.model.constants.Loggers;
32  import org.apache.directory.api.ldap.model.constants.SchemaConstants;
33  import org.apache.directory.api.ldap.model.exception.LdapException;
34  import org.apache.directory.api.ldap.model.name.Dn;
35  import org.apache.directory.api.ldap.model.schema.SchemaManager;
36  import org.apache.directory.api.ldap.model.schema.comparators.SerializableComparator;
37  import org.apache.directory.server.core.api.DirectoryService;
38  import org.apache.directory.server.core.api.event.EventType;
39  import org.apache.directory.server.core.api.event.NotificationCriteria;
40  import org.apache.directory.server.core.api.partition.PartitionTxn;
41  import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable;
42  import org.apache.directory.server.core.partition.impl.btree.jdbm.StringSerializer;
43  import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
44  import org.apache.directory.server.ldap.replication.ReplicaEventMessageSerializer;
45  import org.slf4j.Logger;
46  import org.slf4j.LoggerFactory;
47  
48  
49  /**
50   * A structure storing the configuration on each consumer registered on a producer. It stores 
51   * the following informations :
52   * <ul>
53   * <li>replicaId : the internal ID associated with the consumer on the provider</li>
54   * <li>hostname : the consumer's host</li>
55   * <li>searchFilter : the filter</li>
56   * <li>lastSentCsn : the last CSN sent by the consumer</li>
57   * <li>refreshNPersist : a flag indicating that the consumer is processing in Refresh and persist mode</li>
58   * <li></li>
59   * </ul>
60   * A separate log is maintained for each syncrepl consumer.<br>
61   * We also associate a Queue with each structure, which will store the messages to send to the consumer.
62   *
63   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
64   */
65  public class ReplicaEventLog implements Comparable<ReplicaEventLog>
66  {
67      /** The logger */
68      private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventLog.class );
69  
70      /** A logger for the replication provider */
71      private static final Logger PROVIDER_LOG = LoggerFactory.getLogger( Loggers.PROVIDER_LOG.getName() );
72  
73      /** hostname of the syncrepl consumer */
74      private String hostName;
75  
76      /** the unmodified search filter as it was when received from the client */
77      private String searchFilter;
78  
79      /** the csn that was sent to the client during the last sync session*/
80      private String lastSentCsn;
81  
82      /** the persistent listener */
83      private SyncReplSearchListener persistentListener;
84  
85      /** notification criteria used by the persistent search */
86      private NotificationCriteria searchCriteria;
87  
88      /** the replica id */
89      private int replicaId;
90  
91      /** flag indicating refreshAndPersist mode */
92      private boolean refreshNPersist;
93  
94      /** the duration(in seconds) of consumer inactivity after which this log will be deleted. Defaults to 172800 seconds (i.e. 2 days) */
95      private long maxIdlePeriod = DEFAULT_MAX_IDLE_PERIOD;
96  
97      /** the minimum number of entries to be present for beginning purging entries older than the last sent CSN. Default is 10000 */
98      private int purgeThresholdCount = DEFAULT_PURGE_THRESHOLD_COUNT;
99  
100     // fields that won't be serialized
101     /** The Journal of modifications */
102     private JdbmTable<String, ReplicaEventMessage> journal;
103 
104     /** the underlying file  */
105     private File journalFile;
106 
107     /** The record manager*/
108     private RecordManager recman;
109 
110     /** A flag used to indicate that the consumer is not up to date */
111     private volatile boolean dirty;
112 
113     /** the DN of the entry where this event log details are stored */
114     private Dn consumerEntryDn;
115 
116     public static final String REPLICA_EVENT_LOG_NAME_PREFIX = "REPL_EVENT_LOG.";
117 
118     public static final int DEFAULT_PURGE_THRESHOLD_COUNT = 10000;
119 
120     /** The max delay for an idle replication log with no activity, by default the logs have no idle time period */
121     public static final int DEFAULT_MAX_IDLE_PERIOD = -1;
122     
123     /** The partition transaction */
124     private PartitionTxn partitionTxn;
125 
126 
127     /**
128      * Creates a new instance of EventLog for a replica
129      * 
130      * @param partitionTxn The Transaction to use
131      * @param directoryService The DirectoryService instance
132      * @param replicaId The replica ID
133      * @throws IOException if we weren't able to log the event
134      */
135     public ReplicaEventLog( PartitionTxn partitionTxn, DirectoryService directoryService, int replicaId ) throws IOException
136     {
137         PROVIDER_LOG.debug( "Creating the replication queue for replica {}", replicaId );
138         SchemaManager schemaManager = directoryService.getSchemaManager();
139         this.replicaId = replicaId;
140         this.searchCriteria = new NotificationCriteria( schemaManager );
141         this.searchCriteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
142 
143         // Create the journal file, or open if already exists
144         File replDir = directoryService.getInstanceLayout().getReplDirectory();
145         journalFile = new File( replDir, REPLICA_EVENT_LOG_NAME_PREFIX + replicaId );
146         recman = new BaseRecordManager( journalFile.getAbsolutePath() );
147         TransactionManager transactionManager = ( ( BaseRecordManager ) recman ).getTransactionManager();
148         transactionManager.setMaximumTransactionsInLog( 200 );
149 
150         SerializableComparator<String> comparator = new SerializableComparator<>(
151             SchemaConstants.CSN_ORDERING_MATCH_MR_OID );
152         comparator.setSchemaManager( schemaManager );
153 
154         journal = new JdbmTable<>( schemaManager, journalFile.getName(), recman, comparator,
155             StringSerializer.INSTANCE, new ReplicaEventMessageSerializer( schemaManager ) );
156         
157         this.partitionTxn = partitionTxn;
158     }
159 
160 
161     /**
162      * Stores the given message in the queue 
163      *
164      * @param message The message to store
165      */
166     public synchronized void log( ReplicaEventMessage message )
167     {
168         try
169         {
170             LOG.debug( "logging entry with Dn {} with the event {}", message.getEntry().getDn(),
171                 message.getChangeType() );
172             PROVIDER_LOG.debug( "logging entry with Dn {} with the event {}", message.getEntry().getDn(),
173                 message.getChangeType() );
174 
175             String entryCsn = message.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString();
176             journal.put( partitionTxn, entryCsn, message );
177         }
178         catch ( Exception e )
179         {
180             LOG.warn( "Failed to insert the entry into syncrepl log", e );
181             PROVIDER_LOG.error( "Failed to insert the entry into syncrepl log", e );
182         }
183     }
184 
185 
186     /**
187      * Deletes the queue (to remove the log) and recreates a new queue instance
188      * with the same queue name. Also creates the corresponding message producer
189      *
190      * @throws Exception If the queue can't be deleted
191      */
192     public void truncate() throws Exception
193     {
194     }
195 
196 
197     /**
198      * Re-create the queue
199      * @throws Exception If the creation has failed
200      */
201     public void recreate() throws Exception
202     {
203         LOG.debug( "recreating the queue for the replica id {}", replicaId );
204     }
205 
206 
207     /**
208      * Stop the EventLog
209      * 
210      * @throws Exception If the stop failed
211      */
212     public void stop() throws Exception
213     {
214         PROVIDER_LOG.debug( "Stopping the EventLog for replicaId {}", replicaId );
215 
216         // Close the producer and session, DO NOT close connection 
217         if ( journal != null )
218         {
219             journal.close( partitionTxn );
220         }
221 
222         journal = null;
223 
224         if ( recman != null )
225         {
226             recman.close();
227         }
228 
229         recman = null;
230     }
231 
232 
233     /**
234      * {@inheritDoc}
235      */
236     @Override
237     public boolean equals( Object obj )
238     {
239         if ( !( obj instanceof ReplicaEventLog ) )
240         {
241             return false;
242         }
243 
244         ReplicaEventLog../../../../org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.html#ReplicaEventLog">ReplicaEventLog other = ( ReplicaEventLog ) obj;
245 
246         return replicaId == other.getId();
247     }
248 
249 
250     /**
251      * {@inheritDoc}
252      */
253     @Override
254     public int hashCode()
255     {
256         int result = 17;
257         result = 31 * result + searchFilter.hashCode();
258         result = 31 * result + hostName.hashCode();
259 
260         return result;
261     }
262 
263 
264     /**
265      * {@inheritDoc}
266      */
267     public int compareTo( ReplicaEventLog o )
268     {
269         if ( this.equals( o ) )
270         {
271             return 0;
272         }
273 
274         return 1;
275     }
276 
277 
278     /**
279      * @return The listener
280      */
281     public SyncReplSearchListener getPersistentListener()
282     {
283         return persistentListener;
284     }
285 
286 
287     /**
288      * Set the listener
289      * @param persistentListener The listener
290      */
291     public void setPersistentListener( SyncReplSearchListener persistentListener )
292     {
293         this.persistentListener = persistentListener;
294     }
295 
296 
297     /**
298      * @return The search criteria
299      */
300     public NotificationCriteria getSearchCriteria()
301     {
302         return searchCriteria;
303     }
304 
305 
306     /**
307      * Stores the search criteria
308      * @param searchCriteria The search criteria
309      */
310     public void setSearchCriteria( NotificationCriteria searchCriteria )
311     {
312         this.searchCriteria = searchCriteria;
313     }
314 
315 
316     /**
317      * @return true if the consumer is in Refresh And Persist mode
318      */
319     public boolean isRefreshNPersist()
320     {
321         return refreshNPersist;
322     }
323 
324 
325     /**
326      * @param refreshNPersist if true, set the EventLog in Refresh and Persist mode
327      */
328     public void setRefreshNPersist( boolean refreshNPersist )
329     {
330         this.refreshNPersist = refreshNPersist;
331     }
332 
333 
334     /**
335      * @return The replica ID
336      */
337     public int getId()
338     {
339         return replicaId;
340     }
341 
342 
343     /**
344      * @return The last CSN sent by the consumer
345      */
346     public String getLastSentCsn()
347     {
348         return lastSentCsn;
349     }
350 
351 
352     /**
353      * Update the last Sent CSN. If it's different from the present one, we
354      * will set the dirty flag to true, and it will be stored in DIT.
355      *  
356      * @param lastSentCsn The new Sent CSN
357      */
358     public void setLastSentCsn( String lastSentCsn )
359     {
360         // set only if there is a change in cookie value
361         // this will avoid setting the dirty flag which eventually is used for
362         // storing the details of this log
363         if ( !lastSentCsn.equals( this.lastSentCsn ) )
364         {
365             this.lastSentCsn = lastSentCsn;
366             dirty = true;
367         }
368     }
369 
370 
371     /**
372      * @return The consumer Hostname
373      */
374     public String getHostName()
375     {
376         return hostName;
377     }
378 
379 
380     /**
381      * Set the consumer hostname
382      * @param hostName The consumer hostname
383      */
384     public void setHostName( String hostName )
385     {
386         this.hostName = hostName;
387     }
388 
389 
390     /**
391      * @return The searchFilter
392      */
393     public String getSearchFilter()
394     {
395         return searchFilter;
396     }
397 
398 
399     /**
400      * Set the searchFilter
401      * @param searchFilter The searchFilter
402      */
403     public void setSearchFilter( String searchFilter )
404     {
405         this.searchFilter = searchFilter;
406     }
407 
408 
409     /**
410      * @return True if the consumer is not up to date
411      */
412     public boolean isDirty()
413     {
414         return dirty;
415     }
416 
417 
418     /**
419      * Set the dirty flag
420      * @param dirty The current consumer status
421      */
422     public void setDirty( boolean dirty )
423     {
424         this.dirty = dirty;
425     }
426 
427 
428     /**
429      * @return The queue name
430      */
431     public String getQueueName()
432     {
433         return "replicaId=" + replicaId;
434     }
435 
436 
437     /**
438      * @param consumerCsn the consumer's CSN extracted from cookie
439      * @return A cursor on top of the queue
440      * @throws Exception If the cursor can't be created
441      */
442     public ReplicaJournalCursor getCursor( String consumerCsn ) throws Exception
443     {
444         return new ReplicaJournalCursor( partitionTxn, journal, consumerCsn );
445     }
446 
447 
448     /**
449      * @return the name of this replica log
450      */
451     public String getName()
452     {
453         return journal.getName();
454     }
455 
456 
457     /**
458      * @return the number of entries present in the replica log
459      */
460     public synchronized long count()
461     {
462         try
463         {
464             return journal.count( partitionTxn );
465         }
466         catch ( LdapException e )
467         {
468             throw new RuntimeException( e );
469         }
470     }
471 
472 
473     public long getMaxIdlePeriod()
474     {
475         return maxIdlePeriod;
476     }
477 
478 
479     public void setMaxIdlePeriod( long maxIdlePeriod )
480     {
481         if ( maxIdlePeriod <= 0 )
482         {
483             maxIdlePeriod = DEFAULT_MAX_IDLE_PERIOD;
484         }
485 
486         this.maxIdlePeriod = maxIdlePeriod;
487     }
488 
489 
490     public int getPurgeThresholdCount()
491     {
492         return purgeThresholdCount;
493     }
494 
495 
496     public void setPurgeThresholdCount( int purgeThresholdCount )
497     {
498         if ( purgeThresholdCount <= 0 )
499         {
500             purgeThresholdCount = DEFAULT_PURGE_THRESHOLD_COUNT;
501         }
502 
503         this.purgeThresholdCount = purgeThresholdCount;
504     }
505 
506 
507     public Dn getConsumerEntryDn()
508     {
509         return consumerEntryDn;
510     }
511 
512 
513     public void setConsumerEntryDn( Dn consumerEntryDn )
514     {
515         this.consumerEntryDn = consumerEntryDn;
516     }
517 
518 
519     @Override
520     public String toString()
521     {
522         return "ReplicaEventLog [hostName=" + hostName + ", searchFilter=" + searchFilter + ", lastSentCsn="
523             + lastSentCsn + ", searchCriteria=" + searchCriteria + ", replicaId=" + replicaId
524             + ", refreshNPersist=" + refreshNPersist + ", maxInactivePeriod=" + maxIdlePeriod
525             + ", purgeThresholdCount=" + purgeThresholdCount + ", journalFile=" + journalFile
526             + ", dirty=" + dirty + ", consumerEntryDn=" + consumerEntryDn + "]";
527     }
528 }