StreamBase Execution Order, Concurrency Options, and Data Parallelism

This topic focuses on execution order, concurrency options, and data parallelism in StreamBase applications. The underlying principles are similar whether you implement your applications as EventFlows (.sbapp) or text-based StreamSQL applications (.ssql).

Introduction

By default, when StreamBase processes data in an application, operations are executed in a predictable order. That is, input tuples are each processed individually to completion: in an EventFlow from left-to-right. In a StreamSQL application, the order in which statements are written determines the order in which they are processed. At StreamBase Server startup time, the application is converted into Java source code, which is then dynamically compiled to Java bytecode, which in turn is executed by a VM/JIT. The generated source code and bytecode are not persisted, and are not visible to users.

If you know that portions of your StreamBase application can run without dependencies on the other streaming data in your application, you may be able to improve the overall throughput by selecting optional concurrency options. Each StreamBase operator (or module reference) that has concurrency enabled is run in its own processing thread. This can result in faster performance on SMP machines.

If you enable concurrency for a StreamBase operator or module, you have an additional option to enable data parallelism. This feature allows you to run multiple instances of the operator or module reference, and each instance runs in its own thread. At runtime, tuples are dispatched to particular instances based on the value of a key field expression, such as customerNumber. This expression must be an integer; if you wish to dispatch based on a non-integer expression, you can use the hash() function to convert any data type into an integer. Tuples are dispatched to instances based on the value of the key expression modulo the number of threads that you select. If no key expression is specified, tuples are dispatched in a round-robin fashion. On SMP machines, this can result in faster performance, as multiple threads are processed in parallel.

Important! There are a number of caveats associated with the concurrency options. In StreamBase Studio, most components have an optional Concurrency tab in the Properties view. Text in the tab cautions you that using this feature requires a thorough analysis of your application. Be sure to read the caveats described in Important Considerations for Using Data Parallelism.

Rules of the StreamBase Execution Order

The StreamBase execution order rules are as follows:

  • Rule 1: Each input tuple is processed to completion, from left to right.

    When a tuple arrives at the input of a module or application, it is processed as far as possible, upstream to downstream, before processing happens on any other tuples. To emphasize the point, the term "processed to completion" means that:

    • Whenever a tuple arrives on an input stream, it is processed to completion by the first operator tied to that input stream, and is then processed by the second operator (if any), and so on, downstream through the application.

    • Whenever a tuple arrives at an operator, the operator does its processing on that tuple, generating zero or more output tuples. The first output tuple is processed to completion, in order, by each downstream operator, in order; then the second output tuple is processed to completion by each downstream operator, in order; and so on.

    • Downstream operators finish work before the upstream operator or input stream sees another tuple.

  • Rule 2: Branches are processed sequentially.

    Consider the case where two operators are attached to an input stream, or two operators are attached to another operator's output port. If the path of tuple processing ever splits, then the different branches are always executed one at a time, to completion, and in the same order.

  • Rule 3: Output tuples are processed sequentially.

    If an operator ever generates more than one tuple, then each output tuple is processed to completion one at a time before the next one is considered.

  • Rule 4: Module output is processed to completion immediately.

    Consider the case where you have a module named Inner, which is inside a module named Outer. The execution order is the same as if you had copied the entire contents of Inner into Outer verbatim. In other words, when a tuple is available on one of Inner's output streams, that tuple becomes available in Outer completely and is processed there to completion.

  • Rule 5: One operator is executed at a time.

    Only one operator ever executes at any given time, and processing always happens according to the rules above. This is true even if an operator generates tuples asynchronously; that is, not necessarily in response to input tuples.

In an EventFlow, if there are multiple arrows (called arcs) connected to a single port of an operator, a tuple is always sent to each of those arcs in the same order: that is, the order in which the downstream entities appear in the .sbapp XML file. Similarly, in a StreamSQL application, tuples are sent to streams in the order in which streams are declared.

If a tuple is sent backwards on a loop (a Union operator with an explicitly-declared schema), downstream processing completes before the tuple is cycled again in the loop.

Concurrency Options in Module References and Operators

If you know that portions of your StreamBase application can run without dependencies on the other streaming data in your application, you may be able to improve the overall throughput by selecting the concurrency options. Each StreamBase module reference or operator that has concurrency enabled is run in its own processing thread. This can result in faster performance; on an SMP machine, because the threads are distributed automatically across the available processors.

Note

At runtime, StreamBase creates a module for each thread created for concurrency. The server prefixes its module reference name to the operator name, as shown in this example:

