Skip to content

Commit 4908455

Browse files
Merge pull request #1199 from OceanParcels/dump_to_zarr
Saving Parcels output directly in zarr format
2 parents 366d470 + 0326f77 commit 4908455

42 files changed

Lines changed: 891 additions & 1230 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

environment_py3_win.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ dependencies:
1919
- scipy>=0.16.0
2020
- tqdm
2121
- xarray>=0.5.1
22-
- dask>=2.0
22+
- dask<=2022.9.0
2323
- cftime>=1.3.1
2424
- ipykernel
2525
- pytest

parcels/collection/collectionaos.py

Lines changed: 24 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from datetime import timedelta as delta
21
from operator import attrgetter # NOQA
32

43
from ctypes import c_void_p
@@ -28,24 +27,6 @@
2827
__all__ = ['ParticleCollectionAOS', 'ParticleCollectionIterableAOS', 'ParticleCollectionIteratorAOS']
2928

3029

31-
def _to_write_particles(pd, time):
32-
"""We don't want to write a particle that is not started yet.
33-
Particle will be written if particle.time is between time-dt/2 and time+dt (/2)
34-
"""
35-
return [i for i, p in enumerate(pd) if (((time - np.abs(p.dt/2) <= p.time < time + np.abs(p.dt))
36-
or (np.isnan(p.dt) and np.equal(time, p.time)))
37-
and np.isfinite(p.id))]
38-
39-
40-
def _is_particle_started_yet(particle, time):
41-
"""We don't want to write a particle that is not started yet.
42-
Particle will be written if:
43-
* particle.time is equal to time argument of pfile.write()
44-
* particle.time is before time (in case particle was deleted between previous export and current one)
45-
"""
46-
return (particle.dt*particle.time <= particle.dt*time or np.isclose(particle.time, time))
47-
48-
4930
def _convert_to_flat_array(var):
5031
"""Convert lists and single integers/floats to one-dimensional numpy arrays
5132
@@ -890,64 +871,33 @@ def cstruct(self):
890871
cstruct = self._data_c.ctypes.data_as(c_void_p)
891872
return cstruct
892873

893-
def toDictionary(self, pfile, time, deleted_only=False):
894-
"""
895-
Convert all Particle data from one time step to a python dictionary.
896-
:param pfile: ParticleFile object requesting the conversion
897-
:param time: Time at which to write ParticleSet
898-
:param deleted_only: Flag to write only the deleted Particles
899-
returns two dictionaries: one for all variables to be written each outputdt,
900-
and one for all variables to be written once
901-
902-
This function depends on the specific collection in question and thus needs to be specified in specific
903-
derivative classes.
874+
def _to_write_particles(self, pd, time):
875+
"""We don't want to write a particle that is not started yet.
876+
Particle will be written if particle.time is between time-dt/2 and time+dt (/2)
904877
"""
905-
data_dict = {}
906-
data_dict_once = {}
907-
908-
time = time.total_seconds() if isinstance(time, delta) else time
878+
return np.array([i for i, p in enumerate(pd) if (((time - np.abs(p.dt/2) <= p.time < time + np.abs(p.dt))
879+
or (np.isnan(p.dt) and np.equal(time, p.time)))
880+
and np.isfinite(p.id))])
909881

