Class NFA<T>

  • Type Parameters:
    T - Type of the processed events

    public class NFA<T>
    extends Object
    Non-deterministic finite automaton implementation.

    The CEP operator keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones. When an event gets processed, it updates the NFA's internal state machine.

    An event that belongs to a partially matched sequence is kept in an internal buffer, which is a memory-optimized data-structure exactly for this purpose. Events in the buffer are removed when all the matched sequences that contain them are:

    1. emitted (success)
    2. discarded (patterns containing NOT)
    3. timed-out (windowed patterns)

    The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".

    See Also:
    https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
    • Method Detail

      • getWindowTime

        public long getWindowTime()
      • createInitialNFAState

        public NFAState createInitialNFAState()
      • open

        public void open​(RuntimeContext cepRuntimeContext,
                         Configuration conf)
                  throws Exception
        Initialization method for the NFA. It is called before any element is passed and thus suitable for one time setup work.
        Parameters:
        cepRuntimeContext - runtime context of the enclosing operator
        conf - The configuration containing the parameters attached to the contract.
        Throws:
        Exception
      • close

        public void close()
                   throws Exception
        Tear-down method for the NFA.
        Throws:
        Exception
      • process

        public Collection<Map<String,​List<T>>> process​(SharedBufferAccessor<T> sharedBufferAccessor,
                                                             NFAState nfaState,
                                                             T event,
                                                             long timestamp,
                                                             AfterMatchSkipStrategy afterMatchSkipStrategy,
                                                             TimerService timerService)
                                                      throws Exception
        Processes the next input event. If some of the computations reach a final state then the resulting event sequences are returned. If computations time out and timeout handling is activated, then the timed out event patterns are returned.

        If computations reach a stop state, the path forward is discarded and currently constructed path is returned with the element that resulted in the stop state.

        Parameters:
        sharedBufferAccessor - the accessor to SharedBuffer object that we need to work upon while processing
        nfaState - The NFAState object that we need to affect while processing
        event - The current event to be processed or null if only pruning shall be done
        timestamp - The timestamp of the current event
        afterMatchSkipStrategy - The skip strategy to use after per match
        timerService - gives access to processing time and time characteristic, needed for condition evaluation
        Returns:
        Tuple of the collection of matched patterns (e.g. the result of computations which have reached a final state) and the collection of timed out patterns (if timeout handling is activated)
        Throws:
        Exception - Thrown if the system cannot access the state.
      • advanceTime

        public Tuple2<Collection<Map<String,​List<T>>>,​Collection<Tuple2<Map<String,​List<T>>,​Long>>> advanceTime​(SharedBufferAccessor<T> sharedBufferAccessor,
                                                                                                                                        NFAState nfaState,
                                                                                                                                        long timestamp,
                                                                                                                                        AfterMatchSkipStrategy afterMatchSkipStrategy)
                                                                                                                                 throws Exception
        Prunes states assuming there will be no events with timestamp lower than the given one. It clears the sharedBuffer and also emits all timed out partial matches.
        Parameters:
        sharedBufferAccessor - the accessor to SharedBuffer object that we need to work upon while processing
        nfaState - The NFAState object that we need to affect while processing
        timestamp - timestamp that indicates that there will be no more events with lower timestamp
        Returns:
        all pending matches and timed outed partial matches
        Throws:
        Exception - Thrown if the system cannot access the state.