From 7f966fe6ed2f0124d4d07d90bb7f54d5e6a1d571 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Tue, 23 Jun 2026 23:32:16 +0200 Subject: [PATCH 01/10] Add defer wrapper --- src/openhound/core/app.py | 13 ++++++++++++- src/openhound/core/app.pyi | 1 + src/openhound/core/resources.py | 18 +++++++++++++++--- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/openhound/core/app.py b/src/openhound/core/app.py index 971aecf..a592a96 100644 --- a/src/openhound/core/app.py +++ b/src/openhound/core/app.py @@ -21,7 +21,7 @@ from openhound.core.models.extension import Extension from openhound.core.preproc import PreProcContext, PreProcessor from openhound.core.progress import Progress -from openhound.core.resources import safe_resource_wrapper +from openhound.core.resources import safe_defer_wrapper, safe_resource_wrapper logger = logging.getLogger(__name__) @@ -292,6 +292,17 @@ def wrapper( return decorator + def defer(self): + """Decorator to register a DLT defer with added exception handling.""" + + def decorator(func: Callable) -> DltResource: + safe_func = safe_defer_wrapper(func) + decorated = dlt.defer(safe_func) + return decorated # type: ignore + + logger.debug(f"Registering defer for {self.name}") + return decorator + def transformer( self, *dlt_args, diff --git a/src/openhound/core/app.pyi b/src/openhound/core/app.pyi index 9f61cd5..ace80d0 100644 --- a/src/openhound/core/app.pyi +++ b/src/openhound/core/app.pyi @@ -99,6 +99,7 @@ class OpenHound: parallelized: bool = False, _impl_cls: type[DltSource] = DltSource, ) -> Any: ... + def defer(self): ... def resource( self, data: Optional[Any] = None, diff --git a/src/openhound/core/resources.py b/src/openhound/core/resources.py index 344f7c5..06efe0e 100644 --- a/src/openhound/core/resources.py +++ b/src/openhound/core/resources.py @@ -1,13 +1,25 @@ -import logging - - import functools import inspect +import logging from typing import Callable logger = logging.getLogger(__name__) +def safe_defer_wrapper(func: Callable) -> Callable: + + @functools.wraps(func) + def sync_wrapper(*args, **kwargs): + try: + gen = func(*args, **kwargs) + return gen + except Exception as e: + logger.error(f"Error executing DLT defer: {e}") + return + + return sync_wrapper + + def safe_resource_wrapper(func: Callable, resource_name: str) -> Callable: """Wrap a DLT resource to catch and log exceptions without stopping the entire pipeline. Can either be sync or async generator function. From f0a17e5614595a8a2d4fd59befb1f8e1445e385a Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 15:35:27 +0200 Subject: [PATCH 02/10] Add safe_defer_wrapper to resource/transformers that return a defer function --- src/openhound/core/resources.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/openhound/core/resources.py b/src/openhound/core/resources.py index 06efe0e..4b25bb4 100644 --- a/src/openhound/core/resources.py +++ b/src/openhound/core/resources.py @@ -6,16 +6,18 @@ logger = logging.getLogger(__name__) -def safe_defer_wrapper(func: Callable) -> Callable: +def safe_defer_wrapper(func: Callable, resource_name: str | None = None) -> Callable: @functools.wraps(func) def sync_wrapper(*args, **kwargs): try: - gen = func(*args, **kwargs) - return gen + return func(*args, **kwargs) except Exception as e: - logger.error(f"Error executing DLT defer: {e}") - return + logger.error( + f"Error executing DLT defer: {e}", + extra={"resource": resource_name, "phase": "defer_execution"}, + ) + return [] return sync_wrapper @@ -48,11 +50,12 @@ def sync_wrapper(*args, **kwargs): return if inspect.isgenerator(gen): - # Note: Don't use while item: := next(gen, None) because this will stop the full iterator - # if the resource yields any empty value while True: try: item = next(gen) + if callable(item): + item = safe_defer_wrapper(item, resource_name) + yield item except StopIteration: break @@ -65,8 +68,10 @@ def sync_wrapper(*args, **kwargs): }, ) continue - else: + if callable(gen): + gen = safe_defer_wrapper(gen, resource_name) + yield gen @functools.wraps(func) @@ -86,11 +91,12 @@ async def async_wrapper(*args, **kwargs): return if inspect.isasyncgen(gen): - # Note: Don't use while item: := next(gen, None) because this will stop the full iterator - # if the resource yields any empty value while True: try: item = await gen.__anext__() + if callable(item): + item = safe_defer_wrapper(item, resource_name) + yield item except StopAsyncIteration: break @@ -102,11 +108,14 @@ async def async_wrapper(*args, **kwargs): "phase": "resource_iteration", }, ) - continue + else: try: result = await gen + if callable(result): + result = safe_defer_wrapper(result, resource_name) + yield result except Exception as e: logger.error( From 9acde82264d2f87b20945c27dcee889fad1c5d3a Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 15:38:41 +0200 Subject: [PATCH 03/10] Updated defer decorator to mimic dlt behaviour --- src/openhound/core/app.py | 11 +++-------- src/openhound/core/app.pyi | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/openhound/core/app.py b/src/openhound/core/app.py index a592a96..526a49c 100644 --- a/src/openhound/core/app.py +++ b/src/openhound/core/app.py @@ -292,16 +292,11 @@ def wrapper( return decorator - def defer(self): + def defer(self, func: Callable) -> Callable: """Decorator to register a DLT defer with added exception handling.""" - - def decorator(func: Callable) -> DltResource: - safe_func = safe_defer_wrapper(func) - decorated = dlt.defer(safe_func) - return decorated # type: ignore - logger.debug(f"Registering defer for {self.name}") - return decorator + safe_func = safe_defer_wrapper(func) + return dlt.defer(safe_func) def transformer( self, diff --git a/src/openhound/core/app.pyi b/src/openhound/core/app.pyi index ace80d0..1720a40 100644 --- a/src/openhound/core/app.pyi +++ b/src/openhound/core/app.pyi @@ -99,7 +99,7 @@ class OpenHound: parallelized: bool = False, _impl_cls: type[DltSource] = DltSource, ) -> Any: ... - def defer(self): ... + def defer(self, func: Callable[..., Any]) -> Callable[..., Callable[[], Any]]: ... def resource( self, data: Optional[Any] = None, From 9e7b981e97919659ed0994aec5bf3626013c063a Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 15:49:22 +0200 Subject: [PATCH 04/10] Remove double wrapping, assume collectors need to use `@app.defer` and not `@dlt.defer` --- src/openhound/core/resources.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/openhound/core/resources.py b/src/openhound/core/resources.py index 4b25bb4..7bbc825 100644 --- a/src/openhound/core/resources.py +++ b/src/openhound/core/resources.py @@ -53,9 +53,6 @@ def sync_wrapper(*args, **kwargs): while True: try: item = next(gen) - if callable(item): - item = safe_defer_wrapper(item, resource_name) - yield item except StopIteration: break @@ -69,9 +66,6 @@ def sync_wrapper(*args, **kwargs): ) continue else: - if callable(gen): - gen = safe_defer_wrapper(gen, resource_name) - yield gen @functools.wraps(func) @@ -94,9 +88,6 @@ async def async_wrapper(*args, **kwargs): while True: try: item = await gen.__anext__() - if callable(item): - item = safe_defer_wrapper(item, resource_name) - yield item except StopAsyncIteration: break @@ -113,9 +104,6 @@ async def async_wrapper(*args, **kwargs): else: try: result = await gen - if callable(result): - result = safe_defer_wrapper(result, resource_name) - yield result except Exception as e: logger.error( From 3c15fbdcd7e596eabfb95e559ec056fbc40fc38b Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 15:58:33 +0200 Subject: [PATCH 05/10] Remove unused resource_name input (since these are not always available) --- src/openhound/core/resources.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/openhound/core/resources.py b/src/openhound/core/resources.py index 7bbc825..900eb11 100644 --- a/src/openhound/core/resources.py +++ b/src/openhound/core/resources.py @@ -6,7 +6,15 @@ logger = logging.getLogger(__name__) -def safe_defer_wrapper(func: Callable, resource_name: str | None = None) -> Callable: +def safe_defer_wrapper(func: Callable) -> Callable: + """Wrap a DLT defer to catch and log exceptions without stopping the entire pipeline. + + Args: + func: The defer function + + Returns: + Wrapped function that catches exceptions and continues (if possible of course) + """ @functools.wraps(func) def sync_wrapper(*args, **kwargs): @@ -15,7 +23,7 @@ def sync_wrapper(*args, **kwargs): except Exception as e: logger.error( f"Error executing DLT defer: {e}", - extra={"resource": resource_name, "phase": "defer_execution"}, + extra={"phase": "defer_execution"}, ) return [] From 911cb40cd0cde8f839118f0f8fd40fdff840a647 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 16:12:10 +0200 Subject: [PATCH 06/10] Added test to see if the pipeline still continues with resource, transformer and defer failures --- tests/test_safe_dlt_wrappers.py | 96 +++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 tests/test_safe_dlt_wrappers.py diff --git a/tests/test_safe_dlt_wrappers.py b/tests/test_safe_dlt_wrappers.py new file mode 100644 index 0000000..c1ee057 --- /dev/null +++ b/tests/test_safe_dlt_wrappers.py @@ -0,0 +1,96 @@ +import logging + +from pydantic import BaseModel + +from openhound.core.app import OpenHound +from openhound.core.collect import Collector +from openhound.core.progress import Progress + + +class ExampleResource(BaseModel): + id: int + source: str + + +class Computer(BaseModel): + id: int + hostname: str + + +class User(BaseModel): + id: int + email: str + + +class UserDetails(BaseModel): + id: int + email: str + office: str + + +def test_dlt_wrapper_pipeline_continues( + caplog, + monkeypatch, + tmp_path, +): + monkeypatch.setenv("DLT_DATA_DIR", str(tmp_path / ".dlt")) + monkeypatch.setattr( + "openhound.core.collect.logger_override.set_handler", lambda name: None + ) + caplog.set_level(logging.ERROR, logger="openhound.core.resources") + + app = OpenHound("safe_wrapper_test", "TEST") + + @app.resource(name="computers", columns=Computer) + def computers(): + yield {"id": 1, "hostname": "DESKTOP-12345"} + yield {"id": 2, "hostname": "DESKTOP-54321"} + raise RuntimeError("resource failed after valid rows") + + @app.transformer(name="users", columns=User) + def users(computer): + if computer["id"] == 1: + yield {"id": 10, "email": "someuser@example.org"} + raise RuntimeError("transformer failed after valid row") + + yield {"id": 20, "email": "someuser2@example.org"} + + @app.transformer(name="user_details", columns=UserDetails) + def user_details(user): + + @app.defer + def deferred_child(user_input): + if user_input["id"] == 1: + raise RuntimeError("defer failed for parent") + + return {"id": 20, "email": "someuser2@example.org", "office": "Amsterdam"} + + yield deferred_child(user) + + @app.source(name="safe_wrapper_test", max_table_nesting=0) + def source(): + computers_resource = computers() + return ( + computers_resource, + computers_resource | users(), + computers_resource | user_details(), + ) + + collector = Collector( + name="safe_wrapper_test", + output_path=tmp_path / "output", + progress=Progress.log, + ) + + load_info = collector.run(source()) + + assert load_info is not None + + messages = [record.getMessage() for record in caplog.records] + phases = {getattr(record, "phase", None) for record in caplog.records} + + assert any("resource failed after valid rows" in message for message in messages) + assert any("transformer failed after valid row" in message for message in messages) + assert any("defer failed for parent" in message for message in messages) + assert "resource_iteration" in phases + assert "defer_execution" in phases From 3597c3c906edb75e974d8c997ebae72f91d71ab1 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 16:14:34 +0200 Subject: [PATCH 07/10] Remove unused ExampleResource model --- tests/test_safe_dlt_wrappers.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/test_safe_dlt_wrappers.py b/tests/test_safe_dlt_wrappers.py index c1ee057..519e3d4 100644 --- a/tests/test_safe_dlt_wrappers.py +++ b/tests/test_safe_dlt_wrappers.py @@ -7,11 +7,6 @@ from openhound.core.progress import Progress -class ExampleResource(BaseModel): - id: int - source: str - - class Computer(BaseModel): id: int hostname: str From cf9938ffe42a64f818ce1399423efb37ce74c629 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 16:14:53 +0200 Subject: [PATCH 08/10] Remove unused ExampleResource model --- tests/test_safe_dlt_wrappers.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_safe_dlt_wrappers.py b/tests/test_safe_dlt_wrappers.py index 519e3d4..f401666 100644 --- a/tests/test_safe_dlt_wrappers.py +++ b/tests/test_safe_dlt_wrappers.py @@ -17,9 +17,7 @@ class User(BaseModel): email: str -class UserDetails(BaseModel): - id: int - email: str +class UserDetails(User): office: str From 4aad5c4a50a96dba54c151db060aa4f3f83a2c60 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 18:49:48 +0200 Subject: [PATCH 09/10] Added new version to action outputs --- .github/workflows/release-on-merge.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/release-on-merge.yml b/.github/workflows/release-on-merge.yml index 6ed33d0..e0d1135 100644 --- a/.github/workflows/release-on-merge.yml +++ b/.github/workflows/release-on-merge.yml @@ -27,6 +27,8 @@ jobs: needs: test-core if: github.event.pull_request.merged == true runs-on: ubuntu-latest + outputs: + new_version: ${{ steps.versioning.outputs.new_version }} steps: - name: Checkout @@ -85,7 +87,7 @@ jobs: publish: needs: tag-release - if: github.event.pull_request.merged == true + if: github.event.pull_request.merged == true && needs.tag-release.outputs.new_version != '' uses: ./.github/workflows/build-and-publish.yml with: tag: ${{ needs.tag-release.outputs.new_version }} From 95bd8fde4e27f689a52d51e9b5d6841d3016bff1 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 19:01:26 +0200 Subject: [PATCH 10/10] Bump github collector version --- pyproject.toml | 4 ++-- uv.lock | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5f2af9e..49c76f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,14 +24,14 @@ dependencies = [ [project.optional-dependencies] all = [ "openhound-jamf==0.2.2", - "openhound-github==0.3.3", + "openhound-github==0.3.4", "openhound-okta==0.1.4", ] jamf = [ "openhound-jamf==0.2.2", ] github = [ - "openhound-github==0.3.3" + "openhound-github==0.3.4" ] okta = [ diff --git a/uv.lock b/uv.lock index dea3e53..d93825c 100644 --- a/uv.lock +++ b/uv.lock @@ -1262,8 +1262,8 @@ requires-dist = [ { name = "griffe-fieldz", specifier = ">=0.5.0" }, { name = "jinja2", specifier = ">=3.1.6" }, { name = "mkdocstrings", extras = ["python"], specifier = ">=1.0.0" }, - { name = "openhound-github", marker = "extra == 'all'", specifier = "==0.3.3" }, - { name = "openhound-github", marker = "extra == 'github'", specifier = "==0.3.3" }, + { name = "openhound-github", marker = "extra == 'all'", specifier = "==0.3.4" }, + { name = "openhound-github", marker = "extra == 'github'", specifier = "==0.3.4" }, { name = "openhound-jamf", marker = "extra == 'all'", specifier = "==0.2.2" }, { name = "openhound-jamf", marker = "extra == 'jamf'", specifier = "==0.2.2" }, { name = "openhound-okta", marker = "extra == 'all'", specifier = "==0.1.4" }, @@ -1308,15 +1308,15 @@ wheels = [ [[package]] name = "openhound-github" -version = "0.3.3" +version = "0.3.4" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "joserfc" }, { name = "requests" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/32/cf/8cd378f9ffa493c3941d438439b6ac226b64bcad55819443600f2cdb7519/openhound_github-0.3.3.tar.gz", hash = "sha256:3e71608fbf600caca4a1df8ad456e319f431e9187ff0a5791c5b3e00d1de1110", size = 3567129, upload-time = "2026-06-22T18:50:22.734Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c0/f0/512b1de552de9820ec64f27ee90f6e5c63b957a25fa7271b41b4d23cab63/openhound_github-0.3.4.tar.gz", hash = "sha256:5fefd8340bf6af221fd4373f316be91f21eccf282f252b13352fcb82e561bfef", size = 3569189, upload-time = "2026-06-24T15:58:44.491Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/c8/fa/eb1e18bc73df2ca14865e0c5d579d711a4c4ee8a64004fb2320e4a4b1045/openhound_github-0.3.3-py3-none-any.whl", hash = "sha256:dd19765a533a93b3f27f64316d0bf5626fe2dae5d05a4d3fa499b1f3d2ab9bf9", size = 120098, upload-time = "2026-06-22T18:50:21.308Z" }, + { url = "https://files.pythonhosted.org/packages/26/fa/602f2d018b50ab3ad12a26c81c2d8c2b248c63e23ed754dd492a1206aa02/openhound_github-0.3.4-py3-none-any.whl", hash = "sha256:27626ccb48d158f99f12c3bef141e58ef3fb06690e0a811df14d21c69a7f8e23", size = 120686, upload-time = "2026-06-24T15:58:43.248Z" }, ] [[package]]