org.apache.s4.core
Class App

java.lang.Object
  extended by org.apache.s4.core.App
Direct Known Subclasses:
AdapterApp

public abstract class App
extends java.lang.Object

Container base class to hold all processing elements. It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.


Nested Class Summary
static class App.ClockType
          The internal clock can be configured as "wall clock" or "event clock".
 
Constructor Summary
App()
           
 
Method Summary
 void addStream(Streamable<Event> stream)
           
 void close()
           
protected
<T extends Event>
Stream<T>
createInputStream(java.lang.String streamName, KeyFinder<T> finder, ProcessingElement... processingElements)
          Creaters an "input" stream, i.e.
protected
<T extends Event>
Stream<T>
createInputStream(java.lang.String streamName, ProcessingElement... processingElements)
           
protected
<T extends Event>
RemoteStream
createOutputStream(java.lang.String name)
           
protected
<T extends Event>
RemoteStream
createOutputStream(java.lang.String name, KeyFinder<Event> finder)
          Creates a "remote" stream, i.e.
<T extends ProcessingElement>
T
createPE(java.lang.Class<T> type)
          Creates a ProcessingElement prototype.
<T extends ProcessingElement>
T
createPE(java.lang.Class<T> type, java.lang.String name)
          Creates a ProcessingElement prototype.
<T extends AbstractSlidingWindowPE>
T
createSlidingWindowPE(java.lang.Class<T> type, long slotDuration, java.util.concurrent.TimeUnit timeUnit, int numSlots, SlotFactory slotFactory)
           
<T extends Event>
Stream<T>
createStream(java.lang.Class<T> type)
           
protected
<T extends Event>
Stream<T>
createStream(java.lang.String name, KeyFinder<T> finder, java.lang.Class<T> eventType, ProcessingElement... processingElements)
          Creates a stream with a specific key finder.
protected
<T extends Event>
Stream<T>
createStream(java.lang.String name, KeyFinder<T> finder, ProcessingElement... processingElements)
           
protected
<T extends Event>
Stream<T>
createStream(java.lang.String name, ProcessingElement... processingElements)
           
 CheckpointingFramework getCheckpointingFramework()
           
 App.ClockType getClockType()
           
 int getId()
           
 ProcessingElement getPE(java.lang.String name)
           
 Receiver getReceiver()
           
 Sender getSender()
           
 SerializerDeserializer getSerDeser()
           
 java.util.List<Streamable<Event>> getStreams()
           
 long getTime()
          The internal clock is configured as "wall clock" or "event clock" when this object is created.
 long getTime(java.util.concurrent.TimeUnit timeUnit)
          The internal clock is configured as "wall clock" or "event clock" when this object is created.
 void init()
           
protected abstract  void onClose()
          This method is called by the container before unloading the application.
protected abstract  void onInit()
          This method is called by the container to initialize applications.
protected abstract  void onStart()
           
 void setClockType(App.ClockType clockType)
          Set the App.ClockType.
 void setId(int id)
           
 void start()
          This method is called by the container after initialization.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

App

public App()
Method Detail

getId

public int getId()
Returns:
the unique app id

setId

public void setId(int id)
Parameters:
id - the unique id for this app

getPE

public ProcessingElement getPE(java.lang.String name)

addStream

public void addStream(Streamable<Event> stream)

getStreams

public java.util.List<Streamable<Event>> getStreams()

onStart

protected abstract void onStart()

start

public final void start()
This method is called by the container after initialization. Once this method is called, threads get started and events start flowing.


onInit

protected abstract void onInit()
This method is called by the container to initialize applications.


init

public final void init()

onClose

protected abstract void onClose()
This method is called by the container before unloading the application.


close

public final void close()

getTime

public long getTime()
The internal clock is configured as "wall clock" or "event clock" when this object is created.

Returns:
the App time in milliseconds.

getTime

