33from dask import utils as da_utils
44import numpy as np
55import xarray as xr
6+ from netCDF4 import Dataset as ncDataset
67
78import datetime
89import math
@@ -188,7 +189,12 @@ def __init__(self, *args, **kwargs):
188189
189190
190191class DaskFileBuffer (NetcdfFileBuffer ):
191- _static_name_map = ['time' , 'depth' , 'lat' , 'lon' ]
192+ _static_name_maps = {'time' : ['time' , 'time_count' , 'time_counter' , 'timer_count' , 't' ],
193+ 'depth' : ['depth' , 'depthu' , 'depthv' , 'depthw' , 'depths' , 'deptht' , 'depthx' , 'depthy' ,
194+ 'depthz' , 'z' , 'z_u' , 'z_v' , 'z_w' , 'd' , 'k' , 'w_dep' , 'w_deps' , 'Z' , 'Zp1' ,
195+ 'Zl' , 'Zu' , 'level' ],
196+ 'lat' : ['lat' , 'nav_lat' , 'y' , 'latitude' , 'la' , 'lt' , 'j' , 'YC' , 'YG' ],
197+ 'lon' : ['lon' , 'nav_lon' , 'x' , 'longitude' , 'lo' , 'ln' , 'i' , 'XC' , 'XG' ]}
192198 _min_dim_chunksize = 16
193199
194200 """ Class that encapsulates and manages deferred access to file data. """
@@ -267,6 +273,45 @@ def close(self):
267273 self .chunking_finalized = False
268274 self .chunk_mapping = None
269275
276+ @classmethod
277+ def add_to_dimension_name_map_global (self , name_map ):
278+ """
279+ [externally callable]
280+ This function adds entries to the name map from parcels_dim -> netcdf_dim. This is required if you want to
281+ use auto-chunking on large fields whose map parameters are not defined. This function must be called before
282+ entering the filebuffer object. Example:
283+ DaskFileBuffer.add_to_dimension_name_map_global({'lat': 'nydim',
284+ 'lon': 'nxdim',
285+ 'time': 'ntdim',
286+ 'depth': 'nddim'})
287+ fieldset = FieldSet(..., chunksize='auto')
288+ [...]
289+ Note that not all parcels dimensions need to be present in 'name_map'.
290+ """
291+ assert isinstance (name_map , dict )
292+ for pcls_dim_name in name_map .keys ():
293+ if isinstance (name_map [pcls_dim_name ], list ):
294+ for nc_dim_name in name_map [pcls_dim_name ]:
295+ self ._static_name_maps [pcls_dim_name ].append (nc_dim_name )
296+ elif isinstance (name_map [pcls_dim_name ], str ):
297+ self ._static_name_maps [pcls_dim_name ].append (name_map [pcls_dim_name ])
298+
299+ def add_to_dimension_name_map (self , name_map ):
300+ """
301+ [externally callable]
302+ This function adds entries to the name map from parcels_dim -> netcdf_dim. This is required if you want to
303+ use auto-chunking on large fields whose map parameters are not defined. This function must be called after
304+ constructing an filebuffer object and before entering the filebuffer. Example:
305+ fb = DaskFileBuffer(...)
306+ fb.add_to_dimension_name_map({'lat': 'nydim', 'lon': 'nxdim', 'time': 'ntdim', 'depth': 'nddim'})
307+ with fb:
308+ [do_stuff}
309+ Note that not all parcels dimensions need to be present in 'name_map'.
310+ """
311+ assert isinstance (name_map , dict )
312+ for pcls_dim_name in name_map .keys ():
313+ self ._static_name_maps [pcls_dim_name ].append (name_map [pcls_dim_name ])
314+
270315 def _get_available_dims_indices_by_request (self ):
271316 """
272317 [private function - not to be called from outside the class]
@@ -278,7 +323,7 @@ def _get_available_dims_indices_by_request(self):
278323 neg_offset = 0
279324 tpl_offset = 0
280325 for name in ['time' , 'depth' , 'lat' , 'lon' ]:
281- i = self ._static_name_map .index (name )
326+ i = list ( self ._static_name_maps . keys ()) .index (name )
282327 if (name not in self .dimensions ):
283328 result [name ] = None
284329 tpl_offset += 1
@@ -300,7 +345,32 @@ def _get_available_dims_indices_by_namemap(self):
300345 """
301346 result = {}
302347 for name in ['time' , 'depth' , 'lat' , 'lon' ]:
303- result [name ] = self ._static_name_map .index (name )
348+ result [name ] = list (self ._static_name_maps .keys ()).index (name )
349+ return result
350+
351+ def _get_available_dims_indices_by_netcdf_file (self ):
352+ """
353+ [private function - not to be called from outside the class]
354+ [File needs to be open (i.e. self.dataset is not None) for this to work - otherwise generating an error]
355+ Returns a dict mapping 'parcels_dimname' -> [None, int32_index_data_array].
356+ This dictionary is based on the information provided by the requested dimensions.
357+ Example: {'time': 0, 'depth': 5, 'lat': 3, 'lon': 1}
358+ for NetCDF with dimensions:
359+ timer: 1
360+ x: [0 4000]
361+ xr: [0 3999]
362+ y: [0 2140]
363+ yr: [0 2139]
364+ z: [0 75]
365+ """
366+ if self .dataset is None :
367+ raise IOError ("Trying to parse NetCDF header information before opening the file." )
368+ result = {}
369+ for pcls_dimname in ['time' , 'depth' , 'lat' , 'lon' ]:
370+ for nc_dimname in self ._static_name_maps [pcls_dimname ]:
371+ if nc_dimname not in self .dataset .dims .keys ():
372+ continue
373+ result [pcls_dimname ] = list (self .dataset .dims .keys ()).index (nc_dimname )
304374 return result
305375
306376 def _is_dimension_available (self , dimension_name ):
@@ -346,6 +416,14 @@ def _is_dimension_in_dataset(self, parcels_dimension_name, netcdf_dimension_name
346416 if netcdf_dimension_name is not None and netcdf_dimension_name in self .dataset .dims .keys ():
347417 value = self .dataset .dims [netcdf_dimension_name ]
348418 k , dname , dvalue = i , netcdf_dimension_name , value
419+ elif self .dimensions is None or self .dataset is None :
420+ return k , dname , dvalue
421+ else :
422+ for name in self ._static_name_maps [dimension_name ]:
423+ if name in self .dataset .dims :
424+ value = self .dataset .dims [name ]
425+ k , dname , dvalue = i , name , value
426+ break
349427 return k , dname , dvalue
350428
351429 def _is_dimension_in_chunksize_request (self , parcels_dimension_name ):
@@ -467,6 +545,53 @@ def _get_initial_chunk_dictionary_by_dict_(self):
467545 self .chunksize .pop ('lon' )
468546 return chunk_dict , chunk_index_map
469547
548+ def _failsafe_parse_ (self ):
549+ """
550+ [private function - not to be called from outside the class]
551+ ['name' need to be initialised]
552+ """
553+ # ==== fail - open it as a normal array and deduce the dimensions from the variable-function names ==== #
554+ # ==== done by parsing ALL variables in the NetCDF, and comparing their call-parameters with the ==== #
555+ # ==== name map available here. ==== #
556+ init_chunk_dict = {}
557+ self .dataset = ncDataset (str (self .filename ))
558+ refdims = self .dataset .dimensions .keys ()
559+ max_field = ""
560+ max_dim_names = ()
561+ max_coincide_dims = 0
562+ for vname in self .dataset .variables :
563+ var = self .dataset .variables [vname ]
564+ coincide_dims = []
565+ for vdname in var .dimensions :
566+ if vdname in refdims :
567+ coincide_dims .append (vdname )
568+ n_coincide_dims = len (coincide_dims )
569+ if n_coincide_dims > max_coincide_dims :
570+ max_field = vname
571+ max_dim_names = tuple (coincide_dims )
572+ max_coincide_dims = n_coincide_dims
573+ self .name = max_field
574+ for nc_dname in max_dim_names :
575+ pcls_dname = None
576+ for dname in self ._static_name_maps .keys ():
577+ if nc_dname in self ._static_name_maps [dname ]:
578+ pcls_dname = dname
579+ break
580+ nc_dimsize = None
581+ pcls_dim_chunksize = None
582+ if pcls_dname is not None and pcls_dname in self .dimensions :
583+ pcls_dim_chunksize = self ._min_dim_chunksize
584+ if isinstance (self .chunksize , dict ) and pcls_dname is not None :
585+ nc_dimsize = self .dataset .dimensions [nc_dname ].size
586+ if pcls_dname in self .chunksize .keys ():
587+ pcls_dim_chunksize = self .chunksize [pcls_dname ][1 ]
588+ if pcls_dname is not None and nc_dname is not None and nc_dimsize is not None and pcls_dim_chunksize is not None :
589+ init_chunk_dict [nc_dname ] = pcls_dim_chunksize
590+
591+ # ==== because in this case it has shown that the requested chunksize setup cannot be used, ==== #
592+ # ==== replace the requested chunksize with this auto-derived version. ==== #
593+ return init_chunk_dict
594+
470595 def _get_initial_chunk_dictionary (self ):
471596 """
472597 [private function - not to be called from outside the class]
@@ -532,8 +657,10 @@ def _get_initial_chunk_dictionary(self):
532657 except :
533658 logger .warning ("Chunking with init_chunk_dict = {} failed - Executing Dask chunking 'failsafe'..." .format (init_chunk_dict ))
534659 self .autochunkingfailed = True
535- self .dataset .close ()
536- raise DaskChunkingError (self .__class__ .__name__ , "No correct mapping found between Parcels- and NetCDF dimensions! Please correct the 'FieldSet(..., chunksize={...})' parameter and try again." )
660+ if not self .autochunkingfailed :
661+ init_chunk_dict = self ._failsafe_parse_ ()
662+ if isinstance (self .chunksize , dict ):
663+ self .chunksize = init_chunk_dict
537664 finally :
538665 self .dataset .close ()
539666 self .chunk_mapping = init_chunk_map
@@ -572,8 +699,6 @@ def data_access(self):
572699 self .rechunk_callback_fields ()
573700 self .chunking_finalized = True
574701 else :
575- if not self .autochunkingfailed :
576- data = data .rechunk (self .chunk_mapping )
577702 self .chunking_finalized = True
578703 else :
579704 da_data = da .from_array (data , chunks = self .chunksize )
0 commit comments