77import glob
88import logging
99import os
10+ import warnings
1011
1112import numpy as np
1213import polars as pl
@@ -280,7 +281,23 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
280281 if not files :
281282 _log .warning (f'No *gli*.parquet files found in { indir } ' )
282283 return False
283- gli = pl .read_parquet (indir + '/*.gli.sub.*.parquet' )
284+ # lets figure out what columns to read for situations where the number of columns changes:
285+ gli0 = pl .read_parquet (files [0 ])
286+ gli1 = pl .read_parquet (files [- 1 ])
287+ columns = list (set (gli0 .columns ) & set (gli1 .columns ))
288+ columns = set ([c for c in columns if len (c ) > 0 ])
289+ dfs = []
290+ for f in files :
291+ df = pl .read_parquet (f )
292+ missing = columns - set (df .columns )
293+ for col in missing :
294+ df = df .with_columns (pl .lit (None ).cast (pl .Float64 ).alias (col ))
295+
296+ dfs .append (df )
297+ gli = pl .concat (dfs , rechunk = True )
298+
299+ #gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet', columns=columns,
300+ # missing_columns='insert')
284301 gli = drop_pre_1971_samples (gli )
285302 gli .write_parquet (outgli )
286303 _log .info (f'Done writing { outgli } ' )
@@ -290,7 +307,22 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
290307 if not files :
291308 _log .warning (f'No *{ kind } *.parquet files found in { indir } ' )
292309 return False
293- pld = pl .read_parquet (indir + '/*.pld1.' + kind + '.*.parquet' )
310+ gli0 = pl .read_parquet (files [0 ])
311+ gli1 = pl .read_parquet (files [- 1 ])
312+ columns = list (set (gli0 .columns ) | set (gli1 .columns ))
313+ columns = set ([c for c in columns if len (c ) > 0 ])
314+ dfs = []
315+ for f in files :
316+ df = pl .read_parquet (f )
317+ missing = columns - set (df .columns )
318+ for col in missing :
319+ df = df .with_columns (pl .lit (None ).cast (pl .Float64 ).alias (col ))
320+
321+ dfs .append (df .select (sorted (columns )))
322+ pld = pl .concat (dfs , rechunk = True )
323+
324+ # pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet',
325+ # missing_columns='insert', columns=columns)
294326 pld = drop_pre_1971_samples (pld )
295327 pld .write_parquet (outpld )
296328
@@ -338,6 +370,29 @@ def _remove_fill_values(df, fill_value=9999):
338370 return df
339371
340372
373+ def _forward_fill (gli , todo = 'Lat' ):
374+ """Forward-fill the specified column (todo) to propagate the last good value at each row."""
375+ gli = gli .with_columns ([
376+ pl .col (todo ).fill_null (strategy = "forward" ).alias ("temp_fill" )
377+ ])
378+ gli = gli .with_columns ([
379+ pl .when (
380+ (pl .col (todo ) == pl .col ("temp_fill" ).shift (1 )) & pl .col (todo ).is_not_null ()
381+ ).then (np .nan ).otherwise (pl .col (todo )).alias (todo )
382+ ])
383+ gli = gli .drop ("temp_fill" )
384+ return gli
385+
386+
387+ def _drop_if (gli , todo = 'Lat' , condit = 'DeadReckoning' , value = 1 ):
388+ """Drop Lat if DeadReckoning is 1"""
389+ gli = gli .with_columns ([
390+ pl .when (pl .col (condit ) == value ).then (np .nan ).otherwise (pl .col (todo )).alias (todo )
391+ ])
392+ return gli
393+
394+
395+
341396def raw_to_timeseries (
342397 indir ,
343398 outdir ,
@@ -348,12 +403,66 @@ def raw_to_timeseries(
348403 maxgap = 10 ,
349404 interpolate = False ,
350405 fnamesuffix = '' ,
406+ deadreckon = False ,
407+ replace_attrs = None
351408):
352409 """
353- A little different than above, for the 4-file version of the data set.
410+ Convert raw seaexplorer data to a timeseries netcdf file.
411+
412+ Parameters
413+ ----------
414+ indir : str
415+ Directory with the raw files are kept.
416+
417+ outdir : str
418+ Directory to write the matching ``*.nc`` files.
419+
420+ deploymentyaml : str
421+ YAML text file with deployment information for this glider.
422+
423+ kind : 'raw' or 'sub'
424+ The type of data to process. 'raw' is the full resolution data, 'sub'
425+ is the sub-sampled data. The default is 'raw'. Note that realtime data is
426+ typically sub-sampled.
427+
428+ profile_filt_time : float
429+ Time in seconds to use for filtering the profiles. Default is 100.
430+
431+ profile_min_time : float
432+ Minimum time in seconds for a profile to be considered a valid profile.
433+ Default is 300.
434+
435+ maxgap : float
436+ Maximum gap in seconds to interpolate over. Default is 10.
437+
438+ interpolate : bool
439+ If *True*, interpolate the data to fill in gaps. Default is False.
440+
441+ fnamesuffix : str
442+ Suffix to add to the output file name. Default is ''.
443+
444+ deadreckon : bool
445+ If *True* use the dead reckoning latitude and longitude data from the glider. Default
446+ is *False*, and latitude and longitude are linearly interpolated between surface fixes.
447+ *False* is the default, and recommended to avoid a-physical underwater jumps.
448+
449+ replace_attrs : dict or None
450+ replace global attributes in the metadata after reading the metadata
451+ file in. Helpful when processing runs with only a couple things that
452+ change.
453+
454+
455+ Returns
456+ -------
457+ outname : str
458+ Name of the output netcdf file.
459+
354460 """
355461
356462 deployment = utils ._get_deployment (deploymentyaml )
463+ if replace_attrs :
464+ for att in replace_attrs :
465+ deployment ['metadata' ][att ] = replace_attrs [att ]
357466
358467 metadata = deployment ['metadata' ]
359468 ncvar = deployment ['netcdf_variables' ]
@@ -365,6 +474,25 @@ def raw_to_timeseries(
365474 sensor = pl .read_parquet (f'{ indir } /{ id } -{ kind } pld.parquet' )
366475 sensor = _remove_fill_values (sensor )
367476
477+ # don't use lat/lon if deadreckoned:
478+ if not deadreckon :
479+ if not ncvar ['latitude' ]['source' ] == 'Lat' :
480+ warnings .warn ("For deadreckon=False, it is suggested to use 'Lat' as the source for latitude." )
481+ if not ncvar ['longitude' ]['source' ] == 'Lon' :
482+ warnings .warn ("For deadreckon=False, it is suggested to use 'Lon' as the source for longitude." )
483+ if 'DeadReckoning' in gli .columns :
484+ _log .info ('Not using deadreckoning; glider has DeadReckoning column' )
485+ gli = _drop_if (gli , todo = 'Lat' , condit = 'DeadReckoning' , value = 1 )
486+ gli = _drop_if (gli , todo = 'Lon' , condit = 'DeadReckoning' , value = 1 )
487+ else :
488+ _log .info ('Not using deadreckoning; glider does not have DeadReckoning column' )
489+ gli = _drop_if (gli , todo = 'Lat' , condit = 'NavState' , value = 116 )
490+ gli = _drop_if (gli , todo = 'Lon' , condit = 'NavState' , value = 116 )
491+ # drop a lat/lon if it is not unique. Happens when there
492+ # are stale fixes.
493+ gli = _forward_fill (gli , todo = 'Lat' )
494+ gli = _forward_fill (gli , todo = 'Lon' )
495+
368496 # build a new data set based on info in `deploymentyaml.`
369497 # We will use ctd as the interpolant
370498 ds = xr .Dataset ()
0 commit comments