910-
indices_to_write = []
911-
if pfile.lasttime_written != time and \
912-
(pfile.write_ondelete is False or deleted_only):
913-
if self._ncount == 0:
914-
logger.warning("ParticleSet is empty on writing as array at time %g" % time)
915-
else:
916-
if deleted_only:
917-
if type(deleted_only) not in [list, np.ndarray] and deleted_only in [True, 1]:
918-
data_states = [p.state for p in self._data]
919-
indices_to_write = np.where(np.isin(data_states, [OperationCode.Delete]))[0]
920-
elif type(deleted_only) in [list, np.ndarray] and len(deleted_only) > 0:
921-
if type(deleted_only[0]) in [int, np.int32, np.uint32]:
922-
indices_to_write = deleted_only
923-
elif isinstance(deleted_only[0], ScipyParticle):
924-
indices_to_write = [i for i, p in self._data if p in deleted_only]
925-
else:
926-
indices_to_write = _to_write_particles(self._data, time)
927-
if len(indices_to_write) > 0:
928-
for var in pfile.var_names:
929-
if 'id' in var:
930-
data_dict[var] = np.array([np.int64(getattr(p, var)) for p in self._data[indices_to_write]])
931-
else:
932-
data_dict[var] = np.array([getattr(p, var) for p in self._data[indices_to_write]])
933-
934-
pset_errs = [p for p in self._data[indices_to_write] if p.state != OperationCode.Delete and abs(time-p.time) > 1e-3 and np.isfinite(p.time)]
935-
for p in pset_errs:
936-
logger.warning_once('time argument in pfile.write() is %g, but a particle has time % g.' % (time, p.time))
937-
938-
if len(pfile.var_names_once) > 0:
939-
# _to_write_particles(self._data, time)
940-
first_write = [p for p in self._data if _is_particle_started_yet(p, time) and (np.int64(p.id) not in pfile.written_once)]
941-
if np.any(first_write):
942-
data_dict_once['id'] = np.array([p.id for p in first_write]).astype(dtype=np.int64)
943-
for var in pfile.var_names_once:
944-
data_dict_once[var] = np.array([getattr(p, var) for p in first_write])
945-
pfile.written_once.extend(np.array(data_dict_once['id']).astype(dtype=np.int64).tolist())
946-
947-
if deleted_only is False:
948-
pfile.lasttime_written = time
882+
def getvardata(self, var, indices=None):
883+
if indices is None:
884+
return np.array([getattr(p, var) for p in self._data])
885+
else:
886+
try:
887+
return np.array([getattr(p, var) for p in self._data[indices]])
888+
except: # Can occur for zero-length ParticleSets
889+
return None
890+
891+
def setvardata(self, var, index, val):
892+
if isinstance(index, (np.int64, int, np.int32)):
893+
setattr(self._data[index], var, val)
894+
else:
895+
for i, v in zip(index, val):
896+
setattr(self._data[i], var, v)
949897

950-
return data_dict, data_dict_once
898+
def setallvardata(self, var, val):
899+
for i in range(len(self._data)):
900+
setattr(self._data[i], var, val)
951901

952902
def toArray(self):
953903
"""

parcels/collection/collections.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -898,19 +898,18 @@ def __getattr__(self, name):
898898
else:
899899
return False
900900

901+
def has_write_once_variables(self):
902+
for var in self.ptype.variables:
903+
if var.to_write == 'once':
904+
return True
905+
return False
906+
901907
@abstractmethod
902-
def toDictionary(self):
903-
"""
904-
Convert all Particle data from one time step to a python dictionary.
905-
:param pfile: ParticleFile object requesting the conversion
906-
:param time: Time at which to write ParticleSet
907-
:param deleted_only: Flag to write only the deleted Particles
908-
returns two dictionaries: one for all variables to be written each outputdt,
909-
and one for all variables to be written once
908+
def getvardata(self, var, indices=None):
909+
pass
910910

911-
This function depends on the specific collection in question and thus needs to be specified in specific
912-
derivatives classes.
913-
"""
911+
@abstractmethod
912+
def setvardata(self, var, index, val):
914913
pass
915914

916915
@abstractmethod

parcels/collection/collectionsoa.py

Lines changed: 21 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from datetime import timedelta as delta
21
from operator import attrgetter
32
from ctypes import Structure, POINTER
43
from bisect import bisect_left
@@ -26,26 +25,6 @@
2625
'See http://oceanparcels.org/#parallel_install for more information')
2726

2827

29-
def _to_write_particles(pd, time):
30-
"""We don't want to write a particle that is not started yet.
31-
Particle will be written if particle.time is between time-dt/2 and time+dt (/2)
32-
"""
33-
return ((np.less_equal(time - np.abs(pd['dt']/2), pd['time'], where=np.isfinite(pd['time']))
34-
& np.greater_equal(time + np.abs(pd['dt'] / 2), pd['time'], where=np.isfinite(pd['time']))
35-
| ((np.isnan(pd['dt'])) & np.equal(time, pd['time'], where=np.isfinite(pd['time']))))
36-
& (np.isfinite(pd['id']))
37-
& (np.isfinite(pd['time'])))
38-
39-
40-
def _is_particle_started_yet(pd, time):
41-
"""We don't want to write a particle that is not started yet.
42-
Particle will be written if:
43-
* particle.time is equal to time argument of pfile.write()
44-
* particle.time is before time (in case particle was deleted between previous export and current one)
45-
"""
46-
return np.less_equal(pd['dt']*pd['time'], pd['dt']*time) | np.isclose(pd['time'], time)
47-
48-
4928
def _convert_to_flat_array(var):
5029
"""Convert lists and single integers/floats to one-dimensional numpy arrays
5130
@@ -153,7 +132,7 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p
153132
self._data['depth'][:] = depth
154133
self._data['time'][:] = time
155134
self._data['id'][:] = pid
156-
self._data['fileid'][:] = -1
135+
self._data['once_written'][:] = 0
157136

