Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions amiadapters/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ def load_transformed(self, run_id: str):
with self._base_adapter_metrics.load_transformed_timer():
meters = self.output_controller.read_transformed_meters(run_id)
reads = self.output_controller.read_transformed_meter_reads(run_id)
if not meters or not reads:
raise Exception(
f"Extract for {self.org_id} (run_id={run_id}) produced "
f"{len(meters)} meters and {len(reads)} reads. "
f"If this is a backfill DAG that has reached the vendor's data floor, "
f"decommission it with: "
f"`python cli.py config remove-backfill {self.org_id} <start_date> <end_date> --profile <profile>`. "
f"Otherwise this likely indicates a vendor outage or authentication failure."
)
for sink in self.storage_sinks:
sink.store_transformed(run_id, meters, reads)

Expand Down
16 changes: 12 additions & 4 deletions amiadapters/outputs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ def read_transformed_meters(self, run_id: str) -> List[GeneralMeter]:
logger.info(f"Reading meters from {path}")
with open(path, "r") as f:
text = f.read()
meters = [GeneralMeter(**json.loads(d)) for d in text.strip().split("\n")]
if not text.strip():
meters = []
else:
meters = [
GeneralMeter(**json.loads(d)) for d in text.strip().split("\n")
]
logger.info(f"Read {len(meters)} meters from {path}")
return meters

Expand All @@ -84,9 +89,12 @@ def read_transformed_meter_reads(self, run_id: str) -> List[GeneralMeterRead]:
logger.info(f"Reading meter reads from {path}")
with open(path, "r") as f:
text = f.read()
reads = [
GeneralMeterRead(**json.loads(d)) for d in text.strip().split("\n")
]
if not text.strip():
reads = []
else:
reads = [
GeneralMeterRead(**json.loads(d)) for d in text.strip().split("\n")
]
logger.info(f"Read {len(reads)} meter reads from {path}")
return reads

Expand Down
4 changes: 4 additions & 0 deletions amiadapters/outputs/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def read_transformed_meters(self, run_id: str) -> List[GeneralMeter]:
key = self._s3_key(run_id, self.TRANSFORM, "meters.json.gz")
logger.info(f"Downloading meters from s3://{self.bucket_name}/{key}")
text = self._download_string_from_s3(key)
if not text.strip():
return []
return [GeneralMeter(**json.loads(line)) for line in text.strip().split("\n")]

def write_transformed_meter_reads(self, run_id: str, reads: List[GeneralMeterRead]):
Expand All @@ -101,6 +103,8 @@ def read_transformed_meter_reads(self, run_id: str) -> List[GeneralMeterRead]:
key = self._s3_key(run_id, self.TRANSFORM, "reads.json.gz")
logger.info(f"Downloading reads from s3://{self.bucket_name}/{key}")
text = self._download_string_from_s3(key)
if not text.strip():
return []
return [
GeneralMeterRead(**json.loads(line)) for line in text.strip().split("\n")
]
Expand Down
10 changes: 10 additions & 0 deletions test/amiadapters/outputs/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,13 @@ def test_write_and_read_transformed_meter_reads(self):
self.assertEqual(reads_out[0].device_id, "1")
self.assertEqual(reads_out[0].register_value, 123.4)
self.assertEqual(reads_out[1].device_id, "2")

def test_read_transformed_meters__empty_file_returns_empty_list(self):
self.controller.write_transformed_meters("run123", [])
result = self.controller.read_transformed_meters("run123")
self.assertEqual([], result)

def test_read_transformed_meter_reads__empty_file_returns_empty_list(self):
self.controller.write_transformed_meter_reads("run123", [])
result = self.controller.read_transformed_meter_reads("run123")
self.assertEqual([], result)
14 changes: 14 additions & 0 deletions test/amiadapters/outputs/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ def test_write_and_read_transformed_meter_reads(self):
self.assertEqual(result[1].device_id, "2")
self.assertEqual(result[1].register_value, 227.6)

def test_read_transformed_meters__empty_file_returns_empty_list(self):
self.mock_s3.get_object.return_value = {
"Body": MagicMock(read=lambda: self._gzip(""))
}
result = self.controller.read_transformed_meters("runid")
self.assertEqual([], result)

def test_read_transformed_meter_reads__empty_file_returns_empty_list(self):
self.mock_s3.get_object.return_value = {
"Body": MagicMock(read=lambda: self._gzip(""))
}
result = self.controller.read_transformed_meter_reads("runid")
self.assertEqual([], result)

def _gzip(self, content: str) -> bytes:
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
Expand Down
65 changes: 65 additions & 0 deletions test/amiadapters/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,68 @@ def test_calculate_extract_range__backfill_with_snowflake_sink_that_gives_no_old
self.calculator.calculate_extract_range(
None, None, backfill_params=backfill_params
)


class TestLoadTransformed(BaseTestCase):

def setUp(self):
self.adapter = Beacon360Adapter(
api_user="user",
api_password="pass",
use_cache=False,
pipeline_configuration=self.TEST_PIPELINE_CONFIGURATION,
org_id="test-org",
org_timezone=pytz.timezone("Europe/Rome"),
configured_task_output_controller=self.TEST_TASK_OUTPUT_CONTROLLER_CONFIGURATION,
configured_metrics=self.TEST_METRICS_CONFIGURATION,
configured_sinks=[],
)
self.adapter.output_controller = MagicMock()
self.sink = MagicMock()
self.adapter.storage_sinks = [self.sink]

def test_load_transformed__happy_path_calls_sink(self):
self.adapter.output_controller.read_transformed_meters.return_value = [
MagicMock()
]
self.adapter.output_controller.read_transformed_meter_reads.return_value = [
MagicMock()
]

self.adapter.load_transformed("run-1")

self.sink.store_transformed.assert_called_once()

def test_load_transformed__raises_when_meters_empty(self):
self.adapter.output_controller.read_transformed_meters.return_value = []
self.adapter.output_controller.read_transformed_meter_reads.return_value = [
MagicMock()
]

with self.assertRaises(Exception) as cm:
self.adapter.load_transformed("run-1")

self.assertIn("remove-backfill", str(cm.exception))
self.sink.store_transformed.assert_not_called()

def test_load_transformed__raises_when_reads_empty(self):
self.adapter.output_controller.read_transformed_meters.return_value = [
MagicMock()
]
self.adapter.output_controller.read_transformed_meter_reads.return_value = []

with self.assertRaises(Exception) as cm:
self.adapter.load_transformed("run-1")

self.assertIn("remove-backfill", str(cm.exception))
self.sink.store_transformed.assert_not_called()

def test_load_transformed__raises_when_both_empty(self):
self.adapter.output_controller.read_transformed_meters.return_value = []
self.adapter.output_controller.read_transformed_meter_reads.return_value = []

with self.assertRaises(Exception) as cm:
self.adapter.load_transformed("run-1")

self.assertIn("remove-backfill", str(cm.exception))
self.sink.store_transformed.assert_not_called()
Loading