1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.directory.server.ldap.replication.provider;
22
23
24 import java.io.IOException;
25 import java.util.Iterator;
26
27 import org.apache.directory.api.ldap.model.constants.Loggers;
28 import org.apache.directory.api.ldap.model.constants.SchemaConstants;
29 import org.apache.directory.api.ldap.model.cursor.AbstractCursor;
30 import org.apache.directory.api.ldap.model.cursor.Cursor;
31 import org.apache.directory.api.ldap.model.cursor.CursorException;
32 import org.apache.directory.api.ldap.model.cursor.Tuple;
33 import org.apache.directory.api.ldap.model.exception.LdapException;
34 import org.apache.directory.api.ldap.model.message.controls.ChangeType;
35 import org.apache.directory.server.core.api.partition.PartitionTxn;
36 import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable;
37 import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41
42
43
44
45
46
47 public class ReplicaJournalCursor extends AbstractCursor<ReplicaEventMessage>
48 {
49
50 private static final Logger LOG = LoggerFactory.getLogger( ReplicaJournalCursor.class );
51
52
53 private static final Logger LOG_CURSOR = LoggerFactory.getLogger( Loggers.CURSOR_LOG.getName() );
54
55
56 private static final boolean IS_DEBUG = LOG_CURSOR.isDebugEnabled();
57
58
59 private Cursor<Tuple<String, ReplicaEventMessage>> tupleCursor;
60
61
62 private JdbmTable<String, ReplicaEventMessage> journal;
63
64
65 private String consumerCsn;
66
67 private ReplicaEventMessage qualifiedEvtMsg;
68
69
70 private boolean skipQualifying;
71
72
73 private PartitionTxn partitionTxn;
74
75
76
77
78
79
80
81
82
83
84
85 public ReplicaJournalCursor( PartitionTxn partitionTxn, JdbmTable<String, ReplicaEventMessage> journal,
86 String consumerCsn ) throws Exception
87 {
88 if ( IS_DEBUG )
89 {
90 LOG_CURSOR.debug( "Creating ReplicaJournalCursor {}", this );
91 }
92
93 this.journal = journal;
94 this.tupleCursor = journal.cursor();
95 this.consumerCsn = consumerCsn;
96 this.partitionTxn = partitionTxn;
97 }
98
99
100
101
102
103 public void after( ReplicaEventMessage arg0 ) throws LdapException, CursorException
104 {
105 throw new UnsupportedOperationException();
106 }
107
108
109
110
111
112 public void afterLast() throws LdapException, CursorException
113 {
114 throw new UnsupportedOperationException();
115 }
116
117
118
119
120
121 public boolean available()
122 {
123 return ( qualifiedEvtMsg != null );
124 }
125
126
127
128
129
130 public void before( ReplicaEventMessage arg0 ) throws LdapException, CursorException
131 {
132 throw new UnsupportedOperationException();
133 }
134
135
136
137
138
139 public void beforeFirst() throws LdapException, CursorException
140 {
141 }
142
143
144
145
146
147 public boolean first() throws LdapException, CursorException
148 {
149 throw new UnsupportedOperationException();
150 }
151
152
153
154
155
156 public ReplicaEventMessage get() throws CursorException
157 {
158 return qualifiedEvtMsg;
159 }
160
161
162
163
164
165
166
167 private boolean isQualified( String csn, ReplicaEventMessage evtMsg ) throws LdapException
168 {
169 LOG.debug( "ReplicaEventMessage: {}", evtMsg );
170
171 if ( evtMsg.isEventOlderThan( consumerCsn ) )
172 {
173 if ( LOG.isDebugEnabled() )
174 {
175 String evt = "MODDN";
176
177 ChangeType changeType = evtMsg.getChangeType();
178
179 if ( changeType != null )
180 {
181 evt = changeType.name();
182 }
183
184 LOG.debug( "event {} for dn {} is not qualified for sending", evt, evtMsg.getEntry().getDn() );
185 }
186
187 return false;
188 }
189
190 return true;
191 }
192
193
194
195
196
197 public boolean last() throws LdapException, CursorException
198 {
199 throw new UnsupportedOperationException();
200 }
201
202
203
204
205
206 public boolean next() throws LdapException, CursorException
207 {
208 while ( tupleCursor.next() )
209 {
210 Tuple<String, ReplicaEventMessage> tuple = tupleCursor.get();
211
212 String csn = tuple.getKey();
213 ReplicaEventMessage message = tuple.getValue();
214
215 if ( skipQualifying )
216 {
217 qualifiedEvtMsg = message;
218 return true;
219 }
220
221 boolean qualified = isQualified( csn, message );
222
223 if ( qualified )
224 {
225 qualifiedEvtMsg = message;
226 return true;
227 }
228 else
229 {
230 journal.remove( partitionTxn, csn );
231 }
232 }
233
234 qualifiedEvtMsg = null;
235
236 return false;
237 }
238
239
240
241
242
243 public boolean previous() throws LdapException, CursorException
244 {
245 throw new UnsupportedOperationException();
246 }
247
248
249
250
251
252 @Override
253 public void close() throws IOException
254 {
255 if ( IS_DEBUG )
256 {
257 LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
258 }
259
260 tupleCursor.close();
261 super.close();
262 }
263
264
265
266
267
268 @Override
269 public void close( Exception cause ) throws IOException
270 {
271 if ( IS_DEBUG )
272 {
273 LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
274 }
275
276 tupleCursor.close();
277 super.close( cause );
278 }
279
280
281
282
283
284
285 protected void skipQualifyingWhileFetching()
286 {
287 skipQualifying = true;
288 }
289
290
291
292
293
294
295 protected void delete()
296 {
297 try
298 {
299 if ( qualifiedEvtMsg != null )
300 {
301 journal.remove( partitionTxn, qualifiedEvtMsg.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString() );
302 }
303 }
304 catch ( Exception e )
305 {
306 }
307 }
308
309
310
311
312
313 @Override
314 public Iterator<ReplicaEventMessage> iterator()
315 {
316 throw new UnsupportedOperationException();
317 }
318 }