Contents
The nth.sbapp sample application demonstrates a
custom Java aggregate function, calljava, that
performs an aggregation on three tuples. The calljava function for aggregate expressions was built by
extending the AggregateWindow class, which is
documented in the StreamBase Java API.
In the application, each input tuple contains a single integer field. The Aggregate operator has a window size of 3, so it It waits until three tuples are input before emitting a tuple. The output tuple contains four fields, each generated by an expression in the Aggregate operator:
-
firstval(val)
-
calljava('com.streambase.sb.sample.Nth', 2, val)
-
calljava('com.streambase.sb.sample.Nth', 3, val)
-
calljava('com.streambase.sb.sample.Nth', 4, val)
In the first expression, the built-in function firstval is used to return the value in the first tuple.
In the remaining expressions calljava is used
to do the same thing: it returns the value of a specified tuple.
Notice that the last expression references a fourth tuple. However, only three tuples should be in the window when it closes, because the aggregate window size size is 3. This reference to a non-existent tuple is a deliberate mistake - read on and run the sample to find out the result.
This topic describes how to load and run the nth.sbapp sample. For more information about custom Java
functions, see the topic, Using the StreamBase Java Function Wizard, in the API Guide.
By default, the sample files are installed in:
-
On Windows:
C:\Program Files\StreamBase Systems\StreamBase.n.m\sample\custom-java-aggregate -
On UNIX:
/opt/streambase/sample/custom-java-aggregate
The sample has the following files:
-
The source code for the function,
Nth.java -
A sample configuration file,
sbd.sbconf, which tells the StreamBase Server to load the custom simple function. -
A sample application,
nth.sbapp, which uses the function. -
An ant
build.xmlfile that can be used to create the function's JAR file.
-
Created the custom Java application:
-
Edited and saved
Nth.javain a text editor. -
Built
Nth.javausing our favorite Java development tools, to generateNth.jar:Note
The installation includes a build file,
build.xml, which you can use to buildNth.javaif ant is installed on your system.
-
-
Launched StreamBase Studio
-
Created the custom-java-aggregate project. Selected the option to Create template server configuration file.
-
From the top menu, in the Authoring perspective, selected → → . Selected the
custom-customjava-aggregateproject, and enterednth.sbappfor the diagram name. -
Created an input stream:
-
Dragged an input stream from the palette to the canvas.
-
Clicked it on the canvas, which invoked the Input Stream Properties view.
-
On the Edit Schema tab, added:
-
Field Name:
val -
Type:
double -
Size:
8
-
-
-
Set up the Aggregate operator:
-
Dragged an Aggregate operator from the palette to the canvas and opened its Properties view.
-
Dragged a connector from the Input Stream to the input of the Aggregate1 operator.
-
On the Dimensions tab, clicked the button.
In the Edit Diminsion dialog, entered the following:
-
Name:
val -
Window size: Close and emit after
3tuples
Clicked .
-
-
On the Aggregate Functions tab, unchecked the delta option, Output all input fields. Then used the button to add these functions:
Aggregate Functions
Output Field Name Expression first firstval(val) second calljava('com.streambase.sb.sample.Nth', 2, val) third calljava('com.streambase.sb.sample.Nth', 3, val) fourth calljava('com.streambase.sb.sample.Nth', 4, val)
-
-
Created an output stream:
-
Dragged an output stream from the palette to the canvas.
-
Connected the Map1 operator to the output stream.
-
-
Double-clicked
sbd.sbconfin the Resources folder to open it in the StreamBase editor. In the file, removed unnecessary template code and added ajava-vmelement referring to theRandom.jarcustom Java program.
-
In the Package Explorer, double-click to open the
nth.sbappapplication. Make sure the application is the currently active tab in the EventFlow Editor. -
Click the
Run StreamBase Application (Default Launch)
button. (Not the Run button.) This
opens the Test/Debug perspective and starts the application.
-
When the server starts, StreamBase Studio switches to the Test/Debug perspective.
-
On the Manual Input view, enter the following values in the val field, pressing after each entry:
1
2
3 -
Observe the tuples in the Application Output view. No tuples are output until three input tuples are sent, because the aggregate's window size is set to 3. The output you should observe is:
first=1.0, second=2.0, third=3.0, fourth=nullThese values are returned by the
calljavafunctions for each field. Notice that the fourth field is always null because there are only three tuples in the window.
To run this sample on a machine where StreamBase is installed:
-
Open three terminal windows on UNIX, or three Command Prompt windows on Windows. In each window, change directory to
streambase-install-dir/sample/custom-java-aggregate. -
In the first window, launch the StreamBase Server for the sample application:
sbd -f sbd.sbconf nth.sbapp -p 10000 -
In the second window, run a dequeuer so that you can see the output that will be produced:
sbc dequeue -
In the third window:
-
Run an enqueuer:
sbc enqueue InputStream1 -
Type these numbers into the enqueuer, one number per line:
1
2
3
-
-
In the second (dequeuer) window, you should see this output from the application:
first=1.0, second=2.0, third=3.0, fourth=nullThese values are returned by the
calljavafunctions for each field. Notice that the fourth field is always null because there are only three tuples in the window. -
Type: Control-Z (Windows) or Control-D (UNIX). The sbc command will exit.
-
In window 3, type:
sbadmin shutdown. The sbadmin shutdown command will terminate the daemon and the dequeuer.
