Skip to content

Commit 8b50557

Browse files
authored
Add support for creating Iceberg datasets in S3 with Glue Catalog (#11)
* Finalize Glue catalog support for iceberg
1 parent 8305098 commit 8b50557

7 files changed

Lines changed: 955 additions & 13 deletions

File tree

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,16 @@ Install with the `delta` module: `pip install faker-cli[delta]`
143143
fake -n 10 pyint,user_name,date_this_year -f deltalake -o sample_data
144144
```
145145

146+
### Iceberg
147+
148+
And, of course, Iceberg tables!
149+
150+
Currently supported are writing to a Glue or generic SQL catalog.
151+
152+
```bash
153+
fake -n 10 pyint,user_name,date_this_year -f iceberg -C glue://default.iceberg_sample -o s3://YOUR_BUCKET/iceberg-data/
154+
```
155+
146156
## Templates
147157

148158
The libary includes a couple templates that can be used to generate certain types of fake data easier.

faker_cli/cli.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,22 @@
2525
@click.option(
2626
"--format",
2727
"-f",
28-
type=click.Choice(["csv", "json", "parquet", "deltalake"]),
28+
type=click.Choice(["csv", "json", "parquet", "deltalake", "iceberg"]),
2929
default="csv",
3030
help="Format of the output",
3131
)
3232
@click.option("--output", "-o", type=click.Path(writable=True))
3333
@click.option("--columns", "-c", help="Column names", default=None, required=False)
3434
@click.option("--template", "-t", help="Template to use", type=click.Choice(["s3access", "cloudfront"]), default=None)
35+
@click.option("--catalog", "-C", help="Catalog URI", default=None, required=False)
3536
@click.argument("column_types", required=False)
3637
@click.option("--provider", "-p", help="Fake data provider", type=click.Choice(["faker", "mimesis"]), default="faker")
37-
def main(num_rows, format, output, columns, template, column_types, provider):
38+
def main(num_rows, format, output, columns, template, catalog, column_types, provider):
3839
"""
3940
Generate fake data, easily.
4041
4142
COLUMN_TYPES is a comma-seperated list of Faker property names, like
42-
pyint,username,date_this_year
43+
pyint,user_name,date_this_year
4344
4445
You can also use --template for real-world synthetic data.
4546
"""
@@ -62,10 +63,12 @@ def main(num_rows, format, output, columns, template, column_types, provider):
6263
raise click.BadArgumentUsage('templates are only supported with the "faker" provider.')
6364

6465
# Parquet output requires a filename
65-
if format in ["parquet", "deltalake"] and output is None:
66-
raise click.BadArgumentUsage("parquet | deltalake formats requires --output/-o filename parameter.")
67-
if output is not None and format not in ["parquet", "deltalake"]:
66+
if format in ["parquet", "deltalake", "iceberg"] and output is None:
67+
raise click.BadArgumentUsage(f"{format} format requires --output/-o filename parameter.")
68+
if output is not None and format not in ["parquet", "deltalake", "iceberg"]:
6869
raise click.BadArgumentUsage("output files not supported for csv/json yet.")
70+
if catalog and format not in ['iceberg']:
71+
raise click.BadArgumentUsage("catalog option is only available for Iceberg formats")
6972

7073
# Optionally load additional features
7174
if format == "parquet":
@@ -90,6 +93,17 @@ def main(num_rows, format, output, columns, template, column_types, provider):
9093
"Make sure to install faker-cli using `pip install faker-cli[delta]`."
9194
)
9295

96+
if format == "iceberg":
97+
try:
98+
from faker_cli.writers.iceberg import IcebergWriter
99+
100+
KLAS_MAPPER["iceberg"] = IcebergWriter
101+
except ImportError:
102+
raise click.ClickException(
103+
"Using Iceberg writer, but the 'iceberg' package is not installed. "
104+
"Make sure to install faker-cli using `pip install faker-cli[iceberg]`."
105+
)
106+
93107
# If the user provides a template, we use that provider and writer and exit.
94108
# We assume a template has a custom writer that may be different than CSV or JSON
95109
if template:
@@ -108,7 +122,8 @@ def main(num_rows, format, output, columns, template, column_types, provider):
108122
format_klas = KLAS_MAPPER.get(format)
109123
if format_klas is None:
110124
raise click.ClickException(f"Format {format} not supported.")
111-
writer = format_klas(sys.stdout, headers, output)
125+
# Fix in a better way - maybe passing **kwargs?
126+
writer = format_klas(sys.stdout, headers, output, catalog)
112127
for i in range(num_rows):
113128
writer.write(fake.generate_row(col_types))
114129
writer.close()

faker_cli/writer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55

66
class Writer:
7-
def __init__(self, output, headers, filename: Optional[str] = None):
7+
def __init__(self, output, headers, filename: Optional[str] = None, catalog_uri: Optional[str] = None):
88
self.output = output
99
self.headers = headers
1010
self.writer = None
@@ -17,7 +17,7 @@ def close(self):
1717

1818

1919
class CSVWriter(Writer):
20-
def __init__(self, output, headers, filename):
20+
def __init__(self, output, headers, filename, catalog_uri):
2121
super().__init__(output, headers)
2222
self.writer = csv.writer(self.output)
2323
self.write(headers)
@@ -27,7 +27,7 @@ def write(self, row):
2727

2828

2929
class JSONWriter(Writer):
30-
def __init__(self, output, headers, filename):
30+
def __init__(self, output, headers, filename, catalog_uri):
3131
super().__init__(output, headers)
3232
self.writer = self.output
3333

faker_cli/writers/iceberg.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
from tempfile import TemporaryDirectory
2+
from urllib.parse import urlparse
3+
4+
import click
5+
import pyarrow as pa
6+
from pyiceberg.catalog import Catalog
7+
from pyiceberg.catalog.sql import SqlCatalog
8+
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError
9+
10+
from faker_cli.writers.parquet import ParquetWriter
11+
12+
13+
class CatalogManager:
14+
def __init__(self, uri: str, location: str) -> None:
15+
[self.database, self.table] = urlparse(uri).netloc.split(".")
16+
self.catalog = self._from_uri(uri, location)
17+
18+
def _from_uri(self, uri: str, location: str) -> Catalog:
19+
u = urlparse(uri)
20+
if u.scheme == "glue":
21+
try:
22+
from pyiceberg.catalog.glue import GlueCatalog
23+
except ImportError:
24+
raise click.ClickException(
25+
"Using Iceberg writer with Glue catalog, but the 'boto3' package is not installed. "
26+
"Make sure to install faker-cli using `pip install faker-cli[iceberg,glue]`."
27+
)
28+
glue = GlueCatalog(self.database)
29+
try:
30+
glue.load_namespace_properties(self.database)
31+
glue.load_table(u.netloc)
32+
raise Exception("Table already exists, please delete or choose another name.")
33+
except NoSuchNamespaceError:
34+
glue.create_namespace(self.database)
35+
except NoSuchTableError:
36+
pass
37+
38+
return glue
39+
40+
elif u.scheme == "sqlite":
41+
self.temp_path = TemporaryDirectory()
42+
sql = SqlCatalog(
43+
self.database, uri=f"sqlite:////{self.temp_path.name}/pyiceberg_catalog.db", warehouse=location
44+
)
45+
sql.create_namespace(self.database)
46+
return sql
47+
else:
48+
raise Exception("Unsupported catalog type, only glue or sqllite are supported.")
49+
50+
def create_table(self, schema, warehouse_path) -> pa.Table:
51+
if self.catalog is SqlCatalog:
52+
table = self.catalog.create_table(
53+
f"{self.database}.{self.table}",
54+
schema=schema,
55+
)
56+
else:
57+
# location required for GlueCatalog
58+
table = self.catalog.create_table(
59+
f"{self.database}.{self.table}",
60+
schema=schema,
61+
location=warehouse_path.rstrip("/"),
62+
)
63+
return table
64+
65+
66+
class IcebergWriter(ParquetWriter):
67+
def __init__(self, output, headers, filename, catalog_uri):
68+
super().__init__(output, headers, filename, catalog_uri)
69+
self.warehouse_path = filename
70+
self.temp_path = TemporaryDirectory()
71+
self.table: pa.Table = None
72+
self.catalog: CatalogManager = CatalogManager(catalog_uri, filename)
73+
74+
def close(self):
75+
iceberg_table = self.catalog.create_table(self.table.schema, self.warehouse_path)
76+
iceberg_table.overwrite(self.table)
77+
78+
if self.catalog is SqlCatalog:
79+
pa.fs.copy_files(
80+
f"{self.temp_path.name}/pyiceberg_catalog.db", f"{self.warehouse_path}pyiceberg_catalog.db"
81+
)
82+
return super().close()

faker_cli/writers/parquet.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from faker_cli.writer import Writer
66

77
class ParquetWriter(Writer):
8-
def __init__(self, output, headers, filename):
8+
def __init__(self, output, headers, filename, catalog_uri):
99
super().__init__(output, headers)
1010
self.filename = filename
1111
self.table: pa.Table = None

0 commit comments

Comments
 (0)