Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5df7bbd
add a check and update for observation status
moira-andrews Feb 18, 2026
0c2c80d
update dynamic cadence on obs complete
moira-andrews Feb 18, 2026
25298c3
remove excess whitespace
moira-andrews Feb 18, 2026
93dc66b
Merge branch 'dev' into bugfix/fix-single-obs-cadence-strategy
jchate6 Mar 4, 2026
982fe86
mock status check
jchate6 Mar 4, 2026
aed35bf
Cleaning up logic and pulling changes from snex2
moira-andrews Mar 4, 2026
834339d
fix linting
jchate6 Mar 4, 2026
c55dece
missed one
jchate6 Mar 4, 2026
ee97c71
update advance window to take the scheduled start and end times inste…
moira-andrews Apr 1, 2026
5684153
simplifying variables
moira-andrews Apr 1, 2026
303c587
Merge branch 'dev' into bugfix/update-start-and-end-windows
moira-andrews Apr 1, 2026
2382974
Updated logic to be a shorter window
moira-andrews Apr 1, 2026
4df78e9
changed 24 hour to a settings variable call
moira-andrews Apr 1, 2026
b78a9e7
Merge branch 'TOMToolkit:dev' into bugfix/fix-single-obs-cadence-stra…
moira-andrews Apr 1, 2026
666c48d
updated logic and handling to be more direct and have runcadencestrat…
moira-andrews Apr 1, 2026
52743dc
updated testing to follow the resume cadence strategy of setting no v…
moira-andrews Apr 1, 2026
6cab1e5
adds the window minimum to the form clean function
moira-andrews Apr 1, 2026
b9e8e36
updated min window to be consistent and fall back to the cadence freq…
moira-andrews Apr 2, 2026
2d858ce
updated window length call in facility
moira-andrews Apr 2, 2026
7977cb7
Merge branch 'bugfix/fix-single-obs-cadence-strategy' into bugfix/upd…
moira-andrews Jun 24, 2026
331c270
Merge branch 'dev' into bugfix/update-start-and-end-windows
moira-andrews Jun 24, 2026
e081cf0
updated functions to match what is used in SNEx for sequencing
moira-andrews Jun 25, 2026
a3a8080
fixed canceled cadence not turning off dc, updated tests
moira-andrews Jun 25, 2026
46e2549
added cadence tests
moira-andrews Jun 25, 2026
800e514
fixed some test bugs
moira-andrews Jun 25, 2026
135cd5d
linting fix
moira-andrews Jun 25, 2026
bd29b2c
fixed linting
moira-andrews Jun 25, 2026
9c4187c
fixed data bug
moira-andrews Jun 25, 2026
ba1d188
added extra line
moira-andrews Jun 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tom_base/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,12 @@

TOM_CADENCE_STRATEGIES = [
'tom_observations.cadences.retry_failed_observations.RetryFailedObservationsStrategy',
'tom_observations.cadences.retry_failed_observations.RetryUntilDeadlineStrategy',
'tom_observations.cadences.resume_cadence_after_failure.ResumeCadenceAfterFailureStrategy'
]

OBS_WINDOW_MINIMUM = 24

# Define extra target fields here. Types can be any of "number", "string", "boolean" or "datetime"
# See https://tomtoolkit.github.io/docs/target_fields for documentation on this feature
# For example:
Expand Down
79 changes: 61 additions & 18 deletions tom_observations/cadences/resume_cadence_after_failure.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import datetime, timedelta
from datetime import timedelta
from dateutil.parser import parse
from django.conf import settings
import logging
from django.utils import timezone

from tom_observations.cadence import BaseCadenceForm, CadenceStrategy
from tom_observations.models import ObservationRecord
Expand Down Expand Up @@ -40,25 +42,53 @@ def update_observation_payload(self, observation_payload):
def run(self):
# gets the most recent observation because the next observation is just going to modify these parameters
last_obs = self.dynamic_cadence.observation_group.observation_records.order_by('-created').first()
if not last_obs:
return

# Make a call to the facility to get the current status of the observation
facility = get_service_class(last_obs.facility)()
start_keyword, end_keyword = facility.get_start_end_keywords()
facility.update_observation_status(last_obs.observation_id) # Updates the DB record
last_obs.refresh_from_db() # Gets the record updates

