|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectorg.apache.s4.core.ProcessingElement
public abstract class ProcessingElement
Base class for implementing processing in S4. All instances are organized as follows:
Stream
defines the topology of the
application graph.
ProcessingElement
dynamically matches an event type to a processing method. See
OverloadDispatcher
. There are two types of processing methods:
onEvent(SomeEvent event)
When implemented, input events of type SomeEvent
will be dispatched to
this method.
onTrigger(AnotherEvent event)
When implemented, input events of type AnotherEvent
will be
dispatched to this method when certain conditions are met. See setTrigger(Class, int, long, TimeUnit)
.
onTime()
method. See setTimerInterval(long, TimeUnit)
ProcessingElement
with ThreadSafe
.
onCreate()
method.
onCreate()
method. For example, if each instance requires a
List object the PE should implement the following:
public class MyPE extends ProcessingElement { private MapwordCount; ... onCreate() { wordCount = new HashMap ; logger.trace("Created a map for instance PE with id {}, getId()); } }
Field Summary | |
---|---|
protected App |
app
|
protected java.lang.String |
id
|
Constructor Summary | |
---|---|
protected |
ProcessingElement()
|
|
ProcessingElement(App app)
Create a PE prototype. |
Method Summary | ||
---|---|---|
void |
checkpoint()
|
|
void |
clearDirty()
Dirty state is cleared after the PE has been serialized. |
|
protected java.lang.Object |
clone()
This method exists simply to make clone() protected. |
|
protected void |
close()
|
|
ProcessingElement |
deserializeState(byte[] loadedState)
|
|
protected
|
emit(T event,
Stream<T>[] streamArray)
Helper method to be used by PE implementation classes. |
|
App |
getApp()
PE objects must be associated with one and only one App object. |
|
CheckpointingConfig |
getCheckpointingConfig()
|
|
long |
getEventCount()
|
|
java.lang.String |
getId()
Unique ID for a PE instance. |
|
ProcessingElement |
getInstanceForKey(java.lang.String id)
This method is designed to be used within the package. |
|
java.util.Collection<ProcessingElement> |
getInstances()
Get all the local instances. |
|
protected java.lang.String |
getName()
|
|
long |
getNumPEInstances()
Returns the approximate number of PE instances from the cache. |
|
java.util.Map<java.lang.String,ProcessingElement> |
getPEInstances()
|
|
ProcessingElement |
getPrototype()
The ProcessingElement prototype for this object. |
|
java.util.Map<java.lang.String,ProcessingElement> |
getRemoteInstances()
This method returns an immutable map that contains all the PE instances for this prototype. |
|
ProcessingElement |
getRemoteInstancesForKey()
This method returns a remote PE instance for key. |
|
long |
getTimerInterval(java.util.concurrent.TimeUnit timeUnit)
The duration of the periodic task controlled by the embedded timer. |
|
protected void |
handleInputEvent(Event event)
|
|
protected void |
initPEPrototypeInternal()
|
|
protected boolean |
isCheckpointable()
|
|
boolean |
isDirty()
By default, the state of a PE instance is considered dirty whenever it processed an event. |
|
boolean |
isSingleton()
|
|
boolean |
isThreadSafe()
Set to true if the concrete PE class has the ThreadSafe annotation. |
|
protected abstract void |
onCreate()
This method is called after a PE instance is created. |
|
protected abstract void |
onRemove()
This method is called before a PE instance is removed. |
|
protected void |
onTime()
This method is called by the PE timer. |
|
protected void |
recover()
|
|
protected void |
removeAll()
|
|
void |
restoreState(ProcessingElement oldState)
|
|
byte[] |
serializeState()
|
|
void |
setApp(App app)
|
|
void |
setCheckpointingConfig(CheckpointingConfig checkpointingConfig)
|
|
protected void |
setName(java.lang.String name)
|
|
ProcessingElement |
setPECache(int maximumSize)
Sets the max size of the PE cache. |
|
ProcessingElement |
setPECache(int maximumSize,
long duration,
java.util.concurrent.TimeUnit timeUnit)
Set PE expiration and cache size. |
|
ProcessingElement |
setSingleton(boolean isSingleton)
Makes this PE a singleton. |
|
ProcessingElement |
setTimerInterval(long interval,
java.util.concurrent.TimeUnit timeUnit)
Set a timer that calls onTime() . |
|
ProcessingElement |
setTrigger(java.lang.Class<? extends Event> eventType,
int numEvents,
long interval,
java.util.concurrent.TimeUnit timeUnit)
This trigger is fired when the following conditions occur: An event of eventType arrived to the PE instance numEvents have arrived since the last time this trigger was fired -OR- time since last event is greater than interval. |
|
java.lang.String |
toString()
|
Methods inherited from class java.lang.Object |
---|
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait |
Field Detail |
---|
protected transient App app
protected java.lang.String id
Constructor Detail |
---|
protected ProcessingElement()
public ProcessingElement(App app)
#configurePECache
to configure.
app
- the app that contains this PEMethod Detail |
---|
protected void onTime()
ProcessingElement
subclass must be annotated with
ThreadSafe
.
Override this method to implement a periodic process.
protected abstract void onCreate()
Fields initialized in the class constructor are shared by all PE instances.
protected abstract void onRemove()
public App getApp()
App
object.
public void setApp(App app)
public long getNumPEInstances()
public java.util.Map<java.lang.String,ProcessingElement> getPEInstances()
public ProcessingElement setPECache(int maximumSize, long duration, java.util.concurrent.TimeUnit timeUnit)
PE instances will be automatically removed from the cache once a fixed duration has elapsed after the PEs creation, or last access.
Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
When this method is called all existing PE instances are destroyed.
maximumSize
- the approximate maximum number of PEs in the cache.duration
- the PE durationtimeUnit
- the time unit
public ProcessingElement setPECache(int maximumSize)
Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
When this method is called all existing PE instances are destroyed.
maximumSize
- the approximate maximum number of PEs in the cache.
public ProcessingElement setTrigger(java.lang.Class<? extends Event> eventType, int numEvents, long interval, java.util.concurrent.TimeUnit timeUnit)
When the trigger fires, the method trigger(EventType event) is called. Where EventType matches the argument eventType.
eventType
- the type of event on which this trigger will fire.numEvents
- number of events since last trigger activation. Must be greater than zero. (Set to one to trigger on
every input event.)interval
- minimum time between triggers. Set to zero if no time interval needed.timeUnit
- the TimeUnit for the argument interval. Can set to null if no time interval needed.
public boolean isSingleton()
public ProcessingElement setSingleton(boolean isSingleton)
isSingleton
-
java.util.concurrent.ExecutionException
public long getTimerInterval(java.util.concurrent.TimeUnit timeUnit)
timeUnit
- the timeUnt of the returned value.
public ProcessingElement setTimerInterval(long interval, java.util.concurrent.TimeUnit timeUnit)
onTime()
.
If interval==0
the timer is disabled.
interval
- in timeUnittimeUnit
- the timeUnit of intervalpublic boolean isThreadSafe()
ThreadSafe
annotation. The default is false (no annotation).
In general, application developers don't need to worry about thread safety in the concrete PEs. In some cases the
PE needs to be thread safe to avoid deadlocks. For example , if the application graph has cycles and the queues
are allowed to block, then some critical PEs with multiple incoming streams need to be made thread safe to avoid
locking the entire PE instance.
protected void handleInputEvent(Event event)
protected boolean isCheckpointable()
public void checkpoint()
protected void removeAll()
protected void close()
protected void initPEPrototypeInternal()
public ProcessingElement getInstanceForKey(java.lang.String id)
java.util.concurrent.ExecutionException
public java.util.Collection<ProcessingElement> getInstances()
getLocalInstanceForKey
public ProcessingElement getRemoteInstancesForKey()
public java.util.Map<java.lang.String,ProcessingElement> getRemoteInstances()
public java.lang.String getId()
public ProcessingElement getPrototype()
ProcessingElement
prototype for this object.
ProcessingElement
for this instance.protected java.lang.Object clone()
clone()
protected.
clone
in class java.lang.Object
protected <T extends Event> void emit(T event, Stream<T>[] streamArray)
protected java.lang.String getName()
protected void setName(java.lang.String name)
name
- PE namepublic CheckpointingConfig getCheckpointingConfig()
public void setCheckpointingConfig(CheckpointingConfig checkpointingConfig)
public boolean isDirty()
clearDirty()
method must also be overriden in order to correctly reflect
the "dirty" state of the PE.
public void clearDirty()
isDirty()
must also be overriden in order to correctly reflect the
"dirty" state of the PE.
public byte[] serializeState()
public ProcessingElement deserializeState(byte[] loadedState)
public void restoreState(ProcessingElement oldState)
protected void recover()
public long getEventCount()
public java.lang.String toString()
toString
in class java.lang.Object
|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |