org.apache.s4.core
Class ProcessingElement

java.lang.Object
  extended by org.apache.s4.core.ProcessingElement
All Implemented Interfaces:
java.lang.Cloneable
Direct Known Subclasses:
AbstractSlidingWindowPE

public abstract class ProcessingElement
extends java.lang.Object
implements java.lang.Cloneable

Base class for implementing processing in S4. All instances are organized as follows:


Field Summary
protected  App app
           
protected  java.lang.String id
           
 
Constructor Summary
protected ProcessingElement()
           
  ProcessingElement(App app)
          Create a PE prototype.
 
Method Summary
 void checkpoint()
           
 void clearDirty()
          Dirty state is cleared after the PE has been serialized.
protected  java.lang.Object clone()
          This method exists simply to make clone() protected.
protected  void close()
           
 ProcessingElement deserializeState(byte[] loadedState)
           
protected
<T extends Event>
void
emit(T event, Stream<T>[] streamArray)
          Helper method to be used by PE implementation classes.
 App getApp()
          PE objects must be associated with one and only one App object.
 CheckpointingConfig getCheckpointingConfig()
           
 long getEventCount()
           
 java.lang.String getId()
          Unique ID for a PE instance.
 ProcessingElement getInstanceForKey(java.lang.String id)
          This method is designed to be used within the package.
 java.util.Collection<ProcessingElement> getInstances()
          Get all the local instances.
protected  java.lang.String getName()
           
 long getNumPEInstances()
          Returns the approximate number of PE instances from the cache.
 java.util.Map<java.lang.String,ProcessingElement> getPEInstances()
           
 ProcessingElement getPrototype()
          The ProcessingElement prototype for this object.
 java.util.Map<java.lang.String,ProcessingElement> getRemoteInstances()
          This method returns an immutable map that contains all the PE instances for this prototype.
 ProcessingElement getRemoteInstancesForKey()
          This method returns a remote PE instance for key.
 long getTimerInterval(java.util.concurrent.TimeUnit timeUnit)
          The duration of the periodic task controlled by the embedded timer.
protected  void handleInputEvent(Event event)
           
protected  void initPEPrototypeInternal()
           
protected  boolean isCheckpointable()
           
 boolean isDirty()
          By default, the state of a PE instance is considered dirty whenever it processed an event.
 boolean isSingleton()
           
 boolean isThreadSafe()
          Set to true if the concrete PE class has the ThreadSafe annotation.
protected abstract  void onCreate()
          This method is called after a PE instance is created.
protected abstract  void onRemove()
          This method is called before a PE instance is removed.
protected  void onTime()
          This method is called by the PE timer.
protected  void recover()
           
protected  void removeAll()
           
 void restoreState(ProcessingElement oldState)
           
 byte[] serializeState()
           
 void setApp(App app)
           
 void setCheckpointingConfig(CheckpointingConfig checkpointingConfig)
           
protected  void setName(java.lang.String name)
           
 ProcessingElement setPECache(int maximumSize)
          Sets the max size of the PE cache.
 ProcessingElement setPECache(int maximumSize, long duration, java.util.concurrent.TimeUnit timeUnit)
          Set PE expiration and cache size.
 ProcessingElement setSingleton(boolean isSingleton)
          Makes this PE a singleton.
 ProcessingElement setTimerInterval(long interval, java.util.concurrent.TimeUnit timeUnit)
          Set a timer that calls onTime().
 ProcessingElement setTrigger(java.lang.Class<? extends Event> eventType, int numEvents, long interval, java.util.concurrent.TimeUnit timeUnit)
          This trigger is fired when the following conditions occur: An event of eventType arrived to the PE instance numEvents have arrived since the last time this trigger was fired -OR- time since last event is greater than interval.
 java.lang.String toString()
           
 
Methods inherited from class java.lang.Object
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

app

protected transient App app

id

protected java.lang.String id
Constructor Detail

ProcessingElement

protected ProcessingElement()

ProcessingElement

public ProcessingElement(App app)
Create a PE prototype. By default, PE instances will never expire. Use #configurePECache to configure.

Parameters:
app - the app that contains this PE
Method Detail

onTime

protected void onTime()
This method is called by the PE timer. By default it is synchronized with the onEvent() and onTrigger() methods. To execute concurrently with other methods, the ProcessingElement subclass must be annotated with ThreadSafe. Override this method to implement a periodic process.


onCreate