# Boilerplate to get necessary properties for future calls
start_keyword, end_keyword = facility.get_start_end_keywords()
observation_payload = last_obs.parameters

# Cadence logic
# If the observation hasn't finished, do nothing
if not last_obs.terminal:
return
elif last_obs.failed: # If the observation failed

if last_obs.status == 'CANCELED':
self.dynamic_cadence.active = False
self.dynamic_cadence.save()
logger.info(f'Observation {last_obs} was canceled, stopping dynamic cadence')
return

# Boilerplate to get necessary properties for future calls
observation_payload = last_obs.parameters.copy()
scheduled_end = last_obs.scheduled_end
if not scheduled_end:
logger.info(f'No observation end scheduled yet, falling back to end: {observation_payload[end_keyword]}')
scheduled_end = parse(observation_payload[end_keyword])

if isinstance(scheduled_end, str):
scheduled_end = parse(scheduled_end)

if timezone.is_naive(scheduled_end):
scheduled_end = timezone.make_aware(scheduled_end)

observation_payload['scheduled_end'] = scheduled_end.isoformat()
logger.info(f'Scheduled observation end: {scheduled_end}')

if last_obs.failed: # If the observation failed
# Submit next observation to be taken as soon as possible with the same window length
window_length = parse(observation_payload[end_keyword]) - parse(observation_payload[start_keyword])
observation_payload[start_keyword] = datetime.now().isoformat()
observation_payload[end_keyword] = (parse(observation_payload[start_keyword]) + window_length).isoformat()
cadence_frequency = self.dynamic_cadence.cadence_parameters.get('cadence_frequency')
if cadence_frequency is None:
raise Exception(f'The {self.name} strategy requires a cadence_frequency cadence_parameter.')
window_min = getattr(settings, 'OBS_WINDOW_MINIMUM', 24)
window_length = min(cadence_frequency, window_min)
now = timezone.now()
observation_payload[start_keyword] = now.isoformat()
observation_payload[end_keyword] = (now + timedelta(hours=window_length)).isoformat()

else: # If the observation succeeded
# Advance window normally according to cadence parameters
observation_payload = self.advance_window(
Expand All @@ -70,10 +100,14 @@ def run(self):
# Submission of the new observation to the facility
obs_type = last_obs.parameters.get('observation_type')
form = facility.get_form(obs_type)(data=observation_payload)
logger.info(f'obs payload: {observation_payload}')
if form.is_valid():
observation_ids = facility.submit_observation(form.observation_payload())
else:
logger.error(msg=f'Unable to submit next cadenced observation: {form.errors}')
logger.error(
msg=f'Unable to submit next cadenced observation: {form.errors} '
f'for ObservationRecord.id: {last_obs.id}'
)
raise Exception(f'Unable to submit next cadenced observation: {form.errors}')

# Creation of corresponding ObservationRecord objects for the observations
Expand All @@ -88,27 +122,36 @@ def run(self):
)
# Add ObservationRecords to the DynamicCadence
self.dynamic_cadence.observation_group.observation_records.add(record)
self.dynamic_cadence.observation_group.save()
new_observations.append(record)

self.dynamic_cadence.observation_group.save()
# Update the status of the ObservationRecords in the DB
for obsr in new_observations:
facility = get_service_class(obsr.facility)()
facility.update_observation_status(obsr.observation_id)
obsr.refresh_from_db() # commit the updated observation status

return new_observations

def advance_window(self, observation_payload, start_keyword='start', end_keyword='end'):
cadence_frequency = self.dynamic_cadence.cadence_parameters.get('cadence_frequency')
if not cadence_frequency:
if cadence_frequency is None:
raise Exception(f'The {self.name} strategy requires a cadence_frequency cadence_parameter.')
advance_window_hours = cadence_frequency
window_length = parse(observation_payload[end_keyword]) - parse(observation_payload[start_keyword])
window_min = getattr(settings, 'OBS_WINDOW_MINIMUM', 24)
window_length = min(cadence_frequency, window_min)

scheduled_end = observation_payload['scheduled_end']

if isinstance(scheduled_end, str):
scheduled_end = parse(scheduled_end)

if timezone.is_naive(scheduled_end):
scheduled_end = timezone.make_aware(scheduled_end)

