Dafuq did I just see?

Run Spark Jobs on Kubernetes

Apr 162019

Hello All,


In this article, I'm gonna show you how we transform our ETL processes to spark which runs as Kubernetes pods.

Before that, we prefer custom python codes for our ETLs.
The problem about this project is a need for a distributed key-value store and when we pick a solution like Redis, It creates too much internal I/O between slave docker containers and Redis. The performance with spark is much better.
Also, the master creates numbers of slaves and manages the containers. Sometimes, docker-py library fails with communicating the docker-engine and the master can't delete the slaves or Redis containers. This causes idempotency problems.
You have to distribute the slave containers across your docker cluster which means that you have to put too many cross-functional requirements next to your business code.


We inspect the spark documentation for Kubernetes because we have been already using Kubernetes for our production environment.
We use the version 2.3.3 for Spark-Kubernetes.
You can have a look at this: https://spark.apache.org/docs/2.3.3/running-on-kubernetes.html
Even the Spark Documentation says the feature is experimental for now, we started to run spark jobs on our Kubernetes cluster. 

This feature allows us to run spark across our cluster.

Easy to use.

Secured. Because you have to create a specific user for spark driver and executors.

Enough parameters for Kubernetes (node-selector for computation, core limit, number of executors, etc.)

 

We bundled the spark submit codes with our artifact jar.
After this step, the docker container can make a request to k8s master, starts the driver pod, and the driver pod creates executors from the same image.
This allows us to bundle all the things in one image. If the code change, CI creates a new bundle and publish it to the registry.
The image describes the architecture below.

undefined
First of all, you have to create a base image.
Download the "spark-2.3.3-bin-hadoop2.7" from here https://spark.apache.org/downloads.html and unzip it.
Create an image from this.

./bin/docker-image-tool.sh -r internal-registry-url.com:5000 -t base build
./bin/docker-image-tool.sh -r internal-registry-url.com:5000 -t base push


We created multi-staged Dockerfile like this.

FROM hseeberger/scala-sbt:11.0.1_2.12.7_1.2.6  AS build-env

COPY . /app

WORKDIR /app
ENV SPARK_APPLICATION_MAIN_CLASS Main

RUN sbt update && \
    sbt clean assembly

RUN SPARK_APPLICATION_JAR_LOCATION=`find /app/target -iname '*-assembly-*.jar' | head -n1` && \
    export SPARK_APPLICATION_JAR_LOCATION && \
    mkdir /publish && \
    cp -R ${SPARK_APPLICATION_JAR_LOCATION} /publish/ && \
    ls -la ${SPARK_APPLICATION_JAR_LOCATION} && \
    ls -la /publish

FROM internal-registry-url.com:5000/spark:base

RUN apk add --no-cache tzdata
ENV TZ=Europe/Istanbul
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

COPY --from=build-env /publish/* /opt/spark/examples/jars/
COPY --from=build-env /app/secrets/* /opt/spark/secrets/
COPY --from=build-env /app/run.sh /opt/spark/

WORKDIR /opt/spark

CMD [ "/opt/spark/run.sh" ]


And our run.sh script is like this : 

#!/bin/bash

bin/spark-submit \
   --master k8s://https://${KUBERNETS_MASTER}:6443 \
   --deploy-mode cluster \
   --name coverage-${MORDOR_ENV} \
   --class Main \
   --conf spark.executor.instances=${NUMBER_OF_EXECUTORS} \
   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
   --conf spark.kubernetes.driverEnv.MORDOR_ENV=${MORDOR_ENV} \
   --conf spark.kubernetes.driver.label.app=coverage-${MORDOR_ENV} \
   --conf spark.kubernetes.container.image.pullPolicy=Always \
   --conf spark.kubernetes.container.image=http://internal-registry-url.com:5000/coveragecalculator:${VERSION} \
   --conf spark.kubernetes.driver.pod.name=coverage-${MORDOR_ENV} \
   --conf spark.kubernetes.authenticate.submission.caCertFile=/opt/spark/secrets/${CRT_FILE} \
   --conf spark.kubernetes.authenticate.submission.oauthToken=${CRT_TOKEN} \
   --conf spark.kubernetes.driver.limit.cores=${DRIVER_CORE_LIMIT} \
   --conf spark.kubernetes.executor.limit.cores=${EXECUTOR_CORE_LIMIT} \
   local:///opt/spark/examples/jars/CoverageCalculator-assembly-0.1.jar 

 
Notice that, you have to place the secrets in secrets/ folder in order to create pods with single image.
After the driver pod created, it uses the internal executor pod creation scripts which also placed in spark:base image described also in the spark-kubernetes documentation.  

We created the pipelines as build-push -> run-on-qa-cluster -> run-on-preprod-cluster -> run-on-prod-cluster


undefined


The run scripts placed in pipeline, pass the parameters to run.sh and we run like this : 

docker run -i --entrypoint /bin/bash -e KUBERNETS_MASTER='yourkubernetesmasterip' -e NUMBER_OF_EXECUTORS=5 -e MORDOR_ENV='qa' -e VERSION=$GO_PIPELINE_LABEL -e CRT_FILE='non_prod_ca.crt' -e CRT_TOKEN='THE_USER_CRT_TOKEN' -e DRIVER_CORE_LIMIT=2 -e EXECUTOR_CORE_LIMIT=2 -v /etc/resolv.conf:/etc/resolv.conf:ro -v /etc/localtime:/etc/localtime:ro 192.168.57.20:5000/coveragecalculator:$GO_PIPELINE_LABEL /opt/spark/run.sh

 
This command creates one driver pod which has core limit equals to 2.
And after that, 5 executor Pods are created by spark:base. Each one of them has also 2 core limits.

undefined

Atom

boranseref@gmail.com