Package org.apache.flink.cep.nfa
Class NFA<T>
- java.lang.Object
-
- org.apache.flink.cep.nfa.NFA<T>
-
- Type Parameters:
T
- Type of the processed events
public class NFA<T> extends Object
Non-deterministic finite automaton implementation.The
CEP operator
keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones. When an event gets processed, it updates the NFA's internal state machine.An event that belongs to a partially matched sequence is kept in an internal
buffer
, which is a memory-optimized data-structure exactly for this purpose. Events in the buffer are removed when all the matched sequences that contain them are:- emitted (success)
- discarded (patterns containing NOT)
- timed-out (windowed patterns)
The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
-
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description Tuple2<Collection<Map<String,List<T>>>,Collection<Tuple2<Map<String,List<T>>,Long>>>
advanceTime(SharedBufferAccessor<T> sharedBufferAccessor, NFAState nfaState, long timestamp, AfterMatchSkipStrategy afterMatchSkipStrategy)
Prunes states assuming there will be no events with timestamp lower than the given one.void
close()
Tear-down method for the NFA.NFAState
createInitialNFAState()
Collection<State<T>>
getStates()
long
getWindowTime()
void
open(RuntimeContext cepRuntimeContext, Configuration conf)
Initialization method for the NFA.Collection<Map<String,List<T>>>
process(SharedBufferAccessor<T> sharedBufferAccessor, NFAState nfaState, T event, long timestamp, AfterMatchSkipStrategy afterMatchSkipStrategy, TimerService timerService)
Processes the next input event.
-
-
-
Method Detail
-
getWindowTime
public long getWindowTime()
-
getStates
@VisibleForTesting public Collection<State<T>> getStates()
-
createInitialNFAState
public NFAState createInitialNFAState()
-
open
public void open(RuntimeContext cepRuntimeContext, Configuration conf) throws Exception
Initialization method for the NFA. It is called before any element is passed and thus suitable for one time setup work.- Parameters:
cepRuntimeContext
- runtime context of the enclosing operatorconf
- The configuration containing the parameters attached to the contract.- Throws:
Exception
-
process
public Collection<Map<String,List<T>>> process(SharedBufferAccessor<T> sharedBufferAccessor, NFAState nfaState, T event, long timestamp, AfterMatchSkipStrategy afterMatchSkipStrategy, TimerService timerService) throws Exception
Processes the next input event. If some of the computations reach a final state then the resulting event sequences are returned. If computations time out and timeout handling is activated, then the timed out event patterns are returned.If computations reach a stop state, the path forward is discarded and currently constructed path is returned with the element that resulted in the stop state.
- Parameters:
sharedBufferAccessor
- the accessor to SharedBuffer object that we need to work upon while processingnfaState
- The NFAState object that we need to affect while processingevent
- The current event to be processed or null if only pruning shall be donetimestamp
- The timestamp of the current eventafterMatchSkipStrategy
- The skip strategy to use after per matchtimerService
- gives access to processing time and time characteristic, needed for condition evaluation- Returns:
- Tuple of the collection of matched patterns (e.g. the result of computations which have reached a final state) and the collection of timed out patterns (if timeout handling is activated)
- Throws:
Exception
- Thrown if the system cannot access the state.
-
advanceTime
public Tuple2<Collection<Map<String,List<T>>>,Collection<Tuple2<Map<String,List<T>>,Long>>> advanceTime(SharedBufferAccessor<T> sharedBufferAccessor, NFAState nfaState, long timestamp, AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception
Prunes states assuming there will be no events with timestamp lower than the given one. It clears the sharedBuffer and also emits all timed out partial matches.- Parameters:
sharedBufferAccessor
- the accessor to SharedBuffer object that we need to work upon while processingnfaState
- The NFAState object that we need to affect while processingtimestamp
- timestamp that indicates that there will be no more events with lower timestamp- Returns:
- all pending matches and timed outed partial matches
- Throws:
Exception
- Thrown if the system cannot access the state.
-
-