ScaDS Logo

COMPETENCE CENTER
FOR SCALABLE DATA SERVICES
AND SOLUTIONS

Using Apache Flink CEP for real-time logistics monitoring - Find dropped packages

Beitragsseiten

Find dropped packages

For logistics providers as well as for recipients of important and fragile goods it’s very important to detect if a package was thrown or dropped. In case of a quickly detected damage of an indispensable good, this part can be reordered and the downtime can be reduced.

To observe our data stream on a certain pattern it can be necessary to define an appropriate partition strategy. This can be done via physically partition the data on a low-level with a user-defined strategy, or to use the higher level keyBy operator for a logical separation into disjoint groups of data elements. Result is a KeyedDataStream which is also necessary to build data windows (see later) or to use the aggregation functions. To detect an incident within one package the shipment_number attribute is used for partitioning, as this attribute serves as a unique identifier for each parcel.

The Pattern API is used to describe the event patterns the data stream has to be observed. Each event pattern consists of different stages; each has its own conditions. The order of the stages defines the processing order. Each stage must have a unique name for identification. The pattern for the detection of dropped packages looks like this. First, a stage named orientationEvent is defined, which also describes the begin of the event pattern (line 2). The where-clause in the next line determines the event conditions needed to accomplish the first state of the pattern. For simplification it is assumed that a dropped package is characterized by a value > 100 of the orientation_l_r attribute4. Very important for CEP application is the definition of event sequences, e.g. to describe that event A has to be followed by event B. the followedBy function states that the first stage of our pattern has to be followed by a second stage and appends a stage called accelerationEvent. The followedBy operator has the characteristic that other events can occur between events. There is also a next operator which implies that event B has to directly succeed the previous event A and no other events are between them. The second part of the pattern states that the second characteristic of a dropped package is acceleration. So the event condition for this part is that acceleration is higher than 10. The within-part of the pattern definition determines the time window in which the events have to occur.

KeyedStream< SensorEvent, Tuple> keyedData = eventData.keyBy("shipment_number");
Pattern< SensorEvent, ?> dropPattern = Pattern.< SensorEvent >begin("orientationEvent")
.where(new FilterFunction< SensorEvent >() {
public boolean filter(SensorEvent orientationEvent) throws Exception {
return orientationEvent.getOrientation_l_r() > 100.0;
}
})
.followedBy("accelerationEvent")
.where(new FilterFunction< SensorEvent >() {
public boolean filter(SensorEvent accelerationEvent) throws Exception {
return accelerationEvent.getAcceleration() > 10.0;
}
})
.within(Time.seconds(3));

In the last part of this simple CEP application the observing data stream and the described event pattern are put together. Then, if the event pattern occurred in the data stream a PatternSelectFunction is used to extract the values which triggered the event and to throw a warning or initiate another action, e.g. launch another application or send a message to a system.

PatternStream<SensorEvent> FlowFirstPatternStream = CEP.pattern(keyedData, dropPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new PatternSelectFunction<SensorEvent, String>() {
public String select(Map<String, SensorEvent > pattern) throws Exception {
ShipmentEvent orientEvent = pattern.get("orientationEvent");
ShipmentEvent accelEvent = pattern.get("accelerationEvent");

if (orientEvent.getShipment_number().equals(accelEvent.getShipment_number())) {
return "Shipment : " + orientEvent.getShipment_number() + " was dropped!";
}
}
});
warning.print();