-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcustom_processes_force.py
More file actions
148 lines (128 loc) · 5 KB
/
custom_processes_force.py
File metadata and controls
148 lines (128 loc) · 5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import logging
from typing import Dict, Optional, Tuple, Union, List
import pystac
from openeo_driver.ProcessGraphDeserializer import non_standard_process, ProcessSpec, _extract_bbox_extent, _extract_temporal_extent
from openeo_driver.processes import ProcessArgs
from openeo_driver.utils import EvalEnv
from openeo_driver.util.geometry import BoundingBox
from openeogeotrellis.load_stac import _spatiotemporal_extent_from_load_params, construct_item_collection, ItemCollection
logger = logging.getLogger("FORCE_custom_Processes")
logger.info(f"Loading FORCE custom processes from {__file__}")
temporal_extent_schema = {
'type': 'array',
'subtype': 'temporal-interval',
'uniqueItems': True,
'minItems': 2,
'maxItems': 2,
'items': {
'anyOf': [
{
'type': 'string',
'subtype': 'date-time',
'format': 'date-time'
},
{'type': 'string', 'subtype': 'date', 'format': 'date'},
{'type': 'null'}]
}
}
STAC_COLLECTION_URLS = {
"SENTINEL2_L1C": "https://stac.dataspace.copernicus.eu/v1/collections/sentinel-2-l1c"
}
@non_standard_process(
ProcessSpec(
id="query_stac",
description="Returns an item collection"
).param(name="url", description="URL to the stac collection", schema={"type": "string"}, required=True)
.param(name="temporal_extent", description="The date range", schema=temporal_extent_schema, required=True)
.param(name="spatial_extent", description="Area of interest", schema={'type': 'object', 'subtype': 'geojson'}, required=True)
.returns(description="List of outputs", schema={
"type": "array",
"items": {
"type": "string"
}
})
)
def query_stac(args: ProcessArgs, env: EvalEnv):
collection_url = args.get_required(
"url",
)
temporal_extent = None
spatial_extent = None
if "temporal_extent" in args:
temporal_extent = _extract_temporal_extent(
args, field="temporal_extent", process_id="force_query"
)
if "spatial_extent" in args:
spatial_extent = _extract_bbox_extent(
args, field="spatial_extent", process_id="force_query"
)
items = force_query_stac_catalog(
url=collection_url,
spatial_extent=spatial_extent,
temporal_extent=temporal_extent,
)
item_collection_pystac = pystac.ItemCollection(items.items)
return item_collection_pystac.to_dict()
@non_standard_process(
ProcessSpec(
id="force_query",
description="Returns a list of S3 paths"
).param(name="id", description="Collection to load", schema={"type": "string"}, required=True)
.param(name="temporal_extent", description="The date range", schema=temporal_extent_schema, required=True)
.param(name="spatial_extent", description="Area of interest", schema={'type': 'object', 'subtype': 'geojson'}, required=True)
.returns(description="List of outputs", schema={
"type": "array",
"items": {
"type": "string"
}
})
)
def force_query(args: ProcessArgs, env: EvalEnv):
collection_id = args.get_required(
"id",
)
temporal_extent = None
spatial_extent = None
if "temporal_extent" in args:
temporal_extent = _extract_temporal_extent(
args, field="temporal_extent", process_id="force_query"
)
if "spatial_extent" in args:
spatial_extent = _extract_bbox_extent(
args, field="spatial_extent", process_id="force_query"
)
url = STAC_COLLECTION_URLS.get(collection_id)
if url is None:
raise ValueError(f"Unknown collection '{collection_id}'. Known collections are '{STAC_COLLECTION_URLS.keys()}'")
items = force_query_stac_catalog(
url=url,
spatial_extent=spatial_extent,
temporal_extent=temporal_extent,
)
urls = extract_urls_from_item_collection(items)
return urls
def force_query_stac_catalog(
url: str,
spatial_extent: Union[Dict, BoundingBox, None],
temporal_extent: Tuple[Optional[str], Optional[str]],
) -> ItemCollection:
spatiotemporal_extent = _spatiotemporal_extent_from_load_params(
spatial_extent=spatial_extent,
temporal_extent=temporal_extent
)
property_filter_pg_map = None
item_collection, *_tail = construct_item_collection(
url=url,
spatiotemporal_extent=spatiotemporal_extent,
property_filter_pg_map=property_filter_pg_map,
)
logger.info(f"Query to '{url}' with spatial_extent '{spatial_extent}' and temporal_extent '{temporal_extent}'")
return item_collection
def extract_urls_from_item_collection(item_collection: ItemCollection):
items = item_collection.iter_items_with_band_assets()
items_list = list(items) # TODO remove
logger.info(f"{type(items_list[0][0])=}")
items = items_list
links = [l.target for i in items for l in i[0].links if l.rel == "enclosure"]
logger.info(f"Query returned '{len(links)}' results.")
return links