public long getTime(java.util.concurrent.TimeUnit timeUnit)
The internal clock is configured as "wall clock" or "event clock" when this object is created.

Parameters:
timeUnit -
Returns:
the App time in timeUnit

setClockType

public void setClockType(App.ClockType clockType)
Set the App.ClockType.

Parameters:
clockType - the clockTyoe for this app must be App.ClockType.WALL_CLOCK (default) or App.ClockType.EVENT_CLOCK

getClockType

public App.ClockType getClockType()
Returns:
the clock type.

getSender

public Sender getSender()
Returns:
the sender object

getReceiver

public Receiver getReceiver()
Returns:
the receiver object

getSerDeser

public SerializerDeserializer getSerDeser()

getCheckpointingFramework

public CheckpointingFramework getCheckpointingFramework()

createStream

protected <T extends Event> Stream<T> createStream(java.lang.String name,
                                                   KeyFinder<T> finder,
                                                   java.lang.Class<T> eventType,
                                                   ProcessingElement... processingElements)
Creates a stream with a specific key finder. The event is delivered to the PE instances in the target PE prototypes by key.

If the value of the key is "joe" and the target PE prototypes are AddressPE and WorkPE, the event will be delivered to the instances with key="joe" in the PE prototypes AddressPE and WorkPE.

Parameters:
name - the name of the stream
finder - the key finder object
eventType - expected event type
processingElements - the target processing elements
Returns:
the stream

createStream

protected <T extends Event> Stream<T> createStream(java.lang.String name,
                                                   KeyFinder<T> finder,
                                                   ProcessingElement... processingElements)
See Also:
createStream(String, KeyFinder, Class, ProcessingElement...)

createStream

protected <T extends Event> Stream<T> createStream(java.lang.String name,
                                                   ProcessingElement... processingElements)
See Also:
createStream(String, KeyFinder, Class, ProcessingElement...)

createStream

public <T extends Event> Stream<T> createStream(java.lang.Class<T> type)
See Also:
createStream(String, KeyFinder, Class, ProcessingElement...)

createOutputStream

protected <T extends Event> RemoteStream createOutputStream(java.lang.String name,
                                                            KeyFinder<Event> finder)
Creates a "remote" stream, i.e. a stream that forwards events to remote clusters

Parameters:
name - stream name, shared across communicating clusters
finder - key finder
Returns:
a reference to the created remote stream

createOutputStream

protected <T extends Event> RemoteStream createOutputStream(java.lang.String name)
See Also:
createOutputStream(String, KeyFinder)

createInputStream

protected <T extends Event> Stream<T> createInputStream(java.lang.String streamName,
                                                        KeyFinder<T> finder,
                                                        ProcessingElement... processingElements)
Creaters an "input" stream, i.e. a stream that listens to events from remote clusters, and that registers its interest in the stream with the specified name.

Parameters:
streamName - name of the remote stream
finder - key finder
processingElements - target processing elements
Returns:
a reference to the created input stream

createInputStream

protected <T extends Event> Stream<T> createInputStream(java.lang.String streamName,
                                                        ProcessingElement... processingElements)
See Also:
createInputStream(String, KeyFinder, ProcessingElement...)

createPE

public <T extends ProcessingElement> T createPE(java.lang.Class<T> type,
                                                java.lang.String name)
Creates a ProcessingElement prototype.

Parameters:
type - the processing element type.
name - a name for this PE prototype.
Returns:
the processing element prototype.

createPE

public <T extends ProcessingElement> T createPE(java.lang.Class<T> type)
Creates a ProcessingElement prototype.

Parameters:
type - the processing element type.
Returns:
the processing element prototype.

createSlidingWindowPE

public <T extends AbstractSlidingWindowPE> T createSlidingWindowPE(java.lang.Class<T> type,
                                                                   long slotDuration,
                                                                   java.util.concurrent.TimeUnit timeUnit,
                                                                   int numSlots,
                                                                   SlotFactory slotFactory)