Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions .github/workflows/spark-namespace-insert.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Spark Namespace Insert Docker

on:
pull_request:
types:
- opened
- synchronize
- ready_for_review
- reopened
paths:
- ".github/workflows/spark-namespace-insert.yml"
- "Makefile"
- "docker/**"
- "integration-tests/**"
- "lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java"
- "lance-spark-base_2.12/src/main/java/org/lance/spark/write/**"
- "lance-spark-base_2.12/src/test/java/org/lance/spark/LanceSparkWriteOptionsTest.java"
- "pom.xml"
- "*/pom.xml"
workflow_dispatch:
inputs:
spark-version:
description: "Spark version to test"
required: true
default: "3.5"
scala-version:
description: "Scala version to test"
required: true
default: "2.13"
backends:
description: "Comma-separated test backends: local or local,rest-dir"
required: true
default: "local,rest-dir"
rest-uri:
description: "Optional REST namespace URI. If omitted, tests start a local REST directory namespace."
required: false
default: ""
rest-database:
description: "Optional database header value for an external REST namespace"
required: false
default: ""
docker-run-args:
description: "Extra docker run args for docker-test"
required: false
default: ""

permissions:
contents: read

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
SPARK_VERSION: ${{ github.event.inputs['spark-version'] || '3.5' }}
SCALA_VERSION: ${{ github.event.inputs['scala-version'] || '2.13' }}
NAMESPACE_INSERT_TEST_BACKENDS: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.backends || 'local,rest-dir' }}
NAMESPACE_INSERT_PYTEST_CMD: >-
pytest /home/lance/tests/test_lance_spark.py::TestDMLNamespaceInsert
-v --timeout=180

jobs:
namespace-insert-docker-test:
name: Namespace Insert Docker Test
runs-on: ubuntu-24.04
timeout-minutes: 90
steps:
- name: Checkout
uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Set up Java
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 17
cache: "maven"
- name: Resolve Docker build args
id: docker-args
run: |
make print-docker-build-args SPARK_VERSION=${SPARK_VERSION} SCALA_VERSION=${SCALA_VERSION} >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build test-base image (cached)
uses: docker/build-push-action@v6
with:
context: docker
file: docker/Dockerfile.test-base
load: true
tags: lance-spark-test-base:${{ env.SPARK_VERSION }}_${{ env.SCALA_VERSION }}
build-args: |
SPARK_DOWNLOAD_VERSION=${{ steps.docker-args.outputs.spark-download-version }}
SPARK_MAJOR_VERSION=${{ env.SPARK_VERSION }}
SCALA_VERSION=${{ env.SCALA_VERSION }}
PY4J_VERSION=${{ steps.docker-args.outputs.py4j-version }}
SPARK_SCALA_SUFFIX=${{ steps.docker-args.outputs.spark-scala-suffix }}
cache-from: type=gha,scope=namespace-insert-test-base-${{ env.SPARK_VERSION }}_${{ env.SCALA_VERSION }}
cache-to: type=gha,mode=max,scope=namespace-insert-test-base-${{ env.SPARK_VERSION }}_${{ env.SCALA_VERSION }}
- name: Build bundle
run: make bundle SPARK_VERSION=${SPARK_VERSION} SCALA_VERSION=${SCALA_VERSION}
- name: Build test image
run: |
make docker-build-test \
SPARK_VERSION=${SPARK_VERSION} \
SCALA_VERSION=${SCALA_VERSION} \
LANCE_NAMESPACE_IMPL_VERSION=${{ steps.docker-args.outputs.lance-namespace-impl-version }}
- name: Run directory namespace insert tests
if: ${{ contains(env.NAMESPACE_INSERT_TEST_BACKENDS, 'local') }}
run: |
make docker-test \
SPARK_VERSION=${SPARK_VERSION} \
SCALA_VERSION=${SCALA_VERSION} \
TEST_BACKENDS=local \
PYTEST_CMD="${NAMESPACE_INSERT_PYTEST_CMD}"
- name: Resolve REST namespace URI
id: rest
if: ${{ contains(env.NAMESPACE_INSERT_TEST_BACKENDS, 'rest-dir') }}
env:
INPUT_REST_URI: ${{ github.event.inputs['rest-uri'] }}
INPUT_DOCKER_RUN_ARGS: ${{ github.event.inputs['docker-run-args'] }}
run: |
rest_uri="${INPUT_REST_URI}"
docker_run_args="${INPUT_DOCKER_RUN_ARGS}"
start_rest_dir="false"
rest_dir_root=""
rest_dir_port=""

if [ -z "${rest_uri}" ]; then
rest_dir_port="10024"
rest_dir_root="/home/lance/rest-data"
rest_uri="http://127.0.0.1:${rest_dir_port}"
start_rest_dir="true"
fi

