Spark on Kubernetes: Setting Up MinIO as Object Storage
Object storage servers such as Amazon S3 and MinIO have become the de-facto hard drives for storing data in cloud native applications, machine learning, and Big Data.
MinIO, in particular, has been adopted by organizations needing high performance and cost effective storage; and ones that choose to host and manage their own applications. With nearly 290 million known deployments, it is particularly well suited for providing storage in Docker and Kubernetes environments, supporting micoservice architectures, and helping manage multi-tenancy.
In this series we've been looking at what it takes to run Spark on top of Kubernetes. In the first article, we focused on building images and configuring Kubernetes permissions to allow it to launch container based drivers (running in both cluster and client mode) and executors. In the second article, we looked at extending the driver container to integrate with Jupyter and a broader set of Data Science tools.
In this article, we are going to turn our attention to an important second piece of data infrastructure: Where to store your data. We're going to look at how you can get up and running with MinIO and the customization required by Spark in order to read and write data to a self-hosted object store.
Up and Running with MinIO
To follow along with this tutorial, you will need a running MinIO installation. There are several ways in which it can be installed. The installation options include:
- A standalone (or distributed) instance running a Docker container
- As a service deployed to a virtual or physical machine via a Linux package
- A binary that is compiled from source
The easiest way to get up and running quickly is to use Docker.
docker pull minio/minio docker run -p 9000:9000 minio/minio server /data
A convenient and closely related alternative is to deploy MinIO into Kubernetes as a set of containers.
The Oak-Tree MinIO cluster is deployed in our in-house Kubernetes cluster as a distributed mode daemon-set. The code listing below shows the general configuration. If you wish to customize the options, the MinIO documentation contains an excellent resource on the various ways in which MinIO can be deployed into Kubernetes.
apiVersion: apps/v1 kind: DaemonSet metadata: name: minio labels: app: minio spec: selector: matchLabels: app: minio template: metadata: labels: app: minio spec: # We only deploy minio to nodes specified as a minio server. # The label is applied to nodes using `kubectl label node hostname1 -l minio-server=true` nodeSelector: minio-server: "true" hostNetwork: true # The storage is provided using a hostPath, which must be the same on all servers. volumes: - name: storage hostPath: path: /ext/storage/object-storage/ containers: - name: minio env: - name: MINIO_ACCESS_KEY value: "minio-access-key" - name: MINIO_SECRET_KEY value: "minio-secret-long-and-random" image: minio/minio:RELEASE.2019-10-12T01-39-57Z # Servers must be manually defined. args: - server - http://storage{1...4}.example.com/data/minio ports: - containerPort: 9000 volumeMounts: - name: storage mountPath: /data/minio/
We then use a service to route internal traffic and an Ingress to manage outside connections.
apiVersion: v1 kind: Service metadata: name: object-storage spec: ports: - port: 9000 targetPort: 9000 protocol: TCP selector: # Looks for labels `app:minio` in the namespace and applies the spec app: minio
apiVersion: extensions/v1beta1 kind: Ingress metadata: name: object-storage-ingress annotations: nginx.ingress.kubernetes.io/rewrite-target: / kubernetes.io/ingress.class: "nginx" ingress.kubernetes.io/force-ssl-redirect: "true" ingress.kubernetes.io/proxy-body-size: "800m" nginx.ingress.kubernetes.io/proxy-body-size: "800m" certmanager.k8s.io/issuer: "letsencrypt-prod" spec: tls: - hosts: - storage.example.com secretName: object-storage-tls rules: - host: storage.example.com http: paths: - path: / backend: serviceName: object-storage servicePort: 9000
Connecting to MinIO from Spark
The Java libraries used by Spark to connect to S3 compatible object services are tightly coupled together and special care needs to be taken when declaring dependencies. If you search for "MinIO Spark S3 Errors" you will have a whole host of people looking for help and guidance (or offering their suggestions on how to configure a Spark install that can communicate with MinIO). Getting the right set of libraries and components can be tricky.
For that reason, while the general Spark containers that we created in Part 1 of the series may work for many environments; we will create a special set of containers to allow us to communicate with MinIO. In these containers, we will install the following set of software:
- A newest version of Hadoop (version 3.1.2)
- A version of Spark that is compiled without Hadoop (version 2.4.4)
- A set of Java client libraries required to make the interfaces work (most of these are part of the AWS Hadoop client)
- A set of Apache common libraries required by the Spark runtime to allow it to interface with Hadoop 3 without crashing
Copies of the Docker files and Kubernetes manifests in this article are available from the Oak-Tree Data Ops Examples repository. The specific versions of the libraries that we package were determined by a great deal of trial and error. Building Spark container images is generally one of those things you do carefully and then commit to version control, so that you can readily refresh your memory about how it was done.
Container Images
We'll use the same progression of containers seen in the first and second articles:
- A general "executor" or base container which contains the Spark and Hadoop runtimes
- A "driver" container that builds on top of the base which contains
kubectl
and is capable of integrating with the cluster - A more general "Jupyter" container that contains general use Data Science libraries and the Jupyter lab/notebook interface
Executor Container
The executor image is the most involved of the three. Because of two distinct build stages - download and extract dependencies and then prepare the final container - we use a multi-stage build (just as with the original image).
In the first stage, we fetch Spark (without the associated Hadoop libraries, since we will be installing those by themselves) and Hadoop. We extract them to a working directory in preparation for copying them to the final container.
In the second stage, we copy the components to program folders for Hadoop and Spark, set the entrypoint script, and download a whole host of additional dependencies required by the environment. In the last step of the automation we configure environment variables and specify the container working directory, entrypoint, and runtime user.
# Dependencies Container Image # Install wget to retrieve Spark runtime components, # extract to temporary directory, copy to the desired image FROM ubuntu:18.04 AS deps RUN apt-get update && apt-get -y install wget WORKDIR /tmp RUN wget http://us.mirrors.quenda.co/apache/spark/spark-2.4.4/spark-2.4.4-bin-without-hadoop.tgz \ && tar xvzf spark-2.4.4-bin-without-hadoop.tgz RUN wget http://us.mirrors.quenda.co/apache/hadoop/common/hadoop-3.1.3/hadoop-3.1.3.tar.gz \ && tar xvzf hadoop-3.1.3.tar.gz # Runtime Container Image. Adapted from the official Spark runtime # image from the project repository at https://github.com/apache/spark. FROM openjdk:8-jdk-slim AS build # Install Spark Dependencies and Prepare Spark Runtime Environment RUN set -ex && \ apt-get update && \ ln -s /lib /lib64 && \ apt install -y bash tini libc6 libpam-modules libnss3 wget python3 python3-pip && \ mkdir -p /opt/hadoop && \ mkdir -p /opt/spark && \ mkdir -p /opt/spark/examples && \ mkdir -p /opt/spark/work-dir && \ touch /opt/spark/RELEASE && \ rm /bin/sh && \ ln -sv /bin/bash /bin/sh && \ ln -sv /usr/bin/tini /sbin/tini && \ echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \ chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \ ln -sv /usr/bin/python3 /usr/bin/python && \ ln -sv /usr/bin/pip3 /usr/bin/pip \ rm -rf /var/cache/apt/* # Install Kerberos Client and Auth Components ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update \ && apt install -yqq krb5-user \ && rm -rf /var/cache/apt/* # Hadoop: Copy previously fetched runtime components COPY --from=deps /tmp/hadoop-3.1.3/bin /opt/hadoop/bin COPY --from=deps /tmp/hadoop-3.1.3/etc /opt/hadoop/etc COPY --from=deps /tmp/hadoop-3.1.3/include /opt/hadoop/include COPY --from=deps /tmp/hadoop-3.1.3/lib /opt/hadoop/lib COPY --from=deps /tmp/hadoop-3.1.3/libexec /opt/hadoop/libexec COPY --from=deps /tmp/hadoop-3.1.3/sbin /opt/hadoop/sbin COPY --from=deps /tmp/hadoop-3.1.3/share /opt/hadoop/share # Spark: Copy previously fetched runtime components COPY --from=deps /tmp/spark-2.4.4-bin-without-hadoop/bin /opt/spark/bin COPY --from=deps /tmp/spark-2.4.4-bin-without-hadoop/jars /opt/spark/jars COPY --from=deps /tmp/spark-2.4.4-bin-without-hadoop/python /opt/spark/python COPY --from=deps /tmp/spark-2.4.4-bin-without-hadoop/R /opt/spark/R COPY --from=deps /tmp/spark-2.4.4-bin-without-hadoop/sbin /opt/spark/sbin COPY --from=deps /tmp/spark-2.4.4-bin-without-hadoop/yarn /opt/spark/yarn # Spark: Copy Docker entry script COPY --from=deps /tmp/spark-2.4.4-bin-without-hadoop/kubernetes/dockerfiles/spark/entrypoint.sh /opt/ # Spark: Copy examples, data, and tests COPY --from=deps /tmp/spark-2.4.4-bin-without-hadoop/examples /opt/spark/examples COPY --from=deps /tmp/spark-2.4.4-bin-without-hadoop/data /opt/spark/data COPY --from=deps /tmp/spark-2.4.4-bin-without-hadoop/kubernetes/tests /opt/spark/tests # Replace out of date dependencies causing a 403 error on job launch WORKDIR /tmp RUN cd /tmp && mkdir -p /tmp/s3deps \ && wget https://oak-tree.tech/documents/71/commons-logging-1.1.3.jar \ && wget https://oak-tree.tech/documents/81/commons-pool-1.5.4.jar \ && wget https://oak-tree.tech/documents/80/commons-beanutils-1.9.3.jar \ && wget https://oak-tree.tech/documents/79/commons-cli-1.2.jar \ && wget https://oak-tree.tech/documents/78/commons-collections-3.2.2.jar \ && wget https://oak-tree.tech/documents/77/commons-configuration-1.6.jar \ && wget https://oak-tree.tech/documents/76/commons-dbcp-1.4.jar \ && wget https://oak-tree.tech/documents/75/commons-digester-1.8.jar \ && wget https://oak-tree.tech/documents/74/commons-httpclient-3.1.jar \ && wget https://oak-tree.tech/documents/73/commons-io-2.4.jar \ && wget https://oak-tree.tech/documents/70/log4j-1.2.17.jar \ && wget https://oak-tree.tech/documents/72/apache-log4j-extras-1.2.17.jar \ && wget https://oak-tree.tech/documents/59/kubernetes-client-4.6.4.jar \ && wget https://oak-tree.tech/documents/58/kubernetes-model-4.6.4.jar \ && wget https://oak-tree.tech/documents/57/kubernetes-model-common-4.6.4.jar \ && cd /tmp/s3deps \ && wget https://oak-tree.tech/documents/60/joda-time-2.9.9.jar \ && wget https://oak-tree.tech/documents/61/httpclient-4.5.3.jar \ && wget https://oak-tree.tech/documents/62/aws-java-sdk-s3-1.11.534.jar \ && wget https://oak-tree.tech/documents/63/aws-java-sdk-kms-1.11.534.jar \ && wget https://oak-tree.tech/documents/64/aws-java-sdk-dynamodb-1.11.534.jar \ && wget https://oak-tree.tech/documents/65/aws-java-sdk-core-1.11.534.jar \ && wget https://oak-tree.tech/documents/66/aws-java-sdk-1.11.534.jar \ && wget https://oak-tree.tech/documents/67/hadoop-aws-3.1.2.jar \ && wget https://oak-tree.tech/documents/68/slf4j-api-1.7.29.jar \ && wget https://oak-tree.tech/documents/69/slf4j-log4j12-1.7.29.jar \ && rm -rf /opt/spark/jars/kubernetes-client-* \ && rm -rf /opt/spark/jars/kubernetes-model-* \ && rm -rf /opt/spark/jars/kubernetes-model-common-* \ && mv /tmp/commons-logging-* /opt/spark/jars \ && mv /tmp/log4j-* /opt/spark/jars/ \ && mv /tmp/apache-log4j-* /opt/spark/jars \ && mv /tmp/kubernetes-* /opt/spark/jars/ \ && mv /tmp/s3deps/* /opt/spark/jars/ # Set Hadoop environment ENV HADOOP_HOME /opt/hadoop ENV LD_LIBRARY_PATH $HADOOP_HOME/lib/native # Set Spark environment ENV SPARK_HOME /opt/spark ENV PATH $PATH:$SPARK_HOME/bin:$HADOOP_HOME/bin ENV SPARK_DIST_CLASSPATH /opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/* ENV SPARK_CLASSPATH /opt/spark/jars/*:$SPARK_DIST_CLASSPATH WORKDIR /opt/spark/work-dir RUN chmod g+w /opt/spark/work-dir ENTRYPOINT [ "/opt/entrypoint.sh" ] # Specify the User that the actual main process will run as USER ${spark_uid}
Driver Container
The driver container is essentially identical to the driver of the first article. The primary difference between the two is that this version inherits from the updated executor image. As with the first image, we configure a repository to download Kubernetes control and install the binary.
FROM code.oak-tree.tech:5005/courseware/oak-tree/dataops-examples/spark-k8s-minio-base # Install kubectl USER root RUN apt install -y apt-transport-https apt-utils gnupg curl \ && curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - \ && echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list \ && apt update \ && apt install -y kubectl # Switch back to Spark USER uid USER ${spark_uid}
Jupyter Container
Like the driver, the Jupyter container is also similar to the previous Jupyter image. We use Pip to add a set of data science, visualization, and machine learning tools. We then download a set of Jupyter extensions to make it easier to monitor the progress of Spark jobs. Finally, we configure a non-root user for Jupyter to run under. It extends the driver container.
FROM code.oak-tree.tech:5005/courseware/oak-tree/dataops-examples/spark-k8s-minio-driver # Install Jupyter and Data Science libraries USER root RUN ln -sv /usr/bin/pip3 /usr/bin/pip \ && pip install numpy pandas RUN pip install notedown plotly seaborn matplotlib RUN pip install bokeh xlrd yellowbrick RUN pip install scikit-learn scikit-image RUN pip install scipy RUN pip install jupyterlab s3contents \ && mkdir -p /home/public && chmod 777 /home/public RUN pip install py4j \ && ln -s /opt/spark/python/pyspark /usr/local/lib/python3.7/dist-packages/pyspark \ && ln -s /opt/spark/python/pylintrc /usr/local/lib/python3.7/dist-packages/pylintrc # Install Jupyter Spark extension RUN pip install jupyter-spark \ && jupyter serverextension enable --py jupyter_spark \ && jupyter nbextension install --py jupyter_spark \ && jupyter nbextension enable --py jupyter_spark \ && jupyter nbextension enable --py widgetsnbextension # Configure Jupyter User ARG NB_USER="jovyan" ARG NB_UID="1000" ARG NB_GROUP="analytics" ARG NB_GID="777" RUN groupadd -g $NB_GID $NB_GROUP \ && useradd -m -s /bin/bash -N -u $NB_UID -g $NB_GID $NB_USER \ && mkdir -p /home/$NB_USER/work \ && mkdir -p /home/$NB_USER/.jupyter \ && chown -R $NB_USER:$NB_GROUP /home/$NB_USER # Configure Working Directory USER $NB_USER WORKDIR /home/$NB_USER/work
Building the Container Images
The code listing below shows the commands to build and push the container images to the registry. The Dockerfiles are referenced by their names as found in the examples repository.
# Build Spark Kubernetes MinIO Container Image for the Executor docker build -f Dockerfile.k8s-minio.executor \ -t code.oak-tree.tech:5005/courseware/oak-tree/dataops-examples/spark-k8s-minio-base . # Build Spark Kubernetes MinIO Container Image for the Driver docker build -f Dockerfile.k8s-minio.driver \ -t code.oak-tree.tech:5005/courseware/oak-tree/dataops-examples/spark-k8s-minio-driver . # Build Spark Kubernetes MinIO Container Image for Jupyter docker build -f Dockerfile.k8s-minio.jupyter \ -t code.oak-tree.tech:5005/courseware/oak-tree/dataops-examples/spark-k8s-minio-jupyter . # Push container images to the registry docker push code.oak-tree.tech:5005/courseware/oak-tree/dataops-examples/spark-k8s-minio-base docker push code.oak-tree.tech:5005/courseware/oak-tree/dataops-examples/spark-k8s-minio-driver docker push code.oak-tree.tech:5005/courseware/oak-tree/dataops-examples/spark-k8s-minio-jupyter
Deploying to Kubernetes
Once the container images have been built and pushed, they need to be deployed to Kubernetes to test. Ultimately you need a pod instance to host Jupyter, a headless service to provide it with a stable network location, and an ingress to manage external access.
The deployment process is identical as described in part two of this series, briefly:
- Create a
ConfigMap
with the Jupyter password and the access ID/secret needed to store Jupyter files in object storage. Storing Jupyter files in an external storage system allows for them to persist between container restarts or migrations. - Deploy a pod instance using the Jupyter container image which mounts of the configuration file as a read-only file within the pod.
- Create the headless service and ingress.
Please refer to the example repository or previous article for the YAML deployment manifests and implementation details.
Testing the Configuration
Once the Jupyter pod has been deployed and you can access it, you're ready to test the configuration. There are two pieces of functionality that you need to verify:
- That the pod is able to launch Spark jobs into the cluster. This verifies that your Spark and Hadoop installs work as expected.
- That you are able to read and write files to MinIO. This verifies that you've installed the Amazon client libraries correctly, that the environment is sane, and that you've managed to configure everything correctly.
Verify Spark Configuration
To verify that the Spark configuration works as expected, we can use a variation of the code we saw in the second article with a set of additions. Our notebook setup needs the following:
- Import dependencies (which must be done prior to the initialization of the
SparkContext
) required to connect Amazon S3. - Configure the Spark connection and executor environment.
- Initialize the
SparkContext
and cluster connection. - Verify that all of the components are visible to one another and available by submitting a small processing job.
Spark Environment Options and Dependencies
The code listing configures Spark to utilize the extra dependencies required to read and write data to MinIO. These dependencies are included in the container image we built above, but must be referenced at runtime. We do this by providing a JARS option and the PySpark submit arguments.
In addition to declaring dependencies via the JARS option , it is possible to download and install dependencies dynamically using Apache Maven. To use the dynamic functionality, you need to provide a list of Spark packages. The code in the listing shows how to construct a comma separated strings for both.
Only the JARS is actually provided to the submit arguments, however.
# Manage PySpark Runtime Options import os PACKAGE_OPTIONS = '--packages %s ' % ','.join(( # 'org.apache.spark:spark-avro_2.12:2.4.4', )) JAR_OPTIONS = '--jars %s ' % ','.join(( '/opt/spark/jars/joda-time-2.9.9.jar', '/opt/spark/jars/httpclient-4.5.3.jar', '/opt/spark/jars/aws-java-sdk-s3-1.11.534.jar', '/opt/spark/jars/aws-java-sdk-kms-1.11.534.jar', '/opt/spark/jars/aws-java-sdk-dynamodb-1.11.534.jar', '/opt/spark/jars/aws-java-sdk-core-1.11.534.jar', '/opt/spark/jars/aws-java-sdk-1.11.534.jar', '/opt/spark/jars/hadoop-aws-3.1.2.jar', '/opt/spark/jars/slf4j-api-1.7.29.jar', '/opt/spark/jars/slf4j-log4j12-1.7.29.jar', )) os.environ['PYSPARK_SUBMIT_ARGS'] = JAR_OPTIONS + ' pyspark-shell' os.environ.get('PYSPARK_SUBMIT_ARGS')
Configure Spark to Connect to Kubernetes and MinIO
The code in the second listing defines the parameters needed by Spark to connect to the cluster and launch worker instances. These parameters include the URL to the spark master, the container image (note that here we use the MinIO executor prepared earlier), the service account, and the driver hostname and port.
We further configure the parameters needed to connect to the MinIO instance and read/write data. Those parameters include the endpoint (in our environment, MinIO is available at http://object-storage:9000), the access ID, the secret key, the environment parameters (such as enabling "fast" uploads, which tells Spark that MinIO will not be returning an MD5 hash of the file contents), and which file system Spark should use to read and write data. Finally, we initialize the Spark context and session.
In many Spark programs you will have a reference to both a context and session. The context is the main entry point for Spark functionality and controls the connection to the server. The session serves as the entry point for data frames, the primary way you work with structured data and Spark's SQL functionality.
Spark's SQL library is not just the interface to databases. It is the primary module Spark uses for working with any type of structured data, regardless of whether it originated from a database, a NoSQL store, a data warehouse, or a set of flat files.
import pyspark conf = pyspark.SparkConf() # Kubernetes is a Spark master in our setup. # It creates pods with Spark workers, orchestrates those # workers and returns final results to the Spark driver # (“k8s://https://” is NOT a typo, this is how Spark knows the “provider” type). conf.setMaster("k8s://https://kubernetes.default:443") # Worker pods are created from the base Spark docker image. # If you use another image, specify its name instead. conf.set( "spark.kubernetes.container.image", "code.oak-tree.tech:5005/courseware/oak-tree/dataops-examples/spark-k8s-minio-base") # Authentication certificate and token (required to create worker pods): conf.set( "spark.kubernetes.authenticate.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") conf.set( "spark.kubernetes.authenticate.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") # Service account which should be used for the driver conf.set( "spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver") # 2 pods/workers will be created. Can be expanded for larger workloads. conf.set("spark.executor.instances", "4") # The DNS alias for the Spark driver. Required by executors to report status. conf.set( "spark.driver.host", "oaktree-jupyter") # Port which the Spark shell should bind to and to which executors will report progress conf.set("spark.driver.port", "20020") # Configure S3 Object Storage as filesystem, pass MinIO credentials conf.set("spark.hadoop.fs.s3a.endpoint", 'http://object-storage:9000') \ .set("spark.hadoop.fs.s3a.access.key", 'minio-access-key') \ .set("spark.hadoop.fs.s3a.secret.key", 'minio-secret-long-and-random') \ .set("spark.hadoop.fs.s3a.fast.upload", True) \ .set("spark.hadoop.fs.s3a.path.style.access", True) \ .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") # Initialize spark context, create executors conf.setAppName('spark-iotest') sc = pyspark.SparkContext(conf=conf) # Initialize Spark Session from pyspark.sql import SparkSession spark = SparkSession(sc)
Verify the MinIO Configuration
Write Data to MinIO
The final code listing shows how to connect to MinIO and write a text file, which we then turn around and read. File paths in Spark reference the type of schema (s3://), the bucket, and key name.
For the code to work, you need to have previously created a container/bucket called "test-container". Spark does not create containers automatically.
# Write a two column table to MinIO as CSV OBJECTURL_TEST = 's3a://test-container/playground/colors-test.csv' rdd = sc.parallelize([('Mario', 'Red'), ('Luigi', 'Green'), ('Princess', 'Pink')]) rdd.toDF(['name', 'color']).write.csv(OBJECTURL_TEST, header=True) # Read the data back from MinIO gnames_df = spark.read.format('csv').option('header', True) \ .load(OBJECTURL_TEST) gnames_df.show()
Et Voila
MinIO is an enormously powerful solution for storing large amounts of data. While it is light-weight enough to be used in a development environment, it can also scale to handle petabytes in a data center setting. It's an ideal foundation upon which to build storage infrastructure.
At this point in our series, we've reached an important milestone. Spark and MinIO (storage and compute), lay the foundation of a Big Data computing infrastructure in Kubernetes. Having both in the same cluster gives us something useful we can start to play with. From here, we have a springboard to explore the fun parts of Big Data Science - exploring information, training machine learning models, analyzing graph databases, and processing streaming data - and less on the Big Iron. Stay tuned!
Comments
Loading
No results found