era5_rechunking: add script to rechunk and concatenate ERA5 forcing files#131
era5_rechunking: add script to rechunk and concatenate ERA5 forcing files#131ezhilsabareesh8 wants to merge 8 commits into
Conversation
anton-seaice
left a comment
There was a problem hiding this comment.
This appears somewhat complicated
- are using xarray (with Dask) or nco/ncks not good options ?
- do we need this level of configuration in input options ?
|
I was thinking something like this: is that too slow ? |
Changed to xarray in this commit 4b0efbf, it is taking 3 minutes per variable now |
anton-seaice
left a comment
There was a problem hiding this comment.
ill get back to this tomorrow
| #PBS -l storage=gdata/tm70+gdata/xp65+gdata/rt52 | ||
| #PBS -l wd | ||
| #PBS -j oe | ||
| #PBS -o /g/data/tm70/ek4684/era5_rechunked_1h/logs/yearly/ |
There was a problem hiding this comment.
for sharability, probably best not to set this
| #PBS -o /g/data/tm70/ek4684/era5_rechunked_1h/logs/yearly/ |
| # ── user configuration ───────────────────────────────────────────────────────── | ||
| # Directory where yearly rechunked files will be written: | ||
| # {OUTPUT_DIR}/{stream}/{stream}_era5_oper_sfc_{YYYYMMDD}-{YYYYMMDD}.nc | ||
| OUTPUT_DIR="/g/data/tm70/ek4684/era5_rechunked_1h_yearly" |
There was a problem hiding this comment.
| OUTPUT_DIR="/g/data/tm70/ek4684/era5_rechunked_1h_yearly" | |
| OUTPUT_DIR="/g/data/${PROJECT}/${USER}/era5_rechunked_1h_yearly" |
or similar might me more shareable
There was a problem hiding this comment.
I think put it on /scratch rather than /g/data/ ?
There was a problem hiding this comment.
I changed the default submit-script output path to /scratch/${PROJECT}/${USER}/era5_rechunked_1h_yearly
| @@ -0,0 +1,31 @@ | |||
| #!/usr/bin/bash | |||
| # Copyright 2025 ACCESS-NRI and contributors. See the top-level COPYRIGHT file for details. | |||
There was a problem hiding this comment.
| # Copyright 2025 ACCESS-NRI and contributors. See the top-level COPYRIGHT file for details. | |
| # Copyright 2026 ACCESS-NRI and contributors. See the top-level COPYRIGHT file for details. |
There was a problem hiding this comment.
is submit_era5_yearly_rechuncked.sh a clearer filename ? It's consistent with https://github.com/ACCESS-NRI/om3-scripts/blob/main/external_tidal_generation/submit_bottom_roughness.sh then ?
There was a problem hiding this comment.
Yes, agreed. I renamed it to submit_era5_yearly_rechunked.sh for consistency with the other submit scripts
| module use /g/data/xp65/public/modules | ||
| module load conda/analysis3-26.02 | ||
|
|
||
| python3 "${SCRIPT_DIR}/make_era5_yearly_rechunked.py" \ |
There was a problem hiding this comment.
SCRIPT_DIR is only used in one place, so may as just use PBS_O_WORKDIR directly here
There was a problem hiding this comment.
Removed SCRIPT_DIR and used ${PBS_O_WORKDIR} directly
| @@ -0,0 +1,372 @@ | |||
| #!/usr/bin/env python3 | |||
| # Copyright 2025 ACCESS-NRI and contributors. See the top-level COPYRIGHT file for details. | |||
There was a problem hiding this comment.
| # Copyright 2025 ACCESS-NRI and contributors. See the top-level COPYRIGHT file for details. | |
| # Copyright 2026 ACCESS-NRI and contributors. See the top-level COPYRIGHT file for details. |
| m0 = re.search(r"(\d{8})-(\d{8})", source_files[0].name) | ||
| ml = re.search(r"(\d{8})-(\d{8})", source_files[-1].name) | ||
| start = m0.group(1) if m0 else f"{year}0101" | ||
| end = ml.group(2) if ml else f"{year}1231" |
There was a problem hiding this comment.
end = ml.group(2) if ml else f"{year}1231"
In what case does this else statement get reached ?
There was a problem hiding this comment.
That fallback was defensive for filenames that did not match the expected YYYYMMDD-YYYYMMDD pattern, but it is no longer needed because the output dates now come from the dataset time coordinate.
| logging.error(f"No .nc files in {source_dir}") | ||
| return False | ||
|
|
||
| # Derive output filename from first/last source file date stamps |
There was a problem hiding this comment.
| # Derive output filename from first/last source file date stamps | |
| # In case this year is a partially year, find first/last source file date stamps |
| m0 = re.search(r"(\d{8})-(\d{8})", source_files[0].name) | ||
| ml = re.search(r"(\d{8})-(\d{8})", source_files[-1].name) | ||
| start = m0.group(1) if m0 else f"{year}0101" | ||
| end = ml.group(2) if ml else f"{year}1231" |
There was a problem hiding this comment.
Why are we getting dates from the filenames rather than the data ?
e.g. would
start = str(ds.time.min().dt.strftime("%Y%m%d").values)
end = str(ds.time.max().dt.strftime("%Y%m%d").values)
be better ?
There was a problem hiding this comment.
Yes, using the data is better. I changed this to derive start and end from ds.time.min() and ds.time.max(), so partial-year files reflect their actual time coverage instead of relying on filename parsing
|
@anton-seaice Test output of the latest commit is here |
anton-seaice
left a comment
There was a problem hiding this comment.
This didn't work for me (using the submission script).
Requesting one node on sapphirerapids and then only running 12 workers at once was ok - but i don't really undersand why it needs that much RAM to succeed
| args = parser.parse_args() | ||
|
|
||
| # ── single-task mode ────────────────────────────────────────────────────── | ||
| if args.stream and args.year: |
There was a problem hiding this comment.
Do we need this section?
Can args.stream be a list of streams, and args.year be a range of years to process? And then just treat them the same as "multi-task mode"
| if args.workers == 1: | ||
| for task in tasks: | ||
| stream, year, rc, _ = _subprocess_task(task) | ||
| if rc != 0: | ||
| failures.append((stream, year)) |
There was a problem hiding this comment.
for task in tasks:
stream, year, rc, _ = _subprocess_task(task)
if rc != 0:
failures.append((stream, year))
Do we need this?
It appears to work fine to comment this out and use multiprocessing.Pool(args.workers) with 1 worker
| ALL_STREAMS = [ | ||
| "10u", | ||
| "10v", | ||
| "2d", | ||
| "2t", | ||
| "ci", | ||
| "cp", | ||
| "csf", | ||
| "lsf", | ||
| "lsp", | ||
| "msl", | ||
| "ssr", | ||
| "ssrd", | ||
| "strd", | ||
| ] | ||
|
|
||
| STREAM_TO_VARNAME = { | ||
| "10u": "u10", | ||
| "10v": "v10", | ||
| "2d": "d2m", | ||
| "2t": "t2m", | ||
| "ci": "siconc", | ||
| "cp": "cp", | ||
| "csf": "csf", | ||
| "lsf": "lsf", | ||
| "lsp": "lsp", | ||
| "msl": "msl", | ||
| "ssr": "ssr", | ||
| "ssrd": "ssrd", | ||
| "strd": "strd", | ||
| } | ||
|
|
There was a problem hiding this comment.
| ALL_STREAMS = [ | |
| "10u", | |
| "10v", | |
| "2d", | |
| "2t", | |
| "ci", | |
| "cp", | |
| "csf", | |
| "lsf", | |
| "lsp", | |
| "msl", | |
| "ssr", | |
| "ssrd", | |
| "strd", | |
| ] | |
| STREAM_TO_VARNAME = { | |
| "10u": "u10", | |
| "10v": "v10", | |
| "2d": "d2m", | |
| "2t": "t2m", | |
| "ci": "siconc", | |
| "cp": "cp", | |
| "csf": "csf", | |
| "lsf": "lsf", | |
| "lsp": "lsp", | |
| "msl": "msl", | |
| "ssr": "ssr", | |
| "ssrd": "ssrd", | |
| "strd": "strd", | |
| } | |
| # Streams used by ERA5 datamode in OM3 | |
| # Stream : variable name mapping in ERA5 data | |
| STREAM_TO_VARNAME = { | |
| "10u": "u10", | |
| "10v": "v10", | |
| "2d": "d2m", | |
| "2t": "t2m", | |
| "ci": "siconc", | |
| "cp": "cp", | |
| "csf": "csf", | |
| "lsf": "lsf", | |
| "lsp": "lsp", | |
| "msl": "msl", | |
| "ssr": "ssr", | |
| "ssrd": "ssrd", | |
| "strd": "strd", | |
| } | |
| ALL_STREAMS = list(STREAM_TO_VARNAME.keys()) | |
| "10v": "v10", | ||
| "2d": "d2m", | ||
| "2t": "t2m", | ||
| "ci": "siconc", |
There was a problem hiding this comment.
why is siconc in this list ?
| ) | ||
|
|
||
| print(f"Tasks to run: {len(tasks)}") | ||
| print(f"Tasks skipped: {skipped} (output already exists)") |
There was a problem hiding this comment.
| print(f"Tasks skipped: {skipped} (output already exists)") | |
| if not args.overwrite: | |
| print(f"Tasks skipped: {skipped} (output already exists)") |
| status = "OK" if result.returncode == 0 else "FAIL" | ||
| if result.returncode != 0: | ||
| tail = (result.stderr or result.stdout or "").strip()[-500:] | ||
| print(f"[{status}] {stream}/{year} ({elapsed:.0f}s)\n {tail}", flush=True) |
There was a problem hiding this comment.
When it fails it didn't give enough useful information to understand why:
This is the log extract
[FAIL] 10u/1954 (161s)
/g/data/tm70/as2285/making_inputs/om3-scripts/scripts_common.py:118: UserWarning: /g/data/tm70/as2285/making_inputs/om3-scripts/era5_rechunking/make_era5_yearly_rechunked.py contains uncommitted changes! Commit and push your changes before generating any production output.
warn(
| [str(f) for f in source_files], | ||
| combine="by_coords", | ||
| mask_and_scale=False, | ||
| chunks={"time": 744, "latitude": 721, "longitude": 1440}, |
There was a problem hiding this comment.
There isn't enough RAM in the qsub job to do this operation - it would need >10GB per task. by the time each whole file is uncompressed.
There was a problem hiding this comment.
Not really sure what to suggest
|
|
||
| # Provenance | ||
| this_file = os.path.normpath(__file__) | ||
| now_iso = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") |
There was a problem hiding this comment.
| now_iso = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") | |
| date_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") |
we don't normally use utc time for stuff
| def _output_exists(stream, year, output_base): | ||
| return bool( | ||
| glob.glob( | ||
| os.path.join(output_base, stream, f"{stream}_era5_oper_sfc_{year}*-*.nc") |
There was a problem hiding this comment.
This method a bit dangerous because it doesn't detect if the file is complete or not, but i dont have a better suggestion
Adds
era5_rechunking/with a Python script and PBS script to rechunk ERA5 single-level monthly files from thert52archive into yearly files optimised for ACCESS-OM3 / DATM.Related issue ACCESS-NRI/access-om3-configs#1381
What it does:
Test output is here
/g/data/tm70/ek4684/era5_rechunk_test/