|
5 | 5 | specified bounding box in a few different file formats. |
6 | 6 |
|
7 | 7 | """ |
| 8 | + |
8 | 9 | import json |
9 | 10 | import os |
10 | 11 | import sys |
|
15 | 16 | import pyarrow.dataset as ds |
16 | 17 | import pyarrow.compute as pc |
17 | 18 | import pyarrow.fs as fs |
18 | | -import pyarrow.parquet as pq |
19 | | -import shapely.wkb |
20 | | - |
21 | | -from . core import record_batch_reader, get_all_overture_types |
22 | | - |
23 | 19 |
|
24 | | -def get_writer(output_format, path, schema): |
25 | | - if output_format == "geojson": |
26 | | - writer = GeoJSONWriter(path) |
27 | | - elif output_format == "geojsonseq": |
28 | | - writer = GeoJSONSeqWriter(path) |
29 | | - elif output_format == "geoparquet": |
30 | | - # Update the geoparquet metadata to remove the file-level bbox which |
31 | | - # will no longer apply to this file. Since we cannot write the field at |
32 | | - # the end, just remove it as it's optional. Let the per-row bounding |
33 | | - # boxes do all the work. |
34 | | - metadata = schema.metadata |
35 | | - geo = json.loads(metadata[b"geo"]) |
36 | | - for column in geo["columns"].keys(): |
37 | | - column.pop("bbox") |
38 | | - metadata[b"geo"] = json.dumps(geo).encode("utf-8") |
39 | | - schema = schema.with_metadata(metadata) |
| 20 | +from geoarrow.rust.core import write_geojson, write_geojson_lines, write_parquet |
40 | 21 |
|
41 | | - writer = pq.ParquetWriter(path, schema) |
42 | | - return writer |
| 22 | +from .core import record_batch_reader, get_all_overture_types |
43 | 23 |
|
44 | 24 |
|
45 | 25 | class BboxParamType(click.ParamType): |
@@ -89,99 +69,12 @@ def download(bbox, output_format, output, type_): |
89 | 69 | if reader is None: |
90 | 70 | return |
91 | 71 |
|
92 | | - with get_writer(output_format, output, schema=reader.schema) as writer: |
93 | | - copy(reader, writer) |
94 | | - |
95 | | - |
96 | | -def copy(reader, writer): |
97 | | - while True: |
98 | | - try: |
99 | | - batch = reader.read_next_batch() |
100 | | - except StopIteration: |
101 | | - break |
102 | | - if batch.num_rows > 0: |
103 | | - writer.write_batch(batch) |
104 | | - |
105 | | - |
106 | | -class BaseGeoJSONWriter: |
107 | | - """ |
108 | | - A base feature writer that manages either a file handle |
109 | | - or output stream. Subclasses should implement write_feature() |
110 | | - and finalize() if needed |
111 | | - """ |
112 | | - |
113 | | - def __init__(self, where): |
114 | | - self.file_handle = None |
115 | | - if isinstance(where, str): |
116 | | - self.file_handle = open(os.path.expanduser(where), "w") |
117 | | - self.writer = self.file_handle |
118 | | - else: |
119 | | - self.writer = where |
120 | | - self.is_open = True |
121 | | - |
122 | | - def __enter__(self): |
123 | | - return self |
124 | | - |
125 | | - def __exit__(self, exc_type, value, traceback): |
126 | | - self.close() |
127 | | - |
128 | | - def close(self): |
129 | | - if not self.is_open: |
130 | | - return |
131 | | - self.finalize() |
132 | | - if self.file_handle: |
133 | | - self.file_handle.close() |
134 | | - self.is_open = False |
135 | | - |
136 | | - def write_batch(self, batch): |
137 | | - if batch.num_rows == 0: |
138 | | - return |
139 | | - |
140 | | - for row in batch.to_pylist(): |
141 | | - feature = self.row_to_feature(row) |
142 | | - self.write_feature(feature) |
143 | | - |
144 | | - def write_feature(self, feature): |
145 | | - pass |
146 | | - |
147 | | - def finalize(self): |
148 | | - pass |
149 | | - |
150 | | - def row_to_feature(self, row): |
151 | | - geometry = shapely.wkb.loads(row.pop("geometry")) |
152 | | - row.pop("bbox") |
153 | | - |
154 | | - # This only removes null values in the top-level dictionary but will leave in |
155 | | - # nulls in sub-properties |
156 | | - properties = {k: v for k, v in row.items() if k != "bbox" and v is not None} |
157 | | - return { |
158 | | - "type": "Feature", |
159 | | - "geometry": geometry.__geo_interface__, |
160 | | - "properties": properties, |
161 | | - } |
162 | | - |
163 | | - |
164 | | -class GeoJSONSeqWriter(BaseGeoJSONWriter): |
165 | | - def write_feature(self, feature): |
166 | | - self.writer.write(json.dumps(feature, separators=(",", ":"))) |
167 | | - self.writer.write("\n") |
168 | | - |
169 | | - |
170 | | -class GeoJSONWriter(BaseGeoJSONWriter): |
171 | | - def __init__(self, *args, **kwargs): |
172 | | - super().__init__(*args, **kwargs) |
173 | | - self._has_written_feature = False |
174 | | - |
175 | | - self.writer.write('{"type": "FeatureCollection", "features": [\n') |
176 | | - |
177 | | - def write_feature(self, feature): |
178 | | - if self._has_written_feature: |
179 | | - self.writer.write(",\n") |
180 | | - self.writer.write(json.dumps(feature, separators=(",", ":"))) |
181 | | - self._has_written_feature = True |
182 | | - |
183 | | - def finalize(self): |
184 | | - self.writer.write("]}") |
| 72 | + if output_format == "geojson": |
| 73 | + write_geojson(reader, output) |
| 74 | + elif output_format == "geojsonseq": |
| 75 | + write_geojson_lines(reader, output) |
| 76 | + elif output_format == "geoparquet": |
| 77 | + write_parquet(reader, output) |
185 | 78 |
|
186 | 79 |
|
187 | 80 | if __name__ == "__main__": |
|
0 commit comments