Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"description": "Feed Sync Dispatcher for Transitland",
"entry_point": "feed_sync_dispatcher_transitland",
"timeout": 3600,
"memory": "1Gi",
"trigger_http": true,
"include_folders": ["helpers"],
"include_api_folders": ["database_gen", "database", "common"],
Expand All @@ -17,5 +16,5 @@
"max_instance_count": 1,
"min_instance_count": 0,
"available_cpu": 1,
"available_memory": "512Mi"
"available_memory": "1Gi"
}
4 changes: 2 additions & 2 deletions functions-python/feed_sync_dispatcher_transitland/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from shared.helpers.feed_sync.feed_sync_common import FeedSyncProcessor, FeedSyncPayload
from shared.helpers.feed_sync.feed_sync_dispatcher import feed_sync_dispatcher
from shared.helpers.feed_sync.models import TransitFeedSyncPayload
from shared.helpers.logger import Logger
from shared.helpers.logger import init_logger
from shared.helpers.pub_sub import get_pubsub_client, get_execution_id
from typing import Tuple, List
from collections import defaultdict
Expand All @@ -47,6 +47,7 @@

# session instance to reuse connections
session = requests.Session()
init_logger()


def process_feed_urls(feed: dict, urls_in_db: List[str]) -> Tuple[List[str], List[str]]:
Expand Down Expand Up @@ -388,7 +389,6 @@ def feed_sync_dispatcher_transitland(request):
"""
HTTP Function entry point queries the transitland API and publishes events to a Pub/Sub topic to be processed.
"""
Logger.init_logger()
publisher = get_pubsub_client()
topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_NAME)
transit_land_feed_sync_processor = TransitFeedSyncProcessor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ def check_url_status(url: str) -> bool:
"""Check if a URL is reachable."""
try:
response = requests.head(url, timeout=10)
return response.status_code < 400 or response.status_code == 403
result = response.status_code < 400 or response.status_code == 403
if not result:
logging.error(
"Url [%s] replied with status code: [%s]", url, response.status_code
)
return result
except requests.RequestException:
logging.warning(f"Failed to reach URL: {url}")
logging.warning("Failed to reach URL: %s", url)
return False


Expand Down Expand Up @@ -92,10 +97,10 @@ def create_new_feed(session: Session, stable_id: str, payload: FeedPayload) -> F
)
if location:
new_feed.locations = [location]
logging.debug(f"Added location for feed {new_feed.id}")
logging.debug("Added location for feed %s", new_feed.id)

# Persist the new feed
session.add(new_feed)
session.flush()
logging.info(f"Created new feed with ID: {new_feed.id}")
logging.info("Created new feed with ID: %s", new_feed.id)
return new_feed
40 changes: 25 additions & 15 deletions functions-python/feed_sync_process_transitland/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from sqlalchemy.orm import Session

from shared.database.database import with_db_session
from shared.helpers.logger import Logger
from shared.helpers.logger import init_logger
from shared.database_gen.sqlacodegen_models import Feed
from shared.helpers.feed_sync.models import TransitFeedSyncPayload as FeedPayload
from feed_processor_utils import check_url_status, create_new_feed
Expand All @@ -35,6 +35,8 @@
DATASET_BATCH_TOPIC = os.getenv("DATASET_BATCH_TOPIC_NAME")
FEEDS_DATABASE_URL = os.getenv("FEEDS_DATABASE_URL")

init_logger()


class FeedProcessor:
def __init__(self, db_session: Session):
Expand All @@ -46,10 +48,12 @@ def process_feed(self, payload: FeedPayload) -> None:
"""Process a feed based on its database state."""
try:
logging.info(
f"Processing feed: external_id={payload.external_id}, feed_id={payload.feed_id}"
"Processing feed: external_id=%s, feed_id=%s",
payload.external_id,
payload.feed_id,
)
if not check_url_status(payload.feed_url):
logging.error(f"Feed URL not reachable: {payload.feed_url}. Skipping.")
logging.error("Feed URL not reachable: %s. Skipping.", payload.feed_url)
return

self.feed_stable_id = f"{payload.source}-{payload.stable_id}".lower()
Expand All @@ -70,9 +74,9 @@ def process_feed(self, payload: FeedPayload) -> None:
def _process_new_feed_or_skip(self, payload: FeedPayload) -> Optional[Feed]:
"""Process a new feed or skip if the URL already exists."""
if self._check_feed_url_exists(payload.feed_url):
logging.error(f"Feed URL already exists: {payload.feed_url}. Skipping.")
logging.error("Feed URL already exists: %s. Skipping.", payload.feed_url)
return
logging.info(f"Creating new feed for external_id: {payload.external_id}")
logging.info("Creating new feed for external_id: %s", payload.external_id)
return create_new_feed(self.session, self.feed_stable_id, payload)