new_start = parse(observation_payload[start_keyword]) + timedelta(hours=advance_window_hours)
if new_start < datetime.now(): # Ensure that the new window isn't in the past
new_start = datetime.now()
new_end = new_start + window_length
new_start = scheduled_end + timedelta(hours=advance_window_hours)
if new_start < timezone.now(): # Ensure that the new window isn't in the past
new_start = timezone.now()
new_end = new_start + timedelta(hours=window_length)
observation_payload[start_keyword] = new_start.isoformat()
observation_payload[end_keyword] = new_end.isoformat()

Expand Down
215 changes: 179 additions & 36 deletions tom_observations/cadences/retry_failed_observations.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,209 @@
from datetime import timedelta
from dateutil.parser import parse
import logging
from django.conf import settings
from django.utils import timezone

from tom_observations.cadence import BaseCadenceForm, CadenceStrategy
from tom_observations.models import ObservationRecord
from tom_observations.facility import get_service_class

logger = logging.getLogger(__name__)


class RetryFailedObservationsForm(BaseCadenceForm):
pass


class RetryFailedObservationsStrategy(CadenceStrategy):
class BaseRetryFailedObservationsStrategy(CadenceStrategy):
"""
The RetryFailedObservationsStrategy immediately re-submits all observations within an observation group a certain
number of hours later, as specified by ``advance_window_hours``.
The BaseRetryFailedObservationsStrategy immediately re-submits all observations within an observation
group a certain number of hours later, as specified by ``advance_window_hours``.

This strategy requires the DynamicCadence to have a parameter ``cadence_frequency``.
"""
name = 'Retry Failed Observations'
description = """This strategy immediately re-submits a cadenced observation without amending any other part of the
cadence."""
description = """This strategy immediately re-submits a cadenced observation without amending any
other part of the cadence."""
form = RetryFailedObservationsForm

def retry_observation(self, first_obs, last_obs, facility):
'''
Default retry observations, for BaseRetry strategy (retry until successful), the default is to always
retry the observations until obtained
'''
return True

def notify_success(self, obs):
'''
Function to add a call to slack or email on successful single time observations
'''
return

def run(self):
failed_observations = [obsr for obsr
in self.dynamic_cadence.observation_group.observation_records.all()
if obsr.failed]
records = self.dynamic_cadence.observation_group.observation_records.all().order_by('created')
first_obs = records.first()
last_obs = records.last()

if not first_obs or not last_obs:
return

facility = get_service_class(last_obs.facility)()
facility.update_observation_status(last_obs.observation_id)
last_obs.refresh_from_db()

if not last_obs.terminal:
return

if last_obs.status == 'COMPLETED':
self.dynamic_cadence.active = False
self.dynamic_cadence.save()
logger.info(f'Observation {last_obs} completed; turned off dynamic cadence')
return self.notify_success(last_obs)

if last_obs.status == 'CANCELED':
self.dynamic_cadence.active = False
self.dynamic_cadence.save()
logger.info(f'Observation {last_obs} was canceled, stopping dynamic cadence')
return

if not self.retry_observation(first_obs, last_obs, facility):
self.dynamic_cadence.active = False
self.dynamic_cadence.save()
logger.info(f'Stopping retry cadence for observation group {self.dynamic_cadence.observation_group.id}')
return

return self.submit_retry_observation(first_obs, last_obs, facility)

def submit_retry_observation(self, first_obs, last_obs, facility):
observation_payload = last_obs.parameters.copy()

start_keyword, end_keyword = facility.get_start_end_keywords()
observation_payload = self.advance_window(
observation_payload, start_keyword=start_keyword, end_keyword=end_keyword,
first_obs=first_obs, facility=facility
)

if observation_payload is None:
self.dynamic_cadence.active = False
self.dynamic_cadence.save()
logger.info(
f'No retry window remaining for observation group '
f'{self.dynamic_cadence.observation_group.id}; deactivated silently'
)
return

obs_type = observation_payload.get('observation_type')
form = facility.get_form(obs_type)(data=observation_payload)

if not form.is_valid():
logger.error(
msg=f'Unable to submit next observation: {form.errors} '
f'for ObservationRecord.id: {last_obs.id}'
)
raise Exception(f'Unable to submit next observation: {form.errors}')

