This topic focuses on execution order, concurrency options, and data
parallelism in StreamBase applications. The underlying
principles are similar whether you implement your applications as EventFlows
(.sbapp) or text-based StreamSQL applications
(.ssql).
By default, when StreamBase processes data in an application, operations are executed in a predictable order. That is, input tuples are each processed individually to completion: in an EventFlow from left-to-right. In a StreamSQL application, the order in which statements are written determines the order in which they are processed. At StreamBase Server startup time, the application is converted into Java source code, which is then dynamically compiled to Java bytecode, which in turn is executed by a VM/JIT. The generated source code and bytecode are not persisted, and are not visible to users.
If you know that portions of your StreamBase application can run without dependencies on the other streaming data in your application, you may be able to improve the overall throughput by selecting optional concurrency options. Each StreamBase operator (or module reference) that has concurrency enabled is run in its own processing thread. This can result in faster performance on SMP machines.
If you enable concurrency for a StreamBase operator or module,
you have an additional option to enable data parallelism. This feature
allows you to run multiple instances of the operator or module reference,
and each instance runs in its own thread. At runtime, tuples are dispatched
to particular instances based on the value of a key field expression, such
as customerNumber. This expression must be an
integer; if you wish to dispatch based on a non-integer expression, you can
use the hash() function to convert any data
type into an integer. Tuples are dispatched to instances based on the value
of the key expression modulo the number of threads that you select. If no
key expression is specified, tuples are dispatched in a round-robin
fashion. On SMP machines, this can result in faster performance, as
multiple threads are processed in parallel.
Important! There are a number of caveats associated with the concurrency options. In StreamBase Studio, most components have an optional Concurrency tab in the Properties view. Text in the tab cautions you that using this feature requires a thorough analysis of your application. Be sure to read the caveats described in Important Considerations for Using Data Parallelism.
The StreamBase execution order rules are as follows:
-
Rule 1: Each input tuple is processed to completion, from left to right.
When a tuple arrives at the input of a module or application, it is processed as far as possible, upstream to downstream, before processing happens on any other tuples. To emphasize the point, the term "processed to completion" means that:
-
Whenever a tuple arrives on an input stream, it is processed to completion by the first operator tied to that input stream, and is then processed by the second operator (if any), and so on, downstream through the application.
-
Whenever a tuple arrives at an operator, the operator does its processing on that tuple, generating zero or more output tuples. The first output tuple is processed to completion, in order, by each downstream operator, in order; then the second output tuple is processed to completion by each downstream operator, in order; and so on.
-
Downstream operators finish work before the upstream operator or input stream sees another tuple.
-
-
Rule 2: Branches are processed sequentially.
Consider the case where two operators are attached to an input stream, or two operators are attached to another operator's output port. If the path of tuple processing ever splits, then the different branches are always executed one at a time, to completion, and in the same order.
-
Rule 3: Output tuples are processed sequentially.
If an operator ever generates more than one tuple, then each output tuple is processed to completion one at a time before the next one is considered.
-
Rule 4: Module output is processed to completion immediately.
Consider the case where you have a module named
Inner, which is inside a module namedOuter. The execution order is the same as if you had copied the entire contents ofInnerintoOuterverbatim. In other words, when a tuple is available on one ofInner's output streams, that tuple becomes available inOutercompletely and is processed there to completion. -
Rule 5: One operator is executed at a time.
Only one operator ever executes at any given time, and processing always happens according to the rules above. This is true even if an operator generates tuples asynchronously; that is, not necessarily in response to input tuples.
In an EventFlow, if there are multiple arrows (called arcs) connected to a single port of an operator,
a tuple is always sent to each of those arcs in the same order: that is,
the order in which the downstream entities appear in the .sbapp XML file. Similarly, in a StreamSQL application,
tuples are sent to streams in the order in which streams are declared.
If a tuple is sent backwards on a loop (a Union operator with an explicitly-declared schema), downstream processing completes before the tuple is cycled again in the loop.
If you know that portions of your StreamBase application can run without dependencies on the other streaming data in your application, you may be able to improve the overall throughput by selecting the concurrency options. Each StreamBase module reference or operator that has concurrency enabled is run in its own processing thread. This can result in faster performance; on an SMP machine, because the threads are distributed automatically across the available processors.
Note
At runtime, StreamBase creates a module for each thread created for concurrency. The server prefixes its module reference name to the operator name, as shown in this example:
sbc list -m
MyBsort.MyBsort
In this example, the sbc list command is run with its -m option; without it, the sbc list command does not show modules, so the MyBsort operator would have been omitted.
In StreamSQL applications, you can use the PARALLEL keyword to cause an APPLY statement to run in a separate thread. For more
information about concurrency in StreamSQL, see APPLY Statement in the
StreamSQL Guide.
In StreamBase Studio, in the Properties View for Module References and most operators, the Concurrency tab includes an option to Run this component in a separate thread. For example:
This option is available for any module reference and for most operators, except operators that need to be processed sequentially, not concurrently with other components. The option to run in a separate thread is not available for the Lock and Unlock operators, and Query operators when connected to Query Table data constructs. By contrast, the option is available for Query operators that are connected to JDBC Table data constructs.
Let's say you have two module references for separate subsets of an
application. Each module reference has its own thread when run. In
addition, when the StreamBase application runs, one additional
thread is created for the processing of the rest of the application; that
thread is named main.
You can specify the concurrency option for an individual operator, which would then get its own thread. If the individual operator you mark for concurrency resides in a subapplication (that is, in a module referenced by a parent application), the operator does get its own thread, but the other operators in the subapplication run in the same thread. Instead, the best practice is to group a set of operators (and any data constructs they use) into a single module reference, and then mark the module reference for concurrency. Do this only if the module reference can run concurrently without data dependencies on other components in your application.
In a text-based StreamSQL application, only modules can be run in separate
threads. You enable concurrency by changing APPLY
MODULE to APPLY PARALLEL MODULE. If you
convert an EventFlow application into a StreamSQL application, each
operator marked for concurrency must be placed into a separate module and
then marked for concurrency. For more information, see the APPLY Statement description in
the StreamSQL Guide.
In the Data Parallelism section of the Concurrency tab, you can specify that multiple instances of an operator or module should be instantiated, each in a separate thread. At runtime, each tuple is dispatched to a particular instance of the component. When configured this way, all tuples with the same value in a field identified as the Key Expression are sent to the same operator or module instance.
In the example that follows, concurrency has been enabled for a Map
operator; the StreamBase application is set to create two
instances of this operator. As each tuple is processed,
StreamBase uses the value in the customerNumber tuple field to select the instance of the
Map operator to process the tuple. The StreamBase runtime
guarantees that tuples with the same customerNumber value are processed by the same instance of
this Map operator.
Using data parallelism can improve runtime performance on SMP machines. However, there are important caveats to consider, as described in the next section.
Before you enable data parallelism for a component, read this section and determine whether any of the following cautions applies to your StreamBase application.
For operators and modules that maintain state while the application is running, it is critical to use an appropriate Key Expression.
-
The Key Expression field is optional in the Data Parallelism section of the Concurrency tab. If you do not specify a key expression, StreamBase uses a predictable distribution (or round robin) of the tuples among the available instances of the component. For streaming data where the individual tuples do not depend on values in other tuples, this parallel distribution is fine. However, if the nature of your streaming data is that you are looking for correlations between the tuples, round robin distribution could cause unexpected results. If the tuples have data dependencies, be sure to specify a Key Expression.
-
Aggregate, BSort, Gather, Join, Merge, and Query Tables are stateful objects. Any component marked for data parallelism will have multiple instances instantiated. So for those operators that maintain state, the state will be available only to the instance in which it is stored. For example, in a trading application, all tuples with a
SymbolofIBMmust go to the same instance so that the operator has all the appropriate data for its calculation. You must consider whether the field chosen for the Key Expression is consistent with what you are doing in the operator. For example, if you are grouping bySymbol, then useSymbolto make sure all stocks for the same symbol are sent to the same operator instance. -
Use a key expression that sets the appropriate scope for each instance's processing. For example, using a key expression on
Symbolmight not be appropriate if the runtime processing of each instance really needs to consider all the stocks in a particularSector. -
The number of parallel threads can alter the processing semantics if the key expression is incorrect. For example, if the key expression is
Symboland the pricing model is sector-based, it is possible thatDELLandHPQmight go to the same thread (that is, work correctly) with two threads selected, but go to different threads (that is, work incorrectly) with three threads.
If Data Parallelism is enabled for a module, and that module contains a Query Table, remember that the Query Table is a stateful object. It is like the other stateful operators in that each instance will only see a subset of tuples, and you must set the key expression appropriately.
If Data Parallelism is enabled for a module, when you execute a query
against a Query Table, take care that your query is applied to the
correct table. For example, assume that bank-sector related data is
processed by instance 1 and stored in Query Table 1, and high-tech
sector related data is processed by instance 2 and stored in Query
Table 2. You must be certain that when you attempt to read data from
the table, that the tuple triggering the read operation has a field
corresponding to the Key Expression field, so that the query is
directed to the instance with the table storing the desired data. For
example, a read for IBM needs to be
directed to instance 2 and Query Table 2, so the tuple that triggers
this read must have a sector field with the high-tech entry so that the
query is sent to the correct instance.
Let's say that the module has an input stream to accept tuples that are
written to the Query Table. This tuple has many fields but we are
concerned only with Symbol and
Sector. The Key Expression uses
Sector to pick the instance that will
process the tuple. So now when a tuple arrives on another input stream
to trigger a read, it too must have a Sector field so the Key Expression sends the tuple to
the correct instance; then it also needs a Symbol field so that data on
the desired stock can be retrieved.
Note that a read all rows operation
returns all the rows for this thread, not all the rows from all the
threads.
Do not enable data parallelism for modules that contain file-based or TCP-based embedded adapters. For example, you might inadvertently set up multiple instances of the adapter trying to simultaneously read the same file or port, or trying to write to the same file or port. That is, two CSV write adapters could not open the same file for writing simultaneously, which is what enabling data parallelism would try to do. (The name of the file is part of the adapters configuration so it would be the same in all instances of the adapter.) To avoid this scenario, do not enable data parallelism for modules that contain these types of embedded adapters.
By contrast, a TCP-based output embedded adapter, such as the StreamBase E-mail Sender Output Adapter, does work in a data-parallel module.
Only operators or modules configured to run in a separate thread can be optionally configured to offer data parallelism. When you consider using this feature, keep in mind the cautions in the previous section.
In an EventFlow application, the Concurrency tab of a component's Properties view has
three controls enabled if you check the Run the
component in a separate thread checkbox.
Check the Run in parallel threads checkbox to enable the Number of threads and Key expression fields. Use the first option to specify the number of instances of the component you want to instantiate. Use the Key expression field to specify an expression derived from fields in the incoming tuple. StreamBase uses the resulting integer to select the instance that will process the tuple.
The key expression must evaluate to an integer value. You can use the
hash() function to convert the results of the
expression to an int. For example, string
fields can be converted into an integer value with the function
hash(string_field_name). In this example, your
key expression might be hash(customerNumber).
When the application starts, it instantiates multiple instances of each
operator or module reference. You can confirm that the multiple instances
have been created by running StreamBase Monitor
(sbmonitor). In the Monitor
displays, look for multiple entries for the operators or module
references you have designated to run parallel. The designation
default refers to the container hosting the
application.
For more information about StreamBase containers, see Using Containers to Organize and Connect Multiple Applications in the Administration Guide.
In a StreamSQL application, only modules can be configured to run in a separate thread and use data parallelism. Before using this feature, study the Important Considerations for Using Data Parallelism.
You include modules in a StreamSQL application with the APPLY MODULE statement. To specify data parallelism, use
the clause immediately
after the PARALLEL instance_number BY field_name
keyword. In the APPLY field, specify an
integer value to specify the number of instances of the component you
want to instantiate. The instance_number field is the
name of an integer field in the incoming tuple to use as the key
expression, as described in the previous section.
field_name
For example, If customerNumber is an integer
value, use the following syntax.
APPLY PARALLEL 2 BY customerNumber MODULE...
If customerNumber is a non-integer value,
use one of the type conversion functions int()
or hash() to convert the expression into an
integer. The conversion function cannot be used directly within the
APPLY statement. You must invoke the conversion
function within a SELECT statement, adding the
resulting integer field to the tuple passed into the module. This process
is summarized in the following code example.
CREATE INPUT STREAM in (customerNumber string(10), ...); CREATE OUTPUT STREAM out; CREATE STREAM s AS SELECT *, hash(customerNumber) AS hash_value FROM in; APPLY PARALLEL 2 BY hash_value MODULE "module_name" FROM module_input_stream = s INTO module_output_stream = out;
For more information, see the description of the APPLY Statement in the StreamSQL Guide.