sbc list -m 
MyBsort.MyBsort

In this example, the sbc list command is run with its -m option; without it, the sbc list command does not show modules, so the MyBsort operator would have been omitted.

In StreamSQL applications, you can use the PARALLEL keyword to cause an APPLY statement to run in a separate thread. For more information about concurrency in StreamSQL, see APPLY Statement in the StreamSQL Guide.

In StreamBase Studio, in the Properties View for Module References and most operators, the Concurrency tab includes an option to Run this component in a separate thread. For example:

Run this StreamBase component in a separate thread

This option is available for any module reference and for most operators, except operators that need to be processed sequentially, not concurrently with other components. The option to run in a separate thread is not available for the Lock and Unlock operators, and Query operators when connected to Query Table data constructs. By contrast, the option is available for Query operators that are connected to JDBC Table data constructs.

Let's say you have two module references for separate subsets of an application. Each module reference has its own thread when run. In addition, when the StreamBase application runs, one additional thread is created for the processing of the rest of the application; that thread is named main.

You can specify the concurrency option for an individual operator, which would then get its own thread. If the individual operator you mark for concurrency resides in a subapplication (that is, in a module referenced by a parent application), the operator does get its own thread, but the other operators in the subapplication run in the same thread. Instead, the best practice is to group a set of operators (and any data constructs they use) into a single module reference, and then mark the module reference for concurrency. Do this only if the module reference can run concurrently without data dependencies on other components in your application.

In a text-based StreamSQL application, only modules can be run in separate threads. You enable concurrency by changing APPLY MODULE to APPLY PARALLEL MODULE. If you convert an EventFlow application into a StreamSQL application, each operator marked for concurrency must be placed into a separate module and then marked for concurrency. For more information, see the APPLY Statement description in the StreamSQL Guide.

Using Data Parallelism

In the Data Parallelism section of the Concurrency tab, you can specify that multiple instances of an operator or module should be instantiated, each in a separate thread. At runtime, each tuple is dispatched to a particular instance of the component. When configured this way, all tuples with the same value in a field identified as the Key Expression are sent to the same operator or module instance.

In the example that follows, concurrency has been enabled for a Map operator; the StreamBase application is set to create two instances of this operator. As each tuple is processed, StreamBase uses the value in the customerNumber tuple field to select the instance of the Map operator to process the tuple. The StreamBase runtime guarantees that tuples with the same customerNumber value are processed by the same instance of this Map operator.

Concurrency Option and Data Parallelism

Using data parallelism can improve runtime performance on SMP machines. However, there are important caveats to consider, as described in the next section.

Important Considerations for Using Data Parallelism

Before you enable data parallelism for a component, read this section and determine whether any of the following cautions applies to your StreamBase application.

Critical to Use an Appropriate Key Expression

For operators and modules that maintain state while the application is running, it is critical to use an appropriate Key Expression.

  • The Key Expression field is optional in the Data Parallelism section of the Concurrency tab. If you do not specify a key expression, StreamBase uses a predictable distribution (or round robin) of the tuples among the available instances of the component. For streaming data where the individual tuples do not depend on values in other tuples, this parallel distribution is fine. However, if the nature of your streaming data is that you are looking for correlations between the tuples, round robin distribution could cause unexpected results. If the tuples have data dependencies, be sure to specify a Key Expression.

  • Aggregate, BSort, Gather, Join, Merge, and Query Tables are stateful objects. Any component marked for data parallelism will have multiple instances instantiated. So for those operators that maintain state, the state will be available only to the instance in which it is stored. For example, in a trading application, all tuples with a Symbol of IBM must go to the same instance so that the operator has all the appropriate data for its calculation. You must consider whether the field chosen for the Key Expression is consistent with what you are doing in the operator. For example, if you are grouping by Symbol, then use Symbol to make sure all stocks for the same symbol are sent to the same operator instance.

  • Use a key expression that sets the appropriate scope for each instance's processing. For example, using a key expression on Symbol might not be appropriate if the runtime processing of each instance really needs to consider all the stocks in a particular Sector.

  • The number of parallel threads can alter the processing semantics if the key expression is incorrect. For example, if the key expression is Symbol and the pricing model is sector-based, it is possible that DELL and HPQ might go to the same thread (that is, work correctly) with two threads selected, but go to different threads (that is, work incorrectly) with three threads.

Modules That Contain Query Tables

If Data Parallelism is enabled for a module, and that module contains a Query Table, remember that the Query Table is a stateful object. It is like the other stateful operators in that each instance will only see a subset of tuples, and you must set the key expression appropriately.