observation_ids = facility.submit_observation(form.observation_payload())
new_observations = []
for obs in failed_observations:
observation_payload = obs.parameters
facility = get_service_class(obs.facility)()
start_keyword, end_keyword = facility.get_start_end_keywords()
observation_payload = self.advance_window(
observation_payload, start_keyword=start_keyword, end_keyword=end_keyword

for observation_id in observation_ids:
record = ObservationRecord.objects.create(
target=last_obs.target,
facility=facility.name,
parameters=observation_payload,
observation_id=observation_id,
)
obs_type = obs.parameters.get('observation_type', None)
form = facility.get_form(obs_type)(data=observation_payload)
form.is_valid()
observation_ids = facility.submit_observation(form.observation_payload())

for observation_id in observation_ids:
# Create Observation record
record = ObservationRecord.objects.create(
target=obs.target,
facility=facility.name,
parameters=observation_payload,
observation_id=observation_id
)
self.dynamic_cadence.observation_group.observation_records.add(record)
self.dynamic_cadence.observation_group.save()
new_observations.append(record)
self.dynamic_cadence.observation_group.observation_records.add(record)
new_observations.append(record)

self.dynamic_cadence.observation_group.save()

for obsr in new_observations:
facility.update_observation_status(obsr.observation_id)
obsr.refresh_from_db()

return new_observations

def advance_window(self, observation_payload, start_keyword='start', end_keyword='end'):
def advance_window(self, observation_payload,
start_keyword='start', end_keyword='end', first_obs=None, facility=None):
cadence_frequency = self.dynamic_cadence.cadence_parameters.get('cadence_frequency')
if not cadence_frequency:
raise Exception(f'The {self.name} strategy requires a cadence_frequency cadence_parameter.')
advance_window_hours = cadence_frequency
new_start = parse(observation_payload[start_keyword]) + timedelta(hours=advance_window_hours)
new_end = parse(observation_payload[end_keyword]) + timedelta(hours=advance_window_hours)
if cadence_frequency is None:
raise Exception(
f'The {self.name} strategy requires a cadence_frequency cadence_parameter.'
)

window_min = getattr(settings, 'OBS_WINDOW_MINIMUM', 24)
window_length = min(cadence_frequency, window_min)

new_start = timezone.now()
new_end = new_start + timedelta(hours=window_length)

observation_payload[start_keyword] = new_start.isoformat()
observation_payload[end_keyword] = new_end.isoformat()
return observation_payload


class RetryFailedObservationsStrategy(BaseRetryFailedObservationsStrategy):
"""
Retry indefinitely until the observation succeeds.
"""
pass


class RetryUntilDeadlineStrategy(BaseRetryFailedObservationsStrategy):
"""
Retry in short windows until either the observation succeeds, or the
original cadence_frequency interval has elapsed.
"""

def retry_observation(self, first_obs, last_obs, facility):
deadline = self.get_deadline(first_obs, facility)
return timezone.now() < deadline

def advance_window(self, observation_payload,
start_keyword='start', end_keyword='end', first_obs=None, facility=None):
cadence_frequency = self.dynamic_cadence.cadence_parameters.get('cadence_frequency')
if cadence_frequency is None:
raise Exception(
f'The {self.name} strategy requires a cadence_frequency cadence_parameter.'
)

window_min = getattr(settings, 'OBS_WINDOW_MINIMUM', 24)
window_length = min(cadence_frequency, window_min)

deadline = self.get_deadline(first_obs, facility)
new_start = timezone.now()

if new_start >= deadline:
return None

new_end = min(new_start + timedelta(hours=window_length), deadline)

if new_end <= new_start:
return None

observation_payload[start_keyword] = new_start.isoformat()
observation_payload[end_keyword] = new_end.isoformat()
return observation_payload

def get_deadline(self, first_obs, facility):
cadence_frequency = self.dynamic_cadence.cadence_parameters.get('cadence_frequency')
if cadence_frequency is None:
raise Exception(
f'The {self.name} strategy requires a cadence_frequency cadence_parameter.'
)

start_keyword, _ = facility.get_start_end_keywords()
start_value = first_obs.parameters.get(start_keyword)

if not start_value:
raise Exception(
f'Could not determine original start time for '
f'ObservationRecord.id={first_obs.id}'
)

original_start = parse(start_value) if isinstance(start_value, str) else start_value
if timezone.is_naive(original_start):
original_start = timezone.make_aware(original_start)

return original_start + timedelta(hours=cadence_frequency)
Loading
Loading