Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,14 @@ jobs:
run: |
make dbt

- name: Deploy Native App
run: |
make app
# Native App deployment (`make app`) is temporarily disabled pending a
# LocalStack Snowflake emulator fix - `snow app run` triggers an
# `Unknown function INTERNAL$TO_CHAR` error while creating the
# application package database. Track the upstream issue before
# re-enabling this step.
# - name: Deploy Native App
# run: |
# make app

- name: Run tests
run: |
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ The dashboard provides:
- Predictive maintenance recommendations
- Anomaly detection and alerting

> **Known issue:** `make app` currently fails on the LocalStack Snowflake emulator with `Unknown function INTERNAL$TO_CHAR` when `snow app run` tries to initialise the application package database. See [Known LocalStack Limitations](#known-localstack-limitations) for details and the tracking status.

## Testing

You can run full end-to-end integration tests using the following command:
Expand Down Expand Up @@ -205,6 +207,15 @@ The key advantages are:

For automated Cloud Pods management in CI/CD pipelines, check out the sample workflow in [`.github/workflows/cloud-pods.yml`](.github/workflows/cloud-pods.yml).

## Known LocalStack Limitations

The core data pipeline (Snowpipe configuration → S3 ingestion → dbt models → pytest/dbt tests) is fully functional against `localstack/snowflake:latest` (tested on April 2026). Two bugs in the LocalStack Snowflake emulator affect advanced behaviour and are worked around or disabled in this repo until they are fixed upstream:

1. **Snowpipe auto-ingest does not execute `COPY` for new S3 objects.** The emulator creates the internal SQS queue (`sf-snowpipe-<ACCOUNT>`), delivers S3 `ObjectCreated` events to it, and the `PipeRunner` worker consumes them — but the auto-generated query rewrites the `FROM` clause to `@stage/<file>.csv`, which returns zero rows in the emulator's COPY implementation. As a workaround, `setup/03_upload_file.py` runs a scoped `COPY INTO ... FROM @SENSOR_DATA_STAGE PATTERN='.*<filename>'` immediately after each S3 upload, which mirrors what Snowpipe would do in real Snowflake. The `PIPE` object itself is still created so `SHOW PIPES` / `DESC PIPE` keep working.
2. **`make app` fails with `Unknown function INTERNAL$TO_CHAR`.** `snow app run -c localstack` starts by running internal `USE DATABASE` / stage-creation queries against the newly created application-package database (e.g. `factory_app_pkg_<user>`) and those queries call the internal `INTERNAL$TO_CHAR` helper. The function is registered on the default `FACTORY_PIPELINE_DEMO` database but not on databases that `snow app` creates implicitly, so the call blows up before any app artefacts are staged. `make app` is therefore skipped in the CI workflow; the Streamlit UI code under `app/` is still authored the same way as for real Snowflake, so the target will work again once the emulator registers utility functions for application-package databases.

If you hit these issues in your own experiments, please file them against the LocalStack Snowflake emulator with the repro steps from this repo.

## License

This project is licensed under the [Apache License 2.0](LICENSE).
112 changes: 77 additions & 35 deletions setup/02_configure_s3_bucket.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,115 @@
import boto3
import json
import snowflake.connector

# Configuration
S3_ENDPOINT_URL = "http://localhost:4566"
S3_BUCKET_NAME = "factory-sensor-data-local"
SQS_QUEUE_ARN = "arn:aws:sqs:us-east-1:000000000000:sf-snowpipe-test"

SNOWFLAKE_CONFIG = {
"account": "test",
"user": "test",
"password": "test",
"database": "FACTORY_PIPELINE_DEMO",
"schema": "PUBLIC",
"host": "snowflake.localhost.localstack.cloud",
"port": 4566,
"protocol": "https",
"warehouse": "test",
"role": "test",
}

PIPE_NAME = "SENSOR_DATA_PIPE"


def get_snowpipe_notification_arn():
"""Fetch the notification channel (SQS ARN) from the Snowpipe definition.

LocalStack for Snowflake auto-provisions an SQS queue named
``sf-snowpipe-<ACCOUNT>`` when ``AUTO_INGEST = TRUE`` pipes are created.
The account is normalised to upper-case, so hard-coding the ARN is
fragile - always fetch it via ``DESC PIPE`` instead.
"""
conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
try:
cursor = conn.cursor()
cursor.execute(f"DESC PIPE {PIPE_NAME}")
columns = [col[0].lower() for col in cursor.description]
row = cursor.fetchone()
if row is None:
raise RuntimeError(f"Pipe '{PIPE_NAME}' not found - run 'make seed' first.")
record = dict(zip(columns, row))
notification_channel = record.get("notification_channel")
if not notification_channel:
raise RuntimeError(
f"Pipe '{PIPE_NAME}' has no notification_channel; "
"ensure it was created with AUTO_INGEST = TRUE."
)
return notification_channel
finally:
conn.close()


def create_s3_bucket():
"""Create S3 bucket for sensor data"""
"""Create the S3 bucket that will hold raw sensor data."""
s3 = boto3.client(
"s3",
endpoint_url=S3_ENDPOINT_URL,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1"
"s3",
endpoint_url=S3_ENDPOINT_URL,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)

# Create bucket

try:
s3.create_bucket(Bucket=S3_BUCKET_NAME)
print(f"Bucket '{S3_BUCKET_NAME}' created successfully.")
except Exception as e:
print(f"Could not create bucket: {e}")

def configure_event_notification():
"""Configure S3 bucket notifications for Snowpipe"""

def configure_event_notification(sqs_queue_arn):
"""Configure S3 bucket notifications so Snowpipe auto-ingests new CSVs."""
s3 = boto3.client(
"s3",
endpoint_url=S3_ENDPOINT_URL,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1"
"s3",
endpoint_url=S3_ENDPOINT_URL,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)

# Configure bucket notification for Snowpipe

notification_config = {
"QueueConfigurations": [
{
"Id": "snowpipe-ingest-notification",
"QueueArn": SQS_QUEUE_ARN,
"QueueArn": sqs_queue_arn,
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{
"Name": "prefix",
"Value": "raw_data/"
},
{
"Name": "suffix",
"Value": ".csv"
}
{"Name": "prefix", "Value": "raw_data/"},
{"Name": "suffix", "Value": ".csv"},
]
}
}
},
}
]
}

