Skip to content

Commit 4a68ace

Browse files
committed
persist error functionality
Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
1 parent 157a90d commit 4a68ace

4 files changed

Lines changed: 107 additions & 2 deletions

File tree

pynumaflow/_constants.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
SIDE_INPUT_DIR_PATH = "/var/numaflow/side-inputs"
88
ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"
99

10-
# Get container type from env var, default to unknown-container
10+
# Error Constants
11+
RUNTIME_APPLICATION_ERRORS_PATH = "/var/numaflow/runtime/application-errors"
12+
CURRENT_FILE = "current-udf.json"
13+
INTERNAL_ERROR = "Internal error"
1114
CONTAINER_TYPE = os.getenv(ENV_UD_CONTAINER_TYPE, "unknown-container")
12-
# UDF exception error string with container type
1315
ERR_UDF_EXCEPTION_STRING = f"UDF_EXECUTION_ERROR({CONTAINER_TYPE})"
1416

1517
# Socket configs

pynumaflow/errors/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from pynumaflow.errors.errors import persist_critical_error
2+
3+
__all__ = ["persist_critical_error"]

pynumaflow/errors/_dtypes.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
class _RuntimeErrorEntry:
2+
"""Represents a runtime error entry to be persisted."""
3+
4+
def __init__(self, container: str, timestamp: int, code: str, message: str, details: str):
5+
self.container = container
6+
self.timestamp = timestamp
7+
self.code = code
8+
self.message = message
9+
self.details = details
10+
11+
def to_dict(self) -> dict:
12+
return {
13+
"container": self.container,
14+
"timestamp": self.timestamp,
15+
"code": self.code,
16+
"message": self.message,
17+
"details": self.details,
18+
}

pynumaflow/errors/errors.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import os
2+
import json
3+
import threading
4+
import time
5+
from pathlib import Path
6+
from pynumaflow._constants import (
7+
CONTAINER_TYPE,
8+
RUNTIME_APPLICATION_ERRORS_PATH,
9+
CURRENT_FILE,
10+
INTERNAL_ERROR,
11+
)
12+
from pynumaflow.errors._dtypes import _RuntimeErrorEntry
13+
14+
15+
class _PersistErrorOnce:
16+
"""Ensures that the persist_critical_error function is executed only once."""
17+
18+
def __init__(self):
19+
self.done = False
20+
self.lock = threading.Lock()
21+
22+
def execute(self, func, *args, **kwargs):
23+
with self.lock:
24+
if self.done:
25+
raise RuntimeError("Persist critical error function has already been executed.")
26+
self.done = True
27+
return func(*args, **kwargs)
28+
29+
30+
_persist_error_once = _PersistErrorOnce()
31+
32+
33+
def persist_critical_error(
34+
error_code: str, error_message: str, error_details: str
35+
) -> RuntimeError | None:
36+
"""
37+
Persists a critical error to a file. This function will only execute once.
38+
Logs the error if persisting to the file fails.
39+
Returns None if successful, or raises RuntimeError if already executed.
40+
"""
41+
try:
42+
_persist_error_once.execute(
43+
_persist_critical_error_to_file,
44+
error_code,
45+
error_message,
46+
error_details,
47+
RUNTIME_APPLICATION_ERRORS_PATH,
48+
)
49+
except RuntimeError as e:
50+
return e
51+
except Exception as e:
52+
print(f"Error in persisting critical error: {e}")
53+
return None
54+
55+
56+
def _persist_critical_error_to_file(
57+
error_code: str, error_message: str, error_details: str, dir_path: str
58+
):
59+
"""Internal function to persist a critical error to a file."""
60+
61+
os.makedirs(dir_path, mode=0o777, exist_ok=True)
62+
container_dir = os.path.join(dir_path, CONTAINER_TYPE)
63+
os.makedirs(container_dir, mode=0o777, exist_ok=True)
64+
65+
current_file_path = os.path.join(container_dir, CURRENT_FILE)
66+
error_code = error_code or INTERNAL_ERROR
67+
current_timestamp = int(time.time())
68+
69+
runtime_error_entry = _RuntimeErrorEntry(
70+
container=CONTAINER_TYPE,
71+
timestamp=current_timestamp,
72+
code=error_code,
73+
message=error_message,
74+
details=error_details,
75+
)
76+
77+
with open(current_file_path, "w") as f:
78+
json.dump(runtime_error_entry.to_dict(), f)
79+
80+
final_file_name = f"{current_timestamp}-udf.json"
81+
final_file_path = os.path.join(container_dir, final_file_name)
82+
os.rename(current_file_path, final_file_path)

0 commit comments

Comments
 (0)