1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.server.ldap.replication.provider;
21
22
23 import java.io.File;
24 import java.util.Map;
25
26 import org.apache.directory.api.ldap.model.constants.SchemaConstants;
27 import org.apache.directory.api.ldap.model.csn.Csn;
28 import org.apache.directory.api.ldap.model.exception.LdapException;
29 import org.apache.directory.server.core.api.DirectoryService;
30 import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34
35
36
37
38
39
40 public class ReplicaEventLogJanitor extends Thread
41 {
42 private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventLogJanitor.class );
43
44 private DirectoryService directoryService;
45
46 private Map<Integer, ReplicaEventLog> replicaLogMap;
47
48 private volatile boolean stop = false;
49
50
51 final Object lock = new Object();
52
53
54 private long sleepTime = 5 * 60 * 1000L;
55
56 private long thresholdTime = 2 * 60 * 60 * 1000L;
57
58
59 public ReplicaEventLogJanitor( final DirectoryService directoryService,
60 final Map<Integer, ReplicaEventLog> replicaLogMap )
61 {
62
63
64 this.directoryService = directoryService;
65 this.replicaLogMap = replicaLogMap;
66 setDaemon( true );
67 }
68
69
70 @Override
71 public void run()
72 {
73 while ( !stop )
74 {
75 for ( ReplicaEventLog log : replicaLogMap.values() )
76 {
77 synchronized ( log )
78 {
79 try
80 {
81 String lastSentCsn = log.getLastSentCsn();
82
83 if ( lastSentCsn == null )
84 {
85 LOG.debug( "last sent CSN is null for the replica {}, skipping cleanup", log.getName() );
86 return;
87 }
88
89 long now = directoryService.getTimeProvider().currentIimeMillis();
90
91 long maxIdleTime = log.getMaxIdlePeriod() * 1000L;
92
93 long lastUpdatedTime = new Csn( lastSentCsn ).getTimestamp();
94
95 LOG.debug( "checking log idle time now={} lastUpdatedTime={} maxIdleTime={}", now,
96 lastUpdatedTime, maxIdleTime );
97
98
99 if ( ( maxIdleTime > 0 ) && ( now - lastUpdatedTime ) >= maxIdleTime )
100 {
101
102 removeEventLog( log );
103
104
105
106
107
108
109 try
110 {
111 directoryService.getAdminSession().delete( log.getConsumerEntryDn() );
112 }
113 catch ( LdapException e )
114 {
115 LOG.warn( "Failed to delete the entry {} of replica event log {}",
116 log.getConsumerEntryDn(), log.getName(), e );
117 }
118
119 continue;
120 }
121
122 long thresholdCount = log.getPurgeThresholdCount();
123
124 if ( log.count() < thresholdCount )
125 {
126 continue;
127 }
128
129 LOG.debug( "starting to purge the log entries that are older than {} milliseconds",
130 thresholdTime );
131
132 long deleteCount = 0;
133
134 ReplicaJournalCursor cursor = log.getCursor( null );
135 cursor.skipQualifyingWhileFetching();
136
137 while ( cursor.next() )
138 {
139 ReplicaEventMessage message = cursor.get();
140 String csnVal = message.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString();
141
142
143 if ( csnVal.compareTo( lastSentCsn ) >= 0 )
144 {
145 break;
146 }
147
148 Csn csn = new Csn( csnVal );
149
150 if ( ( now - csn.getTimestamp() ) >= thresholdTime )
151 {
152 cursor.delete();
153 deleteCount++;
154 }
155 }
156
157 cursor.close();
158
159 LOG.debug( "purged {} messages from the log {}", deleteCount, log.getName() );
160 }
161 catch ( Exception e )
162 {
163 LOG.warn( "Failed to purge old entries from the log {}", log.getName(), e );
164 }
165 }
166 }
167
168 try
169 {
170 synchronized ( lock )
171 {
172 lock.wait( sleepTime );
173 }
174 }
175 catch ( InterruptedException e )
176 {
177 LOG.warn( "ReplicaEventLogJanitor thread was interrupted, processing logs for cleanup", e );
178 }
179 }
180 }
181
182
183 public synchronized void removeEventLog( ReplicaEventLog replicaEventLog )
184 {
185 directoryService.getEventService().removeListener( replicaEventLog.getPersistentListener() );
186 String name = replicaEventLog.getName();
187 LOG.debug( "removed the persistent listener for replication event log {}", name );
188
189 replicaLogMap.remove( replicaEventLog.getId() );
190
191 try
192 {
193 replicaEventLog.stop();
194
195 new File( directoryService.getInstanceLayout().getReplDirectory(), name + ".db" ).delete();
196 new File( directoryService.getInstanceLayout().getReplDirectory(), name + ".lg" ).delete();
197 LOG.info( "successfully removed replication event log {}", name );
198 }
199 catch ( Exception e )
200 {
201 LOG.warn(
202 "Closing the replication event log of the entry {} was not successful, will be removed anyway",
203 name, e );
204 }
205 }
206
207
208 public void setSleepTime( long sleepTime )
209 {
210 this.sleepTime = sleepTime;
211 }
212
213
214 public long getSleepTime()
215 {
216 return sleepTime;
217 }
218
219
220 public void stopCleaning()
221 {
222 stop = true;
223
224 synchronized ( lock )
225 {
226 lock.notify();
227 }
228 }
229 }