try:
s3.put_bucket_notification_configuration(
Bucket=S3_BUCKET_NAME,
NotificationConfiguration=notification_config
NotificationConfiguration=notification_config,
)
print(
f"Event notification configured for bucket '{S3_BUCKET_NAME}' "
f"-> {sqs_queue_arn}"
)
print(f"Event notification configured for bucket '{S3_BUCKET_NAME}'")
except Exception as e:
print(f"Could not configure event notification: {e}")
raise


if __name__ == "__main__":
create_s3_bucket()
configure_event_notification()
print("S3 bucket setup complete with Snowpipe notification configuration.")
sqs_queue_arn = get_snowpipe_notification_arn()
print(f"Resolved Snowpipe notification ARN: {sqs_queue_arn}")
configure_event_notification(sqs_queue_arn)
print("S3 bucket setup complete with Snowpipe notification configuration.")
72 changes: 66 additions & 6 deletions setup/03_upload_file.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
import boto3
import time
import argparse
import os
import glob
import os
import re
import time

import boto3
import snowflake.connector

# Configuration
S3_ENDPOINT_URL = "http://localhost:4566"
S3_BUCKET_NAME = "factory-sensor-data-local"
DEFAULT_FILE_PATH = "data/sensor_data_batch_1.csv"

SNOWFLAKE_CONFIG = {
"account": "test",
"user": "test",
"password": "test",
"database": "FACTORY_PIPELINE_DEMO",
"schema": "PUBLIC",
"host": "snowflake.localhost.localstack.cloud",
"port": 4566,
"protocol": "https",
"warehouse": "test",
"role": "test",
}