def _process_existing_feed_refs(
Expand All @@ -83,7 +87,9 @@ def _process_existing_feed_refs(
f for f in current_feeds if f.producer_url == payload.feed_url
]
if matching_feeds:
logging.info(f"Feed with URL already exists: {payload.feed_url}. Skipping.")
logging.info(
"Feed with URL already exists: %s. Skipping.", payload.feed_url
)
return

stable_id_matches = [
Expand All @@ -92,12 +98,12 @@ def _process_existing_feed_refs(
reference_count = len(stable_id_matches)
active_match = [f for f in stable_id_matches if f.status == "active"]
if reference_count > 0:
logging.info(f"Updating feed for stable_id: {self.feed_stable_id}")
logging.info("Updating feed for stable_id: %s", self.feed_stable_id)
self.feed_stable_id = f"{self.feed_stable_id}_{reference_count}".lower()
new_feed = self._deprecate_old_feed(payload, active_match[0].id)
else:
logging.info(
f"No matching stable_id. Creating new feed for {payload.external_id}."
"No matching stable_id. Creating new feed for %s.", payload.external_id
)
new_feed = create_new_feed(self.session, self.feed_stable_id, payload)
return new_feed
Expand Down Expand Up @@ -125,7 +131,7 @@ def _deprecate_old_feed(
old_feed = self.session.get(Feed, old_feed_id)
if old_feed:
old_feed.status = "deprecated"
logging.info(f"Deprecated old feed: {old_feed.id}")
logging.info("Deprecated old feed: %s", old_feed.id)
return create_new_feed(self.session, self.feed_stable_id, payload)

def _publish_to_batch_topic_if_needed(
Expand Down Expand Up @@ -164,13 +170,13 @@ def _publish_to_topic(self, feed: Feed, payload: FeedPayload) -> None:
)
future.add_done_callback(
lambda _: logging.info(
f"Published feed {feed.stable_id} to dataset batch topic"
"Published feed %s to dataset batch topic", feed.stable_id
)
)
future.result()
logging.info(f"Message published for feed {feed.stable_id}")
logging.info("Message published for feed %s", feed.stable_id)
except Exception as e:
logging.error(f"Error publishing to dataset batch topic: {str(e)}")
logging.error("Error publishing to dataset batch topic: %s", str(e))
raise

def _rollback_transaction(self, message: str) -> None:
Expand All @@ -181,13 +187,17 @@ def _rollback_transaction(self, message: str) -> None:

@with_db_session
@functions_framework.cloud_event
def process_feed_event(cloud_event, db_session: Session) -> None:
def process_feed_event(cloud_event, db_session: Session) -> str:
"""Cloud Function entry point for feed processing."""
Logger.init_logger()
try:
message_data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
payload = FeedPayload(**json.loads(message_data))
processor = FeedProcessor(db_session)
processor.process_feed(payload)
result = "Feed processing completed successfully."
logging.info(result)
return result
except Exception as e:
logging.error(f"Error processing feed event: {str(e)}")
result = f"Error processing feed event: {str(e)}"
logging.error(result)
return result
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import base64
import json
import logging
from unittest import mock
from unittest.mock import patch, Mock, MagicMock

Expand Down Expand Up @@ -41,42 +40,6 @@ def mock_db_session():
return Mock()


class MockLogger:
"""Mock logger for testing"""

@staticmethod
def init_logger():
return MagicMock()

def __init__(self, name):
self.name = name
self._logger = logging.getLogger(name)

def get_logger(self):
mock_logger = MagicMock()
# Add all required logging methods
mock_logger.info = MagicMock()
mock_logger.error = MagicMock()
mock_logger.warning = MagicMock()
mock_logger.debug = MagicMock()
mock_logger.addFilter = MagicMock()
return mock_logger


@pytest.fixture(autouse=True)
def mock_logging():
"""Mock both local and GCP logging."""
with patch("main.logging") as mock_log, patch("main.Logger", MockLogger):
for logger in [mock_log]:
logger.info = MagicMock()
logger.error = MagicMock()
logger.warning = MagicMock()
logger.debug = MagicMock()
logger.addFilter = MagicMock()

yield mock_log


@pytest.fixture
def feed_payload():
"""Fixture for feed payload."""
Expand Down Expand Up @@ -150,7 +113,7 @@ def _create_payload_dict(feed_payload: FeedPayload) -> dict:
"payload_type": feed_payload.payload_type,
}

def test_get_current_feed_info(self, processor, feed_payload, mock_logging):
def test_get_current_feed_info(self, processor, feed_payload):
"""Test retrieving current feed information."""
# Mock database query
processor.session.query.return_value.filter.return_value.all.return_value = [
Expand Down Expand Up @@ -179,7 +142,7 @@ def test_get_current_feed_info(self, processor, feed_payload, mock_logging):
)
assert len(feeds) == 0

def test_check_feed_url_exists_comprehensive(self, processor, mock_logging):
def test_check_feed_url_exists_comprehensive(self, processor):
"""Test comprehensive feed URL existence checks."""
test_url = "https://example.com/feed"

Expand All @@ -191,7 +154,7 @@ def test_check_feed_url_exists_comprehensive(self, processor, mock_logging):
result = processor._check_feed_url_exists(test_url)
assert result is True

def test_database_error_handling(self, processor, feed_payload, mock_logging):
def test_database_error_handling(self, processor, feed_payload):
"""Test database error handling in different scenarios."""

# Test case 1: General database error during feed processing
Expand All @@ -201,9 +164,7 @@ def test_database_error_handling(self, processor, feed_payload, mock_logging):

processor._rollback_transaction.assert_called_once()

def test_publish_to_batch_topic_comprehensive(
self, processor, feed_payload, mock_logging
):
def test_publish_to_batch_topic_comprehensive(self, processor, feed_payload):
"""Test publishing to batch topic including success, error, and message format validation."""

# Test case 1: Successful publish with message format validation
Expand All @@ -227,7 +188,7 @@ def test_publish_to_batch_topic_comprehensive(
assert "feed_stable_id" in json.loads(message_arg["data"])
assert "tld-feed1" == json.loads(message_arg["data"])["feed_stable_id"]

def test_process_feed_event_validation(self, mock_logging):
def test_process_feed_event_validation(self):
"""Test feed event processing with various invalid payloads."""

# Test case 1: Empty payload
Expand Down Expand Up @@ -255,7 +216,7 @@ def test_process_feed_event_validation(self, mock_logging):
process_feed_event(cloud_event)

def test_process_feed_event_pubsub_error(
self, processor, feed_payload, mock_logging, mock_db_session
self, processor, feed_payload, mock_db_session
):
"""Test feed event processing handles missing credentials error."""
# Create cloud event with valid payload
Expand All @@ -274,7 +235,7 @@ def test_process_feed_event_pubsub_error(

process_feed_event(cloud_event, db_session=mock_session)

def test_process_feed_event_malformed_cloud_event(self, mock_logging):
def test_process_feed_event_malformed_cloud_event(self):
"""Test feed event processing with malformed cloud event."""
# Test case 1: Missing message data
cloud_event = Mock()
Expand All @@ -287,7 +248,7 @@ def test_process_feed_event_malformed_cloud_event(self, mock_logging):

process_feed_event(cloud_event)

def test_process_feed_event_invalid_json(self, mock_logging):
def test_process_feed_event_invalid_json(self):
"""Test handling of invalid JSON in cloud event"""
# Create invalid base64 encoded JSON
invalid_json = base64.b64encode(b'{"invalid": "json"').decode()
Expand All @@ -296,14 +257,14 @@ def test_process_feed_event_invalid_json(self, mock_logging):
cloud_event.data = {"message": {"data": invalid_json}}

# Process the event
process_feed_event(cloud_event)
result = process_feed_event(cloud_event)

# Verify error handling
mock_logging.error.assert_called()
assert result.startswith("Error processing feed event")

@patch("main.create_new_feed")
def test_process_new_feed_or_skip(
self, create_new_feed_mock, processor, feed_payload, mock_logging
self, create_new_feed_mock, processor, feed_payload
):
"""Test processing new feed or skipping existing feed."""
processor._check_feed_url_exists = MagicMock()
Expand All @@ -313,9 +274,7 @@ def test_process_new_feed_or_skip(
create_new_feed_mock.assert_called_once()

@patch("main.create_new_feed")
def test_process_new_feed_skip(
self, create_new_feed_mock, processor, feed_payload, mock_logging
):
def test_process_new_feed_skip(self, create_new_feed_mock, processor, feed_payload):
"""Test processing new feed or skipping existing feed."""
processor._check_feed_url_exists = MagicMock()
# Test case 2: Existing feed
Expand All @@ -325,7 +284,7 @@ def test_process_new_feed_skip(

@patch("main.create_new_feed")
def test_process_existing_feed_refs(
self, create_new_feed_mock, processor, feed_payload, mock_logging
self, create_new_feed_mock, processor, feed_payload
):
"""Test processing existing feed references."""
# 1. Existing feed with same url
Expand Down
Loading