protected abstract void onCreate()
This method is called after a PE instance is created. Use it to initialize fields that are PE instance specific. PE instances are created using {#clone()}.

Fields initialized in the class constructor are shared by all PE instances.


onRemove

protected abstract void onRemove()
This method is called before a PE instance is removed. Use it to close resources and clean up.


getApp

public App getApp()
PE objects must be associated with one and only one App object.

Returns:
the app

setApp

public void setApp(App app)

getNumPEInstances

public long getNumPEInstances()
Returns the approximate number of PE instances from the cache.

Returns:
the approximate number of PE instances.

getPEInstances

public java.util.Map<java.lang.String,ProcessingElement> getPEInstances()

setPECache

public ProcessingElement setPECache(int maximumSize,
                                    long duration,
                                    java.util.concurrent.TimeUnit timeUnit)
Set PE expiration and cache size.

PE instances will be automatically removed from the cache once a fixed duration has elapsed after the PEs creation, or last access.

Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.

When this method is called all existing PE instances are destroyed.

Parameters:
maximumSize - the approximate maximum number of PEs in the cache.
duration - the PE duration
timeUnit - the time unit
Returns:
the PE prototype

setPECache

public ProcessingElement setPECache(int maximumSize)
Sets the max size of the PE cache.

Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.

When this method is called all existing PE instances are destroyed.

Parameters:
maximumSize - the approximate maximum number of PEs in the cache.
Returns:
the PE prototype

setTrigger

public ProcessingElement setTrigger(java.lang.Class<? extends Event> eventType,
                                    int numEvents,
                                    long interval,
                                    java.util.concurrent.TimeUnit timeUnit)
This trigger is fired when the following conditions occur:
  • An event of eventType arrived to the PE instance
  • numEvents have arrived since the last time this trigger was fired -OR- time since last event is greater than interval.

When the trigger fires, the method trigger(EventType event) is called. Where EventType matches the argument eventType.

Parameters:
eventType - the type of event on which this trigger will fire.
numEvents - number of events since last trigger activation. Must be greater than zero. (Set to one to trigger on every input event.)
interval - minimum time between triggers. Set to zero if no time interval needed.
timeUnit - the TimeUnit for the argument interval. Can set to null if no time interval needed.
Returns:
the PE prototype

isSingleton

public boolean isSingleton()
Returns:
the isSingleton

setSingleton

public ProcessingElement setSingleton(boolean isSingleton)
Makes this PE a singleton. A single PE instance is eagerly created and ready to receive events.

Parameters:
isSingleton -
Throws:
java.util.concurrent.ExecutionException

getTimerInterval

public long getTimerInterval(java.util.concurrent.TimeUnit timeUnit)
The duration of the periodic task controlled by the embedded timer.

Parameters:
timeUnit - the timeUnt of the returned value.
Returns:
the timer interval.

setTimerInterval

public ProcessingElement setTimerInterval(long interval,
                                          java.util.concurrent.TimeUnit timeUnit)
Set a timer that calls onTime(). If interval==0 the timer is disabled.

Parameters:
interval - in timeUnit
timeUnit - the timeUnit of interval

isThreadSafe

public boolean isThreadSafe()
Set to true if the concrete PE class has the ThreadSafe annotation. The default is false (no annotation). In general, application developers don't need to worry about thread safety in the concrete PEs. In some cases the PE needs to be thread safe to avoid deadlocks. For example , if the application graph has cycles and the queues are allowed to block, then some critical PEs with multiple incoming streams need to be made thread safe to avoid locking the entire PE instance.

Returns:
true if the PE implementation is considered thread safe.

handleInputEvent

protected void handleInputEvent(Event event)

isCheckpointable

protected boolean isCheckpointable()

checkpoint

public void checkpoint()

removeAll

protected void removeAll()

close

protected void close()

initPEPrototypeInternal

protected void initPEPrototypeInternal()

getInstanceForKey

public ProcessingElement getInstanceForKey(java.lang.String id)
This method is designed to be used within the package. We make it package-private. The returned instances are all in the same JVM. Do not use it to access remote objects.

Throws:
java.util.concurrent.ExecutionException

getInstances

public java.util.Collection<ProcessingElement> getInstances()
Get all the local instances. See notes in getLocalInstanceForKey


getRemoteInstancesForKey

public ProcessingElement getRemoteInstancesForKey()
This method returns a remote PE instance for key. TODO: not implemented for cluster configuration yet, use it only in single node configuration. for testing apps.

Returns:
pe instance for key. Null if if doesn't exist.

getRemoteInstances

public java.util.Map<java.lang.String,ProcessingElement> getRemoteInstances()
This method returns an immutable map that contains all the PE instances for this prototype. PE instances may be located anywhere in the cluster. Be aware that this could be an expensive operation. TODO: not implemented for cluster configuration yet, use it only in single node configuration. for testing apps.


getId

public java.lang.String getId()
Unique ID for a PE instance.

Returns:
the id

getPrototype

public ProcessingElement getPrototype()
The ProcessingElement prototype for this object.

Returns:
the corresponding ProcessingElement for this instance.

clone

protected java.lang.Object clone()
This method exists simply to make clone() protected.

Overrides:
clone in class java.lang.Object

emit

protected <T extends Event> void emit(T event,
                                      Stream<T>[] streamArray)
Helper method to be used by PE implementation classes. Sends an event to all the target streams.


getName

protected java.lang.String getName()
Returns:
the PE name

setName

protected void setName(java.lang.String name)
Parameters:
name - PE name

getCheckpointingConfig

public CheckpointingConfig getCheckpointingConfig()

setCheckpointingConfig

public void setCheckpointingConfig(CheckpointingConfig checkpointingConfig)

isDirty

public boolean isDirty()
By default, the state of a PE instance is considered dirty whenever it processed an event. Some event may actually leave the state of the PE unchanged. PE implementations can therefore override this method to accommodate specific behaviors, by managing a custom "dirty" flag. If this method is overriden, clearDirty() method must also be overriden in order to correctly reflect the "dirty" state of the PE.


clearDirty

public void clearDirty()
Dirty state is cleared after the PE has been serialized. PE implementations that maintain their "dirty" flag must override this method by clearing their internally managed "dirty" flag. If this method is overriden, isDirty() must also be overriden in order to correctly reflect the "dirty" state of the PE.


serializeState

public byte[] serializeState()

deserializeState

public ProcessingElement deserializeState(byte[] loadedState)

restoreState

public void restoreState(ProcessingElement oldState)

recover

protected void recover()

getEventCount

public long getEventCount()

toString

public java.lang.String toString()
Overrides:
toString in class java.lang.Object