22from __future__ import annotations
33
44import json
5+ import multiprocessing as mp
56from multiprocessing .sharedctypes import Synchronized
67import re
7- from typing import List , Literal
8+ from typing import Literal
89
910import h5py
1011import h5py .h5o
2122
2223def rtdc_copy (src_h5file : h5py .Group ,
2324 dst_h5file : h5py .Group ,
24- features : List [str ] | Literal ['all' , 'scalar' , 'none' ] = "all" ,
25+ features : list [str ] | Literal ['all' , 'scalar' , 'none' ] = "all" ,
2526 include_basins : bool = True ,
2627 include_logs : bool = True ,
2728 include_tables : bool = True ,
2829 meta_prefix : str = "" ,
29- bytes_total : Synchronized [int ] = None ,
30- bytes_written : Synchronized [int ] = None ,
30+ bytes_total : Synchronized [int ] | None = None ,
31+ bytes_written : Synchronized [int ] | None = None ,
3132 ):
3233 """Create a compressed copy of an RT-DC file
3334
@@ -63,14 +64,11 @@ def rtdc_copy(src_h5file: h5py.Group,
6364
6465 # identify features in source file
6566 if "events" in src_h5file :
66- events_src = list (src_h5file ["events" ].keys ())
67+ events_src = (list (src_h5file .get ("events" , {}).keys ())
68+ + list (src_h5file .get ("basin_events" , {}).keys ()))
6769 else :
6870 events_src = []
6971
70- if include_basins and "basin_events" in src_h5file :
71- events_src += list (src_h5file ["basin_events" ].keys ())
72- events_src = sorted (set (events_src ))
73-
7472 # actual features to copy
7573 if isinstance (features , list ):
7674 feature_iter = features
@@ -86,17 +84,27 @@ def rtdc_copy(src_h5file: h5py.Group,
8684 f"or one of 'all', 'scalar' or 'none', got "
8785 f"'{ features } '" )
8886
89- # additional check for basin features.
87+ # copy internal basins
88+ basin_feat , basin_bytes = internal_basin_events_copy (
89+ src_h5file = src_h5file ,
90+ dst_h5file = dst_h5file ,
91+ features = feature_iter ,
92+ )
93+
94+ # remove features that are already written to the output file
95+ for feat in basin_feat :
96+ if feat in feature_iter :
97+ feature_iter .remove (feat )
98+
9099 bn_regexp = re .compile ("^basinmap[0-9]*$" ) # future-proof regexp
91100 src_basin_feats = [f for f in events_src if bn_regexp .match (f )]
92101 if include_basins :
93- # Make sure all ' basinmap?' features are included in the output file .
102+ # Explicitly include all remaining basinmap features.
94103 for feat in src_basin_feats :
95- if feat not in feature_iter :
104+ if feat not in feature_iter and feat not in basin_feat :
96105 feature_iter .append (feat )
97106 else :
98- # We do not need the basinmap features, because basins are
99- # stripped from the output file.
107+ # Remove all remaining basinmap features from the list.
100108 for feat in src_basin_feats :
101109 if feat in feature_iter :
102110 feature_iter .remove (feat )
@@ -106,12 +114,15 @@ def rtdc_copy(src_h5file: h5py.Group,
106114 if include_tables and "tables" in src_h5file :
107115 bytes_total .value += get_size (src_h5file ["tables" ])
108116
109- for feat in feature_iter :
117+ for feat in feature_iter + basin_feat :
110118 if feat in src_h5file ["events" ]:
111119 bytes_total .value += get_size (src_h5file ["events" ][feat ])
112- elif include_basins and f"basin_events/{ feat } " in src_h5file :
120+ elif f"basin_events/{ feat } " in src_h5file :
113121 bytes_total .value += get_size (src_h5file ["basin_events" ][feat ])
114122
123+ if bytes_written is not None :
124+ bytes_written .value += basin_bytes
125+
115126 # copy logs
116127 if include_logs and "logs" in src_h5file :
117128 dst_h5file .require_group ("logs" )
@@ -148,7 +159,7 @@ def rtdc_copy(src_h5file: h5py.Group,
148159 dst_h5file = dst_h5file ,
149160 features_iter = feature_iter )
150161
151- # copy features
162+ # copy regular event features
152163 if feature_iter :
153164 dst_h5file .require_group ("events" )
154165 for feat in feature_iter :
@@ -185,16 +196,57 @@ def rtdc_copy(src_h5file: h5py.Group,
185196 if attr not in dst .attrs :
186197 dst .attrs [attr ] = ufunc (dst )
187198
188- elif include_basins and f"basin_events/{ feat } " in src_h5file :
189- # Also copy internal basins which should have been defined
190- # in the "basin_events" group.
191- if feat in src_h5file ["basin_events" ]:
192- h5ds_copy (src_loc = src_h5file ["basin_events" ],
193- src_name = feat ,
194- dst_loc = dst_h5file .require_group ("basin_events" ),
195- dst_name = feat ,
196- bytes_written = bytes_written ,
197- )
199+
200+ def internal_basin_events_copy (
201+ src_h5file : h5py .Group ,
202+ dst_h5file : h5py .Group ,
203+ features : list [str ],
204+ ) -> tuple [list [str ], int ]:
205+ """Copy internal basin data from the input to the output file
206+
207+ The basin dictionaries are read and only the `basinmap` features
208+ that are required are copied to the output file.
209+ """
210+ basin_feat = []
211+ basin_bytes_mp = mp .Value ("L" )
212+
213+ bn_dicts = RTDC_HDF5 .basin_get_dicts_from_h5file (src_h5file )
214+
215+ for bn in bn_dicts :
216+ if bn ["type" ] == "internal" :
217+ bn_feats = []
218+ for feat in bn ["features" ]:
219+ if feat in features and f"basin_events/{ feat } " in src_h5file :
220+ bn_feats .append (feat )
221+ h5ds_copy (
222+ src_loc = src_h5file ["basin_events" ],
223+ src_name = feat ,
224+ dst_loc = dst_h5file .require_group ("basin_events" ),
225+ dst_name = feat ,
226+ bytes_written = basin_bytes_mp ,
227+ )
228+ if bn_feats :
229+ # Note down features that we added
230+ basin_feat += bn_feats
231+ # Write basinmap feature
232+ if bn ["mapping" ].startswith ("basinmap" ):
233+ basin_feat .append (bn ["mapping" ])
234+ h5ds_copy (
235+ src_loc = src_h5file ["events" ],
236+ src_name = bn ["mapping" ],
237+ dst_loc = dst_h5file .require_group ("events" ),
238+ dst_name = bn ["mapping" ],
239+ bytes_written = basin_bytes_mp ,
240+ )
241+ # Rewrite basin definition
242+ bn ["features" ] = bn_feats
243+ # Convert edited `bn` to JSON and write feature data
244+ b_lines = json .dumps (bn , indent = 2 ).split ("\n " )
245+ key = hashobj (b_lines )
246+ if key not in dst_h5file .require_group ("basins" ):
247+ with RTDCWriter (dst_h5file ) as hw :
248+ hw .write_text (dst_h5file ["basins" ], key , b_lines )
249+ return list (set (basin_feat )), basin_bytes_mp .value
198250
199251
200252def basin_definition_copy (src_h5file , dst_h5file , features_iter ):
@@ -207,11 +259,17 @@ def basin_definition_copy(src_h5file, dst_h5file, features_iter):
207259
208260 The `features_iter` list of features defines which features are
209261 relevant for the internal basin.
262+
263+ To copy internal basins, use :func:`internal_basin_events_copy`.
210264 """
211265 dst_h5file .require_group ("basins" )
212266 # Load the basin information
213267 basin_dicts = RTDC_HDF5 .basin_get_dicts_from_h5file (src_h5file )
214268 for bn in basin_dicts :
269+ if bn ["type" ] == "internal" :
270+ # handled in `internal_basin_events_copy`
271+ continue
272+
215273 b_key = bn ["key" ]
216274
217275 if b_key in dst_h5file ["basins" ]:
@@ -225,44 +283,20 @@ def basin_definition_copy(src_h5file, dst_h5file, features_iter):
225283 f"{ src_h5file } does not contain basin { b_key } which I got "
226284 f"from `RTDC_HDF5.basin_get_dicts_from_h5file`." )
227285
228- if bn ["type" ] == "internal" :
229- # Make sure we define the internal features selected
230- feat_used = [f for f in bn ["features" ] if f in features_iter ]
231- if len (feat_used ) == 0 :
232- # We don't have any internal features, don't write anything
233- continue
234- elif feat_used != bn ["features" ]:
235- bn ["features" ] = feat_used
236- rewrite = True
237- else :
238- rewrite = False
239- else :
240- # We do not have an internal basin, just copy everything
241- rewrite = False
242-
243- if rewrite :
244- # Convert edited `bn` to JSON and write feature data
245- b_lines = json .dumps (bn , indent = 2 ).split ("\n " )
246- key = hashobj (b_lines )
247- if key not in dst_h5file ["basins" ]:
248- with RTDCWriter (dst_h5file ) as hw :
249- hw .write_text (dst_h5file ["basins" ], key , b_lines )
250- else :
251- # copy only
252- h5ds_copy (src_loc = src_h5file ["basins" ],
253- src_name = b_key ,
254- dst_loc = dst_h5file ["basins" ],
255- dst_name = b_key ,
256- recursive = False )
286+ h5ds_copy (src_loc = src_h5file ["basins" ],
287+ src_name = b_key ,
288+ dst_loc = dst_h5file ["basins" ],
289+ dst_name = b_key ,
290+ recursive = False )
257291
258292
259293def h5ds_copy (src_loc : h5py .Group ,
260294 src_name : str ,
261295 dst_loc : h5py .Group ,
262- dst_name : str = None ,
296+ dst_name : str | None = None ,
263297 ensure_compression : bool = True ,
264298 recursive : bool = True ,
265- bytes_written : Synchronized [int ] = None ,
299+ bytes_written : Synchronized [int ] | None = None ,
266300 ):
267301 """Copy an HDF5 Dataset from one group to another
268302
0 commit comments