diff --git a/amiadapters/adapters/base.py b/amiadapters/adapters/base.py index d6a4e62..18db28d 100644 --- a/amiadapters/adapters/base.py +++ b/amiadapters/adapters/base.py @@ -215,6 +215,15 @@ def load_transformed(self, run_id: str): with self._base_adapter_metrics.load_transformed_timer(): meters = self.output_controller.read_transformed_meters(run_id) reads = self.output_controller.read_transformed_meter_reads(run_id) + if not meters or not reads: + raise Exception( + f"Extract for {self.org_id} (run_id={run_id}) produced " + f"{len(meters)} meters and {len(reads)} reads. " + f"If this is a backfill DAG that has reached the vendor's data floor, " + f"decommission it with: " + f"`python cli.py config remove-backfill {self.org_id} --profile `. " + f"Otherwise this likely indicates a vendor outage or authentication failure." + ) for sink in self.storage_sinks: sink.store_transformed(run_id, meters, reads) diff --git a/amiadapters/outputs/local.py b/amiadapters/outputs/local.py index c8ed2e6..ba6e314 100644 --- a/amiadapters/outputs/local.py +++ b/amiadapters/outputs/local.py @@ -65,7 +65,12 @@ def read_transformed_meters(self, run_id: str) -> List[GeneralMeter]: logger.info(f"Reading meters from {path}") with open(path, "r") as f: text = f.read() - meters = [GeneralMeter(**json.loads(d)) for d in text.strip().split("\n")] + if not text.strip(): + meters = [] + else: + meters = [ + GeneralMeter(**json.loads(d)) for d in text.strip().split("\n") + ] logger.info(f"Read {len(meters)} meters from {path}") return meters @@ -84,9 +89,12 @@ def read_transformed_meter_reads(self, run_id: str) -> List[GeneralMeterRead]: logger.info(f"Reading meter reads from {path}") with open(path, "r") as f: text = f.read() - reads = [ - GeneralMeterRead(**json.loads(d)) for d in text.strip().split("\n") - ] + if not text.strip(): + reads = [] + else: + reads = [ + GeneralMeterRead(**json.loads(d)) for d in text.strip().split("\n") + ] logger.info(f"Read {len(reads)} meter reads from {path}") return reads diff --git a/amiadapters/outputs/s3.py b/amiadapters/outputs/s3.py index aefb0b4..b973a70 100644 --- a/amiadapters/outputs/s3.py +++ b/amiadapters/outputs/s3.py @@ -89,6 +89,8 @@ def read_transformed_meters(self, run_id: str) -> List[GeneralMeter]: key = self._s3_key(run_id, self.TRANSFORM, "meters.json.gz") logger.info(f"Downloading meters from s3://{self.bucket_name}/{key}") text = self._download_string_from_s3(key) + if not text.strip(): + return [] return [GeneralMeter(**json.loads(line)) for line in text.strip().split("\n")] def write_transformed_meter_reads(self, run_id: str, reads: List[GeneralMeterRead]): @@ -101,6 +103,8 @@ def read_transformed_meter_reads(self, run_id: str) -> List[GeneralMeterRead]: key = self._s3_key(run_id, self.TRANSFORM, "reads.json.gz") logger.info(f"Downloading reads from s3://{self.bucket_name}/{key}") text = self._download_string_from_s3(key) + if not text.strip(): + return [] return [ GeneralMeterRead(**json.loads(line)) for line in text.strip().split("\n") ] diff --git a/test/amiadapters/outputs/test_local.py b/test/amiadapters/outputs/test_local.py index 89bab91..3a67978 100644 --- a/test/amiadapters/outputs/test_local.py +++ b/test/amiadapters/outputs/test_local.py @@ -146,3 +146,13 @@ def test_write_and_read_transformed_meter_reads(self): self.assertEqual(reads_out[0].device_id, "1") self.assertEqual(reads_out[0].register_value, 123.4) self.assertEqual(reads_out[1].device_id, "2") + + def test_read_transformed_meters__empty_file_returns_empty_list(self): + self.controller.write_transformed_meters("run123", []) + result = self.controller.read_transformed_meters("run123") + self.assertEqual([], result) + + def test_read_transformed_meter_reads__empty_file_returns_empty_list(self): + self.controller.write_transformed_meter_reads("run123", []) + result = self.controller.read_transformed_meter_reads("run123") + self.assertEqual([], result) diff --git a/test/amiadapters/outputs/test_s3.py b/test/amiadapters/outputs/test_s3.py index 8caab34..906493c 100644 --- a/test/amiadapters/outputs/test_s3.py +++ b/test/amiadapters/outputs/test_s3.py @@ -168,6 +168,20 @@ def test_write_and_read_transformed_meter_reads(self): self.assertEqual(result[1].device_id, "2") self.assertEqual(result[1].register_value, 227.6) + def test_read_transformed_meters__empty_file_returns_empty_list(self): + self.mock_s3.get_object.return_value = { + "Body": MagicMock(read=lambda: self._gzip("")) + } + result = self.controller.read_transformed_meters("runid") + self.assertEqual([], result) + + def test_read_transformed_meter_reads__empty_file_returns_empty_list(self): + self.mock_s3.get_object.return_value = { + "Body": MagicMock(read=lambda: self._gzip("")) + } + result = self.controller.read_transformed_meter_reads("runid") + self.assertEqual([], result) + def _gzip(self, content: str) -> bytes: buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="wb") as gz: diff --git a/test/amiadapters/test_base.py b/test/amiadapters/test_base.py index 6136539..9288074 100644 --- a/test/amiadapters/test_base.py +++ b/test/amiadapters/test_base.py @@ -279,3 +279,68 @@ def test_calculate_extract_range__backfill_with_snowflake_sink_that_gives_no_old self.calculator.calculate_extract_range( None, None, backfill_params=backfill_params ) + + +class TestLoadTransformed(BaseTestCase): + + def setUp(self): + self.adapter = Beacon360Adapter( + api_user="user", + api_password="pass", + use_cache=False, + pipeline_configuration=self.TEST_PIPELINE_CONFIGURATION, + org_id="test-org", + org_timezone=pytz.timezone("Europe/Rome"), + configured_task_output_controller=self.TEST_TASK_OUTPUT_CONTROLLER_CONFIGURATION, + configured_metrics=self.TEST_METRICS_CONFIGURATION, + configured_sinks=[], + ) + self.adapter.output_controller = MagicMock() + self.sink = MagicMock() + self.adapter.storage_sinks = [self.sink] + + def test_load_transformed__happy_path_calls_sink(self): + self.adapter.output_controller.read_transformed_meters.return_value = [ + MagicMock() + ] + self.adapter.output_controller.read_transformed_meter_reads.return_value = [ + MagicMock() + ] + + self.adapter.load_transformed("run-1") + + self.sink.store_transformed.assert_called_once() + + def test_load_transformed__raises_when_meters_empty(self): + self.adapter.output_controller.read_transformed_meters.return_value = [] + self.adapter.output_controller.read_transformed_meter_reads.return_value = [ + MagicMock() + ] + + with self.assertRaises(Exception) as cm: + self.adapter.load_transformed("run-1") + + self.assertIn("remove-backfill", str(cm.exception)) + self.sink.store_transformed.assert_not_called() + + def test_load_transformed__raises_when_reads_empty(self): + self.adapter.output_controller.read_transformed_meters.return_value = [ + MagicMock() + ] + self.adapter.output_controller.read_transformed_meter_reads.return_value = [] + + with self.assertRaises(Exception) as cm: + self.adapter.load_transformed("run-1") + + self.assertIn("remove-backfill", str(cm.exception)) + self.sink.store_transformed.assert_not_called() + + def test_load_transformed__raises_when_both_empty(self): + self.adapter.output_controller.read_transformed_meters.return_value = [] + self.adapter.output_controller.read_transformed_meter_reads.return_value = [] + + with self.assertRaises(Exception) as cm: + self.adapter.load_transformed("run-1") + + self.assertIn("remove-backfill", str(cm.exception)) + self.sink.store_transformed.assert_not_called()