This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 67
Expand file tree
/
Copy pathio.py
More file actions
87 lines (75 loc) · 3.1 KB
/
io.py
File metadata and controls
87 lines (75 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import Mapping, Optional, Union
def load_data_ddl(
table_name: str,
*,
write_disposition: str = "INTO",
columns: Optional[Mapping[str, str]] = None,
partition_by: Optional[list[str]] = None,
cluster_by: Optional[list[str]] = None,
table_options: Optional[Mapping[str, Union[str, int, float, bool, list]]] = None,
from_files_options: Mapping[str, Union[str, int, float, bool, list]],
with_partition_columns: Optional[Mapping[str, str]] = None,
connection_name: Optional[str] = None,
) -> str:
"""Generates the LOAD DATA DDL statement."""
statement = ["LOAD DATA"]
statement.append(write_disposition)
statement.append(table_name)
if columns:
column_defs = ", ".join([f"{name} {typ}" for name, typ in columns.items()])
statement.append(f"({column_defs})")
if partition_by:
statement.append(f"PARTITION BY {', '.join(partition_by)}")
if cluster_by:
statement.append(f"CLUSTER BY {', '.join(cluster_by)}")
if table_options:
opts = []
for key, value in table_options.items():
if isinstance(value, str):
value_sql = repr(value)
opts.append(f"{key} = {value_sql}")
elif isinstance(value, bool):
opts.append(f"{key} = {str(value).upper()}")
elif isinstance(value, list):
list_str = ", ".join([repr(v) for v in value])
opts.append(f"{key} = [{list_str}]")
else:
opts.append(f"{key} = {value}")
options_str = ", ".join(opts)
statement.append(f"OPTIONS ({options_str})")
opts = []
for key, value in from_files_options.items():
if isinstance(value, str):
value_sql = repr(value)
opts.append(f"{key} = {value_sql}")
elif isinstance(value, bool):
opts.append(f"{key} = {str(value).upper()}")
elif isinstance(value, list):
list_str = ", ".join([repr(v) for v in value])
opts.append(f"{key} = [{list_str}]")
else:
opts.append(f"{key} = {value}")
options_str = ", ".join(opts)
statement.append(f"FROM FILES ({options_str})")
if with_partition_columns:
part_defs = ", ".join(
[f"{name} {typ}" for name, typ in with_partition_columns.items()]
)
statement.append(f"WITH PARTITION COLUMNS ({part_defs})")
if connection_name:
statement.append(f"WITH CONNECTION `{connection_name}`")
return " ".join(statement)