ScaDS Logo


Using Apache Flink CEP for real-time logistics monitoring - Connecting to socket stream


Connecting to socket stream

Before starting with data processing and monitoring a connection to the data stream has to set up. In this example it is a TCP socket connection on port 9999. Like in every Flink application an ExecutionEnvironment has to be initialized.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
SingleOutputStreamOperator<SensorEvent> eventStream = env.socketTextStream("localhost", 9999, '\n').flatMap(new splitMap());

In this example the degree of parallelism is set to 4, which means that all operators, data sources and data sinks are distributed on four nodes in the cluster. In line 3 checkpoints are enabled and created every second. The mode guarantees that every data element is processed exactly one time. In line 4 the characteristic for the internal event time is defined. This configuration parameter is needed to give every incoming data stream element a timestamp e.g. for window computations. Since no timestamps are provided in the data stream the TimeCharacteristic is set to ProcessingTime, so each data element gets the system time of the node which first processes the element. Further this mode is most efficient concerning performance and latency opposed to the IngestionTime or EventTime modes. In line 5 lastly the connection to the streaming server on localhost, port 9999 is established and a flatMap function is called which transform the incoming JSON events into SensorEvent POJOs.