1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.directory.server.ldap.replication.provider;
22
23
24 import java.io.File;
25 import java.io.IOException;
26
27 import jdbm.RecordManager;
28 import jdbm.recman.BaseRecordManager;
29 import jdbm.recman.TransactionManager;
30
31 import org.apache.directory.api.ldap.model.constants.Loggers;
32 import org.apache.directory.api.ldap.model.constants.SchemaConstants;
33 import org.apache.directory.api.ldap.model.exception.LdapException;
34 import org.apache.directory.api.ldap.model.name.Dn;
35 import org.apache.directory.api.ldap.model.schema.SchemaManager;
36 import org.apache.directory.api.ldap.model.schema.comparators.SerializableComparator;
37 import org.apache.directory.server.core.api.DirectoryService;
38 import org.apache.directory.server.core.api.event.EventType;
39 import org.apache.directory.server.core.api.event.NotificationCriteria;
40 import org.apache.directory.server.core.api.partition.PartitionTxn;
41 import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable;
42 import org.apache.directory.server.core.partition.impl.btree.jdbm.StringSerializer;
43 import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
44 import org.apache.directory.server.ldap.replication.ReplicaEventMessageSerializer;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public class ReplicaEventLog implements Comparable<ReplicaEventLog>
66 {
67
68 private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventLog.class );
69
70
71 private static final Logger PROVIDER_LOG = LoggerFactory.getLogger( Loggers.PROVIDER_LOG.getName() );
72
73
74 private String hostName;
75
76
77 private String searchFilter;
78
79
80 private String lastSentCsn;
81
82
83 private SyncReplSearchListener persistentListener;
84
85
86 private NotificationCriteria searchCriteria;
87
88
89 private int replicaId;
90
91
92 private boolean refreshNPersist;
93
94
95 private long maxIdlePeriod = DEFAULT_MAX_IDLE_PERIOD;
96
97
98 private int purgeThresholdCount = DEFAULT_PURGE_THRESHOLD_COUNT;
99
100
101
102 private JdbmTable<String, ReplicaEventMessage> journal;
103
104
105 private File journalFile;
106
107
108 private RecordManager recman;
109
110
111 private volatile boolean dirty;
112
113
114 private Dn consumerEntryDn;
115
116 public static final String REPLICA_EVENT_LOG_NAME_PREFIX = "REPL_EVENT_LOG.";
117
118 public static final int DEFAULT_PURGE_THRESHOLD_COUNT = 10000;
119
120
121 public static final int DEFAULT_MAX_IDLE_PERIOD = -1;
122
123
124 private PartitionTxn partitionTxn;
125
126
127
128
129
130
131
132
133
134
135 public ReplicaEventLog( PartitionTxn partitionTxn, DirectoryService directoryService, int replicaId ) throws IOException
136 {
137 PROVIDER_LOG.debug( "Creating the replication queue for replica {}", replicaId );
138 SchemaManager schemaManager = directoryService.getSchemaManager();
139 this.replicaId = replicaId;
140 this.searchCriteria = new NotificationCriteria( schemaManager );
141 this.searchCriteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
142
143
144 File replDir = directoryService.getInstanceLayout().getReplDirectory();
145 journalFile = new File( replDir, REPLICA_EVENT_LOG_NAME_PREFIX + replicaId );
146 recman = new BaseRecordManager( journalFile.getAbsolutePath() );
147 TransactionManager transactionManager = ( ( BaseRecordManager ) recman ).getTransactionManager();
148 transactionManager.setMaximumTransactionsInLog( 200 );
149
150 SerializableComparator<String> comparator = new SerializableComparator<>(
151 SchemaConstants.CSN_ORDERING_MATCH_MR_OID );
152 comparator.setSchemaManager( schemaManager );
153
154 journal = new JdbmTable<>( schemaManager, journalFile.getName(), recman, comparator,
155 StringSerializer.INSTANCE, new ReplicaEventMessageSerializer( schemaManager ) );
156
157 this.partitionTxn = partitionTxn;
158 }
159
160
161
162
163
164
165
166 public synchronized void log( ReplicaEventMessage message )
167 {
168 try
169 {
170 LOG.debug( "logging entry with Dn {} with the event {}", message.getEntry().getDn(),
171 message.getChangeType() );
172 PROVIDER_LOG.debug( "logging entry with Dn {} with the event {}", message.getEntry().getDn(),
173 message.getChangeType() );
174
175 String entryCsn = message.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString();
176 journal.put( partitionTxn, entryCsn, message );
177 }
178 catch ( Exception e )
179 {
180 LOG.warn( "Failed to insert the entry into syncrepl log", e );
181 PROVIDER_LOG.error( "Failed to insert the entry into syncrepl log", e );
182 }
183 }
184
185
186
187
188
189
190
191
192 public void truncate() throws Exception
193 {
194 }
195
196
197
198
199
200
201 public void recreate() throws Exception
202 {
203 LOG.debug( "recreating the queue for the replica id {}", replicaId );
204 }
205
206
207
208
209
210
211
212 public void stop() throws Exception
213 {
214 PROVIDER_LOG.debug( "Stopping the EventLog for replicaId {}", replicaId );
215
216
217 if ( journal != null )
218 {
219 journal.close( partitionTxn );
220 }
221
222 journal = null;
223
224 if ( recman != null )
225 {
226 recman.close();
227 }
228
229 recman = null;
230 }
231
232
233
234
235
236 @Override
237 public boolean equals( Object obj )
238 {
239 if ( !( obj instanceof ReplicaEventLog ) )
240 {
241 return false;
242 }
243
244 ReplicaEventLog../../../../org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.html#ReplicaEventLog">ReplicaEventLog other = ( ReplicaEventLog ) obj;
245
246 return replicaId == other.getId();
247 }
248
249
250
251
252
253 @Override
254 public int hashCode()
255 {
256 int result = 17;
257 result = 31 * result + searchFilter.hashCode();
258 result = 31 * result + hostName.hashCode();
259
260 return result;
261 }
262
263
264
265
266
267 public int compareTo( ReplicaEventLog o )
268 {
269 if ( this.equals( o ) )
270 {
271 return 0;
272 }
273
274 return 1;
275 }
276
277
278
279
280
281 public SyncReplSearchListener getPersistentListener()
282 {
283 return persistentListener;
284 }
285
286
287
288
289
290
291 public void setPersistentListener( SyncReplSearchListener persistentListener )
292 {
293 this.persistentListener = persistentListener;
294 }
295
296
297
298
299
300 public NotificationCriteria getSearchCriteria()
301 {
302 return searchCriteria;
303 }
304
305
306
307
308
309
310 public void setSearchCriteria( NotificationCriteria searchCriteria )
311 {
312 this.searchCriteria = searchCriteria;
313 }
314
315
316
317
318
319 public boolean isRefreshNPersist()
320 {
321 return refreshNPersist;
322 }
323
324
325
326
327
328 public void setRefreshNPersist( boolean refreshNPersist )
329 {
330 this.refreshNPersist = refreshNPersist;
331 }
332
333
334
335
336
337 public int getId()
338 {
339 return replicaId;
340 }
341
342
343
344
345
346 public String getLastSentCsn()
347 {
348 return lastSentCsn;
349 }
350
351
352
353
354
355
356
357
358 public void setLastSentCsn( String lastSentCsn )
359 {
360
361
362
363 if ( !lastSentCsn.equals( this.lastSentCsn ) )
364 {
365 this.lastSentCsn = lastSentCsn;
366 dirty = true;
367 }
368 }
369
370
371
372
373
374 public String getHostName()
375 {
376 return hostName;
377 }
378
379
380
381
382
383
384 public void setHostName( String hostName )
385 {
386 this.hostName = hostName;
387 }
388
389
390
391
392
393 public String getSearchFilter()
394 {
395 return searchFilter;
396 }
397
398
399
400
401
402
403 public void setSearchFilter( String searchFilter )
404 {
405 this.searchFilter = searchFilter;
406 }
407
408
409
410
411
412 public boolean isDirty()
413 {
414 return dirty;
415 }
416
417
418
419
420
421
422 public void setDirty( boolean dirty )
423 {
424 this.dirty = dirty;
425 }
426
427
428
429
430
431 public String getQueueName()
432 {
433 return "replicaId=" + replicaId;
434 }
435
436
437
438
439
440
441
442 public ReplicaJournalCursor getCursor( String consumerCsn ) throws Exception
443 {
444 return new ReplicaJournalCursor( partitionTxn, journal, consumerCsn );
445 }
446
447
448
449
450
451 public String getName()
452 {
453 return journal.getName();
454 }
455
456
457
458
459
460 public synchronized long count()
461 {
462 try
463 {
464 return journal.count( partitionTxn );
465 }
466 catch ( LdapException e )
467 {
468 throw new RuntimeException( e );
469 }
470 }
471
472
473 public long getMaxIdlePeriod()
474 {
475 return maxIdlePeriod;
476 }
477
478
479 public void setMaxIdlePeriod( long maxIdlePeriod )
480 {
481 if ( maxIdlePeriod <= 0 )
482 {
483 maxIdlePeriod = DEFAULT_MAX_IDLE_PERIOD;
484 }
485
486 this.maxIdlePeriod = maxIdlePeriod;
487 }
488
489
490 public int getPurgeThresholdCount()
491 {
492 return purgeThresholdCount;
493 }
494
495
496 public void setPurgeThresholdCount( int purgeThresholdCount )
497 {
498 if ( purgeThresholdCount <= 0 )
499 {
500 purgeThresholdCount = DEFAULT_PURGE_THRESHOLD_COUNT;
501 }
502
503 this.purgeThresholdCount = purgeThresholdCount;
504 }
505
506
507 public Dn getConsumerEntryDn()
508 {
509 return consumerEntryDn;
510 }
511
512
513 public void setConsumerEntryDn( Dn consumerEntryDn )
514 {
515 this.consumerEntryDn = consumerEntryDn;
516 }
517
518
519 @Override
520 public String toString()
521 {
522 return "ReplicaEventLog [hostName=" + hostName + ", searchFilter=" + searchFilter + ", lastSentCsn="
523 + lastSentCsn + ", searchCriteria=" + searchCriteria + ", replicaId=" + replicaId
524 + ", refreshNPersist=" + refreshNPersist + ", maxInactivePeriod=" + maxIdlePeriod
525 + ", purgeThresholdCount=" + purgeThresholdCount + ", journalFile=" + journalFile
526 + ", dirty=" + dirty + ", consumerEntryDn=" + consumerEntryDn + "]";
527 }
528 }