158137
# special case for exceptions which can only be handled from scipy
159138
self._data['exception'] = np.empty(self.ncount, dtype=object)
@@ -815,58 +794,30 @@ def flatten_dense_data_array(vname):
815794
cstruct = CParticles(*cdata)
816795
return cstruct
817796

818-
def toDictionary(self, pfile, time, deleted_only=False):
797+
def _to_write_particles(self, pd, time):
798+
"""We don't want to write a particle that is not started yet.
799+
Particle will be written if particle.time is between time-dt/2 and time+dt (/2)
819800
"""
820-
Convert all Particle data from one time step to a python dictionary.
821-
:param pfile: ParticleFile object requesting the conversion
822-
:param time: Time at which to write ParticleSet
823-
:param deleted_only: Flag to write only the deleted Particles
824-
returns two dictionaries: one for all variables to be written each outputdt,
825-
and one for all variables to be written once
801+
return np.where((np.less_equal(time - np.abs(pd['dt'] / 2), pd['time'], where=np.isfinite(pd['time']))
802+
& np.greater_equal(time + np.abs(pd['dt'] / 2), pd['time'], where=np.isfinite(pd['time']))
803+
| ((np.isnan(pd['dt'])) & np.equal(time, pd['time'], where=np.isfinite(pd['time']))))
804+
& (np.isfinite(pd['id']))
805+
& (np.isfinite(pd['time'])))[0]
826806

827-
This function depends on the specific collection in question and thus needs to be specified in specific
828-
derivative classes.
829-
"""
830-
831-
data_dict = {}
832-
data_dict_once = {}
807+
def getvardata(self, var, indices=None):
808+
if indices is None:
809+
return self._data[var]
810+
else:
811+
try:
812+
return self._data[var][indices]
813+
except: # Can occur for zero-length ParticleSets
814+
return None
833815

834-
time = time.total_seconds() if isinstance(time, delta) else time
816+
def setvardata(self, var, index, val):
817+
self._data[var][index] = val
835818

836-
indices_to_write = []
837-
if pfile.lasttime_written != time and \
838-
(pfile.write_ondelete is False or deleted_only is not False):
839-
if self._data['id'].size == 0:
840-
logger.warning("ParticleSet is empty on writing as array at time %g" % time)
841-
else:
842-
if deleted_only is not False:
843-
if type(deleted_only) not in [list, np.ndarray] and deleted_only in [True, 1]:
844-
indices_to_write = np.where(np.isin(self._data['state'],
845-
[OperationCode.Delete]))[0]
846-
elif type(deleted_only) in [list, np.ndarray]:
847-
indices_to_write = deleted_only
848-
else:
849-
indices_to_write = _to_write_particles(self._data, time)
850-
if np.any(indices_to_write):
851-
for var in pfile.var_names:
852-
data_dict[var] = self._data[var][indices_to_write]
853-
854-
pset_errs = ((self._data['state'][indices_to_write] != OperationCode.Delete) & np.greater(np.abs(time - self._data['time'][indices_to_write]), 1e-3, where=np.isfinite(self._data['time'][indices_to_write])))
855-
if np.count_nonzero(pset_errs) > 0:
856-
logger.warning_once('time argument in pfile.write() is {}, but particles have time {}'.format(time, self._data['time'][pset_errs]))
857-
858-
if len(pfile.var_names_once) > 0:
859-
first_write = (_to_write_particles(self._data, time) & _is_particle_started_yet(self._data, time) & np.isin(self._data['id'], pfile.written_once, invert=True))
860-
if np.any(first_write):
861-
data_dict_once['id'] = np.array(self._data['id'][first_write]).astype(dtype=np.int64)
862-
for var in pfile.var_names_once:
863-
data_dict_once[var] = self._data[var][first_write]
864-
pfile.written_once.extend(np.array(self._data['id'][first_write]).astype(dtype=np.int64).tolist())
865-
866-
if deleted_only is False:
867-
pfile.lasttime_written = time
868-
869-
return data_dict, data_dict_once
819+
def setallvardata(self, var, val):
820+
self._data[var][:] = val
870821

871822
def toArray(self):
872823
"""

0 commit comments

Comments
 (0)