def find_latest_batch_file(data_dir="data"):
"""Find the batch file with the highest number"""
if not os.path.exists(data_dir):
Expand Down Expand Up @@ -64,13 +78,57 @@ def upload_file_to_s3(file_path, custom_filename=None):
new_name = f"{base_filename}_{timestamp}"
target_filename = f"raw_data/{new_name}"

# Upload file
try:
s3.upload_file(file_path, S3_BUCKET_NAME, target_filename)
print(f"File '{file_path}' uploaded to '{S3_BUCKET_NAME}/{target_filename}'")
print("Snowpipe should now automatically ingest this file into RAW_SENSOR_DATA table")
return target_filename
except Exception as e:
print(f"Could not upload file: {e}")
return None


def trigger_snowpipe_copy(uploaded_filename):
"""Execute a scoped COPY INTO as a workaround for a LocalStack bug.

Real Snowflake runs the pipe's COPY automatically once S3 delivers a
notification to its internal SQS queue. The LocalStack Snowflake
emulator (tested on ``localstack/snowflake:latest`` as of 2026-04)
consumes the SQS notifications, but the auto-generated COPY targets
``@stage/file.csv`` which returns zero rows in the emulator. To keep
the demo end-to-end functional, we run the COPY ourselves and scope
it to the file that was just uploaded via the ``PATTERN`` clause so
running ``make upload`` multiple times does not re-load previous
batches. Remove this helper once the upstream bug is fixed.
"""
filename = os.path.basename(uploaded_filename)
pattern = f".*{filename}"
print(
"Running scoped COPY INTO to emulate Snowpipe auto-ingest "
f"(LocalStack workaround, pattern='{pattern}')..."
)
conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
try:
cursor = conn.cursor()
cursor.execute(
"COPY INTO RAW_SENSOR_DATA "
"FROM @SENSOR_DATA_STAGE "
f"PATTERN='{pattern}' "
"ON_ERROR='CONTINUE'"
)
rows = cursor.fetchall()
total_loaded = 0
for row in rows:
status = row[1] if len(row) > 1 else ""
loaded = row[3] if len(row) > 3 else 0
try:
total_loaded += int(loaded or 0)
except (TypeError, ValueError):
pass
print(f" status={status}, rows_loaded={loaded}")
print(f"Snowpipe COPY complete. New rows loaded: {total_loaded}")
finally:
conn.close()

def list_bucket_contents():
"""List contents of the S3 bucket"""
Expand Down Expand Up @@ -113,5 +171,7 @@ def list_bucket_contents():
else:
file_to_upload = args.file

upload_file_to_s3(file_to_upload, args.name)
uploaded_key = upload_file_to_s3(file_to_upload, args.name)
list_bucket_contents()
if uploaded_key:
trigger_snowpipe_copy(uploaded_key)
12 changes: 7 additions & 5 deletions tests/test_machine_health_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ def test_machine_health_metrics_data_types(snowflake_conn):
pytest.skip("No data available in machine health metrics table")

df = pd.DataFrame(data, columns=columns)

# Type validations
assert df['machine_id'].dtype == 'object', "machine_id should be string type"
assert df['health_status'].dtype == 'object', "health_status should be string type"

# Type validations (accept both legacy `object` and modern `StringDtype`)
assert pd.api.types.is_string_dtype(df['machine_id']), \
"machine_id should be string type"
assert pd.api.types.is_string_dtype(df['health_status']), \
"health_status should be string type"
assert pd.to_numeric(df['failure_risk_score'], errors='coerce').notnull().all(), \
"failure_risk_score should be numeric"
assert df['maintenance_recommendation'].dtype == 'object', \
assert pd.api.types.is_string_dtype(df['maintenance_recommendation']), \
"maintenance_recommendation should be string type"

# Value validations
Expand Down
Loading