diff --git a/functions-python/feed_sync_dispatcher_transitland/function_config.json b/functions-python/feed_sync_dispatcher_transitland/function_config.json index 28fff8001..70606e46a 100644 --- a/functions-python/feed_sync_dispatcher_transitland/function_config.json +++ b/functions-python/feed_sync_dispatcher_transitland/function_config.json @@ -3,7 +3,6 @@ "description": "Feed Sync Dispatcher for Transitland", "entry_point": "feed_sync_dispatcher_transitland", "timeout": 3600, - "memory": "1Gi", "trigger_http": true, "include_folders": ["helpers"], "include_api_folders": ["database_gen", "database", "common"], @@ -17,5 +16,5 @@ "max_instance_count": 1, "min_instance_count": 0, "available_cpu": 1, - "available_memory": "512Mi" + "available_memory": "1Gi" } diff --git a/functions-python/feed_sync_dispatcher_transitland/src/main.py b/functions-python/feed_sync_dispatcher_transitland/src/main.py index 9920eaaac..cae657834 100644 --- a/functions-python/feed_sync_dispatcher_transitland/src/main.py +++ b/functions-python/feed_sync_dispatcher_transitland/src/main.py @@ -32,7 +32,7 @@ from shared.helpers.feed_sync.feed_sync_common import FeedSyncProcessor, FeedSyncPayload from shared.helpers.feed_sync.feed_sync_dispatcher import feed_sync_dispatcher from shared.helpers.feed_sync.models import TransitFeedSyncPayload -from shared.helpers.logger import Logger +from shared.helpers.logger import init_logger from shared.helpers.pub_sub import get_pubsub_client, get_execution_id from typing import Tuple, List from collections import defaultdict @@ -47,6 +47,7 @@ # session instance to reuse connections session = requests.Session() +init_logger() def process_feed_urls(feed: dict, urls_in_db: List[str]) -> Tuple[List[str], List[str]]: @@ -388,7 +389,6 @@ def feed_sync_dispatcher_transitland(request): """ HTTP Function entry point queries the transitland API and publishes events to a Pub/Sub topic to be processed. """ - Logger.init_logger() publisher = get_pubsub_client() topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_NAME) transit_land_feed_sync_processor = TransitFeedSyncProcessor() diff --git a/functions-python/feed_sync_process_transitland/src/feed_processor_utils.py b/functions-python/feed_sync_process_transitland/src/feed_processor_utils.py index bf2b308a7..e9f8814e3 100644 --- a/functions-python/feed_sync_process_transitland/src/feed_processor_utils.py +++ b/functions-python/feed_sync_process_transitland/src/feed_processor_utils.py @@ -19,9 +19,14 @@ def check_url_status(url: str) -> bool: """Check if a URL is reachable.""" try: response = requests.head(url, timeout=10) - return response.status_code < 400 or response.status_code == 403 + result = response.status_code < 400 or response.status_code == 403 + if not result: + logging.error( + "Url [%s] replied with status code: [%s]", url, response.status_code + ) + return result except requests.RequestException: - logging.warning(f"Failed to reach URL: {url}") + logging.warning("Failed to reach URL: %s", url) return False @@ -92,10 +97,10 @@ def create_new_feed(session: Session, stable_id: str, payload: FeedPayload) -> F ) if location: new_feed.locations = [location] - logging.debug(f"Added location for feed {new_feed.id}") + logging.debug("Added location for feed %s", new_feed.id) # Persist the new feed session.add(new_feed) session.flush() - logging.info(f"Created new feed with ID: {new_feed.id}") + logging.info("Created new feed with ID: %s", new_feed.id) return new_feed diff --git a/functions-python/feed_sync_process_transitland/src/main.py b/functions-python/feed_sync_process_transitland/src/main.py index 81499c11d..5c74f68a0 100644 --- a/functions-python/feed_sync_process_transitland/src/main.py +++ b/functions-python/feed_sync_process_transitland/src/main.py @@ -25,7 +25,7 @@ from sqlalchemy.orm import Session from shared.database.database import with_db_session -from shared.helpers.logger import Logger +from shared.helpers.logger import init_logger from shared.database_gen.sqlacodegen_models import Feed from shared.helpers.feed_sync.models import TransitFeedSyncPayload as FeedPayload from feed_processor_utils import check_url_status, create_new_feed @@ -35,6 +35,8 @@ DATASET_BATCH_TOPIC = os.getenv("DATASET_BATCH_TOPIC_NAME") FEEDS_DATABASE_URL = os.getenv("FEEDS_DATABASE_URL") +init_logger() + class FeedProcessor: def __init__(self, db_session: Session): @@ -46,10 +48,12 @@ def process_feed(self, payload: FeedPayload) -> None: """Process a feed based on its database state.""" try: logging.info( - f"Processing feed: external_id={payload.external_id}, feed_id={payload.feed_id}" + "Processing feed: external_id=%s, feed_id=%s", + payload.external_id, + payload.feed_id, ) if not check_url_status(payload.feed_url): - logging.error(f"Feed URL not reachable: {payload.feed_url}. Skipping.") + logging.error("Feed URL not reachable: %s. Skipping.", payload.feed_url) return self.feed_stable_id = f"{payload.source}-{payload.stable_id}".lower() @@ -70,9 +74,9 @@ def process_feed(self, payload: FeedPayload) -> None: def _process_new_feed_or_skip(self, payload: FeedPayload) -> Optional[Feed]: """Process a new feed or skip if the URL already exists.""" if self._check_feed_url_exists(payload.feed_url): - logging.error(f"Feed URL already exists: {payload.feed_url}. Skipping.") + logging.error("Feed URL already exists: %s. Skipping.", payload.feed_url) return - logging.info(f"Creating new feed for external_id: {payload.external_id}") + logging.info("Creating new feed for external_id: %s", payload.external_id) return create_new_feed(self.session, self.feed_stable_id, payload) def _process_existing_feed_refs( @@ -83,7 +87,9 @@ def _process_existing_feed_refs( f for f in current_feeds if f.producer_url == payload.feed_url ] if matching_feeds: - logging.info(f"Feed with URL already exists: {payload.feed_url}. Skipping.") + logging.info( + "Feed with URL already exists: %s. Skipping.", payload.feed_url + ) return stable_id_matches = [ @@ -92,12 +98,12 @@ def _process_existing_feed_refs( reference_count = len(stable_id_matches) active_match = [f for f in stable_id_matches if f.status == "active"] if reference_count > 0: - logging.info(f"Updating feed for stable_id: {self.feed_stable_id}") + logging.info("Updating feed for stable_id: %s", self.feed_stable_id) self.feed_stable_id = f"{self.feed_stable_id}_{reference_count}".lower() new_feed = self._deprecate_old_feed(payload, active_match[0].id) else: logging.info( - f"No matching stable_id. Creating new feed for {payload.external_id}." + "No matching stable_id. Creating new feed for %s.", payload.external_id ) new_feed = create_new_feed(self.session, self.feed_stable_id, payload) return new_feed @@ -125,7 +131,7 @@ def _deprecate_old_feed( old_feed = self.session.get(Feed, old_feed_id) if old_feed: old_feed.status = "deprecated" - logging.info(f"Deprecated old feed: {old_feed.id}") + logging.info("Deprecated old feed: %s", old_feed.id) return create_new_feed(self.session, self.feed_stable_id, payload) def _publish_to_batch_topic_if_needed( @@ -164,13 +170,13 @@ def _publish_to_topic(self, feed: Feed, payload: FeedPayload) -> None: ) future.add_done_callback( lambda _: logging.info( - f"Published feed {feed.stable_id} to dataset batch topic" + "Published feed %s to dataset batch topic", feed.stable_id ) ) future.result() - logging.info(f"Message published for feed {feed.stable_id}") + logging.info("Message published for feed %s", feed.stable_id) except Exception as e: - logging.error(f"Error publishing to dataset batch topic: {str(e)}") + logging.error("Error publishing to dataset batch topic: %s", str(e)) raise def _rollback_transaction(self, message: str) -> None: @@ -181,13 +187,17 @@ def _rollback_transaction(self, message: str) -> None: @with_db_session @functions_framework.cloud_event -def process_feed_event(cloud_event, db_session: Session) -> None: +def process_feed_event(cloud_event, db_session: Session) -> str: """Cloud Function entry point for feed processing.""" - Logger.init_logger() try: message_data = base64.b64decode(cloud_event.data["message"]["data"]).decode() payload = FeedPayload(**json.loads(message_data)) processor = FeedProcessor(db_session) processor.process_feed(payload) + result = "Feed processing completed successfully." + logging.info(result) + return result except Exception as e: - logging.error(f"Error processing feed event: {str(e)}") + result = f"Error processing feed event: {str(e)}" + logging.error(result) + return result diff --git a/functions-python/feed_sync_process_transitland/tests/test_feed_sync_process.py b/functions-python/feed_sync_process_transitland/tests/test_feed_sync_process.py index eaa726bb6..c2fc06b17 100644 --- a/functions-python/feed_sync_process_transitland/tests/test_feed_sync_process.py +++ b/functions-python/feed_sync_process_transitland/tests/test_feed_sync_process.py @@ -1,6 +1,5 @@ import base64 import json -import logging from unittest import mock from unittest.mock import patch, Mock, MagicMock @@ -41,42 +40,6 @@ def mock_db_session(): return Mock() -class MockLogger: - """Mock logger for testing""" - - @staticmethod - def init_logger(): - return MagicMock() - - def __init__(self, name): - self.name = name - self._logger = logging.getLogger(name) - - def get_logger(self): - mock_logger = MagicMock() - # Add all required logging methods - mock_logger.info = MagicMock() - mock_logger.error = MagicMock() - mock_logger.warning = MagicMock() - mock_logger.debug = MagicMock() - mock_logger.addFilter = MagicMock() - return mock_logger - - -@pytest.fixture(autouse=True) -def mock_logging(): - """Mock both local and GCP logging.""" - with patch("main.logging") as mock_log, patch("main.Logger", MockLogger): - for logger in [mock_log]: - logger.info = MagicMock() - logger.error = MagicMock() - logger.warning = MagicMock() - logger.debug = MagicMock() - logger.addFilter = MagicMock() - - yield mock_log - - @pytest.fixture def feed_payload(): """Fixture for feed payload.""" @@ -150,7 +113,7 @@ def _create_payload_dict(feed_payload: FeedPayload) -> dict: "payload_type": feed_payload.payload_type, } - def test_get_current_feed_info(self, processor, feed_payload, mock_logging): + def test_get_current_feed_info(self, processor, feed_payload): """Test retrieving current feed information.""" # Mock database query processor.session.query.return_value.filter.return_value.all.return_value = [ @@ -179,7 +142,7 @@ def test_get_current_feed_info(self, processor, feed_payload, mock_logging): ) assert len(feeds) == 0 - def test_check_feed_url_exists_comprehensive(self, processor, mock_logging): + def test_check_feed_url_exists_comprehensive(self, processor): """Test comprehensive feed URL existence checks.""" test_url = "https://example.com/feed" @@ -191,7 +154,7 @@ def test_check_feed_url_exists_comprehensive(self, processor, mock_logging): result = processor._check_feed_url_exists(test_url) assert result is True - def test_database_error_handling(self, processor, feed_payload, mock_logging): + def test_database_error_handling(self, processor, feed_payload): """Test database error handling in different scenarios.""" # Test case 1: General database error during feed processing @@ -201,9 +164,7 @@ def test_database_error_handling(self, processor, feed_payload, mock_logging): processor._rollback_transaction.assert_called_once() - def test_publish_to_batch_topic_comprehensive( - self, processor, feed_payload, mock_logging - ): + def test_publish_to_batch_topic_comprehensive(self, processor, feed_payload): """Test publishing to batch topic including success, error, and message format validation.""" # Test case 1: Successful publish with message format validation @@ -227,7 +188,7 @@ def test_publish_to_batch_topic_comprehensive( assert "feed_stable_id" in json.loads(message_arg["data"]) assert "tld-feed1" == json.loads(message_arg["data"])["feed_stable_id"] - def test_process_feed_event_validation(self, mock_logging): + def test_process_feed_event_validation(self): """Test feed event processing with various invalid payloads.""" # Test case 1: Empty payload @@ -255,7 +216,7 @@ def test_process_feed_event_validation(self, mock_logging): process_feed_event(cloud_event) def test_process_feed_event_pubsub_error( - self, processor, feed_payload, mock_logging, mock_db_session + self, processor, feed_payload, mock_db_session ): """Test feed event processing handles missing credentials error.""" # Create cloud event with valid payload @@ -274,7 +235,7 @@ def test_process_feed_event_pubsub_error( process_feed_event(cloud_event, db_session=mock_session) - def test_process_feed_event_malformed_cloud_event(self, mock_logging): + def test_process_feed_event_malformed_cloud_event(self): """Test feed event processing with malformed cloud event.""" # Test case 1: Missing message data cloud_event = Mock() @@ -287,7 +248,7 @@ def test_process_feed_event_malformed_cloud_event(self, mock_logging): process_feed_event(cloud_event) - def test_process_feed_event_invalid_json(self, mock_logging): + def test_process_feed_event_invalid_json(self): """Test handling of invalid JSON in cloud event""" # Create invalid base64 encoded JSON invalid_json = base64.b64encode(b'{"invalid": "json"').decode() @@ -296,14 +257,14 @@ def test_process_feed_event_invalid_json(self, mock_logging): cloud_event.data = {"message": {"data": invalid_json}} # Process the event - process_feed_event(cloud_event) + result = process_feed_event(cloud_event) # Verify error handling - mock_logging.error.assert_called() + assert result.startswith("Error processing feed event") @patch("main.create_new_feed") def test_process_new_feed_or_skip( - self, create_new_feed_mock, processor, feed_payload, mock_logging + self, create_new_feed_mock, processor, feed_payload ): """Test processing new feed or skipping existing feed.""" processor._check_feed_url_exists = MagicMock() @@ -313,9 +274,7 @@ def test_process_new_feed_or_skip( create_new_feed_mock.assert_called_once() @patch("main.create_new_feed") - def test_process_new_feed_skip( - self, create_new_feed_mock, processor, feed_payload, mock_logging - ): + def test_process_new_feed_skip(self, create_new_feed_mock, processor, feed_payload): """Test processing new feed or skipping existing feed.""" processor._check_feed_url_exists = MagicMock() # Test case 2: Existing feed @@ -325,7 +284,7 @@ def test_process_new_feed_skip( @patch("main.create_new_feed") def test_process_existing_feed_refs( - self, create_new_feed_mock, processor, feed_payload, mock_logging + self, create_new_feed_mock, processor, feed_payload ): """Test processing existing feed references.""" # 1. Existing feed with same url