Skip to content

Commit 56607ec

Browse files
authored
Improve Iceberg Blueprint and documentation and Tests (apache#35961)
* Improve Iceberg pubsub Blueprint and documentation * Fix tests * fix readme * Add Iceberg extended test
1 parent 56dac75 commit 56607ec

3 files changed

Lines changed: 71 additions & 5 deletions

File tree

sdks/python/apache_beam/yaml/examples/README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535

3636
## Prerequistes
3737

38-
Build this jar for running with the run command in the next stage:
38+
Build the expansion service jar required for your YAML code.
39+
IO mapping is available in standard_io.yaml, so use this example run command:
3940

4041
```
4142
cd <PATH_TO_BEAM_REPO>/beam; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:shadowJar
@@ -163,7 +164,9 @@ python -m apache_beam.yaml.main \
163164
--num_workers $NUM_WORKERS \
164165
--job_name $JOB_NAME \
165166
--jinja_variables '{ "BOOTSTRAP_SERVERS": "123.45.67.89:9092",
166-
"TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'
167+
"TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'\
168+
--sdk_location container \
169+
--sdk_harness_container_image_overrides ".*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java11_sdk:latest"
167170
```
168171

169172
**_Optional_**: If Kafka cluster is set up with no SASL/PLAINTEXT authentication

sdks/python/apache_beam/yaml/examples/transforms/blueprint/pubsub_to_iceberg.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717
#
1818

19-
# A pipeline that both writes to and reads from the same Kafka topic.
19+
# A pipeline that reads from pubsub topic and writes to Iceberg table.
2020

2121
pipeline:
2222
type: chain
@@ -27,7 +27,7 @@ pipeline:
2727
config:
2828
topic: "projects/apache-beam-testing/topics/my-topic"
2929
format: JSON
30-
schema:
30+
schema:
3131
type: object
3232
properties:
3333
data: {type: BYTES}
@@ -39,6 +39,7 @@ pipeline:
3939
# Dynamic destinations
4040
table: "db.users.{zip}"
4141
catalog_name: "hadoop_catalog"
42+
triggering_frequency_seconds: "20"
4243
catalog_properties:
4344
type: "hadoop"
4445
warehouse: "gs://MY-WAREHOUSE"
@@ -56,4 +57,3 @@ options:
5657
# Row(label='37b', rank=4)
5758
# Row(label='37c', rank=3)
5859
# Row(label='37d', rank=2)
59-
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
fixtures:
19+
- name: TEMP_DIR
20+
type: "tempfile.TemporaryDirectory"
21+
22+
pipelines:
23+
- name: write
24+
pipeline:
25+
type: chain
26+
transforms:
27+
- type: Create
28+
config:
29+
elements:
30+
- {label: "11a", rank: 0}
31+
- {label: "37a", rank: 1}
32+
- {label: "389a", rank: 2}
33+
- type: WriteToIceberg
34+
config:
35+
table: db.labels
36+
catalog_name: hadoop_catalog
37+
catalog_properties:
38+
type: hadoop
39+
warehouse: "{TEMP_DIR}"
40+
options:
41+
project: "apache-beam-testing"
42+
temp_location: "{TEMP_DIR}"
43+
44+
- name: read
45+
pipeline:
46+
type: chain
47+
transforms:
48+
- type: ReadFromIceberg
49+
config:
50+
table: db.labels
51+
catalog_name: hadoop_catalog
52+
catalog_properties:
53+
type: hadoop
54+
warehouse: "{TEMP_DIR}"
55+
- type: AssertEqual
56+
config:
57+
elements:
58+
- {label: "11a", rank: 0}
59+
- {label: "37a", rank: 1}
60+
- {label: "389a", rank: 2}
61+
options:
62+
project: "apache-beam-testing"
63+
temp_location: "{TEMP_DIR}"

0 commit comments

Comments
 (0)