Skip to content
This repository was archived by the owner on Feb 23, 2026. It is now read-only.

Commit 7e24a6a

Browse files
authored
Merge pull request #173 from raphaelrpl/b-0.8
Add minimal support to deal with bitwise mask for Landsat Collection 2 products
2 parents d3f2faf + 57f1f05 commit 7e24a6a

10 files changed

Lines changed: 636 additions & 138 deletions

File tree

CHANGES.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ Changes
2121
=======
2222

2323

24+
Version 0.8.4
25+
-------------
26+
27+
- Add support to generate data cube from Landsat Collection 2 (`#172 <https://github.com/brazil-data-cube/cube-builder/issues/172>`_)
28+
- Add support to combine Landsat Collection 2 sensors (L5/L7, L7/L8, L7/L8/L9) using single collection (`#172 <https://github.com/brazil-data-cube/cube-builder/issues/172>`_)
29+
- Review API error when no parameter is set.
30+
31+
2432
Version 0.8.3 (2022-10-03)
2533
--------------------------
2634

Dockerfile

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,29 @@
1515
# You should have received a copy of the GNU General Public License
1616
# along with this program. If not, see <https://www.gnu.org/licenses/gpl-3.0.html>.
1717
#
18-
19-
FROM python:3.8
18+
ARG GIT_COMMIT
19+
ARG BASE_IMAGE=python:3.8
20+
FROM ${BASE_IMAGE}
2021

2122
ARG GIT_COMMIT
2223

2324
LABEL "org.brazildatacube.maintainer"="Brazil Data Cube <brazildatacube@inpe.br>"
2425
LABEL "org.brazildatacube.title"="Docker image for Data Cube Builder Service"
26+
LABEL "org.brazildatacube.description"="Docker image for Data Cube Builder application."
2527
LABEL "org.brazildatacube.git_commit"="${GIT_COMMIT}"
2628

27-
ADD . /app
29+
# Build arguments
30+
ARG CUBE_BUILDER_VERSION="0.8.4"
31+
ARG CUBE_BUILDER_INSTALL_PATH="/opt/cube-builder/${CUBE_BUILDER_VERSION}"
32+
33+
ADD . ${CUBE_BUILDER_INSTALL_PATH}
2834

29-
WORKDIR /app
35+
WORKDIR ${CUBE_BUILDER_INSTALL_PATH}
3036

3137
RUN python3 -m pip install pip --upgrade setuptools wheel && \
32-
python3 -m pip install -e .[rabbitmq]
38+
python3 -m pip install -e .[rabbitmq] && \
39+
python3 -m pip install gunicorn
40+
41+
EXPOSE 5000
42+
43+
CMD ["gunicorn", "-w4", "--bind=0.0.0.0:5000", "cube_builder:create_app()"]

cube_builder/celery/tasks.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ def warp_merge(activity, band_map, mask, force=False, data_dir=None, **kwargs):
8989
Returns:
9090
Validated activity
9191
"""
92-
logging.warning('Executing merge {} - {}'.format(activity.get('warped_collection_id'), activity['band']))
92+
logging.warning('Executing merge {} - {} - {}'.format(activity.get('warped_collection_id'),
93+
activity['band'],
94+
activity['date']))
9395

9496
record = create_execution(activity)
9597

@@ -117,7 +119,7 @@ def warp_merge(activity, band_map, mask, force=False, data_dir=None, **kwargs):
117119
tile_id, version=version, band=record.band,
118120
prefix=data_dir, composed=False, **kwargs)
119121

120-
if activity['band'] == quality_band and len(activity['args']['datasets']):
122+
if activity['band'] == quality_band and (len(activity['args']['datasets']) or kwargs.get('combined')):
121123
kwargs['build_provenance'] = True
122124

123125
reused = False
@@ -304,6 +306,8 @@ def _is_not_stk(merge):
304306
"""
305307
return merge['band'] == quality_band and composite_function not in ('STK', 'LCF')
306308

309+
platforms = []
310+
307311
for _merge in merges:
308312
# Skip quality generation for MEDIAN, AVG
309313
if _merge['band'] in activities and _merge['args']['date'] in activities[_merge['band']]['scenes'] or \
@@ -341,6 +345,9 @@ def _is_not_stk(merge):
341345

