org.apache.s4.core.window
Class AbstractSlidingWindowPE<T extends Slot<U>,U,V>

java.lang.Object
  extended by org.apache.s4.core.ProcessingElement
      extended by org.apache.s4.core.window.AbstractSlidingWindowPE<T,U,V>
Type Parameters:
T - type of the slot implementation used for this window
U - type of the values added to the window slots
All Implemented Interfaces:
java.lang.Cloneable

public abstract class AbstractSlidingWindowPE<T extends Slot<U>,U,V>
extends ProcessingElement

Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a slot. Each slot represents a segment of time or a fixed number of events. Slots are consecutive in time or events. Users are expected to provide a factory for creating new slots, and a method to perform a global computation on the current window. Slots are automatically added. WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not efficient for sparse event streams.


Field Summary
 
Fields inherited from class org.apache.s4.core.ProcessingElement
app, id
 
Constructor Summary
AbstractSlidingWindowPE(App app, int numSlots, long slotCapacity, SlotFactory<T> slotFactory)
          Constructor for the event-based slot.
AbstractSlidingWindowPE(App app, long slotDuration, java.util.concurrent.TimeUnit timeUnit, int numSlots, SlotFactory<T> slotFactory)
          Constructor for time-based slots.
 
Method Summary
protected  void addSlot()
          Add a slot to the sliding window.
protected abstract  V evaluateWindow(java.util.Collection<T> slots)
          User provided function that evaluates the whole content of the window.
protected  T getOldestSlot()
           
protected  T getOpenSlot()
           
protected  java.util.Collection<T> getSlots()
           
protected  void initPEPrototypeInternal()
           
protected  void onCreate()
          This method is called after a PE instance is created.
protected  void onRemove()
          This method is called before a PE instance is removed.
 void onTrigger(Event event)
          For count-based windows, we use a trigger that adds a new slot when the current one reaches its maximum capacity.
protected  void stop()
          Stops the the sliding window.
protected  void updateOpenSlot(U data)
           
 
Methods inherited from class org.apache.s4.core.ProcessingElement
checkpoint, clearDirty, clone, close, deserializeState, emit, getApp, getCheckpointingConfig, getEventCount, getId, getInstanceForKey, getInstances, getName, getNumPEInstances, getPEInstances, getPrototype, getRemoteInstances, getRemoteInstancesForKey, getTimerInterval, handleInputEvent, isCheckpointable, isDirty, isSingleton, isThreadSafe, onTime, recover, removeAll, restoreState, serializeState, setApp, setCheckpointingConfig, setName, setPECache, setPECache, setSingleton, setTimerInterval, setTrigger, toString
 
Methods inherited from class java.lang.Object
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Constructor Detail

AbstractSlidingWindowPE

public AbstractSlidingWindowPE(App app,
                               int numSlots,
                               long slotCapacity,
                               SlotFactory<T> slotFactory)
Constructor for the event-based slot. The abstract method addSlot() must be called by the concrete class.

Parameters:
app - the application
numSlots - the number of slots to be stored

AbstractSlidingWindowPE

public AbstractSlidingWindowPE(App app,
                               long slotDuration,
                               java.util.concurrent.TimeUnit timeUnit,
                               int numSlots,
                               SlotFactory<T> slotFactory)
Constructor for time-based slots. The abstract method addSlot() is called periodically.

Parameters:
app - the application
slotDuration - the slot duration in timeUnit
timeUnit - the unit of time
numSlots - the number of slots to be stored
Method Detail

onRemove

protected void onRemove()
Description copied from class: ProcessingElement
This method is called before a PE instance is removed. Use it to close resources and clean up.

Specified by:
onRemove in class ProcessingElement

onTrigger

public final void onTrigger(Event event)
For count-based windows, we use a trigger that adds a new slot when the current one reaches its maximum capacity.


initPEPrototypeInternal

protected void initPEPrototypeInternal()
Overrides:
initPEPrototypeInternal in class ProcessingElement

evaluateWindow

protected abstract V evaluateWindow(java.util.Collection<T> slots)
User provided function that evaluates the whole content of the window. It must iterate across all slots. Current slots are passed as a parameter and the PE instance is expected to be locked so that iteration over the slots is safe.


addSlot

protected final void addSlot()
Add a slot to the sliding window. Called automatically for periodic slots. Use it when the window is not periodic.


onCreate

protected void onCreate()
Description copied from class: ProcessingElement
This method is called after a PE instance is created. Use it to initialize fields that are PE instance specific. PE instances are created using {#clone()}.

Fields initialized in the class constructor are shared by all PE instances.

Specified by:
onCreate in class ProcessingElement

updateOpenSlot

protected void updateOpenSlot(U data)

getOldestSlot

protected T getOldestSlot()
Returns:
the least recently inserted slot

stop

protected void stop()
Stops the the sliding window.


getSlots

protected java.util.Collection<T> getSlots()
Returns:
the collection of slots

getOpenSlot

protected T getOpenSlot()