If Data Parallelism is enabled for a module, when you execute a query against a Query Table, take care that your query is applied to the correct table. For example, assume that bank-sector related data is processed by instance 1 and stored in Query Table 1, and high-tech sector related data is processed by instance 2 and stored in Query Table 2. You must be certain that when you attempt to read data from the table, that the tuple triggering the read operation has a field corresponding to the Key Expression field, so that the query is directed to the instance with the table storing the desired data. For example, a read for IBM needs to be directed to instance 2 and Query Table 2, so the tuple that triggers this read must have a sector field with the high-tech entry so that the query is sent to the correct instance.

Let's say that the module has an input stream to accept tuples that are written to the Query Table. This tuple has many fields but we are concerned only with Symbol and Sector. The Key Expression uses Sector to pick the instance that will process the tuple. So now when a tuple arrives on another input stream to trigger a read, it too must have a Sector field so the Key Expression sends the tuple to the correct instance; then it also needs a Symbol field so that data on the desired stock can be retrieved.

Note that a read all rows operation returns all the rows for this thread, not all the rows from all the threads.

Modules That Contain File-based or TCP-based Embedded Adapters

Do not enable data parallelism for modules that contain file-based or TCP-based embedded adapters. For example, you might inadvertently set up multiple instances of the adapter trying to simultaneously read the same file or port, or trying to write to the same file or port. That is, two CSV write adapters could not open the same file for writing simultaneously, which is what enabling data parallelism would try to do. (The name of the file is part of the adapters configuration so it would be the same in all instances of the adapter.) To avoid this scenario, do not enable data parallelism for modules that contain these types of embedded adapters.

By contrast, a TCP-based output embedded adapter, such as the StreamBase E-mail Sender Output Adapter, does work in a data-parallel module.

Enabling Data Parallelism in an EventFlow Application

Only operators or modules configured to run in a separate thread can be optionally configured to offer data parallelism. When you consider using this feature, keep in mind the cautions in the previous section.

In an EventFlow application, the Concurrency tab of a component's Properties view has three controls enabled if you check the Run the component in a separate thread checkbox.

Check the Run in parallel threads checkbox to enable the Number of threads and Key expression fields. Use the first option to specify the number of instances of the component you want to instantiate. Use the Key expression field to specify an expression derived from fields in the incoming tuple. StreamBase uses the resulting integer to select the instance that will process the tuple.

The key expression must evaluate to an integer value. You can use the hash() function to convert the results of the expression to an int. For example, string fields can be converted into an integer value with the function hash(string_field_name). In this example, your key expression might be hash(customerNumber).

When the application starts, it instantiates multiple instances of each operator or module reference. You can confirm that the multiple instances have been created by running StreamBase Monitor (sbmonitor). In the Monitor displays, look for multiple entries for the operators or module references you have designated to run parallel. The designation default refers to the container hosting the application.

StreamBase Monitor when multiple copies of an operator are running in separate threads

For more information about StreamBase containers, see Using Containers to Organize and Connect Multiple Applications in the Administration Guide.

Enabling Data Parallelism in a StreamSQL Application

In a StreamSQL application, only modules can be configured to run in a separate thread and use data parallelism. Before using this feature, study the Important Considerations for Using Data Parallelism.

You include modules in a StreamSQL application with the APPLY MODULE statement. To specify data parallelism, use the PARALLEL instance_number BY field_name clause immediately after the APPLY keyword. In the instance_number field, specify an integer value to specify the number of instances of the component you want to instantiate. The field_name field is the name of an integer field in the incoming tuple to use as the key expression, as described in the previous section.

For example, If customerNumber is an integer value, use the following syntax.

APPLY PARALLEL 2 BY customerNumber MODULE...

If customerNumber is a non-integer value, use one of the type conversion functions int() or hash() to convert the expression into an integer. The conversion function cannot be used directly within the APPLY statement. You must invoke the conversion function within a SELECT statement, adding the resulting integer field to the tuple passed into the module. This process is summarized in the following code example.

CREATE INPUT STREAM in (customerNumber string(10), ...);
CREATE OUTPUT STREAM out;
CREATE STREAM s AS
  SELECT *, hash(customerNumber) AS hash_value
  FROM in;
APPLY PARALLEL 2 BY hash_value MODULE "module_name"
  FROM module_input_stream = s
  INTO module_output_stream = out;

For more information, see the description of the APPLY Statement in the StreamSQL Guide.