Skip to content

Commit 49abdcf

Browse files
authored
add reshuffle as a first class yaml transform and a test (#38046)
1 parent d8d12c7 commit 49abdcf

2 files changed

Lines changed: 65 additions & 0 deletions

File tree

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
pipelines:
19+
- pipeline:
20+
type: chain
21+
transforms:
22+
- type: Create
23+
config:
24+
elements:
25+
- {id: 1, value: "a"}
26+
- {id: 2, value: "b"}
27+
- {id: 3, value: "c"}
28+
- {id: 4, value: "d"}
29+
- {id: 5, value: "e"}
30+
- {id: 6, value: "f"}
31+
- {id: 7, value: "g"}
32+
- {id: 8, value: "h"}
33+
- {id: 9, value: "i"}
34+
- {id: 10, value: "j"}
35+
- type: Reshuffle
36+
- type: AssertEqual
37+
config:
38+
elements:
39+
- {id: 1, value: "a"}
40+
- {id: 2, value: "b"}
41+
- {id: 3, value: "c"}
42+
- {id: 4, value: "d"}
43+
- {id: 5, value: "e"}
44+
- {id: 6, value: "f"}
45+
- {id: 7, value: "g"}
46+
- {id: 8, value: "h"}
47+
- {id: 9, value: "i"}
48+
- {id: 10, value: "j"}

sdks/python/apache_beam/yaml/yaml_provider.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,6 +1207,22 @@ def log_and_return(x):
12071207

12081208
return pcoll | "LogForTesting" >> beam.Map(log_and_return)
12091209

1210+
class Reshuffle(beam.PTransform):
1211+
"""Reshuffles the elements of a PCollection.
1212+
1213+
Redistributes the elements of a PCollection to prevent fusion or balance
1214+
load.
1215+
1216+
Args:
1217+
num_buckets: (optional) Specifies the maximum random keys that would be
1218+
generated. If not set, a default value is used.
1219+
"""
1220+
def __init__(self, num_buckets: Optional[int] = None):
1221+
self.num_buckets = num_buckets
1222+
1223+
def expand(self, pcoll):
1224+
return pcoll | beam.Reshuffle(num_buckets=self.num_buckets)
1225+
12101226
@staticmethod
12111227
def create_builtin_provider():
12121228
return InlineProvider({
@@ -1216,6 +1232,7 @@ def create_builtin_provider():
12161232
'PyTransform': YamlProviders.fully_qualified_named_transform,
12171233
'Flatten': YamlProviders.Flatten,
12181234
'WindowInto': YamlProviders.WindowInto,
1235+
'Reshuffle': YamlProviders.Reshuffle,
12191236
},
12201237
no_input_transforms=('Create', ))
12211238

0 commit comments

Comments
 (0)