Apache Flink is a popular distributed processing engine for unbounded and bounded data streams. Flink is designed to run streaming applications at a large scale and can be integrated with cluster resource managers such as Hadoop YARN, Apache Mesos and Kubernetes.
Flink offers a very efficient out-of-the-box memory management when compared to other popular streaming engines such as Spark Streaming, Apache Samza or Apache Storm. Flink applications can be run on thousands of concurrent tasks distributed in a cluster. Flink’s revolutionary asynchronous checkpointing, inspired by the Chandy-Lamport algorithm, allows us to draw consistent snapshots of the distributed data stream and is the backbone of the fault-tolerance mechanism in Flink. It uses the concept of injecting stream barriers as part of the data stream. The barriers separate the records that are part of the current snapshot from the records that will be part of the next snapshot. The stream barriers will be synced across the parallel streams of data in a partitioned stream, while the data will be buffered for further processing which helps in increasing the processing throughput of the application while maintaining a consistent snapshot of the application state and operator state.
One of the challenges we faced while running a Flink application in a containerized environment such as Kubernetes is the ability to dynamically inject application configuration during runtime. We use Typesafe Config to manage application configuration and the configuration file gets bundled as part of the application jar. The application jar is then bundled as a Docker image built from base Flink image. The challenge here was to solve the puzzle of mounting different application configuration across different environments and keep the docker image unchanged.
Kubernetes offers ConfigMaps, an API object to define application configuration as a set of key-value pairs. A sample flink application configuration to demonstrate the ConfigMap is shown below.
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-streaming-config
namespace: flink-app
labels:
app: flink
data:
flink-streaming: |+
kafka {
broker-servers = "localhost:9092"
input.topic = flink.streaming.input
output.success.topic = flink.streaming.output
}
task {
consumer.parallelism = 2
downstream.operators.parallelism = 2
}
The following code snippet details on how to mount a ConfigMap inside the Flink JobManager container.
apiVersion: batch/v1
kind: Job
metadata:
name: flink-streaming-jobmanager
namespace: flink-app
spec:
template:
metadata:
labels:
app: flink
component: flink-streaming-jobmanager
spec:
volumes:
- name: flink-config-volume
configMap:
name: flink-streaming-config
items:
- key: flink-streaming
path: flink-streaming.conf
restartPolicy: OnFailure
containers:
- name: flink-streaming-jobmanager
image: "flink-streaming-app-docker-image"
imagePullPolicy: Always
workingDir: /opt/flink
command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground",
"--job-classname=com.anand.flink.FlinkStreamingApp",
"-Djobmanager.rpc.address=flink-streaming-jobmanager",
"-Djobmanager.rpc.port=6123",
"-Dblob.server.port=6124",
"-Dqueryable-state.server.ports=6125",
"--config.file.path",
"/data/flink/conf/flink-streaming.conf"]
volumeMounts:
- name: flink-config-volume
mountPath: /data/flink/conf/flink-streaming.conf
subPath: flink-streaming.conf
While mounting the ConfigMap is pretty straightforward, injecting the config file path into the application jar is not. Flink provides a utility named ParameterTool which allows us to read command line parameters that are being passed to the main function of a Java application. The config.file.path property is passed from the Kubernetes deployment manifest. The configuration is then read using the fromArgs function in the ParameterTool. This simple trick allows us to inject the external configuration file from the docker container.
import com.typesafe.config.ConfigFactory
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import java.io.File
import java.util.Properties
object FlinkStreamTask {
def main(args: Array[String]): Unit = {
val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path"))
val config = configFilePath.map {
path => ConfigFactory.parseFile(new File(path)).resolve()
}.getOrElse(ConfigFactory.load("flink-streaming.conf")
.withFallback(ConfigFactory.systemEnvironment()))
val task = new FlinkStreamTask(config)
task.process()
}
}
class FlinkStreamTask(config: com.typesafe.config.Config) {
def process(): Unit = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "flink.streaming.group")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new FlinkKafkaConsumer[String](config.getString("kafka.input.topic"),
new SimpleStringSchema(), properties))
val kafkaSink = new FlinkKafkaProducer[String](config.getString("kafka.output.topic"),
new SimpleStringSchema(), properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
stream.addSink(kafkaSink)
env.execute("FlinkStreamingJob")
}
}