com.streambase.sb.operator
Class Operator

java.lang.Object
  extended by com.streambase.sb.operator.Operator
All Implemented Interfaces:
Serializable
Direct Known Subclasses:
InputAdapter, OutputAdapter

public abstract class Operator
extends Object
implements Serializable

Abstract base class for User code that is used as a Java Operator or an embedded Adapter in a StreamBase application. One instance will be created for each Java Operator in a StreamBase application. StreamBase Studio may operate on several StreamBase applications at a time, so Operator subclass instances may be in different applications.

Operator subclasses must have a public default constructor.

In High Availability mode, the Operator will be serialized and sent from a primary server to a secondary server. Operators in the secondary server will be discarded as new operators containing the primary state are instantiated as they are deserialized. So in secondary HA servers, the operators will frequently be destroyed and recreated.

If there is any state that the operator instance needs to maintain beyond the lifetime of the instance, the setSessionState/getSessionState methods can be used. This is the Memento Design Pattern.

An Operator is notified of state changes through callbacks. The StreamBase runtime 'calls back' an Operator when it changes the runtime state of the Operator. These callbacks include Operator.resume(), Operator.resumed(), Operator.suspend(), Operator.suspended() and . Operator.shutdown().

Operator provides "managed threads", which are threads that run concurrently with the application, but which can synchronize with its overall state changes. These threads are started, suspended, resumed, and shut down with the application. Managed threads are registered with an Operator using the method registerRunnable. This is particularly useful for input adapters, which typically have to respond to external events asynchronously with the application. Managed threads can call sendOutput at any time.

If an Operator registers one or more managed thread, and all of its managed threads exit their run() methods, then the Operator itself will shut down.

The StreamBase runtime blocks while it waits for an Operator's managed threads to respond to a state change. This can be problematic if a managed thread is blocked on some event. However, the StreamBase runtime can be configured to interrupt a thread when it needs to change its state. This is accomplished by setting the flag shouldInterrupt to true when registering the thread with registerRunnable.

It may be that an Operator's managed thread does not respond to a state change even after it has been interrupted. If the Operator's thread does not respond to the state within a given time interval then it is considered to be in failure and it is shut down. This time interval is specified by the server configuration parameter operator-state-change-timeout-ms.

See Also:
Parameterizable, Serialized Form

Nested Class Summary
static class Operator.OperatorStates
          The set of runtime states that an Operator can be in.
static class Operator.SuspendBehaviorStates
          Suspend behavior defines how an Operator handles tuples when it is suspended; meaning when it is in the SUSPENDED state.
 
Field Summary
static int DEFAULT_STATE_CHANGE_TIMEOUT
          Default value for the timeout for Operator state changes, in milliseconds.
 
Constructor Summary
protected Operator()
          Constructs an operator.
 
Method Summary
 String getDisplayName()
          Return the display name of this Operator.
 int getInputPortCount()
          Return the number of input ports.
 Schema getInputSchema(int port)
          Returns the schema of an input port.
 String getName()
          Return the name of this Operator.
 int getOutputPortCount()
          Returns the number of output ports.
 Schema getOutputSchema(int port)
          Return the output schema.
 Parameterizable getParameters()
          Returns the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
 PortCounts getPortCounts()
          An optional method that subclasses can override to dynamically tell Studio the number of input and output ports.
 InputStream getResourceContents(String name)
          Returns an open input stream on the contents of the named resource file.
 boolean getReuseTuple()
          Get the state of the input Tuple reuse flag.
protected  Object getSessionState()
          Retreive session state for this operator that has the same object extent as the application, not the instance (which may be repeatedly destroyed and recreated).
 boolean hasNotYetStarted()
          Returns true if this Operator has not yet started running.
 void init()
          After the Operator has typechecked successfully, but before the application or any managed threads start, the StreamBase server will call the init method.
 boolean isDroppingTuples()
          Returns true if the Operator will drop any tuples it receives when it is suspended.
 boolean isProcessingTuples()
          Returns true if the Operator will process any tuples it receives when it is suspended.
 boolean isRunning()
          Returns true if this Operator is currently running, false otherwise.
 boolean isShutdown()
          Returns true if this Operator is currently shut down, false otherwise.
 boolean isSuspended()
          Returns true if this Operator is currently suspended, false otherwise.
 void postShutdown()
          postShutdown is called by the StreamBase runtime just after shutting down this Operator.
