ScaDS Logo


Using Apache Flink CEP for real-time logistics monitoring - Extend the program


Extend the program

Another important part of event patterns is the observation of sensor values on deviations compared with certain thresholds. For that aggregations on time windows have to be applied. Because not all aggregation functions of the Batch API are usable in the Streaming API a fold-operator is applied to compute the average value of a sensor value within a time window. The fold function comes from higher-order functions of functional programming. It is started with an initial value and applies for every new data element in the stream the corresponding processing steps. In this case it is used for sum up the values and counting the number of orientation events in a time window. Both values are then used for computing an average value of the orientation attribute which then can be used for identifying events where e.g. the orientation value changed within the last 10 seconds by 20 percent. The fold stream contains elements for the shipment_number (for identification), then for the current orientation value and the computed values. The function returns a new SensorEvent which contain the attributes mentioned previously.

private static class AvgFold implements FoldFunction<SensorEvent, Tuple5<String, Double, Double, Double, Double>> {
public Tuple5<String, Double, Double, Double, Double> fold(Tuple5<String, Double, Double, Double, Double> total, SensorEvent current) throws Exception {
double count = total.f4 + 1.0d;
double sum = total.f3 + Double.parseDouble(current. orientation);
double avg = sum / count;

return new Tuple5<String, Double, Double, Double, Double> (current.shipment_number, Double.parseDouble(current.orientation), sum, count, avg);

The fold is applied on a keyed window to process stream elements of the same package together. And a sliding window of 10 seconds is used, which uses the processing time of the elements for timestamps. A characteristic of the fold is that it can be initialized with voluntary values. In the code example the event pattern from above is adapted with the fold function and the event pattern is changed in order that it triggers events, where the current orientation value is more than 20 percent higher than the aggregated average value of the time window.

SingleOutputStreamOperator< Tuple5<String, Double, Double, Double, Double >> aggregatedEventStream = eventData
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.fold(new Tuple5<String, Double, Double, Double, Double >("", 0.0d, 0.0d, 0.0d, 0.0d), new AvgFold());

Pattern< Tuple5<String, Double, Double, Double, Double >, ?> examplePattern = Pattern.< Tuple5<String, Double, Double, Double, Double >>begin("orientationEvent")
.where(new FilterFunction< Tuple5<String, Double, Double, Double, Double >>(){
public boolean filter(Tuple5<String, Double, Double, Double, Double > event) throws Exception {

return event.f1 > event.f4*1.20;
. followedBy("accelerationEvent")
.where(new FilterFunction< SensorEvent >() {
public boolean filter(SensorEvent accelerationEvent) throws Exception {
return accelerationEvent.getAcceleration() > 10.0;