342346
if _merge['args'].get(DATASOURCE_NAME):
343347
activity['scenes'][_merge['args']['date']]['ARDfiles'][DATASOURCE_NAME] = _merge['args'][DATASOURCE_NAME]
348+
if _merge['args'].get('platforms'):
349+
activity.setdefault('platforms', _merge['args'].get('platforms'))
350+
platforms = _merge['args'].get('platforms')
344351

345352
activities[_merge['band']] = activity
346353

@@ -386,9 +393,6 @@ def _is_not_stk(merge):
386393
for idx, date in enumerate(ordered_dates):
387394
activity['scenes'][date]['efficacy'] = weights[idx]
388395

389-
# Prepare list of activities to dispatch
390-
activity_list = list(activities.values())
391-
392396
# For IDENTITY data cube trigger, just publish
393397
if composite_function == 'IDT':
394398
task = publish.s(list(activities.values()), reuse_data_cube=reuse_data_cube, band_map=band_map, **kwargs)
@@ -401,10 +405,10 @@ def _is_not_stk(merge):
401405
# We must keep track of last activity to run
402406
# Since the Clear Observation must only be execute by single process. It is important
403407
# to avoid concurrent processes to write same data set in disk
404-
last_activity = activity_list[-1]
408+
last_activity = activities.pop(quality_band)
405409

406410
# Trigger all except the last
407-
for activity in activity_list[:-1]:
411+
for activity in list(activities.values()):
408412
# TODO: Persist
409413
blends.append(blend.s(activity, band_map, reuse_data_cube=reuse_data_cube, **kwargs))
410414

cube_builder/cli.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def worker(ctx: click.Context):
110110
@click.option('--export-files', type=click.Path(writable=True), help='Export Identity Merges in file')
111111
@with_appcontext
112112
def build(datacube: str, collections: str, tiles: str, start: str, end: str, bands: str = None,
113-
stac_url: str = None, force=False, with_rgb=False, shape=None, export_files=None, **kwargs):
113+
stac_url: str = None, force=False, shape=None, export_files=None, **kwargs):
114114
"""Build data cube through command line.
115115
116116
Args:
@@ -138,7 +138,6 @@ def build(datacube: str, collections: str, tiles: str, start: str, end: str, ban
138138
end_date=end,
139139
tiles=tiles.split(','),
140140
force=force,
141-
with_rgb=with_rgb,
142141
stac_url=stac_url,
143142
**kwargs
144143
)
@@ -175,6 +174,7 @@ def build(datacube: str, collections: str, tiles: str, start: str, end: str, ban
175174
@click.option('--mask', type=click.STRING, help='Custom mask values for data cube.')
176175
@click.option('--quality-band', type=click.STRING, help='Quality band name')
177176
@click.option('--cloud-cover', type=click.FLOAT, help='Cloud Cover Factor. Default is 100 to use all.', default=100)
177+
@click.option('--band-map', type=click.STRING, help='Band mapping when combine sensors. Default is None')
178178
@with_appcontext
179179
def _configure_parameters(datacube: str, **kwargs):
180180
"""Configure the default parameters for data cube.

cube_builder/forms.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class CustomMaskDefinition(Schema):
105105
nodata = fields.Integer(required=True, allow_none=False)
106106
saturated_data = fields.List(fields.Integer, required=False, allow_none=False)
107107
saturated_band = fields.String(required=False, allow_none=False)
108+
bits = fields.Boolean(required=False, allow_none=False, default=False)
108109

109110

110111
class CubeParametersSchema(Schema):
@@ -114,6 +115,7 @@ class CubeParametersSchema(Schema):
114115
reference_day = fields.Integer(required=False, allow_none=False)
115116
histogram_matching = fields.Bool(required=False, allow_none=False)
116117
no_post_process = fields.Bool(required=False, allow_none=False)
118+
band_map = fields.Dict(required=False, allow_none=False)
117119

118120

119121
class DataCubeForm(Schema):

cube_builder/maestro.py

Lines changed: 90 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import json
2323
import logging
2424
import warnings
25+
from collections.abc import Iterable
2526
from contextlib import contextmanager
2627
from copy import deepcopy
2728
from pathlib import Path
@@ -64,6 +65,12 @@ def days_in_month(date):
6465
return td
6566