abstract  void processTuple(int inputPort, Tuple tuple)
          This method will be called by the StreamBase server for each Tuple given to the Operator to process.
 void registerRunnable(Runnable operatorRunnable)
          Deprecated. As of StreamBase version 3.7, replaced by Operator.registerRunnable(Runnable, boolean)
 void registerRunnable(Runnable operatorRunnable, boolean shouldInterrupt)
          Register a Runnable object to be managed by this Operator.
protected  void requireInputPortCount(int numPorts)
          Throws a PortMismatchException if the number of ports is not numPorts.
 void resume()
          resume() is called when an operator starts or resumes execution, before any registered runnables are started or resumed.
 void resumed()
          resumed() is called after all registered runnables of the operator have started or resumed.
 void sendOutput(int port, List tuples)
          Enqueue a List of Tuples to be sent to downstream operators.
 void sendOutput(int port, Tuple tuple)
          Enqueue a Tuple to be sent to downstream operators.
 void setDisplayName(String dn)
          The display name is the String that's shown in the list of available Operators.
 void setOutputSchema(int port, Schema schema)
          Sets the output schema for the given output port (port #'s are zero based).
 void setParameters(Parameterizable params)
          Sets the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
 void setPortHints(int numInputPorts, int numOutputPorts)
          Used to tell StreamBase Studio how many ports to draw on the Java Operator when the operator is drawn in the IDE's canvas.
protected  void setReuseTuple(boolean reuse)
          Allow/disallow the runtime to reuse tuples on operator input.
protected  void setSessionState(Object state)
          Maintain state for this operator that has the same object extent as the application, not the instance (which may be repeatedly destroyed and recreated when High Availability servers are used).
 void setSuspendBehavior(int suspendBehavior)
          Set the suspend behavior of this Operator.
 boolean shouldRun()
          Return whether or not calling operator thread is enabled and should continue running.
 void shutdown()
          shutdown is called by the StreamBase runtime just prior to shutting down this Operator.
 void suspend()
          suspend() will be called when an operator suspends, before any registered runnables are suspended.
 void suspended()
          suspended() will be called after all registered runnables of the operator have suspended.
abstract  void typecheck()
          The typecheck method is called by the StreamBase server to ensure that all the parameters for this operator are correct.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_STATE_CHANGE_TIMEOUT

public static final int DEFAULT_STATE_CHANGE_TIMEOUT
Default value for the timeout for Operator state changes, in milliseconds.

An Operator can receive state change events from the StreamBase runtime. In particular, when the runtime starts, suspends, resumes, or shuts down, it, in turn, applies the same state change to any Operators it holds.

When the server applies a state change to an Operator, it blocks until the Operator acknowledges that it has transitioned to the new state.

If an Operator fails to make such an acknowledgement, the StreamBase runtime might wait indefinitely. StateChangeTimeout limits the amount of time the server will wait for an Operator to respond to a requested state change.

If the Operator fails to respond within the timeout, the server will shut down the Operator and continue with its state change.

See Also:
Constant Field Values
Constructor Detail

Operator

protected Operator()
Constructs an operator.

All Operator subclasses must have a public default constructor, otherwise StreamBase Studio and the StreamBase sbd process will not be able to instantiate the Operator subclass. Nothing substantial should be done in the constructor, except for setting port hints and setting the parameters object.

See Also:
Operator.setPortHints(int, int), Operator.setParameters(com.streambase.sb.operator.Parameterizable)
Method Detail

getDisplayName

public final String getDisplayName()
Return the display name of this Operator. The display name is the String that's shown in the list of available Operators.

Returns:
The display name of the Operator.

getName

public String getName()
Return the name of this Operator. Operators are named and can be managed by name.

Returns:
The name of the operator.

setDisplayName

public final void setDisplayName(String dn)
The display name is the String that's shown in the list of available Operators. If The subclass doesn't override it, use the class name.


setPortHints

public void setPortHints(int numInputPorts,
                         int numOutputPorts)
Used to tell StreamBase Studio how many ports to draw on the Java Operator when the operator is drawn in the IDE's canvas. The default is 1 input port, 0 output ports.

This method is only effective when called from the Java Operator's constructor. The typecheck method should call requireInputPortCount instead.

To specify the ports at runtime requireInputPortCount and setOutputSchema should be called from the typecheck method.

To specify the ports dynamically within studio, see getPortCounts.

Parameters:
numInputPorts - number of input ports. Must be non-negative.
numOutputPorts - number of output ports. Must be non-negative.
Throws:
IllegalArgumentException - if an argument is less than zero
See Also:
Operator.requireInputPortCount(int), Operator.setOutputSchema(int, Schema)

setSessionState

protected final void setSessionState(Object state)
Maintain state for this operator that has the same object extent as the application, not the instance (which may be repeatedly destroyed and recreated when High Availability servers are used).

When HA mode is on, the instances in the secondary server will be destroyed and recreated with copies from the primary server each time a checkpoint is sent from the primary server to the secondary server. If the Java Operator maintains state that should be initialized once, but not created and copied with each checkpoint, the setSessionState() and getSessionState() methods should be used. Examples of this could be JDBC connections which would be initialized once and maintained for the lifetime of the application.

When the Operator is instantiated in SBStudio, this method does nothing.

Parameters:
state - the object that contains the state that the user wants to preserve between checkpoints
See Also:
Operator.getSessionState()

getSessionState

protected final Object getSessionState()
Retreive session state for this operator that has the same object extent as the application, not the instance (which may be repeatedly destroyed and recreated). "State" does not imply fault tolerance and is not related to StreamBase high availability.

See Also:
Operator.setSessionState(Object)

typecheck

public abstract void typecheck()
                        throws TypecheckException
The typecheck method is called by the StreamBase server to ensure that all the parameters for this operator are correct. This method is also responsible for verifying input Schemas and setting output Schemas. This method must always call the setOutputSchema() method if there are any output ports.

If the parameters are not correct, or the input port Schemas are not correct, a TypecheckException should be thrown.

The method requireInputPortCount() should be used to verify that the required input ports are set.

If the Operator changes the number of input ports, this method must call requireInputPortCount.

Throws:
TypecheckException - when the parameters or input Schemas are not satisfied.
See Also:
Operator.requireInputPortCount(int), Operator.setOutputSchema(int, Schema)

init

public void init()
          throws StreamBaseException
After the Operator has typechecked successfully, but before the application or any managed threads start, the StreamBase server will call the init method. Operators should override this method to perform custom initialization.

If this operator wishes to register threads, it should call Operator.registerRunnable(Runnable) from this method.

Throws:
StreamBaseException - Prevents the application from starting.

hasNotYetStarted

public boolean hasNotYetStarted()
Returns true if this Operator has not yet started running. hasNotYetStarted() returns true if the Operator is in the runtime state of NOT_YET_STARTED, which is the initial runtime state of an Operator.

See Also:
Operator.OperatorStates.NOT_YET_STARTED

isDroppingTuples

public boolean isDroppingTuples()
Returns true if the Operator will drop any tuples it receives when it is suspended.

See Also:
Operator.SuspendBehaviorStates.DROPPING_TUPLES

isProcessingTuples

public boolean isProcessingTuples()
Returns true if the Operator will process any tuples it receives when it is suspended.

See Also:
Operator.SuspendBehaviorStates.PROCESSING_TUPLES

isRunning

public boolean isRunning()
Returns true if this Operator is currently running, false otherwise. isRunning() returns true if the Operator is in a runtime state of STARTED.

See Also:
Operator.OperatorStates.STARTED

isShutdown

public boolean isShutdown()
Returns true if this Operator is currently shut down, false otherwise. An Operator is shut down when it is in the runtime state of SHUTDOWN, which is the terminal state for Operators.

See Also:
Operator.OperatorStates.SHUTDOWN

isSuspended

public boolean isSuspended()
Returns true if this Operator is currently suspended, false otherwise. An Operator is suspended when it is in the runtime state of SUSPENDED.

See Also:
Operator.OperatorStates.SUSPENDED

processTuple

public abstract void processTuple(int inputPort,
                                  Tuple tuple)
                           throws StreamBaseException
This method will be called by the StreamBase server for each Tuple given to the Operator to process.

The default implementation does nothing.

Parameters:
inputPort - the input port that the tuple is from (ports are zero based)
tuple - the tuple from the given input port
Throws:
StreamBaseException - Terminates the application.

resume

public void resume()
            throws Exception
resume() is called when an operator starts or resumes execution, before any registered runnables are started or resumed. Note that if the application is shutdown directly from a suspended state, this will not be called; instead Operator.shutdown() will be called. resume() is a callback that is called by the StreamBase runtime.

Throws:
Exception

resumed

public void resumed()
             throws Exception
resumed() is called after all registered runnables of the operator have started or resumed. That is, once Operator.shouldRun() has unblocked and returned true in all registered runnables. resumed() is a callback that is called by the StreamBase runtime.

Throws:
Exception

shouldRun

public final boolean shouldRun()
Return whether or not calling operator thread is enabled and should continue running.

This method returns false if the operator has not yet started or has shut down. It blocks if the operator is suspended. It returns true if the operator is running.

This method links managed operator threads with the StreamBase runtime. Every registered runnable object must repeatedly call this during its entire lifetime in order to synchronize with the main StreamBase application.

An operator (and, thus, its containing application) is not considered started until all registered runnables have called this method. Likewise, an operator is not considered suspended until all registered runnables are blocked in this method. Finally, an operator is not considered shutdown until all registered runnables have exited their run() method (either on their own or in response to this method returning false).

Returns:
true is the operator is running, false otherwise.
Throws:
UnsupportedOperationException - If this was not called from an operator thread.

shutdown

public void shutdown()
shutdown is called by the StreamBase runtime just prior to shutting down this Operator. An implementation of shutdown should include any behavior needed to shut down the Operator - freeing resources, etc. shutdown() is a callback that is called by the StreamBase runtime.


postShutdown

public void postShutdown()
postShutdown is called by the StreamBase runtime just after shutting down this Operator. postShutdown() is a callback that is called by the StreamBase runtime.


suspend

public void suspend()
             throws Exception
suspend() will be called when an operator suspends, before any registered runnables are suspended. suspend() is a callback that is called by the StreamBase runtime.

Throws:
Exception

setSuspendBehavior

public void setSuspendBehavior(int suspendBehavior)
Set the suspend behavior of this Operator. The Operator can either process or drop tuples when suspended. setSuspendBehavior() might be called in the Operator's constructor or in init().

Parameters:
suspendBehavior - The suspend behavior to set, either PROCESSING_TUPLES or DROPPING_TUPLES.
See Also:
Operator.SuspendBehaviorStates.PROCESSING_TUPLES, Operator.SuspendBehaviorStates.DROPPING_TUPLES

suspended

public void suspended()
               throws Exception
suspended() will be called after all registered runnables of the operator have suspended. That is, once all registered runnables have called and are blocked in Operator.shouldRun(). suspended() is a callback that is called by the StreamBase runtime.

Throws:
Exception

getParameters

public Parameterizable getParameters()
Returns the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.

Returns:
The Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.

setParameters

public void setParameters(Parameterizable params)
Sets the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.

Parameters:
params - the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.

getResourceContents

public InputStream getResourceContents(String name)
                                throws ResourceNotFoundException,
                                       StreamBaseException
Returns an open input stream on the contents of the named resource file. The client is responsible for closing the stream when finished. This method should be called during typecheck (as opposed to waiting for init or similar run-time methods) in order to be able to surface to the authoring environment any failures locating the resource.

Returns:
an input stream containing the contents of the resource
Throws:
ResourceNotFoundException - if the resource could not be found
StreamBaseException - if the module was found but could not be opened

registerRunnable

public final void registerRunnable(Runnable operatorRunnable,
                                   boolean shouldInterrupt)
                            throws StreamBaseException
Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.

The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the Operator.shouldRun() method during its entire lifetime. This allows managed threads to be started, stopped, suspended, and resumed along with the rest of the application.

This method should be called from the operator's Operator.init() method.

Parameters:
operatorRunnable - The Runnable to register.
shouldInterrupt - Whether this thread should be interrupted when the operator undergoes a state change. Typically true if the thread blocks on some event, input for example. -
Throws:
StreamBaseException - If it is too late in the Operator's life cycle to register a Runnable.

registerRunnable

public final void registerRunnable(Runnable operatorRunnable)
                            throws StreamBaseException
Deprecated. As of StreamBase version 3.7, replaced by Operator.registerRunnable(Runnable, boolean)

Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.

The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the Operator.shouldRun() method during its entire lifetime. This allows managed threads to be started, shut down, suspended, and resumed along with the rest of the application.

This method should be called from the adapter's Operator.init() method.

Parameters:
operatorRunnable - The Runnable to register.
Throws:
StreamBaseException - If it is too late in the Operator's life cycle to register a Runnable.

requireInputPortCount

protected void requireInputPortCount(int numPorts)
Throws a PortMismatchException if the number of ports is not numPorts. The StreamBase Studio IDE will recognize this and draw the operator box with the appropriate number of input ports.

This method should be called by the typecheck method.

Parameters:
numPorts - the number of ports
Throws:
PortMismatchException - When the number of input ports is incorrect.

getPortCounts

public PortCounts getPortCounts()
                         throws TypecheckException
An optional method that subclasses can override to dynamically tell Studio the number of input and output ports. Clients should expect this method to be called after all setters, but prior to typecheck.

Note: This is for studio purposes only and should not be used to set any fields that you may want to use in other parts of the Operator.

Returns:
PortCounts record type
Throws:
TypecheckException

getInputPortCount

public final int getInputPortCount()
Return the number of input ports.

Returns:
number of Input ports, if not in an application 0.

getOutputPortCount

public final int getOutputPortCount()
Returns the number of output ports. The number of output ports are set by the setOutputSchema method.

See Also:
Operator.setOutputSchema(int, Schema)

sendOutput

public void sendOutput(int port,
                       Tuple tuple)
                throws StreamBaseException
Enqueue a Tuple to be sent to downstream operators. A note about reusing/caching the sent tuple. If the given tuple is a tuple that this operator received from a processTuple() call, then the tuple can be reused if and only if getReuseTuple() is FALSE. If the given tuple is created by this operator then it can be reused when the call to sendOutput() returns. StreamBase is done with the tuple once control returns to this operator.

Parameters:
port - The output port the Tuple is enqueued upon (ports are zero based)
tuple - The Tuple to enqueue
Throws:
StreamBaseException - if the port is invalid or the tuple argument doesn't match the schema of the output port.

sendOutput

public void sendOutput(int port,
                       List tuples)
                throws StreamBaseException
Enqueue a List of Tuples to be sent to downstream operators. A note about reusing/caching the sent tuples. If the tuples in the given list are tuples that this operator received from processTuple() calls, then these tuples can be reused if and only if getReuseTuple() is FALSE. If the given list of tuples is created by this operator, then the tuples in this list can be reused when the call to sendOutput() returns. StreamBase is done with the tuples once control returns to this operator.

Parameters:
port - The output port the Tuple is enqueued upon (ports are zero based)
tuples - The List of Tuple objects to enqueue
Throws:
StreamBaseException - if the port is invalid or any tuple in the tuples argument doesn't match the schema of the output port.

getInputSchema

public final Schema getInputSchema(int port)
Returns the schema of an input port.

Parameters:
port - the port to return the schema for (ports are zero based)
Returns:
Schema for the port, null if not running in an appication
Throws:
IndexOutOfBoundsException - if port not in range

getOutputSchema

public final Schema getOutputSchema(int port)
Return the output schema.

Parameters:
port - the port to return the schema for (ports are zero based)
Returns:
Schema for the port, null if not running in an appication
Throws:
IndexOutOfBoundsException - if port not in range

setOutputSchema

public final void setOutputSchema(int port,
                                  Schema schema)
                           throws TypecheckException
Sets the output schema for the given output port (port #'s are zero based). Will ensure that the output port count is large enough for the given port so that OutputPortCount > port. This method should be called from the typecheck method of a Java operator or adapter.

Parameters:
port - the port to set the given schema to (ports are zero based)
schema - the schema to set the given port to
Throws:
TypecheckException
See Also:
Operator.getOutputPortCount()

setReuseTuple

protected void setReuseTuple(boolean reuse)
Allow/disallow the runtime to reuse tuples on operator input. The default is to disallow tuple reuse. Reusing operator input tuples will create fewer objects which may increase performance of the operator. To enable call: setReuseTuple(true) from the derived constructor. Note: that if you plan to store the input Tuple as state in your operator you must copy the Tuple before you store it.

Parameters:
reuse - allow/disallow tuple reuse.

getReuseTuple

public boolean getReuseTuple()
Get the state of the input Tuple reuse flag.

Returns:
boolean if input Tuple reuse is allowed/disallowed Do we allow runtime to reuse tuples?