diff --git a/Dockerfile b/Dockerfile index 8bd0ff0..c83b547 100644 --- a/Dockerfile +++ b/Dockerfile @@ -111,7 +111,7 @@ python -c "from ortools.linear_solver import pywraplp as p; p.Solver.CreateSolve python -c "from ortools.linear_solver import pywraplp as p; p.Solver.CreateSolver('GLPK')" # assumption: if it's trying to read the licence, it's probably linked properly -strace -e openat python -c "from ortools.linear_solver import pywraplp as p; p.Solver.CreateSolver('GUROBI')" 2>&1 | grep -q gurobi.lic +# strace -e openat python -c "from ortools.linear_solver import pywraplp as p; p.Solver.CreateSolver('GUROBI')" 2>&1 | grep -q gurobi.lic EOT USER app diff --git a/adaptive_scheduler/configdb_connections.py b/adaptive_scheduler/configdb_connections.py index fb1ea0f..7680130 100644 --- a/adaptive_scheduler/configdb_connections.py +++ b/adaptive_scheduler/configdb_connections.py @@ -22,7 +22,7 @@ class ConfigDBInterface(SendMetricMixin): """ def __init__(self, configdb_url, telescope_classes, telescopes_file='data/telescopes.json', - active_instruments_file='data/active_instruments.json'): + active_instruments_file='data/active_instruments.json', overrides=None): self.configdb_url = configdb_url if not self.configdb_url.endswith('/'): self.configdb_url += '/' @@ -31,11 +31,28 @@ def __init__(self, configdb_url, telescope_classes, telescopes_file='data/telesc self.active_instruments_file = active_instruments_file self.active_instruments = None self.telescope_info = None + self.overrides = overrides self.update_configdb_structures() def update_configdb_structures(self): self.update_telescope_info() + self.apply_overrides_to_telescopes() self.update_active_instruments() + self.apply_overrides_to_instruments() + + def apply_overrides_to_telescopes(self): + if self.overrides: + for telescope in self.overrides.get('telescopes', {}).keys(): + if telescope in self.telescope_info and 'status' in self.overrides['telescopes'][telescope]: + self.telescope_info[telescope]['status'] = self.overrides['telescopes'][telescope]['status'] + + def apply_overrides_to_instruments(self): + if self.overrides and self.overrides.get('instruments', {}): + for instrument in self.active_instruments: + if instrument['code'] in self.overrides['instruments']: + instrument['state'] = self.overrides['instruments'][instrument['code']].get('state', instrument['state']) + if instrument['instrument_type']['code'] in self.overrides['instruments']: + instrument['state'] = self.overrides['instruments'][instrument['instrument_type']['code']].get('state', instrument['state']) def update_active_instruments(self): try: diff --git a/adaptive_scheduler/scheduler.py b/adaptive_scheduler/scheduler.py index f80180c..9533b23 100644 --- a/adaptive_scheduler/scheduler.py +++ b/adaptive_scheduler/scheduler.py @@ -394,7 +394,7 @@ def run_scheduler(self, scheduler_input, estimated_scheduler_end, semester_detai print_compound_reservations(compound_reservations) # Prepare scheduler result - scheduler_result = SchedulerResult() + scheduler_result = SchedulerResult(input_reservations=compound_reservations) scheduler_result.schedule = {} scheduler_result.resource_schedules_to_cancel = list(available_resources) @@ -569,7 +569,7 @@ class SchedulerResult(object): '''Aggregates together output of a scheduler run ''' - def __init__(self, schedule=None, resource_schedules_to_cancel=None): + def __init__(self, schedule=None, resource_schedules_to_cancel=None, input_reservations=None): ''' schedule - Expected to be a dict mapping resource to scheduled reservations resource_schedules_to_cancel - List of resources to cancel schedules on - this is the list of all available @@ -577,6 +577,7 @@ def __init__(self, schedule=None, resource_schedules_to_cancel=None): removed from the list. ''' self.schedule = schedule if schedule else {} + self.input_reservations = input_reservations if input_reservations else [] self.resource_schedules_to_cancel = resource_schedules_to_cancel if resource_schedules_to_cancel else [] def count_reservations(self): @@ -626,11 +627,13 @@ def __init__(self, sched_params, scheduler, network_interface, network_model, in self.sched_params = sched_params self.warm_starts_setting = sched_params.warm_starts self.scheduler = scheduler + self.normal_scheduler_input = None + self.rr_scheduler_input = None + self.normal_scheduler_result = None + self.rr_scheduler_result = None self.network_interface = network_interface self.network_model = network_model self.input_factory = input_factory - self.normal_scheduled_requests_by_rg = {} - self.rr_scheduled_requests_by_rg = {} self.log = logging.getLogger(__name__) # List of strings to be printed in final scheduling summary self.summary_events = [] @@ -828,6 +831,10 @@ def _can_apply_scheduler_result(self, scheduler_result, apply_deadline): self.log.warn("Empty scheduler result. Schedule will not be saved.") return False + if self.sched_params.simulate_now: + # Don't care about deadlines if you are simulating a time in the past + return True + estimated_apply_timedelta = self.avg_save_time_per_reservation_timedelta * scheduler_result.count_reservations() estimated_apply_completion = datetime.utcnow() + estimated_apply_timedelta self.log.info( @@ -973,7 +980,7 @@ def create_rr_schedule(self, scheduler_input): rr_scheduler_result = self.call_scheduler(scheduler_input, deadline) try: - self.rr_scheduled_requests_by_rg = rr_scheduler_result.get_scheduled_requests_by_request_group_id() + self.rr_scheduler_result = rr_scheduler_result self.apply_rr_result(rr_scheduler_result, scheduler_input, deadline) rr_scheduling_end = datetime.utcnow() rr_scheduling_timedelta = rr_scheduling_end - rr_scheduling_start @@ -1019,7 +1026,7 @@ def create_normal_schedule(self, scheduler_input): resources_to_clear = list(self.network_model.keys()) try: before_apply = datetime.utcnow() - self.normal_scheduled_requests_by_rg = scheduler_result.get_scheduled_requests_by_request_group_id() + self.normal_scheduler_result = scheduler_result n_submitted = self.apply_normal_result(scheduler_result, scheduler_input, resources_to_clear, deadline) @@ -1066,18 +1073,18 @@ def scheduling_cycle(self, schedule_type, network_state_timestamp, rr_schedule_r set_schedule_type(schedule_type) result = None if schedule_type == NORMAL_OBSERVATION_TYPE: - scheduler_input = self.input_factory.create_normal_scheduling_input( + self.normal_scheduler_input = self.input_factory.create_normal_scheduling_input( self.estimated_normal_run_timedelta.total_seconds(), - scheduled_requests_by_rg=self.normal_scheduled_requests_by_rg, + scheduled_requests_by_rg=self.normal_scheduler_result.get_scheduled_requests_by_request_group_id() if self.normal_scheduler_result else {}, rr_schedule=rr_schedule_result.schedule, network_state_timestamp=network_state_timestamp) - result = self.create_normal_schedule(scheduler_input) + result = self.create_normal_schedule(self.normal_scheduler_input) elif schedule_type == RR_OBSERVATION_TYPE: - scheduler_input = self.input_factory.create_rr_scheduling_input( + self.rr_scheduler_input = self.input_factory.create_rr_scheduling_input( self.estimated_rr_run_timedelta.total_seconds(), - scheduled_requests_by_rg=self.rr_scheduled_requests_by_rg, + scheduled_requests_by_rg=self.rr_scheduler_result.get_scheduled_requests_by_request_group_id() if self.rr_scheduler_result else {}, network_state_timestamp=network_state_timestamp) - result = self.create_rr_schedule(scheduler_input) + result = self.create_rr_schedule(self.rr_scheduler_input) return result diff --git a/adaptive_scheduler/scheduler_input.py b/adaptive_scheduler/scheduler_input.py index d0a3b54..38383b7 100644 --- a/adaptive_scheduler/scheduler_input.py +++ b/adaptive_scheduler/scheduler_input.py @@ -49,7 +49,8 @@ def __init__(self, ignore_ipp=to_bool(os.getenv('IGNORE_IPP_VALUES', 'False')), avg_reservation_save_time_seconds=float(os.getenv('INITIAL_PER_RESERVATION_SAVE_TIME', 0.05)), normal_runtime_seconds=float(os.getenv('INITIAL_NORMAL_RUNTIME', 360.0)), - rr_runtime_seconds=float(os.getenv('INITIAL_RAPID_RESPONSE_RUNTIME', 120.0))): + rr_runtime_seconds=float(os.getenv('INITIAL_RAPID_RESPONSE_RUNTIME', 120.0)), + simulation_opensearch_index=os.getenv('SIMULATION_OPENSEARCH_INDEX', '')): self.dry_run = dry_run self.no_weather = no_weather self.no_singles = no_singles @@ -92,6 +93,7 @@ def __init__(self, self.opensearch_excluded_observatories = opensearch_excluded_observatories.split(',') else: self.opensearch_excluded_observatories = [] + self.simulation_opensearch_index = simulation_opensearch_index class SchedulingInputFactory(object): diff --git a/adaptive_scheduler/simulation/README.md b/adaptive_scheduler/simulation/README.md new file mode 100644 index 0000000..98ef60f --- /dev/null +++ b/adaptive_scheduler/simulation/README.md @@ -0,0 +1,63 @@ +# Adaptive Scheduler Simulator Orchestrator + +The orchestrator allows for running the adaptive scheduler in a simulated environment in order to facilitate testing. +It allows the user to dump input request data to the [Observation Portal](https://github.com/observatorycontrolsystem/observation-portal) +which is then passed to the scheduler. The orchestrator runs the scheduler and passes off the scheduler result to a +metric calculation file, which calculates metrics to send to an OpenSearch database. Work is still being done to enable the +orchestrator to step through a time range and run the scheduler repeatedly on different points of the input data. + +## Overview of Metrics +The available metrics center around priority distributions, utilization, and miscellaneous data including airmass data +and slew distance. Certain metrics sent to OpenSearch are pre-binned by priority level. To get the best understanding of +the data structures, inspect the raw JSON in OpenSearch directly. + +## Prerequisites +* Python 3.9 +* A running [Configuration Database](https://github.com/observatorycontrolsystem/configdb) with instruments +* A running [Observation Portal](https://github.com/observatorycontrolsystem/observation-portal) with requests +* A running OpenSearch with index created to store scheduler simulation results + +## Environment Variables +Consult the adaptive scheduler README for general environment variables related to the scheduler. Additional environment +variables specific to the orchestrator are as follows: +| Variable | Description | Default | +|----------------------------------|---------------------------------------------------------------------------------------------|-------------------------| +| `SIMULATION_RUN_ID` | The run ID of the scheduler. This will be saved as `simulation_id` in OpenSearch | `1` | +| `SIMULATION_START_TIME` | The simulation start time, which allows the orchestrator to step through a time range (WIP) | `2022-06-23` | +| `SIMULATION_END_TIME` | The end time of the time range. This should match the start time if only a single run is desired. | `2022-06-23` | +| `SIMULATION_TIME_STEP_MINUTES` | The time step in minutes for the time range (WIP) | `60` | +| `SIMULATION_AIRMASS_COEFFICIENT` | The airmass optimization weighting value | `0.1` | +| `SIMULATION_OPENSEARCH_INDEX` | The OpenSearch index where metrics will be saved to to | `scheduler-simulations` | +| `OPENSEARCH_URL` | OpenSearch endpoint (needed for the plotting interface) | _`Empty_string`_ | +| | | | + +## How to Run +When running in a Docker container, the entry point can be modified to point to the orchestrator instead of the scheduler, +e.g. `sh -c "sleep 20s; simulation-orchestrator"`. The twenty second wait time is to ensure all the relevant services (configdb, redis, etc.) are +spun up and available. Otherwise, run the orchestrator locally on a machine with `poetry run simulation-orchestrator` + +## Simulation Process +The general workflow for running a scheduler simulation is as follows: +1. Make changes to the adaptive scheduler. If running with Docker, build the image using the suggested build command in the adaptive scheduler README. +2. If necessary, adjust the `metrics.py` file to conform with the tests you are running, such as adjusting binning for priority values. +3. Modify environment variables accordingly, making sure to set and verify the run ID. +4. Run the orchestrator. + +## Plotting +A plotting interface is included with the simulator to facilitate data visualization. The interface features OpenSearch searching by +either OpenSearch ID or `simulation_id`, the ability to save plots in various formats, and zsh-style TAB autocompletion. +Note that the environment variable `OPENSEARCH_URL` must be set on whatever machine you are running the plots from. +To use the plotting interface, run `python -m adaptive_scheduler.simulation.plots` +(`-h` to show the available command line arguments). + +## Creating Your Own Plots +The plotting framework provides a `Plot` class defined in `plotutils.py` to help initialize plots and get data from OpenSearch. +`Plot` is initialized with a user-defined plotting function to generate the plot, the plot title, and either a single string or a list +of strings. It searches the `_id` or `simulation_id` field in OpenSearch for the strings and plots the data. To write your own plotting +functions, follow the example functions in `plotfuncs.py`. Plotting functions should take in either a list of datasets +or a single dataset (to match the initialization in `plots.py`). The plot title should be passed into the plotting function as well. +This title is used to generate the descriptions for the command-line interface of the plotting framework. + +The plot creation process is as simple as: +1. Creating a function (e.g. `plot_my_plot`) in `plotfuncs.py` +2. Adding the plot to the list of plots in `plots.py`, e.g. `Plot(plotfuncs.plot_my_plot, 'My Plot Title', 'some-data-id')` diff --git a/adaptive_scheduler/simulation/__init__.py b/adaptive_scheduler/simulation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/adaptive_scheduler/simulation/metrics.py b/adaptive_scheduler/simulation/metrics.py new file mode 100644 index 0000000..81c11a8 --- /dev/null +++ b/adaptive_scheduler/simulation/metrics.py @@ -0,0 +1,441 @@ +""" +Metric calculation functions for the scheduler simulator. +""" +import copy +import logging +import pickle +from datetime import datetime, timedelta +from collections import defaultdict + +import numpy +import requests +import rise_set +from requests.exceptions import RequestException, Timeout +from rise_set import astrometry + +from adaptive_scheduler.observation_portal_connections import ObservationPortalConnectionError +from adaptive_scheduler.utils import time_in_capped_intervals, normalised_epoch_to_datetime, datetime_to_epoch +from adaptive_scheduler.models import redis_instance + +log = logging.getLogger('adaptive_scheduler') + +DTFORMAT = '%Y-%m-%dT%H:%M' + + +def percent_of(x, y): + """Returns x/y as a percentage.""" + try: + return x / y * 100. + except ZeroDivisionError as e: + if y == 0: + return 0 + else: + raise e + + +def percent_diff(x, y): + """Returns the percent difference between x and y as a float.""" + if x == y == 0: + return 0 + mean = (abs(x) + abs(y)) / 2 + return abs(x - y) / mean * 100. + + +def scalefunc(p, newmax, newmin, oldmax, oldmin): + """Remaps a range of values to another range of values.""" + return (p-oldmin)*(newmax-newmin)/(oldmax-oldmin) + newmin + + +def generate_bin_names(bin_size, bin_range): + """Creates labels for the bins.""" + start, end = bin_range + bin_names = [] + bin_start = numpy.arange(start, end+1, bin_size) + for start_num in bin_start: + if numpy.issubdtype(bin_start.dtype, numpy.integer): + end_num = start_num + bin_size - 1 + end_num = end_num if end_num < end else end + else: + end_num = start_num + bin_size + end_num = end_num if end_num < end else float(end) + if end_num == start_num: + bin_name = str(start_num) + else: + bin_name = f'{start_num}-{end_num}' + bin_names.append(bin_name) + return bin_names + + +def bin_data(bin_by, data=[], bin_size=1, bin_range=None, fill=[], aggregator=len): + """Bins data to create a histogram. For float values, each bin is half-open, i.e. defined on + the interval [a, b) for every bin except for the last bin, which is defined on the interval [a, b]. + The naming convention is different for integers, which use open intervals [a, b] since they are discrete. + For example, for the label '1-3', this means the values 1, 2, and 3, whereas for the label '1.0-3.0' + this means the values on the interval [1.0, 3.0). Bins are uniformly spaced. + + Args: + bin_by (list): A list of data to bin by. Can be float or int. + data (list): Additional data points associated with the data to bin by. It is best for the length + of this array to match the length of bin_by. The aggregation function is applied to the data + after binning, on a per-bin basis. + bin_size: The width of the bins. + bin_range: A tuple of numbers to override the bin ranges. Otherwise, use the min/max of the data. + fill: The data value(s) to fill with if the bin is empty. An iterable may be passed, in which case it is + casted to a list. If None is passed, then empty bins are removed. The aggregator will be applied + to fill values as well. Default is empty list to work with the default aggregator len(). + aggregator (func): The aggregation function to apply over the list of data. Must be callable on an array. + If None is passed, then the raw values are stored in a list. + + Returns: + data_dict: The binned data. Each key is a label corresponding to either a list of values or a single number, + depending on the type of aggregation function used. + + Examples: + Simple frequency count: + >>> bin_data([1, 2, 3, 2, 6]) + {'1': 1, '2': 2, '3': 1, '4': 0, '5': 0, '6': 1} + + Frequency count without empty bins ('3' is removed): + >>> bin_data([4, 4, 5, 6, 7, 2], fill=None) + {'2': 1, '4': 2, '5': 1, '6': 1, '7': 1} + + Bin two lists of data, e.g. highest test score by age group: + >>> ages = [12, 13, 11, 14, 15, 12, 13, 10, 10, 13] + >>> scores = [76, 84, 92, 56, 91, 87, 72, 95, 89, 77] + >>> bin_data(ages, scores, bin_size=2, bin_range=(10, 15), aggregator=max) + {'10-11': 95, '12-13': 87, '14-15': 91} + + Get the raw list of items in each bin: + >>> bin_data([4, 4, 5, 6, 9], [4, 4, 5, 6, 9], aggregator=None) + {'4': [4, 4], '5': [5], '6': [6], '9': [9]} + """ + bin_range = (min(bin_by), max(bin_by)) if bin_range is None else bin_range + bin_dict = {bin_name: [] for bin_name in generate_bin_names(bin_size, bin_range)} + + for i, value in enumerate(bin_by): + if value < bin_range[0] or value > bin_range[1]+1: + continue + index = int((value - bin_range[0]) / bin_size) + keyname = list(bin_dict)[index] + if data: + bin_dict[keyname].append(data[i]) + else: + bin_dict[keyname].append(value) + if fill is not None: + try: + fill = list(fill) + except TypeError: + fill = [fill] + bin_dict = {key: vals if vals else fill for key, vals in bin_dict.items()} + else: + bin_dict = {key: vals for key, vals in bin_dict.items() if vals} + bin_dict = {key: aggregator(vals) if aggregator else vals for key, vals in bin_dict.items()} + + return bin_dict + + +class MetricCalculator(): + """A class encapsulating the metric calculating functions for the scheduler simulator. + + Args: + normal_scheduler_result (SchedulerResult): The normal schedule output of the scheduler. + The attribute of interest is SchedulerResult.schedule, which is a dictionary formatted + as follows: + {scheduled_resource, [reservations]} + rr_scheduler_result (SchedulerResult): The rapid-response schedule output of the scheduler. + scheduler (Scheduler): The instance of the scheduler used by the simulator. + scheduler_runner (SchedulerRunner): The instance of the scheduler runner used by the simulator. + """ + def __init__(self, normal_scheduler_result, rr_scheduler_result, scheduler, scheduler_runner): + self.scheduler = scheduler + self.scheduler_runner = scheduler_runner + self.observation_portal_interface = self.scheduler_runner.network_interface.observation_portal_interface + self.simulation_start = self.scheduler_runner.sched_params.simulate_now + self.horizon_days = self.scheduler_runner.sched_params.horizon_days + + self.normal_scheduler_result = normal_scheduler_result + self.normal_schedule = self.normal_scheduler_result.schedule + self.normal_input_reservations = self.normal_scheduler_result.input_reservations + self.combined_schedule = defaultdict(dict) + self.combined_input_reservations = [] + if rr_scheduler_result: + self.rr_scheduler_result = rr_scheduler_result + self.rr_schedule = self.rr_scheduler_result.schedule + self.rr_input_reservations = self.rr_scheduler_result.input_reservations + self._combine_normal_rr_schedules() + self._combine_resources_scheduled() + self._combine_normal_rr_input_reservations() + else: + self.combined_schedule = self.normal_schedule + self.combined_resources_scheduled = self.normal_scheduler_result.resources_scheduled() + self.combined_resources_scheduled = [site for site in self.normal_schedule.keys() if self.normal_schedule[site]] + for comp_res in self.normal_input_reservations: + self.combined_input_reservations.extend(comp_res.reservation_list) + + self.request_groups = self.scheduler_runner.normal_scheduler_input.request_groups + if self.scheduler_runner.rr_scheduler_input: + self.request_groups.extend(self.scheduler_runner.rr_scheduler_input.request_groups) + + self.airmass_data_by_request_id = defaultdict(dict) + + def _combine_resources_scheduled(self): + normal_resources = self.normal_scheduler_result.resources_scheduled() + rr_resources = self.rr_scheduler_result.resources_scheduled() + normal_resources = [site for site in self.normal_schedule.keys() if self.normal_schedule[site]] + rr_resources = [site for site in self.rr_schedule.keys() if self.rr_schedule[site]] + self.combined_resources_scheduled = list(set(normal_resources + rr_resources)) + + def _combine_normal_rr_schedules(self): + self.combined_schedule = defaultdict(list) + for resource, reservations in self.rr_schedule.items(): + self.combined_schedule[resource].extend(reservations) + for resource, reservations in self.normal_schedule.items(): + reservations = [res for res in reservations if res not in self.combined_schedule[resource]] + self.combined_schedule[resource].extend(reservations) + + def _combine_normal_rr_input_reservations(self): + for comp_res in self.normal_input_reservations: + self.combined_input_reservations.extend(comp_res.reservation_list) + for comp_res in self.rr_input_reservations: + reservations = [res for res in comp_res.reservation_list if res not in self.combined_input_reservations] + self.combined_input_reservations.extend(reservations) + + def count_scheduled(self): + scheduled_reservations = [] + for reservations in self.combined_schedule.values(): + scheduled_reservations.extend(reservations) + return len(scheduled_reservations), len(self.combined_input_reservations) + + def percent_reservations_scheduled(self): + scheduled, total = self.count_scheduled() + return percent_of(scheduled, total) + + def total_scheduled_eff_priority(self): + effective_priorities = [] + for reservations in self.combined_schedule.values(): + effective_priorities.extend([res.priority for res in reservations]) + return sum(effective_priorities), effective_priorities + + def get_duration_data(self): + """Returns scheduled and unscheduled durations.""" + sched_durations = [] + unsched_durations = [] + for res in self.combined_input_reservations: + sched_durations.append(res.duration) if res.scheduled else unsched_durations.append(res.duration) + return sched_durations, unsched_durations + + def get_priority_data(self): + """Returns scheduled and unscheduled priority values. Accesses them in the same order as durations so + they can be cross-matched. Scaling changes the priorities to a different range of numbers.""" + sched_rg_ids = [] + unsched_rg_ids = [] + for res in self.combined_input_reservations: + sched_rg_ids.append(res.request_group_id) if res.scheduled else unsched_rg_ids.append(res.request_group_id) + priorities_by_rg_id = {rg.id: rg.proposal.tac_priority for rg in self.request_groups} + sched_priorities = [priorities_by_rg_id[rg_id] for rg_id in sched_rg_ids] + unsched_priorities = [priorities_by_rg_id[rg_id] for rg_id in unsched_rg_ids] + return sched_priorities, unsched_priorities + + def get_window_duration_data(self): + sched_window_durations = [] + for res in self.combined_input_reservations: + if res.scheduled: + windows = res.request.windows + # format the data to a list, each element is a list corresponding to a resource + windows_list = list(windows.windows_for_resource.values()) + window_durations = [] + for loc in windows_list: + window_durations.extend([(w.end-w.start).total_seconds() for w in loc]) + sched_window_durations.append(max(window_durations)) + return sched_window_durations + + def total_available_seconds(self): + """Aggregates the total available time, calculated from dark intervals. + + Returns: + total_available_time (float): The dark intervals capped by the horizon. + """ + total_available_time = 0 + start_time = self.scheduler.estimated_scheduler_end + end_time = start_time + timedelta(days=self.horizon_days) + for resource in self.combined_resources_scheduled: + if resource in self.scheduler.visibility_cache: + dark_intervals = self.scheduler.visibility_cache[resource].dark_intervals + available_time = time_in_capped_intervals(dark_intervals, start_time, end_time) + total_available_time += available_time + return total_available_time + + def percent_time_utilization(self): + scheduled_durations, _ = self.get_duration_data() + return percent_of(sum(scheduled_durations), self.total_available_seconds()) + + def _get_airmass_data_for_request(self, request_id): + """Pulls airmass data from the Observation Portal, cache it in redis. + + Args: + request_id (str): The request id. + + Returns: + airmass_data (dict): The airmass data returned from the API or the cache. + """ + airmass_url = f'{self.observation_portal_interface.obs_portal_url}/api/requests/{request_id}/airmass/' + cached_airmass_data = redis_instance.get(f'airmass_data_{request_id}') + if cached_airmass_data: + self.airmass_data_by_request_id[request_id] = pickle.loads(cached_airmass_data) + return pickle.loads(cached_airmass_data) + try: + response = requests.get(airmass_url, headers=self.observation_portal_interface.headers, timeout=180) + response.raise_for_status() + airmass_data_for_request = response.json()['airmass_data'] + self.airmass_data_by_request_id[request_id] = airmass_data_for_request + redis_instance.set(f'airmass_data_{request_id}', pickle.dumps(airmass_data_for_request)) + return airmass_data_for_request + except (RequestException, ValueError, Timeout) as e: + raise ObservationPortalConnectionError("get_airmass_data failed: {}".format(repr(e))) + + def _get_minmax_airmass(self, airmass_data, midpoint_duration): + """Finds the minimum and maximum midpoint airmass across all sites.""" + max_airmass = 0 + min_airmass = 1000 + for site in airmass_data.values(): + _, airmasses = site.values() + airmasses = numpy.array(airmasses) + min_airmass = min(min(airmasses), min_airmass) + max_airmass = max(max(airmasses), max_airmass) + return min_airmass, max_airmass + + def _get_midpoint_airmasses_by_site(self, airmass_data, midpoint_time): + """"Gets the midpoint airmasses by site for a request. This is done by finding the time + closest matching the calculated midpoint of the observation in the observe portal airmass data. + + Args: + airmass_data (dict): The airmass data we want to use to calculate midpoint of. + start_time (datetime.datetime): The start time of the scheduled observation. + end_time (datetime.datetime): The end time of the scheduled observation. + + Returns: + midpoint_airmasses (str: float): A dictionary with observation sites as keys and corresponding + midpoint airmasses as values. + """ + midpoint_airmasses = {} + for site, details in airmass_data.items(): + details = list(details.values()) + times, airmasses = details + airmasses = numpy.array(airmasses) + times = numpy.array([datetime.strptime(time, DTFORMAT) for time in times]) + midpoint_airmasses[site] = airmasses[numpy.argmin(numpy.abs(times-midpoint_time))] + return midpoint_airmasses + + def airmass_metrics(self, schedule=None): + """Generate the airmass metrics of all scheduled reservations for a single schedule. + + Args: + schedule (scheduler, optional): the schedule we calculate our metrics on. Uses the schedule stored in + the MetricCalculator instance if nothing is passed. + + Returns: + airmass_metrics (dict): Variety of airmass metrics including raw data, average midpoint airmass, average + ideal airmass and 95% confidence interval for midpoint airmass. + """ + schedule = self.combined_schedule if schedule is None else schedule + semester_start = self.scheduler_runner.semester_details['start'] + + midpoint_airmasses = [] + min_airmasses = [] + max_airmasses = [] + for reservations in schedule.values(): + for reservation in reservations: + airmass_data = self._get_airmass_data_for_request(reservation.request.id) + start_time = normalised_epoch_to_datetime(reservation.scheduled_start, + datetime_to_epoch(semester_start)) + midpoint_duration = timedelta(seconds=reservation.duration/2) + midpoint_time = start_time + midpoint_duration + midpoint_airmasses_by_site = self._get_midpoint_airmasses_by_site(airmass_data, midpoint_time) + site = reservation.scheduled_resource[-3:] + midpoint_airmasses.append(midpoint_airmasses_by_site[site]) + min_airmass, max_airmass = self._get_minmax_airmass(airmass_data, midpoint_duration) + min_airmasses.append(min_airmass) + max_airmasses.append(max_airmass) + airmass_metrics = {'raw_airmass_data': [{'midpoint_airmasses': midpoint_airmasses}, + {'min_poss_airmasses': min_airmasses}, + {'max_poss_airmasses': max_airmasses}], + 'avg_midpoint_airmass': sum(midpoint_airmasses)/len(midpoint_airmasses), + 'avg_min_poss_airmass': sum(min_airmasses)/len(min_airmasses), + } + return airmass_metrics + + def binned_tac_priority_metrics(self): + """Bins metrics based on TAC priority. Priority bins should be changed to match the data.""" + bin_size = 5 + bin_range = (10, 30) + collect_upper_range = True + + sched_durations, unsched_durations = self.get_duration_data() + all_durations = sched_durations + unsched_durations + + sched_priorities, unsched_priorities = self.get_priority_data() + all_priorities = sched_priorities + unsched_priorities + sched_histogram = bin_data(sched_priorities, bin_size=bin_size, bin_range=bin_range, fill=None) + bin_sched_durations = bin_data(sched_priorities, sched_durations, + bin_size, bin_range, fill=None, aggregator=sum) + combined_histogram = bin_data(all_priorities, bin_size=bin_size, bin_range=bin_range, fill=None) + bin_all_durations = bin_data(all_priorities, all_durations, + bin_size, bin_range, fill=None, aggregator=sum) + + # collects things above maximum bin range into one large bin + # e.g. bin 10-19, 20-29, 30, 31&up + if collect_upper_range: + max_prio = max(all_priorities) + lower = bin_range[-1] + 1 # assumes discrete priority values + upper_sched_histogram = bin_data(sched_priorities, bin_size=max_prio, bin_range=(lower, max_prio), fill=None) + upper_sched_durations = bin_data(sched_priorities, sched_durations, bin_size=max_prio, + bin_range=(lower, max_prio), fill=None, aggregator=sum) + upper_combined_histogram = bin_data(all_priorities, bin_size=max_prio, bin_range=(lower, max_prio), fill=None) + upper_all_durations = bin_data(all_priorities, all_durations, bin_size=max_prio, + bin_range=(lower, max_prio), fill=None, aggregator=sum) + sched_histogram = sched_histogram | upper_sched_histogram + bin_sched_durations = bin_sched_durations | upper_sched_durations + combined_histogram = combined_histogram | upper_combined_histogram + bin_all_durations = bin_all_durations | upper_all_durations + + bin_percent_count = {bin_: percent_of(sched_histogram[bin_], combined_histogram[bin_]) + for bin_ in sched_histogram} + bin_percent_time = {bin_: percent_of(bin_sched_durations[bin_], bin_all_durations[bin_]) + for bin_ in bin_sched_durations} + + output_dict = { + 'sched_histogram': sched_histogram, + 'sched_durations': bin_sched_durations, + 'full_histogram': combined_histogram, + 'all_durations': bin_all_durations, + 'percent_count': bin_percent_count, + 'percent_time': bin_percent_time + } + return output_dict + + def avg_slew_distance(self): + semester_start = self.scheduler_runner.semester_details['start'] + slew_distances = [] + schedule_copy = copy.deepcopy(self.combined_schedule) + for reservations in schedule_copy.values(): + apparent_radecs = [] + reservations.sort(key=lambda r: r.scheduled_start) + for res in reservations: + res_startdt = normalised_epoch_to_datetime(res.scheduled_start, datetime_to_epoch(semester_start)) + tdb = astrometry.date_to_tdb(res_startdt) + for c in res.request.configurations: + try: + apparent_radecs.append(astrometry.mean_to_apparent(c.target.in_rise_set_format(), tdb)) + except rise_set.exceptions.IncompleteTargetError: + # set a conservative estimate + ra = dec = rise_set.angle.Angle(degrees=0) + apparent_radecs.append((ra, dec)) + for i, radec in enumerate(apparent_radecs): + try: + next_radec = apparent_radecs[i+1] + ang_dist = astrometry.angular_distance_between(*radec, *next_radec) + slew_distances.append(ang_dist.in_degrees()) + except IndexError: + break + + return numpy.mean(slew_distances) diff --git a/adaptive_scheduler/simulation/orchestrator.py b/adaptive_scheduler/simulation/orchestrator.py new file mode 100644 index 0000000..f811cb8 --- /dev/null +++ b/adaptive_scheduler/simulation/orchestrator.py @@ -0,0 +1,206 @@ +''' +Entrypoint to run the scheduler in simulation mode. + +This orchestrator will setup the simulation from environment variables specified, +run the scheduling loop one or more times, and record metrics about a single run or +the full simulation within an OpenSearch index. The orchestrator will also handle +advancing time and input when simulating over a period of time. +''' + +import logging +import sys +import os +import json +from urllib.parse import urljoin + +import requests +from datetime import datetime, timedelta + +from lcogt_logging import LCOGTFormatter +from dateutil.parser import parse + +from adaptive_scheduler.eventbus import get_eventbus +from adaptive_scheduler.kernel.fullscheduler_ortoolkit import FullScheduler_ortoolkit +from adaptive_scheduler.monitoring.network_status import Network +from adaptive_scheduler.interfaces import NetworkInterface +from adaptive_scheduler.observations import ObservationScheduleInterface +from adaptive_scheduler.observation_portal_connections import ObservationPortalInterface +from adaptive_scheduler.configdb_connections import ConfigDBInterface +from adaptive_scheduler.scheduler import LCOGTNetworkScheduler, SchedulerRunner +from adaptive_scheduler.scheduler_input import ( + SchedulingInputFactory, SchedulingInputProvider, SchedulerParameters +) +from adaptive_scheduler.simulation.metrics import MetricCalculator +from adaptive_scheduler.utils import timeit + + +log = logging.getLogger('adaptive_scheduler') + +# Some Environment Variable settings for the simulation +RUN_ID = os.getenv("SIMULATION_RUN_ID", "1") +START_TIME = parse(os.getenv("SIMULATION_START_TIME", "2022-06-23")) +END_TIME = parse(os.getenv("SIMULATION_END_TIME", "2022-06-23")) +TIME_STEP = float(os.getenv("SIMULATION_TIME_STEP_MINUTES", "60")) +AIRMASS_WEIGHTING_COEFFICIENT = os.getenv("SIMULATION_AIRMASS_COEFFICIENT", 0.1) + + +def setup_logging(): + log = logging.getLogger('adaptive_scheduler') + log.setLevel(logging.INFO) + log.propagate = False + + sh = logging.StreamHandler() + sh.setLevel(logging.DEBUG) + + formatter = LCOGTFormatter() + + sh.setFormatter(formatter) + log.addHandler(sh) + + +def setup_input(current_time): + # This will eventually call endpoint in configdb and the observation portal to setup the input state of those + # source based on the current timestamp of the scheduling run. For configdb, this involves playing the records + # backwards until the time is reached. For the observation portal, it involves pulling over all requests + # created and PENDING at a certain point in time for the semester, which should be doable by looking at the created + # and modified timestamps and state. + log.info(f"Placeholder for setting up input for time {current_time.isoformat}") + pass + + +def increment_input(current_time, time_step): + # This will eventually call endpoints in configdb and the observation portal to increment the state of them forward + # by the time step specified. Incrementing time forward is slightly different then the initial setup of a starting time. + # This will be called as you step forward in time to make sure these data sources contain the right input data. + # For configdb, this involves moving the records back forwards a bit. For the observation portal, it involves pulling + # down newer requests as well as cleaning up the state of old ones between time steps (completing/expiring as appropriate). + # This also means that we should complete and fail the right percentages of observations that should have ended within the last + # time_step, and set ones that are in progress to ATTEMPTED state. + log.info(f"Placeholder for incrementing input by {time_step} to time {current_time.isoformat}") + pass + + +@timeit +def send_to_opensearch(os_url, os_index, metrics): + # Send the json metrics to the opensearch index + if os_url and os_index: + doc_name = f"{metrics['simulation_id']}_{metrics['record_time']}" + try: + requests.post( + urljoin(os_url, f'{os_index}/_doc/{doc_name}'), json=metrics + ).raise_for_status() + except Exception as ex: + log.warning(f"Failed to save metrics to Opensearch at {os_url} in index {os_index}: {repr(ex)}") + + log.info(f"Successfully saved metrics for {metrics['simulation_id']}") + else: + log.warning("Not configured to save metrics in opensearch. Please set OPENSEARCH_URL and SIMULATION_OPENSEARCH_INDEX.") + +@timeit +def record_metrics(normal_scheduler_result, rr_scheduler_result, scheduler, scheduler_runner): + log.info("Recording metrics for scheduler simulation run") + + metrics = MetricCalculator(normal_scheduler_result, rr_scheduler_result, scheduler, scheduler_runner) + sched_params = scheduler_runner.sched_params + airmass_metrics = metrics.airmass_metrics() + sched_priorities, unsched_priorities = metrics.get_priority_data() + sched_durations, unsched_durations = metrics.get_duration_data() + binned_tac_priority_metrics = metrics.binned_tac_priority_metrics() + + metrics = { + 'simulation_id': RUN_ID, + 'simulation_start_time': sched_params.simulate_now, + 'horizon_days': sched_params.horizon_days, + 'slicesize_seconds': sched_params.slicesize_seconds, + 'kernel': sched_params.kernel, + 'mip_gap': sched_params.mip_gap, + 'record_time': datetime.utcnow().isoformat(), + 'airmass_weighting_coefficient': AIRMASS_WEIGHTING_COEFFICIENT, + + 'total_effective_priority': metrics.total_scheduled_eff_priority()[0], + 'total_scheduled_count': metrics.count_scheduled()[0], + 'total_request_count': metrics.count_scheduled()[1], + 'percent_requests_scheduled': metrics.percent_reservations_scheduled(), + 'total_scheduled_seconds': sum(sched_durations), + 'total_available_seconds': metrics.total_available_seconds(), + 'percent_time_utilization': metrics.percent_time_utilization(), + 'airmass_metrics': airmass_metrics, + 'scheduled_req_by_priority': [binned_tac_priority_metrics['sched_histogram']], + 'scheduled_seconds_by_priority': [binned_tac_priority_metrics['sched_durations']], + 'total_req_by_priority': [binned_tac_priority_metrics['full_histogram']], + 'total_seconds_by_priority': [binned_tac_priority_metrics['all_durations']], + 'percent_sched_by_priority': [binned_tac_priority_metrics['percent_count']], + 'percent_duration_by_priority': [binned_tac_priority_metrics['percent_time']], + 'raw_window_durations': metrics.get_window_duration_data(), + 'raw_scheduled_durations': sched_durations, + 'raw_unscheduled_durations': unsched_durations, + 'raw_scheduled_priorities': sched_priorities, + 'raw_unscheduled_priorities': unsched_priorities, + 'average_slew_distance': metrics.avg_slew_distance(), + } + send_to_opensearch(sched_params.opensearch_url, sched_params.simulation_opensearch_index, metrics) + + +def main(argv=None): + # Get all scheduler params from environment variables + sched_params = SchedulerParameters() + + # Set up and configure an application scope logger + setup_logging() + + log.info(f"Starting Scheduler Simulator with id {RUN_ID} and time range {START_TIME.isoformat()} to {END_TIME.isoformat()}") + + # All this setup is the same as the normal scheduling run - things will be setup based on the + # scheduler environment variables set. + event_bus = get_eventbus() + schedule_interface = ObservationScheduleInterface(host=sched_params.observation_portal_url) + observation_portal_interface = ObservationPortalInterface(sched_params.observation_portal_url) + # TODO: If there is a configuration override file detected then incorporate that into the configdb_interface + overrides = None + if os.path.exists('/app/data/simulation_overrides.json'): + with open('/app/data/simulation_overrides.json', 'r') as fp: + overrides = json.load(fp) + configdb_interface = ConfigDBInterface(configdb_url=sched_params.configdb_url, telescope_classes=sched_params.telescope_classes, overrides=overrides) + network_state_interface = Network(configdb_interface, sched_params) + network_interface = NetworkInterface(schedule_interface, observation_portal_interface, network_state_interface, + configdb_interface) + kernel_class = FullScheduler_ortoolkit + network_model = configdb_interface.get_telescope_info() + + scheduler = LCOGTNetworkScheduler(kernel_class, sched_params, event_bus, network_model) + input_provider = SchedulingInputProvider(sched_params, network_interface, network_model, is_rr_input=True) + input_factory = SchedulingInputFactory(input_provider) + + # Set the scheduler to run once each time it is invoked. + sched_params.run_once = True + + # Basic orchestrator loop here: setup input, run scheduler, record metrics, step forward time, repeat + current_time = START_TIME + # Setup the input from configdb and observation portal using the current time + setup_input(current_time) + while current_time <= END_TIME: + log.info(f"Simulating with current time {current_time.isoformat()}") + sched_params.simulate_now = f"{current_time.isoformat()}Z" + + # Scheduler run is invoked in the normal way, but it will just run a single time + scheduler_runner = SchedulerRunner(sched_params, scheduler, network_interface, network_model, input_factory) + scheduler_runner.run() + # Output scheduled requests are available within the runner after it completes a run + # These are used to seed a warm start solution for the next run in the normal scheduler, but can be used to generate metrics here + sched_params.metric_effective_horizon = 5 # days + + record_metrics( + scheduler_runner.normal_scheduler_result, + scheduler_runner.rr_scheduler_result, + scheduler_runner.scheduler, + scheduler_runner, + ) + + current_time += timedelta(minutes=TIME_STEP) + increment_input(current_time, TIME_STEP) + + log.info(f"Finished running simulation {RUN_ID}, exiting") + + +if __name__ == '__main__': + main(sys.argv[1:]) diff --git a/adaptive_scheduler/simulation/plotfuncs.py b/adaptive_scheduler/simulation/plotfuncs.py new file mode 100644 index 0000000..99d1a58 --- /dev/null +++ b/adaptive_scheduler/simulation/plotfuncs.py @@ -0,0 +1,557 @@ +""" +Plotting functions to use with the adaptive simulator plotting wrapper. +To write your own plotting functions, follow the format of the example functions. +""" +import matplotlib +import numpy as np +import matplotlib.pyplot as pyplot +import matplotlib.style + +import adaptive_scheduler.simulation.plotutils as plotutils +from adaptive_scheduler.simulation.metrics import bin_data +import adaptive_scheduler.simulation.metrics as metrics + +# change default parameters for matplotlib here +matplotlib.style.use('tableau-colorblind10') +matplotlib.rcParams['figure.figsize'] = (20, 10) +matplotlib.rcParams['figure.titlesize'] = 20 +matplotlib.rcParams['axes.titlesize'] = 14 +matplotlib.rcParams['axes.labelsize'] = 12 +matplotlib.rcParams['xtick.labelsize'] = 12 +matplotlib.rcParams['ytick.labelsize'] = 12 +matplotlib.rcParams['figure.subplot.wspace'] = 0.2 # horizontal spacing for subplots +matplotlib.rcParams['figure.subplot.hspace'] = 0.2 # vertical spacing for subplots +matplotlib.rcParams['figure.subplot.top'] = 0.9 # spacing between plot and title + + +def plot_airmass_difference_histogram(airmass_datasets, plot_title, normalize=False): + """Plots the difference of airmass from ideal. If normalize is turned on, then it scores + the airmasses with 0 being the worst (closest to bad airmass) and 1 being the best. + + Args: + airmass_data [dict]: Should be a list of datasets, each dataset corresponding + to a different airmass weighting coefficient. Assumes the first dataset passed + is the control dataset (airmass optimization turned off). + plot_title (str): The title of the plot. + normalize (bool): Determines if the airmass score is normalized. + + Returns: + fig (matplotlib.pyplot.Figure): The output figure object. + """ + fig, ax = pyplot.subplots() + fig.suptitle(plot_title) + + numbins = 10 + data = [] + labels = ['optimize by earliest'] + for dataset in airmass_datasets: + airmass_data = dataset['airmass_metrics']['raw_airmass_data'] + airmass_coeff = dataset['airmass_weighting_coefficient'] + mp = np.array(airmass_data[0]['midpoint_airmasses']) + a_min = np.array(airmass_data[1]['min_poss_airmasses']) + a_max = np.array(airmass_data[2]['max_poss_airmasses']) + if normalize: + normed = 1 - (mp-a_min)/(a_max-a_min) + data.append(normed[np.where((normed != 0) & (normed != 1))]) + else: + data.append(mp-a_min) + # the first dataset is the control dataset + if dataset is not airmass_datasets[0]: + labels.append(airmass_coeff) + ax.hist(data, bins=numbins, label=labels) + + if normalize: + ax.set_xlabel('Airmass Score (0 is worst, 1 is ideal)') + else: + ax.set_xlabel('Difference from Ideal Airmass (0 is ideal)') + ax.set_ylabel('Number of Scheduled Requests') + ax.legend(title='Airmass Coefficient') + return fig + + +def plot_pct_scheduled_airmass_binned_priority(airmass_datasets, plot_title): + """Plots a barplot of the percentage of requests scheduled for different airmass coefficients + binned into priority levels. + + Args: + airmass_data [dict]: Should be a list of datasets, each dataset corresponding + to a different airmass weighting coefficient. Assumes the first dataset passed + is the control dataset (airmass optimization turned off). + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyplot.Figure): The output figure object. + """ + fig, ax = pyplot.subplots() + fig.suptitle(plot_title) + + bardata = [] + labels = ['optimize by earliest'] + # get the bin names from the first dataset, the bins should be consistent across datasets + binnames = airmass_datasets[0]['percent_sched_by_priority'][0].keys() + for dataset in airmass_datasets: + priority_data = dataset['percent_sched_by_priority'][0] + airmass_coeff = dataset['airmass_weighting_coefficient'] + bardata.append(list(priority_data.values())) + # the first dataset is the control dataset + if dataset is not airmass_datasets[0]: + labels.append(airmass_coeff) + plotutils.plot_multi_barplot(ax, bardata, labels, binnames) + + ax.set_xlabel('Priority') + ax.set_ylabel('Percent of Requests Scheduled') + ax.set_ylim(0, 100) + ax.legend(title='Airmass Coefficient') + return fig + + +def plot_percent_sched_requests_bin_by_priority(eff_pri_datasets, plot_title): + """Plots a set of barplots. A barplot of percentage of request time scheduled for different priorities + on the left side and a barplot of percentage of request numbers scheduled for different priorities on the right side. + + Args: + eff_pri_datasets [dict]: a list of datasets, each dataset corresponding + to a different effective priority calculation. + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyplot.Figure): The output figure object. + """ + fig, (ax1, ax2) = pyplot.subplots(1, 2, figsize=(25, 12)) + fig.suptitle(plot_title) + fig.subplots_adjust(wspace=0.2, hspace=0.2, top=0.9) + bardata1 = [] + labels = ['with duration', 'no duration', 'with duration scaled 100', 'no duration scaled 100'] + for dataset in eff_pri_datasets: + bardata1.append(list(dataset['percent_duration_by_priority'][0].values())) + + priorities = ['low priority(10-19)', 'mid priority(20-29)', 'high priority(30)'] + plotutils.plot_multi_barplot(ax1, bardata1, labels, priorities) + ax1.set_xlabel('Priority') + ax1.set_ylabel('Scheduled Time/Total Request Time (%)') + ax1.set_title('Percent of requested time scheduled') + ax1.legend(title='Effective Priority Algorithms') + bardata2 = [] + for dataset in eff_pri_datasets: + bardata2.append(list(dataset['percent_sched_by_priority'][0].values())) + priorities = ['low priority(10-19)', 'mid priority(20-29)', 'high priority(30)'] + plotutils.plot_multi_barplot(ax2, bardata2, labels, priorities) + ax2.set_xlabel('Priority') + ax2.set_ylabel('Scheduled Requests/Total Requests (%)') + ax2.set_title('Percent of requests Scheduled') + ax2.legend(title='Effective Priority Algorithms') + return fig + + +def plot_sched_priority_duration_dotplot(eff_pri_datasets, plot_title): + """Plots a dot plot showing how the scheduled request durations and priorities are distributed for + different effecitve priority calculations. + + Args: + eff_pri_datasets [dict]: a list of datasets, each dataset corresponding + to a different effective priority calculation. + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyplot.Figure): The output figure object. + """ + def rand_jitter(arr): + stdev = .01 * (max(arr) - min(arr)) + return arr + np.random.randn(len(arr)) * stdev + + markers = ['o', ',', 'v', '^', '<', '>'] + colors = ['r', 'b', 'c', 'm', 'y', 'k'] + fig, (ax1, ax2) = pyplot.subplots(1, 2, figsize=(28, 12)) + fig.suptitle(plot_title) + fig.subplots_adjust(wspace=0.2, hspace=0.2, top=0.9) + labels = ['with duration', 'no duration', 'with duration scaled 100', 'no duration scaled 100'] + for i, data in enumerate(eff_pri_datasets): + id = data['simulation_id'] + # un-scale the priorities + if id in ['airmass-0.1-w-duration-w-scaling', 'airmass-0.1-no-duration-w-scaling']: + data['raw_scheduled_priorities'] = [(p+35)/4.5 for p in data['raw_scheduled_priorities']] + data['raw_scheduled_durations'] = [d/60 for d in data['raw_scheduled_durations']] + ax1.scatter(rand_jitter(data['raw_scheduled_priorities']), rand_jitter(data['raw_scheduled_durations']), + marker=markers[i], c=colors[i], s=10, label=labels[i], alpha=0.3) + ax1.set_ylim(top=100) + ax1.set_xlabel('Priority') + ax1.set_ylabel('Request Duration (minutes)') + ax1.set_title('Scheduled Reservations distribution') + ax1.legend(title='Effective Priority Algorithms') + for i, data in enumerate(eff_pri_datasets): + id = data['simulation_id'] + # un-scale the priorities + if id in ['airmass-0.1-w-duration-w-scaling', 'airmass-0.1-no-duration-w-scaling']: + data['raw_unscheduled_priorities'] = [(p+35)/4.5 for p in data['raw_unscheduled_priorities']] + data['raw_unscheduled_durations'] = [d/60 for d in data['raw_unscheduled_durations']] + ax2.scatter(rand_jitter(data['raw_unscheduled_priorities']), rand_jitter(data['raw_unscheduled_durations']), + c=colors[i], marker=markers[i], s=10, label=labels[i], alpha=0.3) + ax2.set_ylim(top=100) + ax2.set_xlabel('Priority') + ax2.set_ylabel('Request Duration (minutes)') + ax2.set_title('Unscheduled Reservations distribution') + ax2.legend(title='Effective Priority Algorithms') + return fig + + +def plot_heat_map_priority_duration(eff_pri_datasets, plot_title): + """Plots four heat maps showing how the scheduled request durations and priorities are distributed for + each effecitve priority calculations. + + Args: + eff_pri_datasets [dict]: a list of datasets, each dataset corresponding + to a different effective priority calculation. + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyplot.Figure): The output figure object. + """ + fig, axs = pyplot.subplots(2, 2, figsize=(13, 12)) + fig.suptitle(plot_title) + fig.subplots_adjust(wspace=0.01, hspace=0.01, top=0.9) + ax_list = [axs[0, 0], axs[0, 1], axs[1, 0], axs[1, 1]] + labels = ['with duration', 'no duration', 'with duration scaled 100', 'no duration scaled 100'] + for i, data in enumerate(eff_pri_datasets): + id = data['simulation_id'] + if id in ['airmass-0.1-w-duration-w-scaling', 'airmass-0.1-no-duration-w-scaling']: + data['raw_scheduled_priorities'] = [(p+35)/4.5 for p in data['raw_scheduled_priorities']] + data['raw_unscheduled_priorities'] = [(p+35)/4.5 for p in data['raw_unscheduled_priorities']] + sched_priorities = data['raw_scheduled_priorities'] + sched_durations = data['raw_scheduled_durations'] + unsched_priorities = data['raw_unscheduled_priorities'] + unsched_durations = data['raw_unscheduled_durations'] + level_1_bins = bin_data(sched_priorities, sched_durations, bin_size=4, bin_range=(10, 30), aggregator=None) + # set the duration bins (in seconds) here + level_2_bins = { + bin_key: bin_data(bin_values, bin_size=300, bin_range=(0, 1499)) | bin_data(bin_values, bin_size=3000, bin_range=(1500, 4000)) + for bin_key, bin_values in level_1_bins.items() + } + level_1_bins_unsched = bin_data(unsched_priorities, unsched_durations, bin_size=4, bin_range=(10, 30), aggregator=None) + level_2_bins_unsched = { + bin_key: bin_data(bin_values, bin_size=300, bin_range=(0, 1499)) | bin_data(bin_values, bin_size=3000, bin_range=(1500, 4000)) + for bin_key, bin_values in level_1_bins_unsched.items() + } + heat_map_elements = [] + heat_map_elements_unsched = [] + for values in level_2_bins.values(): + heat_map_elements.append(list(values.values())) + for values in level_2_bins_unsched.values(): + heat_map_elements_unsched.append(list(values.values())) + priority_bins = list(level_2_bins.keys()) + duration_bins = ['0-5', '5-10', '10-15', '15-20', '20-25', '25&up'] + heat_map_elements = np.array(heat_map_elements) + heat_map_elements_unsched = np.array(heat_map_elements_unsched) + axis = ax_list[i] + cmap = pyplot.get_cmap('coolwarm') + cmap2 = pyplot.get_cmap('gray') + axis.imshow(heat_map_elements, cmap=cmap) + axis.set_ylabel('Priority') + axis.set_xlabel('Duration (minutes)') + axis.set_xticks(np.arange(len(duration_bins)), labels=duration_bins) + axis.set_yticks(np.arange(len(priority_bins)), labels=priority_bins) + pyplot.setp(axis.get_xticklabels(), rotation=45, ha="right", + rotation_mode="anchor") + for j in range(len(priority_bins)): + for k in range(len(duration_bins)): + value = heat_map_elements[j, k] + axis.text(k, j, f'{heat_map_elements[j, k]}|{ heat_map_elements_unsched[j, k]}', + ha="center", va="center", fontsize='large', fontweight='semibold', color=cmap2(0.001/value)) + percent_time_utilization = data['percent_time_utilization'] + axis.set_title(f'{labels[i]} ({percent_time_utilization:.1f}% time utilized)', fontweight='semibold') + fig.tight_layout() + return fig + + +def plot_pct_time_scheduled_airmass_binned_priority(airmass_datasets, plot_title): + """Plots the percentage of requested time scheduled for different airmass coefficients + binned into priority levels. + + Args: + airmass_data [dict]: Should be a list of datasets, each dataset corresponding + to a different airmass weighting coefficient. Assumes the first dataset passed + is the control dataset (airmass optimization turned off). + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyplot.Figure): The output figure object. + """ + fig, ax = pyplot.subplots() + fig.suptitle(plot_title) + + bardata = [] + labels = ['optimize by earliest'] + # get the bin names from the first dataset, the bins should be consistent across datasets + binnames = airmass_datasets[0]['percent_sched_by_priority'][0].keys() + for dataset in airmass_datasets: + priority_data = dataset['percent_duration_by_priority'][0] + airmass_coeff = dataset['airmass_weighting_coefficient'] + bardata.append(list(priority_data.values())) + # the first dataset is the control dataset + if dataset is not airmass_datasets[0]: + labels.append(airmass_coeff) + plotutils.plot_multi_barplot(ax, bardata, labels, binnames) + + ax.set_xlabel('Priority') + ax.set_ylabel('Percent of Requested Time Scheduled') + ax.set_ylim(0, 100) + ax.legend(title='Airmass Coefficient') + return fig + + +def plot_pct_scheduled_airmass_lineplot(airmass_datasets, plot_title): + """Plots a line chart with percent of requests scheduled on the y-axis and airmass + coefficient on the x-axis. The priority bins are highlighted in different colors. + + Args: + airmass_data [dict]: Should be a list of datasets, each dataset corresponding + to a different airmass weighting coefficient. Assumes the first dataset passed + is the control dataset (airmass optimization turned off). + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyplot.Figure): The output figure object. + """ + fig, ax = pyplot.subplots() + fig.suptitle(plot_title) + + prio_names = list(airmass_datasets[0]['percent_sched_by_priority'][0].keys()) + airmass_coeffs = [] + pct_scheduled = [] + # exclude the control dataset + for dataset in airmass_datasets[1:]: + data_by_priority = dataset['percent_sched_by_priority'][0] + airmass_coeffs.append(dataset['airmass_weighting_coefficient']) + pct_scheduled.append(list(data_by_priority.values())) + data_by_airmass = np.array(pct_scheduled).transpose() + for i, data in enumerate(data_by_airmass): + ax.plot(airmass_coeffs, data, marker='.', ms=8, label=prio_names[i]) + for j, k in zip(airmass_coeffs, data): + annotation = f'{k:.2f}%' + ax.annotate(annotation, xy=(j, k), xytext=(-15, 10), textcoords='offset points') + + ax.set_xlabel('Airmass Coefficient') + ax.set_ylabel('Percent of Requests Scheduled') + ax.set_ylim(0, 100) + ax.legend(title='Priority') + return fig + + +def plot_pct_time_scheduled_airmass_lineplot(airmass_datasets, plot_title): + """Plots a line chart with percent of requested time scheduled on the y-axis and airmass + coefficient on the x-axis. The priority bins are highlighted in different colors. + + Args: + airmass_data [dict]: Should be a list of datasets, each dataset corresponding + to a different airmass weighting coefficient. Assumes the first dataset passed + is the control dataset (airmass optimization turned off). + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyplot.Figure): The output figure object. + """ + fig, ax = pyplot.subplots() + fig.suptitle(plot_title) + + prio_names = list(airmass_datasets[0]['percent_duration_by_priority'][0].keys()) + prio_names.append('all') + airmass_coeffs = [] + pct_scheduled = [] + # exclude the control dataset + for dataset in airmass_datasets[1:]: + sched_by_priority = np.array(list(dataset['scheduled_seconds_by_priority'][0].values())) + total_by_priority = np.array(list(dataset['total_seconds_by_priority'][0].values())) + airmass_coeffs.append(dataset['airmass_weighting_coefficient']) + pct_by_priority = sched_by_priority/total_by_priority * 100 + pct_cumulative = np.sum(sched_by_priority)/np.sum(total_by_priority) * 100 + pct_scheduled.append(np.append(pct_by_priority, pct_cumulative)) + data_by_airmass = np.array(pct_scheduled).transpose() + for i, data in enumerate(data_by_airmass): + ax.plot(airmass_coeffs, data, marker='.', ms=8, label=prio_names[i]) + for j, k in zip(airmass_coeffs, data): + annotation = f'{k:.2f}%' + # manually fix an overlapping annotation, THIS IS SPECIFIC TO A CERTAIN DATASET + # specifically, some of the points at the 6th airmass coeff are overlapping + # if this happens a lot, look into offsetting text automatically + if i == 3 and j == airmass_coeffs[6]: + ax.annotate(annotation, xy=(j, k), xytext=(-15, 6), c='#595959', textcoords='offset points') + elif i == 1 and j == airmass_coeffs[6]: + ax.annotate(annotation, xy=(j, k), xytext=(-15, -13), c='#FF800E', textcoords='offset points') + else: + ax.annotate(annotation, xy=(j, k), xytext=(-15, -13), textcoords='offset points') + + ax.set_xlabel('Airmass Coefficient') + ax.set_ylabel('Percent of Requested Time Scheduled') + ax.set_ylim(0, 100) + ax.legend(title='Priority') + return fig + + +def plot_pct_time_scheduled_out_of_available(airmass_datasets, plot_title): + """Plots a line chart with percent of requested time scheduled out of all availabel time + on the y-axis and airmass coefficient on the x-axis. The priority bins are highlighted + in different colors. + + Args: + airmass_data [dict]: Should be a list of datasets, each dataset corresponding + to a different airmass weighting coefficient. Assumes the first dataset passed + is the control dataset (airmass optimization turned off). + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyplot.Figure): The output figure object. + """ + fig, ax = pyplot.subplots() + fig.suptitle(plot_title) + + prio_names = list(airmass_datasets[0]['percent_duration_by_priority'][0].keys()) + prio_names.append('all') + airmass_coeffs = [] + pct_scheduled = [] + # exclude the control dataset + for dataset in airmass_datasets[1:]: + sched_by_priority = np.array(list(dataset['scheduled_seconds_by_priority'][0].values())) + available_time = dataset['total_available_seconds'] + airmass_coeffs.append(dataset['airmass_weighting_coefficient']) + pct_by_priority = sched_by_priority/available_time * 100 + pct_cumulative = np.sum(sched_by_priority)/available_time * 100 + pct_scheduled.append(np.append(pct_by_priority, pct_cumulative)) + data_by_airmass = np.array(pct_scheduled).transpose() + for i, data in enumerate(data_by_airmass): + ax.plot(airmass_coeffs, data, marker='.', ms=8, label=prio_names[i]) + for j, k in zip(airmass_coeffs, data): + annotation = f'{k:.2f}%' + ax.annotate(annotation, xy=(j, k), xytext=(5, 5), textcoords='offset points') + ax.set_xlabel('Airmass Coefficient') + ax.set_ylabel('Percent of Requested Time Scheduled') + ax.set_ylim(0, 100) + ax.legend(title='Priority') + return fig + + +def plot_midpoint_airmass_histograms(airmass_datasets, plot_title): + """Plots a distribution of midpoint airmasses for each different airmass coefficient. + + Args: + airmass_data [dict]: Should be a list of datasets, each dataset corresponding + to a different airmass weighting coefficient. Assumes the first dataset passed + is the control dataset (airmass optimization turned off). + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyplot.Figure): The output figure object. + """ + fig = pyplot.figure(figsize=(16, 16)) + fig.suptitle(plot_title) + fig.subplots_adjust(wspace=0.3, hspace=0.3, top=0.92) + for i, dataset in enumerate(airmass_datasets): + ax = fig.add_subplot(3, 3, i+1) + midpoint_airmasses = dataset['airmass_metrics']['raw_airmass_data'][0]['midpoint_airmasses'] + airmass_coeff = dataset['airmass_weighting_coefficient'] + ax.hist(midpoint_airmasses, bins=50) + if i == 0: + ax.set_title('Optimize Earliest') + else: + ax.set_title(f'Airmass Coefficient: {airmass_coeff}') + ax.set_xlabel('Midpoint Airmass') + ax.set_ylabel('Count') + ax.set_xlim(1.0, 2.0) + ax.set_ylim(0, 120) + return fig + + +def plot_eff_priority_duration_scatter(datasets, plot_title): + """Plots a scatterplot with effective priority on the y-axis and duration on the x-axis. + + Args: + datasets [dict]: A list of datasets. Expects one dataset for priority range 10-30 and one dataset + for priority scaled to 10-100. + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyplot.Figure): The output figure object. + """ + fig, axs = pyplot.subplots(1, 2, figsize=(24, 8)) + fig.suptitle(plot_title) + labels = ['Priority 10-30', 'Priority 10-100'] + # colors are from tableau-colorblind10 + colors = [('#006BA4', '#5F9ED1'), ('#C85200', '#FF800E')] + for i, ax in enumerate(axs): + data = datasets[i] + prio_scheduled = np.array(data['raw_scheduled_priorities']) + prio_unscheduled = np.array(data['raw_unscheduled_priorities']) + dur_scheduled = np.array(data['raw_scheduled_durations'])/60 + dur_unscheduled = np.array(data['raw_unscheduled_durations'])/60 + ax.scatter(dur_scheduled, prio_scheduled*dur_scheduled, + label='scheduled', marker='x', color=colors[i][0]) + ax.scatter(dur_unscheduled, prio_unscheduled*dur_unscheduled, + label='unscheduled', marker='x', alpha=0.5, color=colors[i][1]) + ax.set_ylabel('Effective Priority (base priority x duration)') + ax.set_xlabel('Duration [min]') + ax.set_title(f'Optimize by Airmass, With Duration, {labels[i]}') + ax.legend(title=labels[i]) + return fig + + +def plot_duration_by_window_duration_scatter(data, plot_title): + """Plots a scatterplot with observation duration on the y-axis and maximum window length per + observation on the x-axis. + + Args: + data (dict): The dataset for this metric. Expects one dataset. + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyploy.Figure): The output Figure object. + """ + fig, ax = pyplot.subplots() + fig.suptitle(plot_title) + sec_to_min = 1/60 + window_dur = np.array(data['raw_window_durations']) * sec_to_min + sched_dur = np.array(data['raw_scheduled_durations']) * sec_to_min + ax.scatter(window_dur, sched_dur, s=4) + ax.set_ylabel('Request Duration [min]') + ax.set_xlabel('Longest Possible Window Duration [min]') + + return fig + + +def plot_subplots_input_duration(data, plot_title): + """Plots histograms of the input request durations in minutes for different priorities. + + Args: + data (dict): The data for this metric. Expects one dataset. + plot_title (str): The title of the plot. + + Returns: + fig (matplotlib.pyploy.Figure): The output Figure object. + """ + fig, axs = pyplot.subplots(3,2, figsize=(22, 15)) + fig.suptitle(plot_title) + sched_durations = data['raw_scheduled_durations'] + sched_durations = [d/60 for d in sched_durations] + unsched_durations = data['raw_unscheduled_durations'] + unsched_durations = [d/60 for d in unsched_durations] + sched_priorities = data['raw_scheduled_priorities'] + unsched_priorities = data['raw_unscheduled_priorities'] + sched_bins = metrics.bin_data(sched_priorities, sched_durations, bin_size=5, bin_range=(10, 30), aggregator=None) | metrics.bin_data(sched_priorities, sched_durations, bin_size=3000, bin_range=(31, 3000),aggregator=None) + unsched_bins = metrics.bin_data(unsched_priorities, unsched_durations, bin_size=5, bin_range=(10, 30), aggregator=None) | metrics.bin_data(unsched_priorities, unsched_durations, bin_size=3000, bin_range=(31, 3000),aggregator=None) + max_duration = 0 + all_durations = sched_durations + unsched_durations + for i in all_durations: + if i > max_duration: + max_duration = i + else: + continue + totals_by_priorities = list(data['total_req_by_priority'][0].values()) + labels = ['10-14', '15-20', '20-24', '25-29', '30', '31-2000'] + axis = [axs[0][0], axs[0][1], axs[1][0], axs[1][1], axs[2][0], axs[2][1]] + for i, values in enumerate(sched_bins.values()): + bars = ['Scheduled', 'Unscheduled'] + axis[i].hist([values, list(unsched_bins.values())[i]], bins=np.arange(0, 100, 2), + stacked=True, label=bars) + axis[i].set_xlabel('Duration (Minutes)') + axis[i].set_ylabel('Input reservation counts') + axis[i].set_ylim(0, 300) + axis[i].set_title(f'{labels[i]} Priority ({totals_by_priorities[i]} requests)') + axis[i].legend() + return fig diff --git a/adaptive_scheduler/simulation/plots.py b/adaptive_scheduler/simulation/plots.py new file mode 100644 index 0000000..5c1098d --- /dev/null +++ b/adaptive_scheduler/simulation/plots.py @@ -0,0 +1,78 @@ +""" +The interface for producing plots. To create plots, add plots to the list of plots, +modifying the parameters to Plot as necessary. +""" +import adaptive_scheduler.simulation.plotfuncs as plotfuncs +import adaptive_scheduler.simulation.plotutils as plotutils +from adaptive_scheduler.simulation.plotutils import Plot + +airmass_experiment_ids = [ + 'no-airmass-w-duration-no-scaling', + 'airmass-0.01-w-duration-no-scaling', + 'airmass-0.05-w-duration-no-scaling', + 'airmass-0.1-w-duration-no-scaling', + 'airmass-1.0-w-duration-no-scaling', + 'airmass-10-w-duration-no-scaling', + 'airmass-100-w-duration-no-scaling', + 'airmass-1000-w-duration-no-scaling', + 'airmass-1000000-w-duration-no-scaling', +] + +effective_priority_experiment_ids = [ + 'airmass-0.1-w-duration-no-scaling', + 'airmass-0.1-no-duration-no-scaling', + 'airmass-0.1-w-duration-w-scaling', + 'airmass-0.1-no-duration-w-scaling', +] + + +plots = [ + # General Use + Plot(plotfuncs.plot_subplots_input_duration, + '1m Network Scheduled/Unscheduled Requests Length Distribution', + 'test-new-dataset'), + Plot(plotfuncs.plot_duration_by_window_duration_scatter, + '1m Network Scatterplot of Duration and Window Duration', + 'window-duration'), + + # Airmass Experiment + Plot(plotfuncs.plot_airmass_difference_histogram, + '1m Network Airmass Difference Distribution for Scheduled Requests', + airmass_experiment_ids), + Plot(plotfuncs.plot_pct_scheduled_airmass_binned_priority, + '1m Network Airmass Experiment Percent of Requests Scheduled per Priority Class', + airmass_experiment_ids), + Plot(plotfuncs.plot_pct_scheduled_airmass_lineplot, + '1m Network Airmass Experiment Percent of Requests Scheduled per Priority Class', + airmass_experiment_ids), + Plot(plotfuncs.plot_pct_time_scheduled_airmass_binned_priority, + '1m Network Airmass Experiment Percent of Requested Time Scheduled per Priority Class', + airmass_experiment_ids), + Plot(plotfuncs.plot_pct_time_scheduled_airmass_lineplot, + '1m Network Airmass Experiment Percent of Requested Time Scheduled per Priority Class', + airmass_experiment_ids), + Plot(plotfuncs.plot_pct_time_scheduled_out_of_available, + '1m Network Airmass Experiment Percent of Requested Time Scheduled out of Available Time', + airmass_experiment_ids), + Plot(plotfuncs.plot_midpoint_airmass_histograms, + '1m Network Airmass Experiment Midpoint Airmass Distributions', + airmass_experiment_ids), + + # Effective Priority Experiment + Plot(plotfuncs.plot_eff_priority_duration_scatter, + '1m Network Scatterplot of Effective Priority and Duration', + ['airmass-0.1-w-duration-no-scaling', 'airmass-0.1-w-duration-w-scaling']), + Plot(plotfuncs.plot_percent_sched_requests_bin_by_priority, + '1m Network Scheduler Metrics Binned by Priority', + effective_priority_experiment_ids), + Plot(plotfuncs.plot_sched_priority_duration_dotplot, + '1m Distribution of Priority and Duration With Airmass Optimization', + effective_priority_experiment_ids), + Plot(plotfuncs.plot_heat_map_priority_duration, + '1m Network Requests Heatmap With Airmass Optimization (sched|unsched)', + effective_priority_experiment_ids), +] + + +if __name__ == '__main__': + plotutils.run_user_interface(plots) diff --git a/adaptive_scheduler/simulation/plotutils.py b/adaptive_scheduler/simulation/plotutils.py new file mode 100644 index 0000000..53bc2d3 --- /dev/null +++ b/adaptive_scheduler/simulation/plotutils.py @@ -0,0 +1,211 @@ +""" +Plotting utility functions +""" +import os +import sys +import argparse +import readline +import logging +from copy import deepcopy +from datetime import datetime + +import numpy +import matplotlib.pyplot as pyplot +import opensearchpy +from opensearchpy import OpenSearch + +DEFAULT_DIR = 'adaptive_scheduler/simulation/plot_output' + +OPENSEARCH_URL = os.getenv('OPENSEARCH_URL', '') +OPENSEARCH_INDEX = os.getenv('SIMULATION_OPENSEARCH_INDEX', 'scheduler-simulations') +try: + opensearch_client = OpenSearch(OPENSEARCH_URL) +except TypeError: + print('Invalid OpenSearch endpoint. Please set `OPENSEARCH_URL` environment variable.') + sys.exit(1) + +# mask logging messages from OpenSearchPy +# they use the root logger, unfortunately +logging.getLogger().setLevel(logging.CRITICAL) + +data_cache = {} + + +class AutoCompleter(object): + def __init__(self, options): + """Handles TAB autocomplete in the command line. + + Args: + options [str]: A list of possible autocomplete options. + """ + self.options = sorted(options) + + def complete(self, text, state): + if state == 0: + if text: + self.matches = [s for s in self.options if s and text in s] + else: + self.matches = self.options[:] + + try: + return self.matches[state] + except IndexError: + return None + + +def run_user_interface(plots): + """Handles user interaction in the command line. + + Args: + plots [Plot]: A list of Plot objects. + """ + description = 'Plotting functions for scheduler simulator data visualization' + parser = argparse.ArgumentParser(description=description) + parser.add_argument('-s', '--save', help='save the plot(s) to a file', action='store_true') + parser.add_argument('-f', '--format', help='the file format to save as', default='jpg') + parser.add_argument('-o', '--outputdir', help='the output directory to save to', default=DEFAULT_DIR) + args = parser.parse_args() + + plot_dict = {plot.name: plot for plot in plots} + plot_names = list(plot_dict.keys()) + spacing = max([len(name) for name in plot_names]) + 10 + print('\nAvailable plots:') + print(f'\n{"Name":{spacing}}Description') + print(f'{"====":{spacing}}===========') + for plot in plots: + plot.export_dir = args.outputdir + plot.export_format = args.format + print(f'{plot.name:{spacing}}{plot.description}') + + completer = AutoCompleter(plot_names) + readline.set_completer(completer.complete) + readline.parse_and_bind('tab: complete') + while True: + showplot = input('\nShow plot (default all): ').strip() + if showplot == '' or showplot.lower() == 'all': + for plot in plots: + plot.generate() + if args.save: + plot.save() + pyplot.show() + break + else: + try: + plot = plot_dict[showplot] + pyplot.close('all') + plot.generate() + if args.save: + plot.save() + plot.fig.show() + pyplot.show() + break + except KeyError as e: + print(f'Plot name not found: {e}') + + +class Plot: + def __init__(self, plotfunc, description, sim_ids, **kwargs): + """A wrapper class for plotting. The user specifies the plotting function to use + and the simulation ID(s) or search keywords. The data is passed to the plotting + function as a list of datasets, each set corresponding to an OpenSearch index. + The plotting function is responsible for accessing the right data keys. Data is cached + within the same run but not between runs. + + Args: + plotfunc (func): The plotting function to use. + description (str): The description of the plot. Will be used as the plot title in matplotlib. + sim_ids: The simulation IDs to look for on OpenSearch. Can be either a list or a single string. + kwargs: Optional arguments to pass to the plotting function. + """ + self.plotfunc = plotfunc + self.description = description + # expects plotting functions to be called 'plot_some_plot_name' + self.name = plotfunc.__name__.replace('plot_', '') + self.sim_ids = sim_ids if type(sim_ids) is list else [sim_ids] + self.kwargs = kwargs + + def generate(self): + self.data = [] + for sim_id in self.sim_ids: + global data_cache + try: + self.data.append(data_cache[sim_id]) + except KeyError: + data_cache[sim_id] = get_opensearch_data(sim_id) + self.data.append(data_cache[sim_id]) + if len(self.data) == 1: + self.data = self.data[0] + + self.fig = self.plotfunc(deepcopy(self.data), self.description, **self.kwargs) + + def save(self): + timestamp = datetime.utcnow().isoformat(timespec='seconds') + savename = f'{self.name}_{timestamp}' + export_to_image(savename, self.fig, self.export_dir, self.export_format) + + +def export_to_image(fname, fig, export_dir=DEFAULT_DIR, export_format='jpg'): + """Takes a matplotlib Figure object and saves the figure. If the output + directory doesn't already exist, creates one for the user. + + Args: + fname (str): The filename to save the file as. + fig (matplotlib.pyplot.Figure): The figure to save. + """ + try: + os.mkdir(export_dir) + print(f'Directory "{export_dir}" created') + except FileExistsError: + pass + fpath = os.path.join(export_dir, f'{fname}.{export_format}') + fig.savefig(fpath, dpi=300, format=export_format) + print(f'Plot exported to {fpath}') + + +def plot_multi_barplot(ax, data, labels, binnames, barwidth=0.04): + """Generates a barplot for multiple datasets. + + Args: + ax (matplotlib.pyplot.Axes): An Axes object to modify. + data: A list of lists. Each sub-list contains the y-axis data for a dataset. + labels: The list of labels to associate with each dataset. Must contain a label for each dataset. + binnames: A list of names of the bins for marking the x-axis. + barwidth (float): The width of each bar. + """ + ticks = numpy.arange(len(data[0])) + for i, datavalues in enumerate(data): + ax.bar(ticks+barwidth*i, datavalues, barwidth, label=labels[i]) + ax.set_xticks(ticks+barwidth*i/2, binnames) + + +def get_opensearch_data(query): + """Gets a specific OpenSearch id and returns the source data. Tries to match the exact ID first, + then moves on to a keyword search (wildcards allowed) if the first search fails. Returns the most + recent index for the keyword search. + + Args: + query (str): The index to look for. + + Returns: + source_data (dict): A dictionary of the data returned from OpenSearch. + """ + try: + response = opensearch_client.get(OPENSEARCH_INDEX, query) + source_data = response['_source'] + except opensearchpy.exceptions.NotFoundError: + query = { + 'query': { + 'wildcard': {'simulation_id.keyword': query} + }, + 'sort': [ + {'record_time': {'order': 'desc'}} + ] + } + response = opensearch_client.search(query, OPENSEARCH_INDEX) + try: + result = response['hits']['hits'][0] + source_data = result['_source'] + except IndexError: + # give up + raise opensearchpy.exceptions.NotFoundError(f'No data found for {query}') + return source_data diff --git a/install.Unix.sh b/install.Unix.sh new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml index 544febf..2286c68 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,3 +45,4 @@ build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] adaptive-scheduler = 'adaptive_scheduler.cli:main' +simulation-orchestrator = 'adaptive_scheduler.simulation.orchestrator:main' diff --git a/tests/airmass_data.json b/tests/airmass_data.json new file mode 100644 index 0000000..8ea4d06 --- /dev/null +++ b/tests/airmass_data.json @@ -0,0 +1,71 @@ +{ + "tfn": { + "times": [ + "2022-07-06T00:11", + "2022-07-06T00:21", + "2022-07-06T00:31", + "2022-07-06T00:41", + "2022-07-06T00:51", + "2022-07-06T01:01", + "2022-07-06T01:11", + "2022-07-06T01:21", + "2022-07-06T01:31", + "2022-07-06T01:41", + "2022-07-06T01:51", + "2022-07-06T02:01", + "2022-07-06T02:11", + "2022-07-06T02:21", + "2022-07-06T02:31", + "2022-07-06T02:41", + "2022-07-06T02:51", + "2022-07-06T03:01", + "2022-07-06T03:11", + "2022-07-06T03:21" + ], + "airmasses": [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20 + ] + }, + "egg": { + "times": [ + "2022-07-06T00:31", + "2022-07-06T00:41", + "2022-07-06T00:51", + "2022-07-06T01:01", + "2022-07-06T01:11", + "2022-07-06T01:21", + "2022-07-06T01:31", + "2022-07-06T01:41" + ], + "airmasses": [ + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9 + ] + } +} + diff --git a/tests/airmass_data_2.json b/tests/airmass_data_2.json new file mode 100644 index 0000000..7751ef5 --- /dev/null +++ b/tests/airmass_data_2.json @@ -0,0 +1,25 @@ +{ + "egg": { + "times": [ + "2022-07-06T00:31", + "2022-07-06T00:41", + "2022-07-06T00:51", + "2022-07-06T01:01", + "2022-07-06T01:11", + "2022-07-06T01:21", + "2022-07-06T01:31", + "2022-07-06T01:41" + ], + "airmasses": [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8 + ] + } +} + diff --git a/tests/test_simulator_metrics.py b/tests/test_simulator_metrics.py new file mode 100644 index 0000000..028f45b --- /dev/null +++ b/tests/test_simulator_metrics.py @@ -0,0 +1,200 @@ +from adaptive_scheduler.simulation.metrics import (MetricCalculator, + bin_data) + +import os +import json +import calendar +from datetime import datetime, timedelta +from mock import Mock +from rise_set import astrometry +from rise_set.sky_coordinates import RightAscension, Declination +from rise_set.angle import Angle +import numpy as np + + +class TestMetrics(): + + def setup(self): + self.start = datetime.strptime("2022-07-06T00:30", '%Y-%m-%dT%H:%M') + self.end = self.start + timedelta(minutes=90) + self.scheduler_run_time = datetime.utcnow() + scheduler_result_attrs = {'resources_scheduled.return_value': ['bpl', 'coj']} + res1 = Mock(duration=10, scheduled=True) + res2 = Mock(duration=20, scheduled=True) + res3 = Mock(duration=30, scheduled=True) + res4 = Mock(scheduled=False) + res5 = Mock(scheduled=False) + fake_schedule = {'bpl': [res1, res2], 'coj': [res3]} + fake_comp_res = Mock(reservation_list=[res1, res2, res3, res4, res5]) + self.mock_scheduler_result = Mock(input_reservations=[fake_comp_res], **scheduler_result_attrs) + self.mock_scheduler = Mock(estimated_scheduler_end=self.scheduler_run_time) + self.mock_scheduler_runner = Mock(semester_details={'start': self.start}) + self.mock_scheduler_runner.sched_params.horizon_days = 5 + self.mock_scheduler_runner.sched_params.simulate_now = self.scheduler_run_time + + self.mock_scheduler_result.schedule = fake_schedule + + self.mock_scheduler.visibility_cache = {'bpl': Mock(), 'coj': Mock()} + self.mock_scheduler.visibility_cache['bpl'].dark_intervals = [ + (self.scheduler_run_time-timedelta(days=5), self.scheduler_run_time-timedelta(days=4)), + (self.scheduler_run_time, self.scheduler_run_time+timedelta(days=1)), + (self.scheduler_run_time+timedelta(days=2), self.scheduler_run_time+timedelta(days=3)), + ] + self.mock_scheduler.visibility_cache['coj'].dark_intervals = [ + (self.scheduler_run_time, self.scheduler_run_time+timedelta(days=2))] + + self.metrics = MetricCalculator(self.mock_scheduler_result, + self.mock_scheduler_result, + self.mock_scheduler, + self.mock_scheduler_runner) + + def test_combining_schedules(self): + scheduler_result_attrs = {'resources_scheduled.return_value': ['bpl', 'coj', 'ogg']} + fake_schedule1 = {'bpl': ['hi', 'there'], 'coj': ['person']} + fake_schedule2 = {'ogg': ['lco', 'rocks'], 'coj': ['woohoo!']} + fake_input = [Mock(reservation_list=['foo', 'bar'])] + mock_normal_scheduler_result = Mock(schedule=fake_schedule1, **scheduler_result_attrs) + mock_normal_scheduler_result.input_reservations = fake_input + mock_rr_scheduler_result = Mock(schedule=fake_schedule2, **scheduler_result_attrs) + mock_rr_scheduler_result.input_reservations = fake_input + + only_normal = MetricCalculator(mock_normal_scheduler_result, None, + self.mock_scheduler, self.mock_scheduler_runner) + both_schedules = MetricCalculator(mock_normal_scheduler_result, mock_rr_scheduler_result, + self.mock_scheduler, self.mock_scheduler_runner) + same_schedule = MetricCalculator(mock_normal_scheduler_result, mock_normal_scheduler_result, + self.mock_scheduler, self.mock_scheduler_runner) + + assert only_normal.combined_schedule == fake_schedule1 + assert both_schedules.combined_schedule == {'bpl': ['hi', 'there'], + 'coj': ['woohoo!', 'person'], + 'ogg': ['lco', 'rocks']} + assert same_schedule.combined_schedule == fake_schedule1 + + def test_percent_scheduled(self): + assert self.metrics.percent_reservations_scheduled() == 60. + + scheduled_reservation = Mock(scheduled=True) + unscheduled_reservation = Mock(scheduled=False) + mock_schedule = {'bpl': [scheduled_reservation], 'coj': [scheduled_reservation, scheduled_reservation]} + mock_scheduler_input = [unscheduled_reservation, scheduled_reservation, scheduled_reservation, scheduled_reservation] + metrics2 = MetricCalculator(self.mock_scheduler_result, + None, + self.mock_scheduler, + self.mock_scheduler_runner) + metrics2.combined_schedule = mock_schedule + metrics2.combined_input_reservations = mock_scheduler_input + + assert metrics2.percent_reservations_scheduled() == 75. + + def test_total_time_aggregators(self): + seconds_in_day = 86400 + + assert sum(self.metrics.get_duration_data()[0]) == 60 + assert self.metrics.total_available_seconds() == 4*seconds_in_day + + def test_bin_data(self): + bin_by = [1, 3, 4, 2, 6, 5, 3, 2, 3, 4, 7, 9, 3, 8, 6, 4] + bin_data_ = [1, 2, 3, 4, 5, 6, 7, 8, 9, 8, 7, 6, 5, 4, 3, 2] + bin_by_float = [0.5, 2.1, 2.8, 6.9, 1.8] + bin_range = (1, 9) + + allparams = {'1-3': 7, '4-6': 6, '7-9': 3} + defaults = {'1': 1, '2': 2, '3': 4, '4': 3, '5': 1, '6': 2, '7': 1, '8': 1, '9': 1} + unevenbins = {'1-2': 3, '3-4': 7, '5-6': 3, '7-8': 2, '9': 1} + floatbinsize = {'0.0-2.5': 3, '2.5-5.0': 7, '5.0-7.5': 4, '7.5-9.0': 2} + floats = {'0.5-1.5': 1, '1.5-2.5': 2, '2.5-3.5': 1, '6.5-6.9': 1} + capped_floats = {'0': 1, '1': 1, '2': 2} + sumdata = {'1-3': 36, '4-6': 27, '7-9': 17} + mindata = {'1-3': 1, '4-6': 2, '7-9': 4} + + assert bin_data(bin_by, bin_size=3, bin_range=bin_range, fill=None) == allparams + assert bin_data(bin_by, fill=None) == defaults + assert bin_data(bin_by, bin_size=2, fill=None) == unevenbins + assert bin_data(bin_by, bin_size=2.5, bin_range=(0, 9), fill=None) == floatbinsize + assert bin_data(bin_by_float, fill=None) == floats + assert bin_data(bin_by_float, bin_range=(0, 4), fill=None) == capped_floats + assert bin_data(bin_by, bin_data_, bin_size=3, fill=None, aggregator=sum) == sumdata + assert bin_data(bin_by, bin_data_, bin_size=3, aggregator=min) == mindata + + def test_airmass_functions(self): + dir_path = os.path.dirname(os.path.realpath(__file__)) + data_path_1 = os.path.join(dir_path, 'airmass_data.json') + data_path_2 = os.path.join(dir_path, 'airmass_data_2.json') + with open(data_path_1) as f: + airmass_data_1 = json.load(f) + with open(data_path_2) as f: + airmass_data_2 = json.load(f) + self.metrics._get_airmass_data_for_request = Mock(side_effect=[airmass_data_1, airmass_data_2]) + request_1 = Mock(id=1) + mock_reservation_1 = Mock(scheduled_start=0, scheduled_resource='1m0a.doma.tfn', + request=request_1, duration=2400) + request_2 = Mock(id=2) + mock_reservation_2 = Mock(scheduled_start=0, scheduled_resource='1m0a.doma.egg', + request=request_2, duration=2400) + scheduled_reservations = [mock_reservation_1, mock_reservation_2] + schedule = {'reservations': scheduled_reservations} + midpoint_time = self.start + timedelta(seconds=mock_reservation_1.duration/2) + midpoint_duration = timedelta(seconds=mock_reservation_1.duration/2) + + assert self.metrics._get_midpoint_airmasses_by_site(airmass_data_1, midpoint_time) == {'tfn': 5, 'egg': 4} + assert self.metrics._get_minmax_airmass(airmass_data_1, midpoint_duration) == (1, 20) + + airmass_metrics = self.metrics.airmass_metrics(schedule) + midpoint_airmasses = [5, 3] + assert type(airmass_metrics) is dict + assert airmass_metrics['avg_midpoint_airmass'] == 4 + assert airmass_metrics['avg_min_poss_airmass'] == 1 + assert airmass_metrics['raw_airmass_data'][0]['midpoint_airmasses'] == midpoint_airmasses + + def test_avg_slew_distance(self): + conf1 = Mock() + conf1.target.in_rise_set_format = Mock(return_value={'ra': Angle(degrees=35), 'dec': Angle(degrees=0)}) + conf2 = Mock() + conf2.target.in_rise_set_format = Mock(return_value={'ra': Angle(degrees=35), 'dec': Angle(degrees=15)}) + conf3 = Mock() + conf3.target.in_rise_set_format = Mock(return_value={'ra': Angle(degrees=10), 'dec': Angle(degrees=15)}) + conf4 = Mock() + conf4.target.in_rise_set_format = Mock(return_value={'ra': Angle(degrees=60), 'dec': Angle(degrees=10)}) + conf5 = Mock() + conf5.target.in_rise_set_format = Mock(return_value={'ra': Angle(degrees=80), 'dec': Angle(degrees=10)}) + conf6 = Mock() + conf6.target.in_rise_set_format = Mock(return_value={'ra': Angle(degrees=80), 'dec': Angle(degrees=-10)}) + + res1 = Mock(scheduled_start=10) + res2 = Mock(scheduled_start=20) + res3 = Mock(scheduled_start=10) + res4 = Mock(scheduled_start=20) + res5 = Mock(scheduled_start=30) + res1.request.configurations = [conf1, conf2] + res2.request.configurations = [conf3] + res3.request.configurations = [conf4] + res4.request.configurations = [conf5, conf5, conf5] + res5.request.configurations = [conf6] + fake_schedule1 = {'bpl': [res1, res2], 'coj': [res5, res4, res3]} + + d = timedelta(seconds=10) + radec1 = astrometry.mean_to_apparent({'ra': Angle(degrees=35), 'dec': Angle(degrees=0)}, + astrometry.date_to_tdb(self.scheduler_run_time+d)) + radec2 = astrometry.mean_to_apparent({'ra': Angle(degrees=35), 'dec': Angle(degrees=15)}, + astrometry.date_to_tdb(self.scheduler_run_time+d)) + radec3 = astrometry.mean_to_apparent({'ra': Angle(degrees=10), 'dec': Angle(degrees=15)}, + astrometry.date_to_tdb(self.scheduler_run_time+2*d)) + radec4 = astrometry.mean_to_apparent({'ra': Angle(degrees=60), 'dec': Angle(degrees=10)}, + astrometry.date_to_tdb(self.scheduler_run_time+3*d)) + radec5 = astrometry.mean_to_apparent({'ra': Angle(degrees=80), 'dec': Angle(degrees=10)}, + astrometry.date_to_tdb(self.scheduler_run_time+4*d)) + radec6 = astrometry.mean_to_apparent({'ra': Angle(degrees=80), 'dec': Angle(degrees=-10)}, + astrometry.date_to_tdb(self.scheduler_run_time+5*d)) + slewdists = [astrometry.angular_distance_between(*radec1, *radec2), + astrometry.angular_distance_between(*radec2, *radec3), + astrometry.angular_distance_between(*radec4, *radec5), + astrometry.angular_distance_between(*radec5, *radec5), + astrometry.angular_distance_between(*radec5, *radec5), + astrometry.angular_distance_between(*radec5, *radec6)] + slewdists = [a.in_degrees() for a in slewdists] + metrics = MetricCalculator(self.mock_scheduler_result, None, self.mock_scheduler, self.mock_scheduler_runner) + metrics.combined_schedule = fake_schedule1 + metrics.scheduler_runner.semester_details['start'] = self.scheduler_run_time + + assert np.isclose(metrics.avg_slew_distance(), np.mean(slewdists))