6667

68+
def _valid_channel_limit(value, expects):
69+
if not isinstance(value, Iterable) or len(value) != expects:
70+
raise ValueError('Invalid type for "channel_limits". '
71+
f'Expects Iterable of {expects} elements, but got "{value}"')
72+
73+
6774
@contextmanager
6875
def timing(description: str) -> None:
6976
"""Measure execution time of context.
@@ -131,10 +138,10 @@ def get_stac(self, collection: str) -> STAC:
131138
When collection not found, search on INPE STAC.
132139
133140
Args:
134-
collection - Collection name to search
141+
collection (str): Collection name to search
135142
136143
Returns:
137-
STAC client
144+
STAC: The STAC which offers the given collection.
138145
"""
139146
if self.properties.get('stac_url'):
140147
return self._stac(collection, self.properties['stac_url'], **self.properties)
@@ -158,7 +165,7 @@ def _stac(self, collection: str, url: str, **kwargs) -> STAC:
158165
url - STAC URL
159166
160167
Returns:
161-
STAC client
168+
STAC: STAC instance
162169
"""
163170
try:
164171
options = dict()
@@ -208,6 +215,13 @@ def orchestrate(self):
208215
# Validate parameters
209216
cube_parameters.validate()
210217

218+
if self.properties.get('channel_limits'):
219+
channel_limits = self.properties['channel_limits']
220+
# Validate the entire signature (Tuple of 3 limit elements)
221+
_valid_channel_limit(channel_limits, expects=3)
222+
# Validate each RGB channel (Tuple of 2 int elements)
223+
_ = [_valid_channel_limit(channel, expects=2) for channel in channel_limits]
224+
211225
# Pass the cube parameters to the data cube functions arguments
212226
props = deepcopy(cube_parameters.metadata_)
213227
props.update(self.properties)
@@ -235,8 +249,13 @@ def orchestrate(self):
235249
self.bands = Band.query().filter(Band.collection_id == self.warped_datacube.id).all()
236250

237251
bands = self.datacube_bands
238-
self.band_map = {b.name: dict(name=b.name, data_type=b.data_type, nodata=b.nodata,
239-
min_value=b.min_value, max_value=b.max_value) for b in bands}
252+
self.band_map = {
253+
b.name: dict(name=b.name, data_type=b.data_type, nodata=b.nodata,
254+
min_value=b.min_value, max_value=b.max_value,
255+
scale=b.scale,
256+
scale_add=b._metadata.get('scale_add') if b._metadata else None)
257+
for b in bands
258+
}
240259

241260
if self.properties.get('reuse_from'):
242261
warnings.warn(
@@ -404,6 +423,13 @@ def dispatch_celery(self):
404423

405424
warped_datacube = self.warped_datacube.name
406425

426+
if self.properties.get('with_rgb'):
427+
input_range = self.properties.get('input_range')
428+
if input_range is None:
429+
raise ValueError('Missing valid range for RGB.')
430+
if type(input_range) not in (list, tuple,):
431+
raise TypeError(f'Invalid input range for RGB. Expects Tuple[int, int], got {type(input_range)}')
432+
407433
quality_band = None
408434
stac_kwargs = self.properties.get('stac_kwargs', dict())
409435
if self.datacube.composite_function.alias != 'IDT':
@@ -509,6 +535,7 @@ def dispatch_celery(self):
509535
srs=grid_crs,
510536
tile_id=tileid,
511537
assets=assets,
538+
platforms=self.platforms,
512539
nodata=float(band.nodata),
513540
bands=band_str_list,
514541
version=self.datacube.version,
@@ -563,6 +590,9 @@ def search_images(self, feature: dict, start: str, end: str, tile_id: str, **kwa
563590

564591
bands = self.datacube_bands
565592

593+
band_mapping = self.properties.get('band_map', dict())
594+
platforms = set()
595+
566596
# Retrieve band definition in dict format.
567597
# TODO: Should we use from STAC?
568598
collection_bands = dict()
@@ -581,6 +611,11 @@ def search_images(self, feature: dict, start: str, end: str, tile_id: str, **kwa
581611
for dataset in self.params['collections']:
582612
options['collections'] = [dataset]
583613
stac = self.get_stac(dataset)
614+
stac_collection = stac.collection(dataset)
615+
if stac_collection.get('summaries') and stac_collection['summaries'].get('platform'):
616+
platforms = platforms.union(set(stac_collection['summaries'].get('platform')))
617+
elif stac_collection.properties.get('platform'):
618+
platforms = platforms.union(set(stac_collection.properties.get('platform')))
584619

585620
token = ''
586621

@@ -593,21 +628,40 @@ def search_images(self, feature: dict, start: str, end: str, tile_id: str, **kwa
593628
for feature in items['features']:
594629
if feature['type'] == 'Feature':
595630
date = feature['properties']['datetime'][0:10]
596-
identifier = feature['id']
597631
stac_bands = feature['properties'].get('eo:bands', [])
632+
identifier = feature['id']
633+
# TODO: Add handler to deal with parse result serializer.
634+
platform = feature['properties'].get('platform')
635+
if stac.url.startswith('https://landsatlook.usgs.gov'):
636+
# Remove last SR sentence.
637+
identifier = f'{identifier[:-3]}{identifier[-3:].replace("_SR", "")}'
638+
# Special treatment for missing/invalid platform values
639+
if (platform is None or isinstance(platform, list)) and _is_landsat(identifier):
640+
platform = _detect_landsat_platform(identifier)
598641

599642
for band in bands:
600643
band_name_href = band.name
644+
645+
if platform and band_mapping:
646+
platforms.add(platform)
647+
if platform not in band_mapping or not band_mapping[platform].get(band_name_href):
648+
continue
649+
650+
band_name_href = band_mapping[platform].get(band_name_href)
651+
601652
if 'CBERS' in dataset and band.common_name not in ('evi', 'ndvi'):
602653
band_name_href = band.common_name
603654

604655
elif band.name not in feature['assets']:
605-
if f'sr_{band.name}' not in feature['assets']:
656+
# TODO: Implement asset resolver
657+
if f'{band_name_href}.TIF' in feature['assets']:
658+
band_name_href = f'{band_name_href}.TIF'
659+
elif f'sr_{band_name_href}' not in feature['assets']:
606660
continue
607661
else:
608662
band_name_href = f'sr_{band.name}'
609663

610-
feature_band = list(filter(lambda b: b['name'] == band_name_href,stac_bands))
664+
feature_band = list(filter(lambda b: b['name'] == band_name_href, stac_bands))
611665
feature_band = feature_band[0] if len(feature_band) > 0 else dict()
612666

613667
scenes[band.name].setdefault(date, dict())
@@ -620,6 +674,9 @@ def search_images(self, feature: dict, start: str, end: str, tile_id: str, **kwa
620674
scene['sceneid'] = identifier
621675
scene['band'] = band.name
622676
scene['dataset'] = dataset
677+
scene['platform'] = platform
678+
if band_mapping:
679+
scene['original_band_name'] = band_name_href
623680

624681
link = link.replace(Config.CBERS_SOURCE_URL_PREFIX, Config.CBERS_TARGET_URL_PREFIX)
625682

@@ -634,4 +691,29 @@ def search_images(self, feature: dict, start: str, end: str, tile_id: str, **kwa
634691
scenes[band.name][date].setdefault(dataset, [])
635692
scenes[band.name][date][dataset].append(scene)
636693

694+
self.platforms = sorted(list(platforms))
695+
637696
return scenes
697+
698+
699+
def _is_landsat(identifier: str) -> bool:
700+
try:
701+
_parse_landsat(identifier)
702+
return True
703+
except ValueError:
704+
return False
705+
706+
707+
def _parse_landsat(identifier: str) -> List[str]:
708+
entities = identifier.split('_')
709+
if len(entities) != 7 or not entities[0].startswith('L') or not entities[0][-2:].isnumeric():
710+
raise ValueError(f'Invalid landsat scene {identifier}')
711+
return entities
712+
713+
714+
def _detect_landsat_platform(identifier: str):
715+
entities = _parse_landsat(identifier)
716+
satellite = int(entities[0][-2:])
717+
for value in [4, 5, 7, 8, 9]:
718+
if satellite == value:
719+
return f'Landsat-{value}'

0 commit comments

Comments
 (0)