View Javadoc
1   /*
2    *   Licensed to the Apache Software Foundation (ASF) under one
3    *   or more contributor license agreements.  See the NOTICE file
4    *   distributed with this work for additional information
5    *   regarding copyright ownership.  The ASF licenses this file
6    *   to you under the Apache License, Version 2.0 (the
7    *   "License"); you may not use this file except in compliance
8    *   with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing,
13   *   software distributed under the License is distributed on an
14   *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *   KIND, either express or implied.  See the License for the
16   *   specific language governing permissions and limitations
17   *   under the License.
18   *
19   */
20  
21  package org.apache.directory.server.ldap.replication.provider;
22  
23  
24  import java.io.IOException;
25  import java.util.Iterator;
26  
27  import org.apache.directory.api.ldap.model.constants.Loggers;
28  import org.apache.directory.api.ldap.model.constants.SchemaConstants;
29  import org.apache.directory.api.ldap.model.cursor.AbstractCursor;
30  import org.apache.directory.api.ldap.model.cursor.Cursor;
31  import org.apache.directory.api.ldap.model.cursor.CursorException;
32  import org.apache.directory.api.ldap.model.cursor.Tuple;
33  import org.apache.directory.api.ldap.model.exception.LdapException;
34  import org.apache.directory.api.ldap.model.message.controls.ChangeType;
35  import org.apache.directory.server.core.api.partition.PartitionTxn;
36  import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable;
37  import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  
42  /**
43   * Define a cursor on top of a replication journal.
44   *
45   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
46   */
47  public class ReplicaJournalCursor extends AbstractCursor<ReplicaEventMessage>
48  {
49      /** Logger for this class */
50      private static final Logger LOG = LoggerFactory.getLogger( ReplicaJournalCursor.class );
51  
52      /** A dedicated log for cursors */
53      private static final Logger LOG_CURSOR = LoggerFactory.getLogger( Loggers.CURSOR_LOG.getName() );
54  
55      /** Speedup for logs */
56      private static final boolean IS_DEBUG = LOG_CURSOR.isDebugEnabled();
57  
58      /** the underlying journal's cursor */
59      private Cursor<Tuple<String, ReplicaEventMessage>> tupleCursor;
60  
61      /** the event log journal */
62      private JdbmTable<String, ReplicaEventMessage> journal;
63  
64      /** the consumer's CSN based on which messages will be qualified for sending */
65      private String consumerCsn;
66  
67      private ReplicaEventMessage qualifiedEvtMsg;
68  
69      /** used while cleaning up the log */
70      private boolean skipQualifying;
71      
72      /** The partition transaction */
73      private PartitionTxn partitionTxn;
74      
75  
76  
77      /**
78       * Creates a cursor on top of the given journal
79       * 
80       * @param partitionTxn The Transaction to use
81       * @param journal the log journal
82       * @param consumerCsn the consumer's CSN taken from cookie
83       * @throws Exception If the cursor creation failed
84       */
85      public ReplicaJournalCursor( PartitionTxn partitionTxn, JdbmTable<String, ReplicaEventMessage> journal, 
86              String consumerCsn ) throws Exception
87      {
88          if ( IS_DEBUG )
89          {
90              LOG_CURSOR.debug( "Creating ReplicaJournalCursor {}", this );
91          }
92  
93          this.journal = journal;
94          this.tupleCursor = journal.cursor();
95          this.consumerCsn = consumerCsn;
96          this.partitionTxn = partitionTxn;
97      }
98  
99  
100     /**
101      * {@inheritDoc}
102      */
103     public void after( ReplicaEventMessage arg0 ) throws LdapException, CursorException
104     {
105         throw new UnsupportedOperationException();
106     }
107 
108 
109     /**
110      * {@inheritDoc}
111      */
112     public void afterLast() throws LdapException, CursorException
113     {
114         throw new UnsupportedOperationException();
115     }
116 
117 
118     /**
119      * {@inheritDoc}
120      */
121     public boolean available()
122     {
123         return ( qualifiedEvtMsg != null );
124     }
125 
126 
127     /**
128      * {@inheritDoc}
129      */
130     public void before( ReplicaEventMessage arg0 ) throws LdapException, CursorException
131     {
132         throw new UnsupportedOperationException();
133     }
134 
135 
136     /**
137      * {@inheritDoc}
138      */
139     public void beforeFirst() throws LdapException, CursorException
140     {
141     }
142 
143 
144     /**
145      * {@inheritDoc}
146      */
147     public boolean first() throws LdapException, CursorException
148     {
149         throw new UnsupportedOperationException();
150     }
151 
152 
153     /**
154      * {@inheritDoc}
155      */
156     public ReplicaEventMessage get() throws CursorException
157     {
158         return qualifiedEvtMsg;
159     }
160 
161 
162     /**
163      * selects the current queue entry if qualified for sending to the consumer
164      * 
165      * @throws Exception
166      */
167     private boolean isQualified( String csn, ReplicaEventMessage evtMsg ) throws LdapException
168     {
169         LOG.debug( "ReplicaEventMessage: {}", evtMsg );
170 
171         if ( evtMsg.isEventOlderThan( consumerCsn ) )
172         {
173             if ( LOG.isDebugEnabled() )
174             {
175                 String evt = "MODDN"; // take this as default cause the event type for MODDN is null
176 
177                 ChangeType changeType = evtMsg.getChangeType();
178 
179                 if ( changeType != null )
180                 {
181                     evt = changeType.name();
182                 }
183 
184                 LOG.debug( "event {} for dn {} is not qualified for sending", evt, evtMsg.getEntry().getDn() );
185             }
186 
187             return false;
188         }
189 
190         return true;
191     }
192 
193 
194     /**
195      * {@inheritDoc}
196      */
197     public boolean last() throws LdapException, CursorException
198     {
199         throw new UnsupportedOperationException();
200     }
201 
202 
203     /**
204      * {@inheritDoc}
205      */
206     public boolean next() throws LdapException, CursorException
207     {
208         while ( tupleCursor.next() )
209         {
210             Tuple<String, ReplicaEventMessage> tuple = tupleCursor.get();
211 
212             String csn = tuple.getKey();
213             ReplicaEventMessage message = tuple.getValue();
214 
215             if ( skipQualifying )
216             {
217                 qualifiedEvtMsg = message;
218                 return true;
219             }
220 
221             boolean qualified = isQualified( csn, message );
222 
223             if ( qualified )
224             {
225                 qualifiedEvtMsg = message;
226                 return true;
227             }
228             else
229             {
230                 journal.remove( partitionTxn, csn );
231             }
232         }
233 
234         qualifiedEvtMsg = null;
235 
236         return false;
237     }
238 
239 
240     /**
241      * {@inheritDoc}
242      */
243     public boolean previous() throws LdapException, CursorException
244     {
245         throw new UnsupportedOperationException();
246     }
247 
248 
249     /**
250      * {@inheritDoc}
251      */
252     @Override
253     public void close() throws IOException
254     {
255         if ( IS_DEBUG )
256         {
257             LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
258         }
259 
260         tupleCursor.close();
261         super.close();
262     }
263 
264 
265     /**
266      * {@inheritDoc}
267      */
268     @Override
269     public void close( Exception cause ) throws IOException
270     {
271         if ( IS_DEBUG )
272         {
273             LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
274         }
275 
276         tupleCursor.close();
277         super.close( cause );
278     }
279 
280 
281     /**
282      * sets the flag to skip CSN based checking while traversing
283      * used for internal log cleanup ONLY 
284      */
285     protected void skipQualifyingWhileFetching()
286     {
287         skipQualifying = true;
288     }
289 
290 
291     /**
292      * delete the current message
293      * used for internal log cleanup ONLY
294      */
295     protected void delete()
296     {
297         try
298         {
299             if ( qualifiedEvtMsg != null )
300             {
301                 journal.remove( partitionTxn, qualifiedEvtMsg.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString() );
302             }
303         }
304         catch ( Exception e )
305         {
306         }
307     }
308 
309 
310     /**
311      * {@inheritDoc}
312      */
313     @Override
314     public Iterator<ReplicaEventMessage> iterator()
315     {
316         throw new UnsupportedOperationException();
317     }
318 }