Custom Java Aggregate Function Sample

The nth.sbapp sample application demonstrates a custom Java aggregate function, calljava, that performs an aggregation on three tuples. The calljava function for aggregate expressions was built by extending the AggregateWindow class, which is documented in the StreamBase Java API.

In the application, each input tuple contains a single integer field. The Aggregate operator has a window size of 3, so it It waits until three tuples are input before emitting a tuple. The output tuple contains four fields, each generated by an expression in the Aggregate operator:

  • firstval(val)

  • calljava('com.streambase.sb.sample.Nth', 2, val)

  • calljava('com.streambase.sb.sample.Nth', 3, val)

  • calljava('com.streambase.sb.sample.Nth', 4, val)

In the first expression, the built-in function firstval is used to return the value in the first tuple.

In the remaining expressions calljava is used to do the same thing: it returns the value of a specified tuple.

Notice that the last expression references a fourth tuple. However, only three tuples should be in the window when it closes, because the aggregate window size size is 3. This reference to a non-existent tuple is a deliberate mistake - read on and run the sample to find out the result.

This topic describes how to load and run the nth.sbapp sample. For more information about custom Java functions, see the topic, Using the StreamBase Java Function Wizard, in the API Guide.

Installed Sample File Locations

By default, the sample files are installed in:

  • On Windows: C:\Program Files\StreamBase Systems\StreamBase.n.m\sample\custom-java-aggregate

  • On UNIX: /opt/streambase/sample/custom-java-aggregate

The sample has the following files:

  • The source code for the function, Nth.java

  • A sample configuration file, sbd.sbconf, which tells the StreamBase Server to load the custom simple function.

  • A sample application, nth.sbapp, which uses the function.

  • An ant build.xml file that can be used to create the function's JAR file.

How We Created the Custom Java Aggregate Function Sample

  1. Created the custom Java application:

    1. Edited and saved Nth.java in a text editor.

    2. Built Nth.java using our favorite Java development tools, to generate Nth.jar:

      Note

      The installation includes a build file, build.xml, which you can use to build Nth.java if ant is installed on your system.

  2. Launched StreamBase Studio

  3. Created the custom-java-aggregate project. Selected the option to Create template server configuration file.

  4. From the top menu, in the Authoring perspective, selected FileNewEventFlow Application. Selected the custom-customjava-aggregate project, and entered nth.sbapp for the diagram name.

  5. Created an input stream:

    1. Dragged an input stream from the palette to the canvas.

    2. Clicked it on the canvas, which invoked the Input Stream Properties view.

    3. On the Edit Schema tab, added:

      • Field Name: val

      • Type: double

      • Size: 8

  6. Set up the Aggregate operator:

    1. Dragged an Aggregate operator from the palette to the canvas and opened its Properties view.

    2. Dragged a connector from the Input Stream to the input of the Aggregate1 operator.

    3. On the Dimensions tab, clicked the Add button.

      In the Edit Diminsion dialog, entered the following:

      • Name: val

      • Window size: Close and emit after 3 tuples

      Clicked OK.

    4. On the Aggregate Functions tab, unchecked the delta option, Output all input fields. Then used the Add button to add these functions:

      Aggregate Functions

      Output Field Name Expression
      first firstval(val)
      second calljava('com.streambase.sb.sample.Nth', 2, val)
      third calljava('com.streambase.sb.sample.Nth', 3, val)
      fourth calljava('com.streambase.sb.sample.Nth', 4, val)


  7. Created an output stream:

    1. Dragged an output stream from the palette to the canvas.

    2. Connected the Map1 operator to the output stream.

  8. Double-clicked sbd.sbconf in the Resources folder to open it in the StreamBase editor. In the file, removed unnecessary template code and added a java-vm element referring to the Random.jar custom Java program.

Back to Top ^

Running the Custom Java Aggregate Function Application

Running nth.sbapp in StreamBase Studio

  1. In the Package Explorer, double-click to open the nth.sbapp application. Make sure the application is the currently active tab in the EventFlow Editor.

  2. Click the Run StreamBase Application (Default Launch) button. (Not the Run button.) This opens the Test/Debug perspective and starts the application.

  3. When the server starts, StreamBase Studio switches to the Test/Debug perspective.

  4. On the Manual Input view, enter the following values in the val field, pressing Send Data after each entry:

    1
    2
    3

  5. Observe the tuples in the Application Output view. No tuples are output until three input tuples are sent, because the aggregate's window size is set to 3. The output you should observe is:

    first=1.0, second=2.0, third=3.0, fourth=null

    These values are returned by the calljava functions for each field. Notice that the fourth field is always null because there are only three tuples in the window.

Running nth.sbapp in Terminal Windows

To run this sample on a machine where StreamBase is installed:

  1. Open three terminal windows on UNIX, or three Command Prompt windows on Windows. In each window, change directory to streambase-install-dir/sample/custom-java-aggregate.

  2. In the first window, launch the StreamBase Server for the sample application:

    sbd -f sbd.sbconf nth.sbapp -p 10000

  3. In the second window, run a dequeuer so that you can see the output that will be produced:

    sbc dequeue

  4. In the third window:

    1. Run an enqueuer:

      sbc enqueue InputStream1

    2. Type these numbers into the enqueuer, one number per line:

      1
      2
      3

  5. In the second (dequeuer) window, you should see this output from the application:

    first=1.0, second=2.0, third=3.0, fourth=null

    These values are returned by the calljava functions for each field. Notice that the fourth field is always null because there are only three tuples in the window.

  6. Type: Control-Z (Windows) or Control-D (UNIX). The sbc command will exit.

  7. In window 3, type: sbadmin shutdown. The sbadmin shutdown command will terminate the daemon and the dequeuer.

Back to Top ^