|
3 | 3 | import itertools |
4 | 4 | import logging |
5 | 5 | import sys |
| 6 | +import time |
6 | 7 | import warnings |
7 | 8 | from typing import ( |
8 | 9 | Any, |
@@ -70,66 +71,73 @@ def base_handler( |
70 | 71 | ) -> Callable[[Any, Any], List[Any]]: |
71 | 72 | @functools.wraps(func) |
72 | 73 | def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: |
73 | | - with LoggingContext( |
74 | | - aws_request_id=aws_context.aws_request_id, |
75 | | - asset_id=None, |
76 | | - app_connection_id=None, |
77 | | - handler=logging.StreamHandler(stream=sys.stdout), |
78 | | - user_handler=handler, |
79 | | - logger=CORVA_LOGGER, |
80 | | - ) as logging_ctx: |
81 | | - # Verify either current call from app_decorator or not |
82 | | - # for instance from partial rerun merge |
83 | | - ( |
84 | | - raw_custom_event_type, |
85 | | - custom_handler, |
86 | | - ) = _get_custom_event_type_by_raw_aws_event(aws_event) |
87 | | - is_direct_app_call: bool = not custom_handler |
88 | | - data_transformation_type = raw_custom_event_type or raw_event_type |
89 | | - if merge_events: |
90 | | - aws_event = _merge_events(aws_event, data_transformation_type) |
91 | | - |
92 | | - if ( |
93 | | - is_direct_app_call |
94 | | - and data_transformation_type not in GENERIC_APP_EVENT_TYPES |
95 | | - ): |
96 | | - CORVA_LOGGER.warning( |
97 | | - f"Handler for {data_transformation_type.__name__!r} " |
98 | | - f"event not found. Skipping..." |
99 | | - ) |
100 | | - return [] |
101 | | - |
102 | | - if is_direct_app_call: |
103 | | - # Means current app call is not RawPartialRerunMergeEvent or similar |
104 | | - validate_app_type_context(aws_event, raw_event_type) |
| 74 | + start_time = time.time() |
| 75 | + try: |
| 76 | + with LoggingContext( |
| 77 | + aws_request_id=aws_context.aws_request_id, |
| 78 | + asset_id=None, |
| 79 | + app_connection_id=None, |
| 80 | + handler=logging.StreamHandler(stream=sys.stdout), |
| 81 | + user_handler=handler, |
| 82 | + logger=CORVA_LOGGER, |
| 83 | + ) as logging_ctx: |
| 84 | + # Verify either current call from app_decorator or not |
| 85 | + # for instance from partial rerun merge |
| 86 | + ( |
| 87 | + raw_custom_event_type, |
| 88 | + custom_handler, |
| 89 | + ) = _get_custom_event_type_by_raw_aws_event(aws_event) |
| 90 | + is_direct_app_call: bool = not custom_handler |
| 91 | + data_transformation_type = raw_custom_event_type or raw_event_type |
| 92 | + if merge_events: |
| 93 | + aws_event = _merge_events(aws_event, data_transformation_type) |
| 94 | + |
| 95 | + if ( |
| 96 | + is_direct_app_call |
| 97 | + and data_transformation_type not in GENERIC_APP_EVENT_TYPES |
| 98 | + ): |
| 99 | + CORVA_LOGGER.warning( |
| 100 | + f"Handler for {data_transformation_type.__name__!r} " |
| 101 | + f"event not found. Skipping..." |
| 102 | + ) |
| 103 | + return [] |
105 | 104 |
|
106 | | - try: |
107 | | - context = CorvaContext.from_aws( |
108 | | - aws_event=aws_event, aws_context=aws_context |
109 | | - ) |
| 105 | + if is_direct_app_call: |
| 106 | + # Means current app call is not RawPartialRerunMergeEvent or similar |
| 107 | + validate_app_type_context(aws_event, raw_event_type) |
110 | 108 |
|
111 | | - redis_client = redis.Redis.from_url( |
112 | | - url=SETTINGS.CACHE_URL, decode_responses=True, max_connections=1 |
113 | | - ) |
114 | | - raw_events = data_transformation_type.from_raw_event(event=aws_event) |
115 | | - specific_callable = custom_handler or func |
116 | | - |
117 | | - results = [ |
118 | | - specific_callable( |
119 | | - raw_event, |
120 | | - context.api_key, |
121 | | - context.aws_request_id, |
122 | | - logging_ctx, |
123 | | - redis_client, |
| 109 | + try: |
| 110 | + context = CorvaContext.from_aws( |
| 111 | + aws_event=aws_event, aws_context=aws_context |
124 | 112 | ) |
125 | | - for raw_event in raw_events |
126 | | - ] |
127 | | - |
128 | | - return results |
129 | 113 |
|
130 | | - except Exception: |
131 | | - CORVA_LOGGER.exception("The app failed to execute.") |
132 | | - raise |
| 114 | + redis_client = redis.Redis.from_url( |
| 115 | + url=SETTINGS.CACHE_URL, decode_responses=True, max_connections=1 |
| 116 | + ) |
| 117 | + raw_events = data_transformation_type.from_raw_event(event=aws_event) |
| 118 | + specific_callable = custom_handler or func |
| 119 | + |
| 120 | + results = [ |
| 121 | + specific_callable( |
| 122 | + raw_event, |
| 123 | + context.api_key, |
| 124 | + context.aws_request_id, |
| 125 | + logging_ctx, |
| 126 | + redis_client, |
| 127 | + ) |
| 128 | + for raw_event in raw_events |
| 129 | + ] |
| 130 | + |
| 131 | + return results |
| 132 | + |
| 133 | + except Exception: |
| 134 | + CORVA_LOGGER.exception("The app failed to execute.") |
| 135 | + raise |
| 136 | + finally: |
| 137 | + CORVA_LOGGER.info( |
| 138 | + "Total data processing time: %d ms", |
| 139 | + int((time.time() - start_time) * 1000), |
| 140 | + ) |
133 | 141 |
|
134 | 142 | return wrapper |
135 | 143 |
|
|
0 commit comments