|
1 | 1 | --- |
2 | | -draft: true |
| 2 | +sidebar_label: Spark |
3 | 3 | --- |
4 | 4 |
|
5 | | -# Spark |
| 5 | +# Using Apache Spark with Ozone |
6 | 6 |
|
7 | | -**TODO:** File a subtask under [HDDS-9858](https://issues.apache.org/jira/browse/HDDS-9858) and complete this page or section. |
8 | | -**TODO:** Uncomment link to this page in src/pages/index.js |
| 7 | +[Apache Spark](https://spark.apache.org/) is a widely used unified analytics engine for large-scale data processing. Ozone can serve as a scalable storage layer for Spark applications, allowing you to read and write data directly from/to Ozone clusters using familiar Spark APIs. |
| 8 | + |
| 9 | +:::note |
| 10 | +This guide covers Apache Spark 3.x. Examples were tested with Spark 3.5.x and Apache Ozone 2.2.0. |
| 11 | +::: |
| 12 | + |
| 13 | +## Overview |
| 14 | + |
| 15 | +Spark interacts with Ozone primarily through the OzoneFileSystem connector, which allows access using the `ofs://` URI scheme. |
| 16 | +Spark can also access Ozone through the S3 Gateway using the `s3a://` protocol, which is useful for porting existing cloud-native Spark applications to Ozone without changing application code. |
| 17 | + |
| 18 | +The older `o3fs://` scheme is supported for legacy compatibility but is not recommended for new deployments. |
| 19 | + |
| 20 | +Key benefits include: |
| 21 | + |
| 22 | +- Storing large datasets generated or consumed by Spark jobs directly in Ozone. |
| 23 | +- Leveraging Ozone's scalability and object storage features for Spark workloads. |
| 24 | +- Using standard Spark DataFrame and `RDD` APIs to interact with Ozone data. |
| 25 | + |
| 26 | +## Prerequisites |
| 27 | + |
| 28 | +1. **Ozone Cluster:** A running Ozone cluster. |
| 29 | +2. **Ozone Client JARs:** The `ozone-filesystem-hadoop3-*.jar` must be available on the Spark driver and executor classpath. |
| 30 | +3. **Hadoop 3.4.x runtime (Ozone 2.1.0+):** Ozone 2.1.0 removed bundled copies of several Hadoop classes (`LeaseRecoverable`, `SafeMode`, `SafeModeAction`) and now requires them from the runtime classpath ([HDDS-13574](https://issues.apache.org/jira/browse/HDDS-13574)). Since Spark 3.5.x ships with Hadoop 3.3.4, you must add `hadoop-common-3.4.x.jar` to the Spark classpath alongside the existing Hadoop JARs. |
| 31 | +4. **Configuration:** Spark needs access to Ozone configuration (`core-site.xml` and potentially `ozone-site.xml`) to connect to the Ozone cluster. |
| 32 | + |
| 33 | +## Configuration |
| 34 | + |
| 35 | +### 1. Core Site (`core-site.xml`) |
| 36 | + |
| 37 | +For `core-site.xml` configuration, refer to the [Ozone File System (ofs) Configuration section](../client-interfaces/ofs#configuration). |
| 38 | + |
| 39 | +### 2. Spark Configuration (`spark-defaults.conf` or `--conf`) |
| 40 | + |
| 41 | +While Spark often picks up settings from `core-site.xml` on the classpath, explicitly setting the implementation can sometimes be necessary: |
| 42 | + |
| 43 | +```properties |
| 44 | +spark.hadoop.fs.ofs.impl=org.apache.hadoop.fs.ozone.RootedOzoneFileSystem |
| 45 | +``` |
| 46 | + |
| 47 | +### 3. Security (Kerberos) |
| 48 | + |
| 49 | +If your Ozone and Spark clusters are Kerberos-enabled, Spark needs permission to obtain delegation tokens for Ozone. |
| 50 | + |
| 51 | +Configure the following property in `spark-defaults.conf` or via `--conf`, specifying your Ozone filesystem URI: |
| 52 | + |
| 53 | +```properties |
| 54 | +# For YARN deployments in spark3+ |
| 55 | +spark.kerberos.access.hadoopFileSystems=ofs://ozone1/ |
| 56 | +``` |
| 57 | + |
| 58 | +Replace `ozone1` with your OM Service ID. Ensure the user running the Spark job has a valid Kerberos ticket (`kinit`). |
| 59 | + |
| 60 | +## Usage Examples |
| 61 | + |
| 62 | +You can read and write data using `ofs://` URIs like any other Hadoop-compatible filesystem. |
| 63 | + |
| 64 | +**URI Format:** `ofs://<om-service-id>/<volume>/<bucket>/path/to/key` |
| 65 | + |
| 66 | +### Reading Data (Scala) |
| 67 | + |
| 68 | +```scala |
| 69 | +import org.apache.spark.sql.SparkSession |
| 70 | + |
| 71 | +val spark = SparkSession.builder.appName("Ozone Spark Read Example").getOrCreate() |
| 72 | + |
| 73 | +// Read a CSV file from Ozone |
| 74 | +val df = spark.read.format("csv") |
| 75 | + .option("header", "true") |
| 76 | + .option("inferSchema", "true") |
| 77 | + .load("ofs://ozone1/volume1/bucket1/input/data.csv") |
| 78 | + |
| 79 | +df.show() |
| 80 | +``` |
| 81 | + |
| 82 | +### Writing Data (Scala) |
| 83 | + |
| 84 | +```scala |
| 85 | +import org.apache.spark.sql.SparkSession |
| 86 | + |
| 87 | +val spark = SparkSession.builder.appName("Ozone Spark Write Example").getOrCreate() |
| 88 | + |
| 89 | +// Assume 'df' is a DataFrame you want to write |
| 90 | +val data = Seq(("Alice", 1), ("Bob", 2), ("Charlie", 3)) |
| 91 | +val df = spark.createDataFrame(data).toDF("name", "id") |
| 92 | + |
| 93 | +// Write DataFrame to Ozone as Parquet files |
| 94 | +df.write.mode("overwrite") |
| 95 | + .parquet("ofs://ozone1/volume1/bucket1/output/users.parquet") |
| 96 | +``` |
| 97 | + |
| 98 | +### Reading Data (Python) |
| 99 | + |
| 100 | +```python |
| 101 | +from pyspark.sql import SparkSession |
| 102 | + |
| 103 | +spark = SparkSession.builder.appName("Ozone Spark Read Example").getOrCreate() |
| 104 | + |
| 105 | +# Read a CSV file from Ozone |
| 106 | +df = spark.read.format("csv") \ |
| 107 | + .option("header", "true") \ |
| 108 | + .option("inferSchema", "true") \ |
| 109 | + .load("ofs://ozone1/volume1/bucket1/input/data.csv") |
| 110 | + |
| 111 | +df.show() |
| 112 | +``` |
| 113 | + |
| 114 | +### Writing Data (Python) |
| 115 | + |
| 116 | +```python |
| 117 | +from pyspark.sql import SparkSession |
| 118 | + |
| 119 | +spark = SparkSession.builder.appName("Ozone Spark Write Example").getOrCreate() |
| 120 | + |
| 121 | +# Assume 'df' is a DataFrame you want to write |
| 122 | +data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)] |
| 123 | +columns = ["name", "id"] |
| 124 | +df = spark.createDataFrame(data, columns) |
| 125 | + |
| 126 | +# Write DataFrame to Ozone as Parquet files |
| 127 | +df.write.mode("overwrite") \ |
| 128 | + .parquet("ofs://ozone1/volume1/bucket1/output/users.parquet") |
| 129 | +``` |
| 130 | + |
| 131 | +## Spark on Kubernetes |
| 132 | + |
| 133 | +The recommended approach for running Spark on Kubernetes with Ozone is to bake the `ozone-filesystem-hadoop3-*.jar` JAR, the `hadoop-common-3.4.x.jar` JAR (if using Ozone 2.1.0+), and core-site.xml directly into a custom Spark image. |
| 134 | + |
| 135 | +### Build a Custom Spark Image |
| 136 | + |
| 137 | +Place the Ozone client JAR and Hadoop compatibility JAR in /opt/spark/jars/, which is on the default Spark classpath, and core-site.xml in /opt/spark/conf/: |
| 138 | + |
| 139 | +```dockerfile |
| 140 | +FROM apache/spark:3.5.8-scala2.12-java11-python3-ubuntu |
| 141 | + |
| 142 | +USER root |
| 143 | + |
| 144 | +ADD https://repo1.maven.org/maven2/org/apache/ozone/ozone-filesystem-hadoop3/2.2.0/ozone-filesystem-hadoop3-2.2.0.jar \ |
| 145 | + /opt/spark/jars/ |
| 146 | + |
| 147 | +# Ozone 2.1.0+ requires Hadoop 3.4.x classes (HDDS-13574). |
| 148 | +# Add alongside (not replacing) Spark's bundled hadoop-common-3.3.4.jar. |
| 149 | +ADD https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.4.2/hadoop-common-3.4.2.jar \ |
| 150 | + /opt/spark/jars/ |
| 151 | + |
| 152 | +COPY core-site.xml /opt/spark/conf/core-site.xml |
| 153 | +COPY ozone_write.py /opt/spark/work-dir/ozone_write.py |
| 154 | + |
| 155 | +USER spark |
| 156 | +``` |
| 157 | + |
| 158 | +Where core-site.xml contains at minimum: |
| 159 | + |
| 160 | +```xml |
| 161 | +<?xml version="1.0"?> |
| 162 | +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> |
| 163 | +<configuration> |
| 164 | + <property> |
| 165 | + <name>fs.ofs.impl</name> |
| 166 | + <value>org.apache.hadoop.fs.ozone.RootedOzoneFileSystem</value> |
| 167 | + </property> |
| 168 | + <property> |
| 169 | + <name>ozone.om.address</name> |
| 170 | + <value>om-host.example.com:9862</value> |
| 171 | + </property> |
| 172 | +</configuration> |
| 173 | +``` |
| 174 | + |
| 175 | +### Submit a Spark Job |
| 176 | + |
| 177 | +```bash |
| 178 | +./bin/spark-submit \ |
| 179 | + --master k8s://https://YOUR_KUBERNETES_API_SERVER:6443 \ |
| 180 | + --deploy-mode cluster \ |
| 181 | + --name spark-ozone-example \ |
| 182 | + --conf spark.executor.instances=2 \ |
| 183 | + --conf spark.kubernetes.container.image=YOUR_REPO/spark-ozone:latest \ |
| 184 | + --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ |
| 185 | + --conf spark.kubernetes.namespace=YOUR_NAMESPACE \ |
| 186 | + local:///opt/spark/work-dir/ozone_write.py |
| 187 | +``` |
| 188 | + |
| 189 | +Replace `YOUR_KUBERNETES_API_SERVER`, `YOUR_REPO`, and `YOUR_NAMESPACE` with your environment values. |
| 190 | + |
| 191 | +## Using the S3A Protocol |
| 192 | + |
| 193 | +Spark can also access Ozone through the S3 Gateway using the `s3a://` protocol. This is useful for porting existing cloud-native Spark applications to Ozone without changing application code. |
| 194 | + |
| 195 | +For configuration details, refer to the [S3A documentation](../client-interfaces/s3a). |
0 commit comments