-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtimemanagement.py
More file actions
426 lines (339 loc) · 13.5 KB
/
Copy pathtimemanagement.py
File metadata and controls
426 lines (339 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# ==============================================================================
# Copyright (c) 2024 Botts Innovative Research, Inc.
# Date: 2024/6/26
# Author: Ian Patterson
# Contact Email: ian@botts-inc.com
# ==============================================================================
from __future__ import annotations
import re
import time
from datetime import datetime, timezone
from enum import Enum
from typing import Any
from pydantic import BaseModel, ConfigDict, Field, model_serializer, model_validator
class TemporalModes(Enum):
REAL_TIME = "realtime"
ARCHIVE = "archive"
BATCH = "batch"
RT_SYNC = "realtimesync"
ARCHIVE_SYNC = "archivesync",
class State(Enum):
UNINITIALIZED = 0
INITIALIZED = 1
STOPPED = 2
BUFFERING = 3
PLAYING = 4
FAST_FORWARDING = 5
REWINDING = 6
class TimeUtils:
iso_format = '%Y-%m-%dT%H:%M:%S.%fZ'
@staticmethod
def to_epoch_time(a_time: datetime | str) -> float:
"""
Convert a datetime or string to epoch time
:param a_time:
:return:
"""
if isinstance(a_time, str):
return time.mktime(
datetime.strptime(a_time, "%Y-%m-%d %H:%M:%S.%fZ").timetuple())
elif isinstance(a_time, datetime):
return time.mktime(a_time.timetuple())
@staticmethod
def to_utc_time(a_time: float | str) -> datetime:
"""
Convert epoch time or string to UTC time object
:param a_time:
:return:
"""
if isinstance(a_time, str):
if re.match(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.)(\d+)(Z)', a_time):
return datetime.strptime(a_time, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
else:
return datetime.strptime(a_time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
elif isinstance(a_time, float):
return datetime.fromtimestamp(a_time, tz=timezone.utc)
@staticmethod
def current_epoch_time():
"""
Get the current time in epoch format
:return:
"""
return time.time()
@staticmethod
def current_utc_time() -> datetime:
"""
Get the current time in UTC timezone
:return:
"""
return datetime.now(timezone.utc)
@staticmethod
def time_to_iso(a_time: datetime | float) -> str:
"""
Convert a datetime object to iso format
:param a_time: datetime object in UTC timezone or epoch time (float)
:return:
"""
if isinstance(a_time, float):
return datetime.fromtimestamp(a_time).strftime(TimeUtils.iso_format)
elif isinstance(a_time, datetime):
return a_time.strftime(TimeUtils.iso_format)
@staticmethod
def compare_time_instants_or_indeterminate(time1: TimeInstant | str, time2: TimeInstant | str) -> int:
"""
Compare two time instants or indeterminate times. This coerces the indeterminate time 'now' to the current time.
This may cause unexpected behavior if the times are very close together.
:param time1: TimeInstant or IndeterminateTime
:param time2: TimeInstant or IndeterminateTime
:return: 0 if equal, -1 if time1 < time2, 1 if time1 > time2
"""
if isinstance(time1, TimeInstant) and isinstance(time2, TimeInstant):
if time1 < time2:
return -1
elif time1 > time2:
return 1
else:
return 0
elif isinstance(time1, TimeInstant) and isinstance(time2, str):
if time2 == "now":
now_as_time = TimeInstant.now_as_time_instant()
return TimeUtils.compare_time_instants_or_indeterminate(time1, now_as_time)
else:
t2_as_time = TimeInstant.from_string(time2)
return TimeUtils.compare_time_instants_or_indeterminate(time1, t2_as_time)
elif isinstance(time1, str) and isinstance(time2, TimeInstant):
if time1 == "now":
now_as_ti = TimeInstant.now_as_time_instant()
return TimeUtils.compare_time_instants_or_indeterminate(now_as_ti, time2)
else:
t1_as_ti = TimeInstant.from_string(time1)
return TimeUtils.compare_time_instants_or_indeterminate(t1_as_ti, time2)
elif isinstance(time1, str) and isinstance(time2, str):
if time1 == "now" and time2 == "now":
raise ValueError("Both times cannot be 'now'")
elif time1 == "now":
t1_as_ti = TimeInstant.now_as_time_instant()
t2_as_ti = TimeInstant.from_string(time2)
elif time2 == "now":
t2_as_ti = TimeInstant.now_as_time_instant()
t1_as_ti = TimeInstant.from_string(time1)
else:
t1_as_ti = TimeInstant.from_string(time1)
t2_as_ti = TimeInstant.from_string(time2)
return TimeUtils.compare_time_instants_or_indeterminate(t1_as_ti, t2_as_ti)
class TimeInstant:
_epoch_time: float | None
def __init__(self, epoch_time: float = None, utc_time: datetime = None):
if epoch_time is not None:
self._epoch_time = epoch_time
elif utc_time is not None:
self._epoch_time = TimeUtils.to_epoch_time(utc_time)
@property
def epoch_time(self):
return self._epoch_time
@epoch_time.setter
def epoch_time(self, epoch_time: float):
if hasattr(self, "_epoch_time"):
raise AttributeError("Epoch time should not be changed once set")
def has_epoch_time(self):
return self._epoch_time is not None
def get_utc_time(self):
return TimeUtils.to_utc_time(self._epoch_time)
def get_iso_time(self):
return TimeUtils.time_to_iso(self._epoch_time)
@staticmethod
def from_string(utc_time: str):
# TODO: handle timezones
if re.match(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.)(\d+)(Z)', utc_time):
dt = datetime.strptime(utc_time, "%Y-%m-%dT%H:%M:%S.%fZ")
else:
dt = datetime.strptime(utc_time, "%Y-%m-%dT%H:%M:%SZ")
return TimeInstant(utc_time=dt)
@staticmethod
def now_as_time_instant():
return TimeInstant(epoch_time=TimeUtils.current_epoch_time())
def __lt__(self, other: TimeInstant) -> bool:
return self.epoch_time < other.epoch_time
def __gt__(self, other: TimeInstant) -> bool:
return self.epoch_time > other.epoch_time
def __eq__(self, other: TimeInstant) -> bool:
return self.epoch_time == other.epoch_time
def __le__(self, other: TimeInstant) -> bool:
return self.epoch_time <= other.epoch_time
def __ge__(self, other: TimeInstant) -> bool:
return self.epoch_time >= other.epoch_time
def __ne__(self, other: TimeInstant) -> bool:
return self.epoch_time != other.epoch_time
def __repr__(self):
return f'{self.get_iso_time()}'
# class DateTimeSchema(BaseModel):
# is_instant: bool = Field(True, description="Whether the date time is an instant or a period.")
# iso_date: str = Field(None, description="The ISO formatted date time.")
# time_period: tuple = Field(None, description="The time period of the date time.")
#
# @model_validator(mode='before')
# def valid_datetime_type(self) -> Self:
# print("DEBUGGING DateTimeSchema valid_datetime_type")
# if self.is_instant:
# if self.iso_date is None:
# raise ValueError("Instant date time must have a valid ISO8601 date.")
# return self
#
# @field_validator('iso_date')
# @classmethod
# def check_iso_date(cls, v) -> str:
# if not v:
# raise ValueError("Instant date time must have a valid ISO8601 date.")
# return v
class IndeterminateTime(Enum):
NOW = "now"
LATEST = "latest"
FIRST = "first"
class TimePeriod(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
start: TimeInstant | str = Field(...)
end: TimeInstant | str = Field(...)
@model_validator(mode='before')
@classmethod
def valid_time_period(cls, data) -> Any:
data_dict = {}
if isinstance(data, list):
data_dict['start'] = cls.check_mbr_type(data[0])
data_dict['end'] = cls.check_mbr_type(data[1])
elif isinstance(data, dict):
data_dict['start'] = cls.check_mbr_type(data['start'])
data_dict['end'] = cls.check_mbr_type(data['end'])
if not cls.compare_start_lt_end(data_dict['start'], data_dict['end']):
raise ValueError("Start time must be less than or equal to end time")
return data_dict
@model_serializer
def ser_model(self):
return [str(self.start), str(self.end)]
@staticmethod
def check_mbr_type(value):
if isinstance(value, str):
if value == "now":
return value
else:
tp = TimeInstant.from_string(value)
return tp
elif isinstance(value, TimeInstant):
return value
return None
@classmethod
def compare_start_lt_end(cls, start: TimeInstant | str, end: TimeInstant | str) -> bool:
if isinstance(start, TimeInstant) and isinstance(end, TimeInstant):
return start <= end
elif isinstance(start, str) and isinstance(end, str):
if start == "now" and end == "now":
raise ValueError("Start and end cannot both be 'now'")
else:
raise ValueError("Check the time strings for validity. This should not occur.")
elif isinstance(start, str) and start == "now" and isinstance(end, TimeInstant):
return TimeInstant.now_as_time_instant() < end
elif isinstance(start, TimeInstant) and isinstance(end, str) and end == "now":
return start < TimeInstant.now_as_time_instant()
def __repr__(self):
return f'{[self.start, self.end]}'
def does_timeperiod_overlap(self, checked_timeperiod: TimePeriod) -> bool:
"""
Checks if the provided TimePeriod overlaps with the TimePeriod instance.
**Note**: This method does not check for some edge cases, but the TimePeriods *should never* be valid
in those situations.
:param checked_timeperiod:
:return: True if the TimePeriods overlap, False otherwise
"""
# check that start of checked is not after end of this instance
start_check_lt_end_inst = TimeUtils.compare_time_instants_or_indeterminate(checked_timeperiod.start, self.end)
# check that end of checked is not before start of this instance
end_check_gt_start_inst = TimeUtils.compare_time_instants_or_indeterminate(self.end, checked_timeperiod.start)
if start_check_lt_end_inst == -1 and end_check_gt_start_inst == 1:
return True
class TimeManagement:
time_range: TimePeriod
time_controller: TimeController
def __init__(self, time_range: TimePeriod):
self.time_range = time_range
self.time_controller = TimeController()
def get_time_range(self):
return self.time_range
class TemporalMode:
pass
class TimeController:
_instance = None
_temporal_mode: TemporalMode
_status: str
_playback_speed: int
_timeline_begin: TimeInstant
_timeline_end: TimeInstant
_current_time: TimeInstant
_synchronizer: Synchronizer
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(TimeController, cls).__new__(cls)
return cls._instance
def set_temporal_mode(self, mode: TemporalMode):
self._temporal_mode = mode
def get_temporal_mode(self):
return self._temporal_mode
def start(self):
pass
def pause(self):
pass
def stop(self):
pass
def fast_forward(self, speed: int):
self._playback_speed = speed
def rewind(self, speed: int):
self._playback_speed = speed
def skip(self, a_time: TimeInstant):
self._current_time = a_time
def get_status(self):
return self._status
def set_timeline_start(self, a_time: TimeInstant):
self._timeline_begin = a_time
def set_timeline_end(self, a_time: TimeInstant):
self._timeline_end = a_time
def set_current_time(self, a_time: TimeInstant):
if a_time < self._timeline_begin:
self._current_time = self._timeline_begin
elif a_time > self._timeline_end:
self._timeline_end = a_time
self._current_time = a_time
def get_timeline_start(self):
return self._timeline_begin
def get_timeline_end(self):
return self._timeline_end
def get_current_time(self):
return self._current_time
def play_from_start(self):
pass
def skip_to_end(self):
pass
def add_listener(self, datastream, event_listener) -> str:
pass
def remove_listener(self, stream_id):
pass
def clear_streams(self):
pass
def reset(self):
self.clear_streams()
self._temporal_mode = None
self._status = None
self._playback_speed = None
self._timeline_begin = None
self._timeline_end = None
self._current_time = None
def set_buffer_time(self, time: int):
pass
def get_buffer_time(self):
pass
def _compute_time_range(self):
pass
class Synchronizer:
_buffer: any
_buffering_time: int
def synchronize(self, systems: list):
pass
def check_in_sync(self):
pass