View Javadoc
1   /*
2    *   Licensed to the Apache Software Foundation (ASF) under one
3    *   or more contributor license agreements.  See the NOTICE file
4    *   distributed with this work for additional information
5    *   regarding copyright ownership.  The ASF licenses this file
6    *   to you under the Apache License, Version 2.0 (the
7    *   "License"); you may not use this file except in compliance
8    *   with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing,
13   *   software distributed under the License is distributed on an
14   *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *   KIND, either express or implied.  See the License for the
16   *   specific language governing permissions and limitations
17   *   under the License.
18   *
19   */
20  package org.apache.directory.server.ldap.replication.provider;
21  
22  
23  import java.io.File;
24  import java.util.Map;
25  
26  import org.apache.directory.api.ldap.model.constants.SchemaConstants;
27  import org.apache.directory.api.ldap.model.csn.Csn;
28  import org.apache.directory.api.ldap.model.exception.LdapException;
29  import org.apache.directory.server.core.api.DirectoryService;
30  import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  
35  /**
36   * Deletes old entries from the replication event logs that are configured in refreshNPersist mode.
37   * 
38   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
39   */
40  public class ReplicaEventLogJanitor extends Thread
41  {
42      private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventLogJanitor.class );
43  
44      private DirectoryService directoryService;
45  
46      private Map<Integer, ReplicaEventLog> replicaLogMap;
47  
48      private volatile boolean stop = false;
49  
50      /** A lock used to wait */
51      final Object lock = new Object();
52  
53      /** time the janitor thread sleeps before successive cleanup attempts. Default value is 5 minutes */
54      private long sleepTime = 5 * 60 * 1000L;
55  
56      private long thresholdTime = 2 * 60 * 60 * 1000L;
57  
58  
59      public ReplicaEventLogJanitor( final DirectoryService directoryService,
60          final Map<Integer, ReplicaEventLog> replicaLogMap )
61      {
62          // if log is in refreshNpersist mode, has more entries than the log's threshold count then 
63          // all the entries before the last sent CSN and older than 2 hours will be purged
64          this.directoryService = directoryService;
65          this.replicaLogMap = replicaLogMap;
66          setDaemon( true );
67      }
68  
69  
70      @Override
71      public void run()
72      {
73          while ( !stop )
74          {
75              for ( ReplicaEventLog log : replicaLogMap.values() )
76              {
77                  synchronized ( log ) // lock the log and clean
78                  {
79                      try
80                      {
81                          String lastSentCsn = log.getLastSentCsn();
82  
83                          if ( lastSentCsn == null )
84                          {
85                              LOG.debug( "last sent CSN is null for the replica {}, skipping cleanup", log.getName() );
86                              return;
87                          }
88  
89                          long now = directoryService.getTimeProvider().currentIimeMillis();
90  
91                          long maxIdleTime = log.getMaxIdlePeriod() * 1000L;
92  
93                          long lastUpdatedTime = new Csn( lastSentCsn ).getTimestamp();
94  
95                          LOG.debug( "checking log idle time now={} lastUpdatedTime={} maxIdleTime={}", now,
96                              lastUpdatedTime, maxIdleTime );
97  
98                          // DO NOT delete those with maxIdleTime <= 0
99                          if ( ( maxIdleTime > 0 ) && ( now - lastUpdatedTime ) >= maxIdleTime )
100                         {
101                             //max idle time of the event log reached, delete it
102                             removeEventLog( log );
103 
104                             // delete the associated entry from DiT, note that ConsumerLogEntryDeleteListener 
105                             // will get called eventually but removeEventLog() will not be called cause by 
106                             // that time this log will not be present in replicaLogMap
107                             // The reason we don't call this method first is to guard against any rename
108                             // operation performed on the log's entry in DiT
109                             try
110                             {
111                                 directoryService.getAdminSession().delete( log.getConsumerEntryDn() );
112                             }
113                             catch ( LdapException e )
114                             {
115                                 LOG.warn( "Failed to delete the entry {} of replica event log {}",
116                                     log.getConsumerEntryDn(), log.getName(), e );
117                             }
118 
119                             continue;
120                         }
121 
122                         long thresholdCount = log.getPurgeThresholdCount();
123 
124                         if ( log.count() < thresholdCount )
125                         {
126                             continue;
127                         }
128 
129                         LOG.debug( "starting to purge the log entries that are older than {} milliseconds",
130                             thresholdTime );
131 
132                         long deleteCount = 0;
133 
134                         ReplicaJournalCursor cursor = log.getCursor( null ); // pass no CSN
135                         cursor.skipQualifyingWhileFetching();
136 
137                         while ( cursor.next() )
138                         {
139                             ReplicaEventMessage message = cursor.get();
140                             String csnVal = message.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString();
141 
142                             // skip if we reach the lastSentCsn or got past it
143                             if ( csnVal.compareTo( lastSentCsn ) >= 0 )
144                             {
145                                 break;
146                             }
147 
148                             Csn csn = new Csn( csnVal );
149 
150                             if ( ( now - csn.getTimestamp() ) >= thresholdTime )
151                             {
152                                 cursor.delete();
153                                 deleteCount++;
154                             }
155                         }
156 
157                         cursor.close();
158 
159                         LOG.debug( "purged {} messages from the log {}", deleteCount, log.getName() );
160                     }
161                     catch ( Exception e )
162                     {
163                         LOG.warn( "Failed to purge old entries from the log {}", log.getName(), e );
164                     }
165                 }
166             }
167 
168             try
169             {
170                 synchronized ( lock )
171                 {
172                     lock.wait( sleepTime );
173                 }
174             }
175             catch ( InterruptedException e )
176             {
177                 LOG.warn( "ReplicaEventLogJanitor thread was interrupted, processing logs for cleanup", e );
178             }
179         }
180     }
181 
182 
183     public synchronized void removeEventLog( ReplicaEventLog replicaEventLog )
184     {
185         directoryService.getEventService().removeListener( replicaEventLog.getPersistentListener() );
186         String name = replicaEventLog.getName();
187         LOG.debug( "removed the persistent listener for replication event log {}", name );
188 
189         replicaLogMap.remove( replicaEventLog.getId() );
190 
191         try
192         {
193             replicaEventLog.stop();
194 
195             new File( directoryService.getInstanceLayout().getReplDirectory(), name + ".db" ).delete();
196             new File( directoryService.getInstanceLayout().getReplDirectory(), name + ".lg" ).delete();
197             LOG.info( "successfully removed replication event log {}", name );
198         }
199         catch ( Exception e )
200         {
201             LOG.warn(
202                 "Closing the replication event log of the entry {} was not successful, will be removed anyway",
203                 name, e );
204         }
205     }
206 
207 
208     public void setSleepTime( long sleepTime )
209     {
210         this.sleepTime = sleepTime;
211     }
212 
213 
214     public long getSleepTime()
215     {
216         return sleepTime;
217     }
218 
219 
220     public void stopCleaning()
221     {
222         stop = true;
223 
224         synchronized ( lock )
225         {
226             lock.notify();
227         }
228     }
229 }