|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||
java.lang.Objectcom.streambase.sb.operator.Operator
public abstract class Operator
Abstract base class for User code that is used as a Java Operator or an embedded Adapter in a StreamBase application. One instance will be created for each Java Operator in a StreamBase application. StreamBase Studio may operate on several StreamBase applications at a time, so Operator subclass instances may be in different applications.
Operator subclasses must have a public default constructor.
In High Availability mode, the Operator will be serialized and sent from a primary server to a secondary server. Operators in the secondary server will be discarded as new operators containing the primary state are instantiated as they are deserialized. So in secondary HA servers, the operators will frequently be destroyed and recreated.
If there is any state that the operator instance needs to maintain beyond the lifetime of the instance, the setSessionState/getSessionState methods can be used. This is the Memento Design Pattern.
An Operator is notified of state changes through callbacks. The
StreamBase runtime 'calls back' an Operator when it changes the runtime
state of the Operator. These callbacks include Operator.resume(),
Operator.resumed(), Operator.suspend(), Operator.suspended() and .
Operator.shutdown().
Operator provides "managed threads", which are threads that run
concurrently with the application, but which can synchronize with
its overall state changes. These threads are started, suspended,
resumed, and shut down with the application. Managed threads are registered
with an Operator using the method registerRunnable. This is
particularly useful for input adapters, which typically have to respond to
external events asynchronously with the application. Managed threads can
call sendOutput at any time.
If an Operator registers one or more managed thread, and all of its managed
threads exit their run() methods, then the Operator itself
will shut down.
The StreamBase runtime blocks while it waits for an Operator's managed
threads to respond to a state change. This can be problematic if a managed
thread is blocked on some event. However, the StreamBase runtime can be
configured to interrupt a thread when it needs to change its state. This
is accomplished by setting the flag shouldInterrupt to true
when registering the thread with registerRunnable.
It may be that an Operator's managed thread does not respond to a state
change even after it has been interrupted. If the Operator's thread does
not respond to the state within a given time interval then it is considered
to be in failure and it is shut down. This time interval is specified by the
server configuration parameter operator-state-change-timeout-ms.
Parameterizable,
Serialized Form| Nested Class Summary | |
|---|---|
static class |
Operator.OperatorStates
The set of runtime states that an Operator can be in. |
static class |
Operator.SuspendBehaviorStates
Suspend behavior defines how an Operator handles tuples when it is suspended; meaning when it is in the SUSPENDED state. |
| Field Summary | |
|---|---|
static int |
DEFAULT_STATE_CHANGE_TIMEOUT
Default value for the timeout for Operator state changes, in milliseconds. |
| Constructor Summary | |
|---|---|
protected |
Operator()
Constructs an operator. |
| Method Summary | |
|---|---|
String |
getDisplayName()
Return the display name of this Operator. |
int |
getInputPortCount()
Return the number of input ports. |
Schema |
getInputSchema(int port)
Returns the schema of an input port. |
String |
getName()
Return the name of this Operator. |
int |
getOutputPortCount()
Returns the number of output ports. |
Schema |
getOutputSchema(int port)
Return the output schema. |
Parameterizable |
getParameters()
Returns the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator. |
PortCounts |
getPortCounts()
An optional method that subclasses can override to dynamically tell Studio the number of input and output ports. |
InputStream |
getResourceContents(String name)
Returns an open input stream on the contents of the named resource file. |
boolean |
getReuseTuple()
Get the state of the input Tuple reuse flag. |
protected Object |
getSessionState()
Retreive session state for this operator that has the same object extent as the application, not the instance (which may be repeatedly destroyed and recreated). |
boolean |
hasNotYetStarted()
Returns true if this Operator has not yet started running. |
void |
init()
After the Operator has typechecked successfully, but before the application or any managed threads start, the StreamBase server will call the init method. |
boolean |
isDroppingTuples()
Returns true if the Operator will drop any tuples it receives when it is suspended. |
boolean |
isProcessingTuples()
Returns true if the Operator will process any tuples it receives when it is suspended. |
boolean |
isRunning()
Returns true if this Operator is currently running, false otherwise. |
boolean |
isShutdown()
Returns true if this Operator is currently shut down, false otherwise. |
boolean |
isSuspended()
Returns true if this Operator is currently suspended, false otherwise. |
void |
postShutdown()
postShutdown is called by the StreamBase runtime just after shutting down this Operator. |
abstract void |
processTuple(int inputPort,
Tuple tuple)
This method will be called by the StreamBase server for each Tuple given to the Operator to process. |
void |
registerRunnable(Runnable operatorRunnable)
Deprecated. As of StreamBase version 3.7, replaced by Operator.registerRunnable(Runnable, boolean) |
void |
registerRunnable(Runnable operatorRunnable,
boolean shouldInterrupt)
Register a Runnable object to be managed by this Operator. |
protected void |
requireInputPortCount(int numPorts)
Throws a PortMismatchException if the number of ports is not numPorts. |
void |
resume()
resume() is called when an operator starts or resumes execution, before any registered runnables are started or resumed. |
void |
resumed()
resumed() is called after all registered runnables of the operator have started or resumed. |
void |
sendOutput(int port,
List tuples)
Enqueue a List of Tuples to be sent to downstream operators. |
void |
sendOutput(int port,
Tuple tuple)
Enqueue a Tuple to be sent to downstream operators. |
void |
setDisplayName(String dn)
The display name is the String that's shown in the list of available Operators. |
void |
setOutputSchema(int port,
Schema schema)
Sets the output schema for the given output port (port #'s are zero based). |
void |
setParameters(Parameterizable params)
Sets the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator. |
void |
setPortHints(int numInputPorts,
int numOutputPorts)
Used to tell StreamBase Studio how many ports to draw on the Java Operator when the operator is drawn in the IDE's canvas. |
protected void |
setReuseTuple(boolean reuse)
Allow/disallow the runtime to reuse tuples on operator input. |
protected void |
setSessionState(Object state)
Maintain state for this operator that has the same object extent as the application, not the instance (which may be repeatedly destroyed and recreated when High Availability servers are used). |
void |
setSuspendBehavior(int suspendBehavior)
Set the suspend behavior of this Operator. |
boolean |
shouldRun()
Return whether or not calling operator thread is enabled and should continue running. |
void |
shutdown()
shutdown is called by the StreamBase runtime just prior to shutting down this Operator. |
void |
suspend()
suspend() will be called when an operator suspends, before any registered runnables are suspended. |
void |
suspended()
suspended() will be called after all registered runnables of the operator have suspended. |
abstract void |
typecheck()
The typecheck method is called by the StreamBase server to ensure that all the parameters for this operator are correct. |
| Methods inherited from class java.lang.Object |
|---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
| Field Detail |
|---|
public static final int DEFAULT_STATE_CHANGE_TIMEOUT
An Operator can receive state change events from the StreamBase runtime. In particular, when the runtime starts, suspends, resumes, or shuts down, it, in turn, applies the same state change to any Operators it holds.
When the server applies a state change to an Operator, it blocks until the Operator acknowledges that it has transitioned to the new state.
If an Operator fails to make such an acknowledgement, the StreamBase runtime might wait indefinitely. StateChangeTimeout limits the amount of time the server will wait for an Operator to respond to a requested state change.
If the Operator fails to respond within the timeout, the server will shut down the Operator and continue with its state change.
| Constructor Detail |
|---|
protected Operator()
All Operator subclasses must have a public default constructor, otherwise StreamBase Studio and the StreamBase sbd process will not be able to instantiate the Operator subclass. Nothing substantial should be done in the constructor, except for setting port hints and setting the parameters object.
Operator.setPortHints(int, int),
Operator.setParameters(com.streambase.sb.operator.Parameterizable)| Method Detail |
|---|
public final String getDisplayName()
public String getName()
public final void setDisplayName(String dn)
public void setPortHints(int numInputPorts,
int numOutputPorts)
This method is only effective when called from the Java Operator's constructor. The typecheck method should call requireInputPortCount instead.
To specify the ports at runtime requireInputPortCount and setOutputSchema should be called from the typecheck method.
To specify the ports dynamically within studio, see getPortCounts.
numInputPorts - number of input ports. Must be non-negative.numOutputPorts - number of output ports. Must be non-negative.
IllegalArgumentException - if an argument is less than zeroOperator.requireInputPortCount(int),
Operator.setOutputSchema(int, Schema)protected final void setSessionState(Object state)
When HA mode is on, the instances in the secondary server will be destroyed and recreated with copies from the primary server each time a checkpoint is sent from the primary server to the secondary server. If the Java Operator maintains state that should be initialized once, but not created and copied with each checkpoint, the setSessionState() and getSessionState() methods should be used. Examples of this could be JDBC connections which would be initialized once and maintained for the lifetime of the application.
When the Operator is instantiated in SBStudio, this method does nothing.
state - the object that contains the state that the user wants to
preserve between checkpointsOperator.getSessionState()protected final Object getSessionState()
Operator.setSessionState(Object)
public abstract void typecheck()
throws TypecheckException
If the parameters are not correct, or the input port Schemas are not correct, a TypecheckException should be thrown.
The method requireInputPortCount() should be used to verify that the required input ports are set.
If the Operator changes the number of input ports, this method must call requireInputPortCount.
TypecheckException - when the parameters or input Schemas are not satisfied.Operator.requireInputPortCount(int),
Operator.setOutputSchema(int, Schema)
public void init()
throws StreamBaseException
If this operator wishes to register threads, it should call
Operator.registerRunnable(Runnable) from this method.
StreamBaseException - Prevents the application from starting.public boolean hasNotYetStarted()
Operator.OperatorStates.NOT_YET_STARTEDpublic boolean isDroppingTuples()
Operator.SuspendBehaviorStates.DROPPING_TUPLESpublic boolean isProcessingTuples()
Operator.SuspendBehaviorStates.PROCESSING_TUPLESpublic boolean isRunning()
Operator.OperatorStates.STARTEDpublic boolean isShutdown()
Operator.OperatorStates.SHUTDOWNpublic boolean isSuspended()
Operator.OperatorStates.SUSPENDED
public abstract void processTuple(int inputPort,
Tuple tuple)
throws StreamBaseException
The default implementation does nothing.
inputPort - the input port that the tuple is from (ports are zero based)tuple - the tuple from the given input port
StreamBaseException - Terminates the application.
public void resume()
throws Exception
Operator.shutdown() will be called.
resume() is a callback that is called by the StreamBase runtime.
Exception
public void resumed()
throws Exception
Operator.shouldRun() has unblocked and returned true in all
registered runnables.
resumed() is a callback that is called by the StreamBase runtime.
Exceptionpublic final boolean shouldRun()
This method returns false if the operator has not yet started or has shut down. It blocks if the operator is suspended. It returns true if the operator is running.
This method links managed operator threads with the StreamBase runtime. Every registered runnable object must repeatedly call this during its entire lifetime in order to synchronize with the main StreamBase application.
An operator (and, thus, its containing application) is not considered started until all registered runnables have called this method. Likewise, an operator is not considered suspended until all registered runnables are blocked in this method. Finally, an operator is not considered shutdown until all registered runnables have exited their run() method (either on their own or in response to this method returning false).
UnsupportedOperationException - If this was not called
from an operator thread.public void shutdown()
public void postShutdown()
public void suspend()
throws Exception
Exceptionpublic void setSuspendBehavior(int suspendBehavior)
setSuspendBehavior() might be called in the Operator's
constructor or in init().
suspendBehavior - The suspend behavior to set, either
PROCESSING_TUPLES or DROPPING_TUPLES.Operator.SuspendBehaviorStates.PROCESSING_TUPLES,
Operator.SuspendBehaviorStates.DROPPING_TUPLES
public void suspended()
throws Exception
Operator.shouldRun().
suspended() is a callback that is called by the StreamBase runtime.
Exceptionpublic Parameterizable getParameters()
public void setParameters(Parameterizable params)
params - the Java Bean that holds the user-configurable parameters visible
in the StreamBase Studio Properties View of this operator.
public InputStream getResourceContents(String name)
throws ResourceNotFoundException,
StreamBaseException
ResourceNotFoundException - if the resource could not be
found
StreamBaseException - if the module was found but could
not be opened
public final void registerRunnable(Runnable operatorRunnable,
boolean shouldInterrupt)
throws StreamBaseException
The body of this Runnable must hook into StreamBase's
thread management by repeatedly calling the Operator.shouldRun() method during its entire lifetime. This allows
managed threads to be started, stopped, suspended, and resumed
along with the rest of the application.
This method should be called from the operator's Operator.init()
method.
operatorRunnable - The Runnable to register.shouldInterrupt - Whether this thread should be interrupted when
the operator undergoes a state change. Typically true if the thread
blocks on some event, input for example.
-
StreamBaseException - If it is too late in the Operator's life
cycle to register a Runnable.
public final void registerRunnable(Runnable operatorRunnable)
throws StreamBaseException
Operator.registerRunnable(Runnable, boolean)
The body of this Runnable must hook into StreamBase's
thread management by repeatedly calling the Operator.shouldRun() method during its entire lifetime. This allows
managed threads to be started, shut down, suspended, and resumed
along with the rest of the application.
This method should be called from the adapter's Operator.init()
method.
operatorRunnable - The Runnable to register.
StreamBaseException - If it is too late in the Operator's life
cycle to register a Runnable.protected void requireInputPortCount(int numPorts)
This method should be called by the typecheck method.
numPorts - the number of ports
PortMismatchException - When the number of input ports is incorrect.
public PortCounts getPortCounts()
throws TypecheckException
typecheck.
Note: This is for studio purposes only and should not be used to set any fields that you may want to use in other parts of the Operator.
TypecheckExceptionpublic final int getInputPortCount()
public final int getOutputPortCount()
Operator.setOutputSchema(int, Schema)
public void sendOutput(int port,
Tuple tuple)
throws StreamBaseException
port - The output port the Tuple is enqueued upon (ports are zero
based)tuple - The Tuple to enqueue
StreamBaseException - if the port is invalid or the tuple argument doesn't match
the schema of the output port.
public void sendOutput(int port,
List tuples)
throws StreamBaseException
port - The output port the Tuple is enqueued upon (ports are zero
based)tuples - The List of Tuple objects to enqueue
StreamBaseException - if the port is invalid or any tuple in the tuples argument
doesn't match the schema of the output port.public final Schema getInputSchema(int port)
port - the port to return the schema for (ports are zero based)
IndexOutOfBoundsException - if port not in rangepublic final Schema getOutputSchema(int port)
port - the port to return the schema for (ports are zero based)
IndexOutOfBoundsException - if port not in range
public final void setOutputSchema(int port,
Schema schema)
throws TypecheckException
port - the port to set the given schema to (ports are zero based)schema - the schema to set the given port to
TypecheckExceptionOperator.getOutputPortCount()protected void setReuseTuple(boolean reuse)
reuse - allow/disallow tuple reuse.public boolean getReuseTuple()
|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||