Deploying Spark Streaming with Delta on Kubernetes using Terraform
Spark. Kubernetes. Terraform.
This is a guest post by Junaid Effendi. You can read and check out his Substack here.
Looking to deploy a Spark Streaming App on Kubernetes? then this article is perfect for you. The article will focus on deploying the components using Terraform, with scripts provided. I will provide a very easy-to-use template, if you have the right foundation then it would be a copy-paste approach, just update the required fields and you should be able to see your app running instantly.
Spark Structured Streaming is a unified engine, meaning you can use the exact same code for batch and stream jobs, just need to update the reader and writer functionality which I will share later.
Pre Requisite:
You have a Kubernetes Cluster up and running with all the required setup and have the basic knowledge of K8.
You have a Spark application (even batch) and working knowledge of Spark.
Architecture
Kubernetes Infra
Namespace
A dedicated namespace would help in separation and ownership becomes easier:
resource "kubernetes_namespace" "<my_namespace>" {
metadata {
name = <my_namespace>
}
}
Service Account
Service account is attached to the IAM role to handle all permissions, e.g. writing to the Delta Lake (Object Store), reading from source Apache Kafka in our case.
resource "kubernetes_service_account" "<my_service_account>" {
metadata {
name = <my_service_account>
namespace = <my_namespace>
annotations = {
// mapping of cloud role. e.g. if you are using aws:
"eks.amazonaws.com/role-arn" : "<aws_iam_arn>"
}
}
}
Cluster Role
To allow Spark Operator to handle the life cycle of Spark Job, it needs access to Kubernetes resources:
resource "kubernetes_cluster_role" "<my_cluster_role>" {
metadata {
name = "<my_cluster_role>"
}
rule {
api_groups = ["", "extensions", "apps"]
resources = ["secrets", "configmaps", "pods", "pods/log", "services", "persistentvolumeclaims"]
verbs = ["create", "get", "list", "update", "watch", "patch", "delete", "deletecollection"]
}
rule {
api_groups = ["sparkoperator.k8s.io"]
resources = ["sparkapplications"]
verbs = ["create", "get", "list", "update", "watch", "patch", "delete"]
}
}
resource "kubernetes_cluster_role_binding" "<my_cluster_role_binding>" {
metadata {
name = "<my_cluster_role_binding>"
}
role_ref {
api_group = "rbac.authorization.k8s.io"
kind = "ClusterRole"
name = kubernetes_cluster_role.<my_cluster_role>.metadata[0].name
}
subject {
kind = "ServiceAccount"
name = kubernetes_service_account.<my_service_account>.metadata[0].name
namespace = <my_namespace>
}
}
Spark Operator
Spark Operator has changed the game, managing the Spark lifecycle was very tedious, you would have to set up lot of things just to make it work on EKS, with Spark Operator just follow the template below to deploy Spark application, this works for both Batch and Streaming Jobs.
resource "helm_release" "spark" {
repository = "https://kubeflow.github.io/spark-operator"
chart = "spark-operator"
version = "1.2.5"
name = "spark"
namespace = <my_namespace>
create_namespace = false
wait = false
set {
name = "sparkJobNamespace"
value = "<my_namespace>"
}
set {
name = "enableBatchScheduler"
value = true
}
set {
name = "enableWebhook"
value = true
}
set {
name = "webhook.enable"
value = true
}
set {
name = "webhook.port"
value = 443
}
set {
name = "podMonitor.enable"
value = false
}
set {
name = "metrics.enable"
value = false
}
set {
name = "image.tag"
value = "v1beta2-1.4.2-3.5.0"
}
set {
name = "image.repository"
value = "ghcr.io/kubeflow/spark-operator"
}
set {
name = "uiService.enable"
value = true
}
}
Data Pipeline
The pipeline consists of just one streaming job, but you can easily add multiple layers of Streaming jobs if needed like Raw and Processed.
💡Example will be based on Scala, its similar for Python.
Data Source
Source can be any Messaging System like Apache Kafka. You can use the Kafka Data Source in the Spark App to read the messages. Make sure to install the connector library, e.g. spark-sql-kafka.
If you are reusing the same source code that is used for Batch, then your source just needs to be changed, e.g. you can read and write using this:
// Subscribe to 1 Kafka topic
val events = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "<my_topic>")
.load()
// real time transformation and logic goes here
// write data to delta path
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "<checkpoint_path>")
.start("<output_path>")
📖 More examples can be found here
Streaming App
Deploying the actual Spark Application, assuming you already have a working artifact for Spark App which applies the business logic, transformation, filtering, PII handling etc.
Leveraging the above k8 infra:
resource "kubernetes_manifest" "sparkapplication_<my_namespace>_<my_job>" {
manifest = {
"apiVersion" = "sparkoperator.k8s.io/v1beta2"
"kind" = "SparkApplication"
"metadata" = {
"name" = <job_name>
"namespace" = kubernetes_namespace.<my_job>.id
}
"spec" = {
"arguments" = [] // Optional
"restartPolicy" = {
"type" = "Always"
"onFailureRetryInterval" = 10
"onSubmissionFailureRetryInterval" = 20
}
"deps" = {
"jars" = [] // If you want to set dependencies during runtime
}
"driver" = {
"cores" = 2
"memory" = "2g"
"serviceAccount" = kubernetes_service_account.<my_service_account>.metadata[0].name
}
"executor" = {
"cores" = 2
"memory" = "2g"
}
"image" = "<path_to_docker_image>"
"imagePullPolicy" = "Always"
"mainApplicationFile" = "<path_to_jar_artifact>"
"mainClass" = "<path_to_class>"
"mode" = "cluster"
"sparkConf" = {
// this section contains spark config: e.g.
"spark.kubernetes.executor.podNamePrefix" = "<job_name>"
// delta configs
"spark.databricks.delta...."
}
"hadoopConf" = {
// this section contains all hadoop config, e.g.:
"mapreduce.fileoutputcommitter.algorithm.version" = "2"
}
"sparkVersion" = "3.5.0"
"type" = "Scala"
}
}
depends_on = [helm_release.spark]
}
📖 Read more: Spark Operator Api Docs
⭐ Consider using Kafka to Delta Connector if no transformation/logic is needed in real time.
Delta
Delta is pretty easy to setup, setting the spark configs in the config section above and installing the library delta-spark
.
💡Make sure to use compatible versions of Spark and Delta.
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
📖 For more checkout: Quick Start Guide
💡It is worth considering an orchestrator like Airflow or ArgoCD to deploy Spark Streaming Jobs.
Post Deploy
Some handy commands for debugging and monitoring:
Checking if Spark App is running:
kubectl get SparkApplication -n <my_namespace>
kubectl get pods -n <my_namespace>
Port forward the Spark UI:
kubectl port-forward <my_job>-driver -n <my_namespace> 4040:4040
Ideally, the Spark UI should be up and running independently, this can be achieved via Spark Operator as well which is not covered in this article, see the reference section for more information.
References
Additional resources and references can be found below:
Kubernetes: https://developer.hashicorp.com/terraform/tutorials/kubernetes/kubernetes-provider
Spark Operator Docs: https://github.com/kubeflow/spark-operator/blob/master/docs/api-docs.md
Spark with Kafka: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Everything about Delta: https://docs.delta.io/latest/delta-intro.html