The StreamBase C++ API supports custom C++ functions that you
can call directly in StreamBase expressions by using the
callcpp function. There are two
forms of the callcpp function, used in
StreamBase simple and aggregate expressions.
Note
The C++ API for custom functions was changed in StreamBase 3.0. If you use StreamBase applications developed before Version 3.0 that use C++ custom simple or aggregate functions, you must rewrite and recompile the functions. See Creating Custom Simple Functions and Creating Custom Aggregate Functions to learn how the changes affect StreamBase applications that use those functions. For complete reference information, please see the StreamBase C++ API Documentation.
The installed StreamBase kit provides the source files for two custom C++ function samples:
streambase-install-dir/sample/custom-aggregate-function
streambase-install-dir/sample/custom-simple-function
These custom functions were built by extending, respectively, the
sb::PluginAggregate class and the sb::PluginFunction class. These classes are provided in
the documented C++ API
Documentation.
This topic describes:
Let's begin with a sample. The Custom Aggregate Sample consists of the following:
-
The source code for the plug-in aggregate function,
NthValue.cpp -
A sample configuration file, nth_value.sbconf, which tells the StreamBase Server to load the custom aggregate function.
-
A sample application, nth_value.sbapp, which uses the aggregate function.
Binaries are provided, as well as their source code. The sample code extends the StreamBase C++ API. For information about the API, see its C++ API Documentation.
On UNIX, a Makefile is provided in each custom C++ function sample's
installation directory to (re)build the sample source code on a supported UNIX machine. On
Windows, a Visual Studio .sln solution file is
provided to (re)build the sample source code on a supported Windows machine.
Note how the custom function is declared in the nth_value.sbconf file. For example:
<streambase-configuration>
<global>
<plugin file="./nth_value"/>
</global>
<server>
<param name="tcp-port" value="10000"/>
</server>
</streambase-configuration>
However, in StreamBase Studio, you can import the samples and the IDE will take care of sending the proper configuration settings to the server (see next section).
-
Launch StreamBase Studio on Windows or Linux.
-
From the top menu, click → , and select the
custom-aggregate-functionandcustom-simple-functionsamples from theExtending StreamBasesection of the list. StreamBase Studio creates two projects for you.Note
On Windows, the custom functions
nth_value.dllandlog.dllare imported into the aggregate and simple projects, respectively; on Linux, the filesnth_value.soandlog.soare imported instead. -
In the Authoring perspective, open the
nth_value.sbappapplication for the Custom Aggregate Function sample. -
With the nth_value.sbapp editor window active, click the button in the toolbar (or select → → from the menu). StreamBase Studio uploads the custom function and its configuration setting to the StreamBase Server.
-
When the server starts, StreamBase Studio switches to the Test/Debug perspective. In the Application Output view, make sure All Streams is selected in the Output Stream drop-down list.
-
In the Manual Input view, select from the Input Stream list. For this example, uncheck the
Log to Application Inputoption, and minimize or close the Application Input view.
-
The Field Values field is initially filled with "null". Enter
1and click . No output is shown in the Application Output view. -
Now enter
10and click . This time, in the Application Output view, notice the output fromoutput_window_size_2:
-
Enter
50and click . Notice the output fromoutput_window_size_3as well asoutput_window_size_2. For example:
-
Enter the number
100and click .
To see what happens when the aggregate function throws an exception, try
enqueueing a number to the input_that_will_cause_an_error stream.
To run this sample on a machine where StreamBase is installed:
-
Open three terminal windows on UNIX, or three StreamBase Command Prompt windows on Windows. In each window, navigate to the custom-aggregate-function directory in your StreamBase installation area. For example:
-
On UNIX:
cd /opt/streambase/sample/custom-aggregate-function
-
On Windows:
cd "C:\Program Files\StreamBase Systems\StreamBase
.n.m\sample\custom-aggregate-function"
-
-
In the first window, launch the StreamBase Server for the sample application:
sbd -f nth_value.sbconf nth_value.sbapp -
In the second window, run a dequeuer so that you can see the output that will be produced:
sbc dequeue -
In the third window:
-
Run an enqueuer:
sbc enqueue input -
Type numbers into the enqueuer, one number per line.
-
-
In the second (dequeuer) window, look for output from
output_window_size_2. -
When you enter the third number in the enqueuer window, look for output in the dequeuer window from
output_window_size_3as well asoutput_window_size_2. For example, if you enter the numbers 1, 10, 50, and 100, then look for output like the following from the dequeuer:output_window_size_2,10
output_window_size_2,50
output_window_size_3,10
output_window_size_2,100
output_window_size_3,50
To see what happens when the aggregate function throws an exception, try
enqueueing a number to the input_that_will_cause_an_error stream.
The Custom Simple Function Sample consists of the following:
-
The source code for the plug-in function,
LogFunction.cpp -
A sample configuration file,
log.sbconf, which tells the StreamBase Server to load the custom simple function. -
A sample application,
log.sbapp, which uses the function.
Binaries are provided, as well as their source code. A Makefile is provided to rebuild the sample source code.
To run this sample from StreamBase Studio, use steps that are similar to the ones outlined in the previous section.
To run this sample on a supported machine where StreamBase is installed:
-
Launch a StreamBase Server on the sample application:
sbd -f log.sbconf log.sbapp -
Run a dequeuer so that you can see the output produced:
sbc dequeue output -
Run an enqueuer:
sbc enqueue input -
Type numbers into the enqueuer, one number per line. For example, if you enter the numbers 1, 10, 50, and 100, then you should see the following output from the dequeuer:
output,0
output,2.30258509299405
output,3.91202300542815
output,4.60517018598809 -
To see what happens when the function throws an exception, try enqueueing a negative number.
The following sections explain how to create custom C++ functions for your StreamBase applications, by extending the StreamBase C++ Custom Function API.
Notes:
-
A StreamBase custom C++ function is mapped to a class. See the sections below for details.
-
Multiple classes (multiple StreamBase custom C++ functions that you write) may be added to a single DLL on Windows, or to a single shared library on UNIX.
As the simple function sample shows, the basic steps are to writing a simple function are first to include some headers:
#include "StreamBase.hpp" #include "PluginFunction.hpp"
You need to define some name spaces:
using namespace std; using namespace sb;
Then define the class name and have it inherit from PluginFunction. For example:
class MySimpleFunction : public PluginFunction
Two virtual methods are required in simple function classes. They are:
-
virtual void typecheck(const Schema &arg_types)Should validate the number and type of arguments the simple function uses. The
typecheckmethod is the one used by StreamBase Studio when authoring and also by the StreamBase Server when the application is started.Note
In Version 3.0, this replaced
virtual DataType typecheck(const PluginTypeList &arg_types) -
virtual void eval(Tuple &retval, ConstTuple &args)The
evalmethod is what is called when the simple function is executed in a StreamBase application.Note
In Version 3.0, this replaced
virtual void eval(sb::Value &retval, const PluginValueList &args)
Within the simple function class, you must declare the class to StreamBase:
STREAMBASE_DECLARE_PLUGIN_FUNCTION(MySimpleFunction);
This declaration provides some infrastructure that aides in registering the simple function class for use by StreamBase.
Finally, outside of the simple function class definition, you must "define" the class and what it will be known as in the StreamBase application:
STREAMBASE_DEFINE_PLUGIN_FUNCTION
(MySimpleFunction, "MyApplicationCalculation");
This definition provides a means to register your simple function and the mapping between the simple function class name and what you want to call this function in your StreamBase application.
This section describes guidelines for writing custom C++ aggregate functions.
Creating aggregate functions is very similar to simple functions. Again, you need to include some headers:
#include "StreamBase.hpp"
#include "PluginAggregate.hpp"
And then define name spaces:
using namespace std;
using namespace sb;
Then define the class name and have it inherit from PluginAggregate (instead of PluginFunction):
class MyAggregateFunction : public PluginAggregate
There are four required methods in an aggregate function:
-
virtual void typecheck(const Schema &arg_types)Validates the argument types.
New in 3.0, replaces:
virtual DataType typecheck(const PluginTypeList &arg_types) -
virtual void initialize()Clear any window state an aggregate function may keep.
-
virtual void increment(ConstTuple &args)Use to add values to the windows state.
New in 3.0, replaces:
virtual void increment(const PluginValueList &args) -
virtual void calculate(Tuple &retval)Use to calculate the value of the aggregate over the values currently in the window.
New in 3.0, replaces:
virtual void calculate(Value &retval)
Within the aggregate function class, you must "declare" the class to StreamBase:
STREAMBASE_DECLARE_PLUGIN_AGGREGATE(MyAggregateFunction);
Finally, outside of the aggregate function class definition, you must "define" the class and what it will be known as in the StreamBase application:
STREAMBASE_DEFINE_PLUGIN_AGGREGATE
(MyAggregateFunction, "MyApplicationAggregate");
When you write custom functions by extending the StreamBase C++ API classes, your function object may use internal fields to record the data types with which it was typechecked. Note that functions can be polymorphic (take different argument data types), and they can also take a variable number of arguments.
To specify variable number of arguments, call arg_types.size() in your typecheck method.
If your function is polymorphic, then it is probably necessary to record
the type information provided at typecheck time.This recorded information
can be used in the implementation of the eval() method, to vary behavior based on the types of
the arguments. The object is guaranteed to have been typechecked before
eval() is called, and to receive
arguments to eval() that match the types
passed to typecheck().
The following code example returns the sum of the integer or double arguments, as a double. If there are no arguments, it returns 0 using the setIntValue method. The eval method is used to verify the data type of inputs. It :
class IntDoubleSumFunction : public PluginFunction {
private:
unsigned int _arg_count;
public:
typecheck(const Schema &argSchema)
{
_arg_count = argSchema.getNumFields();
for (unsigned int i = 0; i < _arg_count; ++i)
requireType(argSchema, i, DataType::INT, DataType::DOUBLE);
setReturnType(DataType::DOUBLE);
}
virtual void eval(Tuple &retval, ConstTuple &args)
{
if (_arg_count == 0)
retval.setDouble(0, 0);
else {
double ret = 0.0;
for (unsigned int i = 0; i < _arg_count; ++i) {
DataType dt = args.getSchema().getField(i).getType();
if(DataType::DOUBLE == dt) {
ret += args.getDouble(i);
} else if(DataType::INT == dt) {
ret += args.getInt(i);
}
}
retval.setDouble(0, ret);
}
}
STREAMBASE_DECLARE_PLUGIN_FUNCTION(IntDoubleSumFunction);
};
STREAMBASE_DEFINE_PLUGIN_FUNCTION(IntDoubleSumFunction, "int_double_sum");
Strings passed between the runtime and C++ plug-ins are of fixed size. In your C++ custom function, you can use one of these two methods to control string length:
-
setReturnType(DataType retType)Allows the maximum string length to be specified at run time in the sbconf file's runtime
cpp-string-field-sizeparameter. That value is used when the runtime cannot determine the maximum size of a string passed between the runtime and a C++ plug-in.Example
In this example from the simple C++ function sample, a DOUBLE data type is specified, but the string length will be determined at run time:
setReturnType(DataType::DOUBLE);
-
setReturnString(int size)Explicitly specifies the maximum string size (as an integer) at compile time. The runtime
cpp-string-field-sizevalue in the sbconf file is ignored.Example
In the example below, if a string longer than 10K is returned, an exception is thrown.
setReturnString(int::10000);
To understand how to build custom C++ functions on UNIX, please see the
Makefile installed in the following
StreamBase directories. The default locations are shown
here:
/opt/streambase/sample/custom-aggregate-function
/opt/streambase/sample/custom-simple-function
Also refer to the related section, Configuration Steps, in this topic.
This section explains how to build StreamBase custom C++ functions on Windows.
Note
In the following description, streambase-install-dir is a token for
the directory in which you installed StreamBase. See the
Installation Guide
for the default locations of streambase-install-dir.
On a Windows machine where StreamBase is installed, if you use Microsoft Visual Studio, configure it as follows:
-
In Microsoft Visual C++ Studio, select and right click on the Project that contains your StreamBase custom C++ function(s). Select .
-
On the pull-down menu, select .
-
On the General property sheet, set the Use Managed Extensions option to
No. -
Select the C/C++ folder. On the General property sheet, fill in the Additional Include Directories area with the following entry:
streambase-install-dir\include -
Select the Linker folder. Select the Input property sheet, and then fill in the Additional Dependencies area with the following entries:
LibsbClient.lib
pthreads.lib
WS2_32.lib
xercesLib.lib
Dbghelp.lib
sbd.lib
-
-
On the pull-down menu, select the configuration (only if you want Debug capabilities).
-
In the C/C++ folder, select the Preprocessor property sheet. Add
STREAMBASE_PTHREAD_DLLto the Preprocessor Definitions. -
Select the Code Generation property sheet. On it, click the pull-down menu, and then select the
option. -
Select the Linker folder. Select the General property sheet and fill in Additional Library Directories area with:
streambase-install-dir\lib\Debug
-
-
On the
Configurationpull-down menu, select theReleaseconfiguration (if you want theReleaseoption).-
In the C/C++ folder, select the Preprocessor property sheet. Add
STREAMBASE_PTHREAD_DLLto the Preprocessor Definitions. -
Select the Code Generation property sheet. Click the pull-down menu and then select .
-
Select the
Linkerfolder. Select the General property sheet and fill in the Additional Library Directories area with:streambase-install-dir\lib\Release
-
Also see the Configuration Steps for an important reminder.
-
Before running your application in StreamBase Studio:
Register the plug-in function name in a StreamBase Server configuration file. If an
.sbconffile does not already exist in your project, you can click → to create one.Edit the custom-functions section in your
*.sbconf, adding a <custom-function> element for each C++ custom function that you built. For example:<custom-functions> <custom-function name="func1" type="simple" > <args> <arg type="int" /> <arg type="string" /> </args> <return type="string" size="16" /> </custom-function> </custom-functions> -
When you deploy your application:
Edit the StreamBase Server configuration file that will be used to run your application. In addition to declaring the function names in the custom-functions section, identify the location of your custom functions in the global section. For example:
<streambase-configuration> <global> <plugin directory="${STREAMBASE_HOME}/plugin" </global> <server> <param name="tcp-port" value="10000"/> </server> </streambase-configuration>This example assumes that the environment variable
STREAMBASE_HOMEis defined to be the shared library that contains your plug-in files. The StreamBase Server automatically loads all the files in that directory.Finally, identify your customized
*.sbconfwhen you start the server. For example:sbd -f MyAppsConfig.sbconf MyApp.sbapp
See the StreamBase Server Configuration XML topic for details.