echo "uri=${rest_uri}" >> "$GITHUB_OUTPUT"
echo "start_rest_dir=${start_rest_dir}" >> "$GITHUB_OUTPUT"
echo "rest_dir_root=${rest_dir_root}" >> "$GITHUB_OUTPUT"
echo "rest_dir_port=${rest_dir_port}" >> "$GITHUB_OUTPUT"
{
echo "docker_run_args<<EOF"
echo "${docker_run_args}"
echo "EOF"
} >> "$GITHUB_OUTPUT"
- name: Run REST directory namespace insert tests
if: ${{ contains(env.NAMESPACE_INSERT_TEST_BACKENDS, 'rest-dir') }}
env:
LANCE_SPARK_REST_URI: ${{ steps.rest.outputs.uri }}
LANCE_SPARK_REST_API_KEY: ${{ secrets.LANCE_SPARK_REST_API_KEY }}
LANCE_SPARK_REST_DATABASE: ${{ github.event.inputs['rest-database'] }}
LANCE_SPARK_START_REST_DIR: ${{ steps.rest.outputs.start_rest_dir }}
LANCE_SPARK_REST_DIR_ROOT: ${{ steps.rest.outputs.rest_dir_root }}
LANCE_SPARK_REST_DIR_PORT: ${{ steps.rest.outputs.rest_dir_port }}
DOCKER_RUN_ARGS: ${{ steps.rest.outputs.docker_run_args }}
run: |
make docker-test \
SPARK_VERSION=${SPARK_VERSION} \
SCALA_VERSION=${SCALA_VERSION} \
TEST_BACKENDS=rest-dir \
LANCE_SPARK_REST_URI="${LANCE_SPARK_REST_URI}" \
LANCE_SPARK_REST_API_KEY="${LANCE_SPARK_REST_API_KEY}" \
LANCE_SPARK_REST_DATABASE="${LANCE_SPARK_REST_DATABASE}" \
LANCE_SPARK_START_REST_DIR="${LANCE_SPARK_START_REST_DIR}" \
LANCE_SPARK_REST_DIR_ROOT="${LANCE_SPARK_REST_DIR_ROOT}" \
LANCE_SPARK_REST_DIR_PORT="${LANCE_SPARK_REST_DIR_PORT}" \
DOCKER_RUN_ARGS="${DOCKER_RUN_ARGS}" \
PYTEST_CMD="${NAMESPACE_INSERT_PYTEST_CMD}"
17 changes: 17 additions & 0 deletions docs/src/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ and namespace-specific options:
| `spark.sql.catalog.{name}.parent` | String | ✗ | Parent prefix for multi-level namespaces. See [Note on Namespace Levels](#note-on-namespace-levels). |
| `spark.sql.catalog.{name}.parent_delimiter` | String | ✗ | Delimiter for parent prefix (default: `.`). See [Note on Namespace Levels](#note-on-namespace-levels). |

## Write Options

Write options can be set on DataFrame writes. Catalog-level values are also used as defaults when
they are present in the Spark catalog configuration.

| Option | Type | Default | Description |
|--------------------------------|---------|---------|----------------------------------------------------------------------------------------------------------|
| `use_namespace_insert` | Boolean | `false` | Use the Lance Namespace insert API for eligible append writes to namespace-backed tables. |
| `namespace_insert_parallelism` | Integer | `0` | Number of writer tasks to request from Spark for namespace insert writes. `0` preserves Spark's plan. For sharded tables Spark uses the sharding distribution; for unsharded tables Spark repartitions by the first output column. |
| `batch_size` | Integer | `8192` | Maximum rows per Arrow batch/request before flushing. |
| `max_batch_bytes` | Long | `268435456` | Maximum approximate bytes per Arrow batch/request before flushing. |

Namespace insert writes are intended for append ingestion through a namespace implementation,
including REST namespaces. Each insert request is committed by the namespace as it runs, so this mode
does not provide the same Spark driver-side atomic commit behavior as the default writer. If a Spark
task or driver fails after some requests complete, those rows may already be visible.

## Example Namespace Implementations

### Directory Namespace
Expand Down
44 changes: 44 additions & 0 deletions docs/src/operations/dml/insert-into.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,50 @@ Add data to existing Lance tables using SQL or DataFrames.
newDF.write().mode("append").saveAsTable("users");
```

## Namespace Insert Writes

For namespace-backed tables, append writes can use the Lance Namespace insert API. This is useful
when the namespace implementation can execute ingestion close to the table storage, such as a REST
namespace service.

=== "Python"
```python
df.writeTo("users") \
.option("use_namespace_insert", "true") \
.option("namespace_insert_parallelism", "8") \
.option("batch_size", "4096") \
.append()
```

=== "Scala"
```scala
df.writeTo("users")
.option("use_namespace_insert", "true")
.option("namespace_insert_parallelism", "8")
.option("batch_size", "4096")
.append()
```

`use_namespace_insert` applies to append writes to existing namespace-backed tables. Create,
replace, overwrite, path-based writes, and schema backfill operations use the default writer.

### Expected Behavior

For users, namespace insert writes look like a normal DataFrame append with two optional write
options. Existing `INSERT INTO` statements and `.append()` calls keep using the default Spark writer
unless `use_namespace_insert` is set. When enabled, Spark still plans executor-side writer tasks, but
each task sends Arrow batches to the configured Lance namespace instead of committing Lance fragments
directly from the driver. This lets directory and REST namespaces handle ingestion through the same
namespace API.

When `namespace_insert_parallelism` is greater than `0`, Spark creates that many writer tasks. For
sharded tables Spark uses the table sharding distribution; for unsharded tables Spark repartitions
by the first output column.

Each namespace insert request is committed as it runs. If a Spark task or driver fails after some
requests complete, those rows may already be visible. Use the default writer when you need Spark's
driver-side atomic commit behavior.

## Insert with Column Specification

=== "SQL"
Expand Down
102 changes: 101 additions & 1 deletion integration-tests/test_lance_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@
import time
import pytest
from packaging.version import Version
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, BinaryType
from pyspark.sql.types import (
ArrayType,
BinaryType,
DoubleType,
FloatType,
IntegerType,
StringType,
StructField,
StructType,
)

SPARK_VERSION = Version(os.environ.get("SPARK_VERSION", "3.5"))

Expand Down Expand Up @@ -123,6 +132,12 @@ def _require_sql_search_backend(spark):
pytest.skip("SQL search table functions are covered on local dir and rest-dir backends")


def _require_namespace_insert_backend(spark):
backend = getattr(spark, "_lance_backend", None)
if backend not in ("local", "rest-dir"):
pytest.skip("Namespace insert writes are covered on local dir and rest-dir backends")


# =============================================================================
# DDL (Data Definition Language) Tests
# =============================================================================
Expand Down Expand Up @@ -1989,6 +2004,91 @@ def test_insert_append_data(self, spark):
assert count == 4


@pytest.mark.rest_dir_compatible
class TestDMLNamespaceInsert:
"""Test append writes through the Lance namespace insert API."""

def test_namespace_insert_append_data(self, spark):
"""Test DataFrame append through namespace insert with multiple writer tasks."""
_require_namespace_insert_backend(spark)

spark.sql("""
CREATE TABLE default.test_table (
id INT,
name STRING,
value DOUBLE
)
""")

df = (
spark.range(0, 24)
.repartition(6)
.selectExpr(
"CAST(id AS INT) AS id",
"concat('name-', id) AS name",
"CAST(id * 1.5 AS DOUBLE) AS value",
)
)

(
df.writeTo("default.test_table")
.option("use_namespace_insert", "true")
.option("namespace_insert_parallelism", "3")
.option("batch_size", "5")
.append()
)

rows = spark.sql("""
SELECT COUNT(*) AS count_rows, SUM(id) AS sum_id, SUM(value) AS sum_value
FROM default.test_table
""").collect()

assert rows[0].count_rows == 24
assert rows[0].sum_id == sum(range(24))
assert rows[0].sum_value == pytest.approx(sum(i * 1.5 for i in range(24)))

def test_namespace_insert_append_vector_data(self, spark):
"""Test namespace insert preserves fixed-size-list vector writes."""
_require_namespace_insert_backend(spark)

spark.sql("""
CREATE TABLE default.test_table (
id INT,
vector ARRAY<FLOAT>
) USING lance
TBLPROPERTIES ('vector.arrow.fixed-size-list.size' = '4')
""")

schema = StructType([
StructField("id", IntegerType(), True),
StructField("vector", ArrayType(FloatType()), True),
])
df = spark.createDataFrame(
[
(1, [1.0, 0.0, 0.0, 0.0]),
(2, [0.0, 1.0, 0.0, 0.0]),
(3, [0.0, 0.0, 1.0, 0.0]),
(4, [0.0, 0.0, 0.0, 1.0]),
],
schema,
).repartition(2)

(
df.writeTo("default.test_table")
.option("use_namespace_insert", "true")
.option("namespace_insert_parallelism", "2")
.append()
)

rows = spark.sql("""
SELECT id, vector FROM default.test_table ORDER BY id
""").collect()

assert [row.id for row in rows] == [1, 2, 3, 4]
assert rows[0].vector == [1.0, 0.0, 0.0, 0.0]
assert rows[3].vector == [0.0, 0.0, 0.0, 1.0]


@requires_update_or_merge
class TestDMLUpdate:
"""Test DML UPDATE SET operations."""
Expand Down
Loading
Loading