forked from contextforge-org/cpex
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmanager.py
More file actions
1539 lines (1374 loc) · 66 KB
/
manager.py
File metadata and controls
1539 lines (1374 loc) · 66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""Location: ./cpex/framework/manager.py
Copyright 2025
SPDX-License-Identifier: Apache-2.0
Authors: Teryl Taylor, Mihai Criveti, Fred Araujo
Plugin manager.
Module that manages and calls plugins at hookpoints throughout the gateway.
This module provides the core plugin management functionality including:
- Plugin lifecycle management (initialization, execution, shutdown)
- Timeout protection for plugin execution
- Context management with automatic cleanup
- Priority-based plugin ordering
- Conditional plugin execution based on prompts/servers/tenants
Examples:
>>> # Initialize plugin manager with configuration
>>> manager = PluginManager("plugins/config.yaml")
>>> # await manager.initialize() # Called in async context
>>> # Create test payload and context
>>> from cpex.framework.models import GlobalContext
>>> from cpex.framework.hooks.prompts import PromptPrehookPayload
>>> payload = PromptPrehookPayload(prompt_id="123", name="test", args={"user": "input"})
>>> context = GlobalContext(request_id="123")
>>> # result, contexts = await manager.prompt_pre_fetch(payload, context) # Called in async context
"""
# Standard
import asyncio
import logging
import threading
from dataclasses import dataclass
from typing import Any, Literal, Optional, Union
# Third-Party
from pydantic import BaseModel, RootModel
# First-Party
from cpex.framework.base import HookRef, Plugin
from cpex.framework.constants import EXTERNAL_PLUGIN_TYPE
from cpex.framework.errors import PluginError, PluginViolationError, convert_exception_to_error
from cpex.framework.extensions.extensions import Extensions
from cpex.framework.extensions.tiers import filter_extensions
from cpex.framework.hooks.policies import DefaultHookPolicy, HookPayloadPolicy, apply_policy
from cpex.framework.loader.config import ConfigLoader
from cpex.framework.loader.plugin import PluginLoader
from cpex.framework.memory import _safe_deepcopy, copyonwrite, wrap_payload_for_isolation
from cpex.framework.models import (
Config,
GlobalContext,
OnError,
PluginContext,
PluginContextTable,
PluginErrorModel,
PluginMode,
PluginPayload,
PluginResult,
)
from cpex.framework.observability import ObservabilityProvider, current_trace_id
from cpex.framework.registry import PluginInstanceRegistry
from cpex.framework.settings import settings
from cpex.framework.utils import payload_matches
# Use standard logging to avoid circular imports (plugins -> services -> plugins)
logger = logging.getLogger(__name__)
# Configuration constants
DEFAULT_PLUGIN_TIMEOUT = 30 # seconds
MAX_PAYLOAD_SIZE = 1_000_000 # 1MB
CONTEXT_CLEANUP_INTERVAL = 300 # 5 minutes
CONTEXT_MAX_AGE = 3600 # 1 hour
HTTP_AUTH_CHECK_PERMISSION_HOOK = "http_auth_check_permission"
DECISION_PLUGIN_METADATA_KEY = "_decision_plugin"
RESERVED_INTERNAL_METADATA_KEYS = frozenset({DECISION_PLUGIN_METADATA_KEY})
@dataclass
class PhaseState:
"""State accumulated during a serial execution phase.
Replaces the nested tuple return type from _run_serial_phase,
improving readability and self-documentation.
Attributes:
payload: The current effective payload (may be modified by plugins).
decision_plugin: Name of the last plugin that modified the payload.
extensions: The current extensions (may be modified by plugins).
"""
payload: Optional[PluginPayload] = None
decision_plugin: Optional[str] = None
extensions: Optional[Extensions] = None
class PluginTimeoutError(Exception):
"""Raised when a plugin execution exceeds the timeout limit."""
class PayloadSizeError(ValueError):
"""Raised when a payload exceeds the maximum allowed size."""
class PluginExecutor:
"""Executes a list of plugins with timeout protection and error handling.
This class manages the execution of plugins in priority order, handling:
- Timeout protection for each plugin
- Context management between plugins
- Error isolation to prevent plugin failures from affecting the gateway
- Metadata aggregation from multiple plugins
Examples:
>>> executor = PluginExecutor()
>>> # In async context:
>>> # result, contexts = await executor.execute(
>>> # plugins=[plugin1, plugin2],
>>> # payload=payload,
>>> # global_context=context,
>>> # plugin_run=pre_prompt_fetch,
>>> # compare=pre_prompt_matches
>>> # )
"""
def __init__(
self,
config: Optional[Config] = None,
timeout: int = DEFAULT_PLUGIN_TIMEOUT,
observability: Optional[ObservabilityProvider] = None,
hook_policies: Optional[dict[str, HookPayloadPolicy]] = None,
default_hook_policy: Optional[Literal["allow", "deny"]] = None,
):
"""Initialize the plugin executor.
Args:
config: the plugin manager configuration.
timeout: Maximum execution time per plugin in seconds.
observability: Optional observability provider implementing ObservabilityProvider protocol.
hook_policies: Per-hook-type payload modification policies.
default_hook_policy: Fallback hook policy ("allow", "denied") when a policy is not specified
for a hook type (overrides `settings.default_hook_policy`).
"""
self.timeout = timeout
self.config = config
self.observability = observability
self.hook_policies: dict[str, HookPayloadPolicy] = hook_policies or {}
self.default_hook_policy = DefaultHookPolicy(
default_hook_policy if default_hook_policy else settings.default_hook_policy
)
self._runtime_disabled: set[str] = set()
async def execute(
self,
hook_refs: list[HookRef],
payload: PluginPayload,
global_context: GlobalContext,
hook_type: str,
local_contexts: Optional[PluginContextTable] = None,
violations_as_exceptions: bool = False,
extensions: Optional[Extensions] = None,
) -> tuple[PluginResult, PluginContextTable | None]:
"""Execute plugins in priority order with timeout protection.
Args:
hook_refs: List of hook references to execute, sorted by priority.
payload: The payload to be processed by plugins.
global_context: Shared context for all plugins containing request metadata.
hook_type: The hook type identifier (e.g., "tool_pre_invoke").
local_contexts: Optional existing contexts from previous hook executions.
violations_as_exceptions: Raise violations as exceptions rather than as returns.
extensions: Optional extensions to filter and pass to plugins that accept them.
Returns:
A tuple containing:
- PluginResult with processing status, modified payload, and metadata
- PluginContextTable with updated local contexts for each plugin
Raises:
PayloadSizeError: If the payload exceeds MAX_PAYLOAD_SIZE.
PluginError: If there is an error inside a plugin.
PluginViolationError: If a violation occurs and violation_as_exceptions is set.
Examples:
>>> # Execute plugins with timeout protection
>>> from cpex.framework.hooks.prompts import PromptHookType
>>> executor = PluginExecutor(timeout=30)
>>> # Assuming you have a registry instance:
>>> # plugins = registry.get_plugins_for_hook(PromptHookType.PROMPT_PRE_FETCH)
>>> # In async context:
>>> # result, contexts = await executor.execute(
>>> # plugins=plugins,
>>> # payload=PromptPrehookPayload(prompt_id="123", name="test", args={}),
>>> # global_context=GlobalContext(request_id="123"),
>>> # plugin_run=pre_prompt_fetch,
>>> # compare=pre_prompt_matches
>>> # )
"""
if not hook_refs:
return (PluginResult(modified_payload=None), None)
# Validate payload size
self._validate_payload_size(payload)
# Look up the policy for this hook type (may be None)
policy = self.hook_policies.get(hook_type)
res_local_contexts = {}
combined_metadata: dict[str, Any] = {}
current_payload: PluginPayload | None = None
current_extensions: Extensions | None = None
decision_plugin_name: Optional[str] = None
sequential_refs, transform_refs, audit_refs, concurrent_refs, fire_and_forget_refs = self._group_by_mode(
hook_refs, payload, hook_type, global_context
)
# Independent semaphores prevent one mode from starving the other
pool = int(settings.execution_pool) if settings.execution_pool else None
fire_and_forget_semaphore = asyncio.Semaphore(pool) if pool else None
concurrent_semaphore = asyncio.Semaphore(pool) if pool else None
# SEQUENTIAL: sequential, chained execution — can halt pipeline
halt_result, phase = await self._run_serial_phase(
hook_refs=sequential_refs,
mode_label="SEQUENTIAL",
payload=payload,
policy=policy,
hook_type=hook_type,
global_context=global_context,
local_contexts=local_contexts,
res_local_contexts=res_local_contexts,
violations_as_exceptions=violations_as_exceptions,
combined_metadata=combined_metadata,
current_payload=current_payload,
decision_plugin_name=decision_plugin_name,
apply_modifications=True,
allow_blocking=True,
current_extensions=current_extensions,
fire_and_forget_refs=fire_and_forget_refs,
fire_and_forget_semaphore=fire_and_forget_semaphore,
extensions=extensions,
)
current_payload = phase.payload
decision_plugin_name = phase.decision_plugin
current_extensions = phase.extensions
if halt_result is not None:
return halt_result
# TRANSFORM: serial, chained execution — can modify payloads but cannot halt pipeline
_, phase = await self._run_serial_phase(
hook_refs=transform_refs,
mode_label="TRANSFORM",
payload=payload,
policy=policy,
hook_type=hook_type,
global_context=global_context,
local_contexts=local_contexts,
res_local_contexts=res_local_contexts,
violations_as_exceptions=violations_as_exceptions,
combined_metadata=combined_metadata,
current_payload=current_payload,
decision_plugin_name=decision_plugin_name,
apply_modifications=True,
allow_blocking=False,
current_extensions=current_extensions,
extensions=extensions,
)
current_payload = phase.payload
decision_plugin_name = phase.decision_plugin
current_extensions = phase.extensions
# AUDIT: serial execution — observe-only (no modifications, no blocking)
_, phase = await self._run_serial_phase(
hook_refs=audit_refs,
mode_label="AUDIT",
payload=payload,
policy=policy,
hook_type=hook_type,
global_context=global_context,
local_contexts=local_contexts,
res_local_contexts=res_local_contexts,
violations_as_exceptions=violations_as_exceptions,
combined_metadata=combined_metadata,
current_payload=current_payload,
decision_plugin_name=decision_plugin_name,
apply_modifications=False,
allow_blocking=False,
current_extensions=current_extensions,
extensions=extensions,
)
# CONCURRENT: parallel execution with fail-fast on first blocking result
if concurrent_refs:
concurrent_ctx_list: list[tuple[HookRef, PluginContext, PluginPayload]] = []
concurrent_tasks: list[asyncio.Task] = []
effective_payload = current_payload if current_payload is not None else payload
for ref in concurrent_refs:
plugin_input = self._isolate_payload(effective_payload, policy)
local_context = self._prepare_plugin_context(ref, global_context, local_contexts, res_local_contexts)
idx = len(concurrent_ctx_list)
concurrent_ctx_list.append((ref, local_context, effective_payload))
coro = self.execute_plugin(
ref,
plugin_input,
local_context,
violations_as_exceptions,
global_context,
combined_metadata,
extensions=extensions,
)
if concurrent_semaphore:
coro = self._with_semaphore(concurrent_semaphore, coro)
concurrent_tasks.append(asyncio.create_task(self._tagged(coro, idx)))
for completed_coro in asyncio.as_completed(concurrent_tasks):
result, idx = await completed_coro
ref, _, _ = concurrent_ctx_list[idx]
if result.modified_payload is not None:
logger.debug(
"CONCURRENT plugin %s returned modified_payload on hook %s; "
"discarding (concurrent plugins cannot modify payloads)",
ref.plugin_ref.name,
hook_type,
)
if not result.continue_processing:
pending = sum(1 for t in concurrent_tasks if not t.done())
violation_detail = (
f": [{result.violation.code}] {result.violation.reason}" if result.violation else ""
)
logger.warning(
"Pipeline halted by CONCURRENT plugin %s on hook %s%s; cancelling %d pending task(s)",
ref.plugin_ref.name,
hook_type,
violation_detail,
pending,
)
for task in concurrent_tasks:
if not task.done():
task.cancel()
await asyncio.gather(*concurrent_tasks, return_exceptions=True)
return self._build_halt_result(
current_payload,
result.violation,
combined_metadata,
fire_and_forget_refs,
payload,
global_context,
res_local_contexts,
fire_and_forget_semaphore,
hook_type,
decision_plugin_name,
extensions=extensions,
)
# FIRE_AND_FORGET: fire-and-forget background tasks (fires last with final payload snapshot)
bg_tasks = self._fire_and_forget_tasks(
fire_and_forget_refs,
payload,
global_context,
res_local_contexts,
fire_and_forget_semaphore,
extensions=extensions,
)
if hook_type == HTTP_AUTH_CHECK_PERMISSION_HOOK and decision_plugin_name:
combined_metadata[DECISION_PLUGIN_METADATA_KEY] = decision_plugin_name
return (
PluginResult(
continue_processing=True,
modified_payload=current_payload,
modified_extensions=current_extensions,
violation=None,
metadata=combined_metadata,
background_tasks=bg_tasks,
),
res_local_contexts,
)
def _group_by_mode(
self,
hook_refs: list[HookRef],
payload: PluginPayload,
hook_type: str,
global_context: GlobalContext,
) -> tuple[list[HookRef], list[HookRef], list[HookRef], list[HookRef], list[HookRef]]:
"""Group hook references by mode, filtering disabled and condition-unmatched plugins.
Args:
hook_refs: All hook references to evaluate.
payload: The current payload (used for condition matching).
hook_type: The hook type identifier.
global_context: Shared context for condition evaluation.
Returns:
A tuple of (sequential_refs, transform_refs, audit_refs, concurrent_refs,
fire_and_forget_refs), each sorted by priority.
"""
sequential_refs: list[HookRef] = []
transform_refs: list[HookRef] = []
audit_refs: list[HookRef] = []
concurrent_refs: list[HookRef] = []
fire_and_forget_refs: list[HookRef] = []
for ref in hook_refs:
# Skip statically disabled plugins
if ref.plugin_ref.mode == PluginMode.DISABLED:
logger.debug("Skipping plugin %s — statically disabled", ref.plugin_ref.name)
continue
# Skip runtime-disabled plugins
if ref.plugin_ref.name in self._runtime_disabled:
logger.debug("Skipping plugin %s — runtime-disabled after previous error", ref.plugin_ref.name)
continue
# Check conditions
if ref.plugin_ref.conditions and not payload_matches(
payload, hook_type, ref.plugin_ref.conditions, global_context
):
logger.debug("Skipping plugin %s - conditions not met", ref.plugin_ref.name)
continue
# Bucket by mode
if ref.plugin_ref.mode == PluginMode.SEQUENTIAL:
sequential_refs.append(ref)
elif ref.plugin_ref.mode == PluginMode.TRANSFORM:
transform_refs.append(ref)
elif ref.plugin_ref.mode == PluginMode.AUDIT:
audit_refs.append(ref)
elif ref.plugin_ref.mode == PluginMode.CONCURRENT:
concurrent_refs.append(ref)
elif ref.plugin_ref.mode == PluginMode.FIRE_AND_FORGET:
fire_and_forget_refs.append(ref)
sequential_refs.sort(key=lambda r: r.plugin_ref.priority)
transform_refs.sort(key=lambda r: r.plugin_ref.priority)
audit_refs.sort(key=lambda r: r.plugin_ref.priority)
concurrent_refs.sort(key=lambda r: r.plugin_ref.priority)
fire_and_forget_refs.sort(key=lambda r: r.plugin_ref.priority)
return sequential_refs, transform_refs, audit_refs, concurrent_refs, fire_and_forget_refs
async def _run_serial_phase(
self,
hook_refs: list[HookRef],
mode_label: str,
payload: PluginPayload,
policy: Any,
hook_type: str,
global_context: GlobalContext,
local_contexts: Optional[PluginContextTable],
res_local_contexts: dict,
violations_as_exceptions: bool,
combined_metadata: dict[str, Any],
current_payload: Optional[PluginPayload],
decision_plugin_name: Optional[str],
apply_modifications: bool,
allow_blocking: bool,
current_extensions: Optional[Extensions] = None,
fire_and_forget_refs: Optional[list[HookRef]] = None,
fire_and_forget_semaphore: Optional[asyncio.Semaphore] = None,
extensions: Optional[Extensions] = None,
) -> tuple[
Optional[tuple[PluginResult, PluginContextTable | None]],
PhaseState,
]:
"""Run a serial execution phase (SEQUENTIAL, TRANSFORM, or AUDIT).
Args:
hook_refs: Hook references to execute in priority order.
mode_label: Human-readable mode name for log messages.
payload: The original (unmodified) payload.
policy: Hook payload policy for field filtering.
hook_type: The hook type identifier.
global_context: Shared context for all plugins.
local_contexts: Existing contexts from previous hook executions.
res_local_contexts: Accumulator for local contexts produced in this execution.
violations_as_exceptions: Whether to raise violations as exceptions.
combined_metadata: Accumulator for plugin metadata.
current_payload: The current effective payload (may be None).
decision_plugin_name: Name of the plugin that last modified the payload.
apply_modifications: Whether to apply payload modifications from plugins.
allow_blocking: Whether plugins can halt the pipeline.
fire_and_forget_refs: Fire-and-forget refs to schedule on halt (only used when allow_blocking=True).
fire_and_forget_semaphore: Semaphore for fire-and-forget tasks (only used when allow_blocking=True).
Returns:
A tuple of (halt_result, phase_state). halt_result is None if pipeline continues.
"""
for hook_ref in hook_refs:
local_context = self._prepare_plugin_context(hook_ref, global_context, local_contexts, res_local_contexts)
effective_payload = current_payload if current_payload is not None else payload
plugin_input = self._isolate_payload(effective_payload, policy)
result = await self.execute_plugin(
hook_ref,
plugin_input,
local_context,
violations_as_exceptions,
global_context,
combined_metadata,
extensions=extensions,
)
if result.modified_payload is not None:
if apply_modifications:
current_payload, decision_plugin_name = self._apply_payload_modification(
hook_ref,
result,
plugin_input,
policy,
hook_type,
current_payload,
decision_plugin_name,
apply_to=effective_payload,
)
else:
logger.debug(
"%s plugin %s returned modified_payload on hook %s; discarding (%s is observe-only)",
mode_label,
hook_ref.plugin_ref.name,
hook_type,
mode_label.lower(),
)
# Accumulate modified_extensions (last writer wins)
if result.modified_extensions is not None:
current_extensions = result.modified_extensions
if not result.continue_processing:
violation_detail = f": [{result.violation.code}] {result.violation.reason}" if result.violation else ""
if allow_blocking:
logger.warning(
"Pipeline halted by %s plugin %s on hook %s%s; scheduling fire-and-forget tasks",
mode_label,
hook_ref.plugin_ref.name,
hook_type,
violation_detail,
)
state = PhaseState(
payload=current_payload, decision_plugin=decision_plugin_name, extensions=current_extensions
)
halt = self._build_halt_result(
current_payload,
result.violation,
combined_metadata,
fire_and_forget_refs or [],
payload,
global_context,
res_local_contexts,
fire_and_forget_semaphore,
hook_type,
decision_plugin_name,
extensions=extensions,
)
return halt, state
else:
logger.warning(
"%s plugin %s returned continue_processing=False on hook %s%s; "
"pipeline continues (blocking suppressed)",
mode_label,
hook_ref.plugin_ref.name,
hook_type,
violation_detail,
)
return None, PhaseState(
payload=current_payload, decision_plugin=decision_plugin_name, extensions=current_extensions
)
def _apply_payload_modification(
self,
hook_ref: HookRef,
result: PluginResult,
effective_payload: PluginPayload,
policy: Any,
hook_type: str,
current_payload: Optional[PluginPayload],
decision_plugin_name: Optional[str],
*,
apply_to: Optional[PluginPayload] = None,
) -> tuple[Optional[PluginPayload], Optional[str]]:
"""Apply a plugin's payload modification, respecting the hook policy.
Args:
effective_payload: The baseline payload the plugin received (may be
an isolated/CoW copy). Used for diffing to detect changes.
apply_to: The canonical pipeline payload to merge accepted changes
into. When ``None``, changes are applied to *effective_payload*.
Returns:
Updated (current_payload, decision_plugin_name) tuple.
"""
if policy:
if isinstance(result.modified_payload, type(effective_payload)) and isinstance(
effective_payload, BaseModel
):
# Same-type BaseModel payload — apply field-level policy filtering
filtered = apply_policy(effective_payload, result.modified_payload, policy, apply_to=apply_to)
if filtered is not None:
return filtered, hook_ref.plugin_ref.name
else:
# Cross-type payload — guard: only accept PluginPayload subtypes or dict
if isinstance(result.modified_payload, (PluginPayload, dict)):
logger.debug(
"Plugin %s returned cross-type payload (%s -> %s) on hook %s; accepting without field filtering",
hook_ref.plugin_ref.name,
type(effective_payload).__name__,
type(result.modified_payload).__name__,
hook_type,
)
return result.modified_payload, hook_ref.plugin_ref.name
else:
logger.warning(
"Plugin %s returned unexpected type %s on hook %s; ignoring modification",
hook_ref.plugin_ref.name,
type(result.modified_payload).__name__,
hook_type,
)
elif self.default_hook_policy == DefaultHookPolicy.ALLOW:
# No explicit policy + default=allow -- accept all modifications
return result.modified_payload, hook_ref.plugin_ref.name
else:
# No explicit policy + default=deny -- reject all modifications
logger.warning(
"Plugin %s attempted payload modification on hook %s but no policy is defined and default is deny",
hook_ref.plugin_ref.name,
hook_type,
)
return current_payload, decision_plugin_name
def _prepare_plugin_context(
self,
hook_ref: HookRef,
global_context: GlobalContext,
local_contexts: Optional[PluginContextTable],
res_local_contexts: dict,
) -> PluginContext:
"""Create an isolated GlobalContext copy and resolve or create the PluginContext.
The resolved context is stored in *res_local_contexts* as a side effect.
"""
local_context_key = global_context.request_id + hook_ref.plugin_ref.uuid
tmp_gc = GlobalContext(
request_id=global_context.request_id,
user=global_context.user,
tenant_id=global_context.tenant_id,
server_id=global_context.server_id,
state={} if not global_context.state else copyonwrite(global_context.state),
metadata={} if not global_context.metadata else copyonwrite(global_context.metadata),
)
if local_contexts and local_context_key in local_contexts:
local_context = local_contexts[local_context_key]
local_context.global_context = tmp_gc
else:
local_context = PluginContext(global_context=tmp_gc)
res_local_contexts[local_context_key] = local_context
return local_context
def _isolate_payload(
self,
effective_payload: PluginPayload,
policy: Any,
) -> PluginPayload:
"""Return an isolated copy of the payload when policy or defaults demand it.
Copy-on-write wrapping is used for BaseModel payloads; other types are deep-copied.
When no isolation is required the original payload is returned as-is.
"""
needs_isolation = (
policy or self.default_hook_policy == DefaultHookPolicy.DENY or isinstance(effective_payload, RootModel)
)
if not needs_isolation:
return effective_payload
if isinstance(effective_payload, BaseModel):
return wrap_payload_for_isolation(effective_payload)
return _safe_deepcopy(effective_payload)
def _build_halt_result(
self,
current_payload: Optional[PluginPayload],
violation: Any,
combined_metadata: dict[str, Any],
fire_and_forget_refs: list[HookRef],
payload: PluginPayload,
global_context: GlobalContext,
res_local_contexts: dict,
fire_and_forget_semaphore: Optional[asyncio.Semaphore],
hook_type: str,
decision_plugin_name: Optional[str],
extensions: Optional[Extensions] = None,
) -> tuple[PluginResult, dict]:
"""Schedule fire-and-forget tasks and build a pipeline-halting result."""
bg_tasks = self._fire_and_forget_tasks(
fire_and_forget_refs,
payload,
global_context,
res_local_contexts,
fire_and_forget_semaphore,
extensions=extensions,
)
if hook_type == HTTP_AUTH_CHECK_PERMISSION_HOOK and decision_plugin_name:
combined_metadata[DECISION_PLUGIN_METADATA_KEY] = decision_plugin_name
return (
PluginResult(
continue_processing=False,
modified_payload=current_payload,
violation=violation,
metadata=combined_metadata,
background_tasks=bg_tasks,
),
res_local_contexts,
)
@staticmethod
async def _with_semaphore(semaphore: asyncio.Semaphore, coro: Any) -> Any:
"""Await *coro* while holding *semaphore*, bounding concurrent CONCURRENT tasks."""
async with semaphore:
return await coro
@staticmethod
async def _tagged(coro: Any, tag: Any) -> tuple[Any, Any]:
"""Await *coro* and pair the result with *tag* for use with as_completed."""
result = await coro
return result, tag
def _fire_and_forget_tasks(
self,
fire_and_forget_refs: list[HookRef],
payload: PluginPayload,
global_context: GlobalContext,
res_local_contexts: dict,
semaphore: Optional[asyncio.Semaphore],
extensions: Optional[Extensions] = None,
) -> list[asyncio.Task]:
"""Schedule all FIRE_AND_FORGET plugins as fire-and-forget background tasks.
May be called from an early-exit path or from the normal completion path.
Each FIRE_AND_FORGET plugin receives an isolated snapshot of the payload at call time.
Returns the list of asyncio.Task handles for all newly scheduled tasks.
"""
tasks: list[asyncio.Task] = []
for ref in fire_and_forget_refs:
local_context_key = global_context.request_id + ref.plugin_ref.uuid
if local_context_key in res_local_contexts:
# Already scheduled — skip to avoid double-scheduling
continue
task_input = (
wrap_payload_for_isolation(payload) if isinstance(payload, BaseModel) else _safe_deepcopy(payload)
)
tmp_gc = GlobalContext(
request_id=global_context.request_id,
user=global_context.user,
tenant_id=global_context.tenant_id,
server_id=global_context.server_id,
state={} if not global_context.state else copyonwrite(global_context.state),
metadata={} if not global_context.metadata else copyonwrite(global_context.metadata),
)
local_context = PluginContext(global_context=tmp_gc)
res_local_contexts[local_context_key] = local_context
task = asyncio.create_task(
self._run_fire_and_forget_task(ref, task_input, local_context, semaphore, extensions=extensions)
)
tasks.append(task)
return tasks
async def _run_fire_and_forget_task(
self,
hook_ref: HookRef,
payload: PluginPayload,
local_context: PluginContext,
semaphore: Optional[asyncio.Semaphore],
extensions: Optional[Extensions] = None,
) -> Optional[PluginErrorModel]:
"""Execute a plugin as a fire-and-forget background task.
Returns None on success, or a PluginErrorModel if the plugin raised.
Errors are logged but never propagated — background tasks cannot halt the pipeline.
If on_error=DISABLE, the plugin is added to the runtime-disabled set.
"""
try:
if semaphore:
async with semaphore:
await self._execute_with_timeout(hook_ref, payload, local_context, extensions=extensions)
else:
await self._execute_with_timeout(hook_ref, payload, local_context, extensions=extensions)
return None
except Exception as exc:
logger.error("Plugin %s failed in fire-and-forget mode (ignored)", hook_ref.plugin_ref.name)
if hook_ref.plugin_ref.on_error == OnError.DISABLE:
self._runtime_disabled.add(hook_ref.plugin_ref.name)
# FAIL and IGNORE both just log for FIRE_AND_FORGET mode (background can't halt pipeline)
return PluginErrorModel(message=repr(exc), plugin_name=hook_ref.plugin_ref.name)
async def execute_plugin(
self,
hook_ref: HookRef,
payload: PluginPayload,
local_context: PluginContext,
violations_as_exceptions: bool,
global_context: Optional[GlobalContext] = None,
combined_metadata: Optional[dict[str, Any]] = None,
extensions: Optional[Extensions] = None,
) -> PluginResult:
"""Execute a single plugin with timeout protection.
Args:
hook_ref: Hooking structure that contains the plugin and hook.
payload: The payload to be processed by plugins.
local_context: local context.
violations_as_exceptions: Raise violations as exceptions rather than as returns.
global_context: Shared context for all plugins containing request metadata.
combined_metadata: combination of the metadata of all plugins.
extensions: Optional extensions to filter and pass to plugins that accept them.
Returns:
A tuple containing:
- PluginResult with processing status, modified payload, and metadata
- PluginContextTable with updated local contexts for each plugin
Raises:
PayloadSizeError: If the payload exceeds MAX_PAYLOAD_SIZE.
PluginError: If there is an error inside a plugin.
PluginViolationError: If a violation occurs and violation_as_exceptions is set.
"""
try:
# Execute plugin with timeout protection
result = await self._execute_with_timeout(hook_ref, payload, local_context, extensions=extensions)
# Merge global state for modes that participate in the pipeline chain.
# AUDIT and FIRE_AND_FORGET operate on isolated snapshots and should not
# mutate shared state.
if (
local_context.global_context
and global_context
and hook_ref.plugin_ref.mode
in (
PluginMode.SEQUENTIAL,
PluginMode.TRANSFORM,
PluginMode.CONCURRENT,
)
):
global_context.state.update(local_context.global_context.state)
global_context.metadata.update(local_context.global_context.metadata)
# Aggregate metadata from all plugins
if result.metadata and combined_metadata is not None:
combined_metadata.update(
{k: v for k, v in result.metadata.items() if k not in RESERVED_INTERNAL_METADATA_KEYS}
)
# Set plugin name in violation if present
if result.violation:
result.violation.plugin_name = hook_ref.plugin_ref.plugin.name
# Handle plugin blocking the request
if not result.continue_processing:
if hook_ref.plugin_ref.mode in (PluginMode.CONCURRENT, PluginMode.SEQUENTIAL):
mode = hook_ref.plugin_ref.mode.value
if result.violation:
logger.warning(
"Plugin %s blocked request in %s mode — violation [%s] %s: %s",
hook_ref.plugin_ref.plugin.name,
mode,
result.violation.code,
result.violation.reason,
result.violation.description,
)
else:
logger.warning(
"Plugin %s blocked request in %s mode (no violation details)",
hook_ref.plugin_ref.plugin.name,
mode,
)
if violations_as_exceptions:
if result.violation:
plugin_name = result.violation.plugin_name
violation_reason = result.violation.reason
violation_desc = result.violation.description
violation_code = result.violation.code
raise PluginViolationError(
f"{hook_ref.name} blocked by plugin {plugin_name}: {violation_code} - {violation_reason} ({violation_desc})",
violation=result.violation,
)
raise PluginViolationError(f"{hook_ref.name} blocked by plugin")
return PluginResult(
continue_processing=False,
modified_payload=None,
violation=result.violation,
metadata=combined_metadata,
)
if hook_ref.plugin_ref.mode in (PluginMode.AUDIT, PluginMode.TRANSFORM):
mode_label = hook_ref.plugin_ref.mode.value
if result.violation:
logger.warning(
"Plugin %s (%s) raised violation — pipeline continues: [%s] %s — %s",
hook_ref.plugin_ref.plugin.name,
mode_label,
result.violation.code,
result.violation.reason,
result.violation.description,
)
else:
logger.warning(
"Plugin %s (%s) returned continue_processing=False without a violation "
"— pipeline continues",
hook_ref.plugin_ref.plugin.name,
mode_label,
)
# Violations are logged but not propagated; AUDIT and TRANSFORM
# plugins cannot halt the pipeline. TRANSFORM may still carry a
# modified_payload (applied by the caller); AUDIT never does.
forwarded_payload = (
result.modified_payload if hook_ref.plugin_ref.mode == PluginMode.TRANSFORM else None
)
return PluginResult(
continue_processing=True,
modified_payload=forwarded_payload,
violation=None,
metadata=combined_metadata,
)
return result
except asyncio.TimeoutError as exc:
on_error = hook_ref.plugin_ref.on_error
logger.error("Plugin %s timed out after %ds", hook_ref.plugin_ref.name, self.timeout)
if on_error == OnError.FAIL:
raise PluginError(
error=PluginErrorModel(
message=f"Plugin {hook_ref.plugin_ref.name} exceeded {self.timeout}s timeout",
plugin_name=hook_ref.plugin_ref.name,
)
) from exc
if on_error == OnError.DISABLE:
self._runtime_disabled.add(hook_ref.plugin_ref.name)
except PluginViolationError:
raise
except PluginError as pe:
on_error = hook_ref.plugin_ref.on_error
logger.error("Plugin %s failed with error: %s", hook_ref.plugin_ref.name, str(pe))
if on_error == OnError.FAIL:
raise
if on_error == OnError.DISABLE:
self._runtime_disabled.add(hook_ref.plugin_ref.name)
except Exception as e:
on_error = hook_ref.plugin_ref.on_error
logger.error("Plugin %s failed with error: %s", hook_ref.plugin_ref.name, str(e))
if on_error == OnError.FAIL:
raise PluginError(error=convert_exception_to_error(e, hook_ref.plugin_ref.name)) from e
if on_error == OnError.DISABLE:
self._runtime_disabled.add(hook_ref.plugin_ref.name)
# Return a result indicating processing should continue despite the error
return PluginResult(continue_processing=True)
async def _execute_with_timeout(
self,
hook_ref: HookRef,
payload: PluginPayload,
context: PluginContext,
extensions: Optional[Extensions] = None,
) -> PluginResult:
"""Execute a plugin with timeout protection.
Args:
hook_ref: Reference to the hook and plugin to execute.
payload: Payload to process.
context: Plugin execution context.
extensions: Optional extensions to filter and pass if the plugin accepts them.
Returns:
Result from plugin execution.
Raises:
asyncio.TimeoutError: If plugin exceeds timeout.
asyncio.CancelledError: If plugin execution is cancelled.
Exception: Re-raised from plugin hook execution failures.
"""
# Start observability span if tracing is active
trace_id = current_trace_id.get()
span_id = None
if trace_id and self.observability:
try:
span_id = self.observability.start_span(
trace_id=trace_id,
name=f"plugin.execute.{hook_ref.plugin_ref.name}",
kind="internal",
resource_type="plugin",
resource_name=hook_ref.plugin_ref.name,
attributes={
"plugin.name": hook_ref.plugin_ref.name,
"plugin.uuid": hook_ref.plugin_ref.uuid,
"plugin.mode": (
hook_ref.plugin_ref.mode.value
if hasattr(hook_ref.plugin_ref.mode, "value")
else str(hook_ref.plugin_ref.mode)
),
"plugin.priority": hook_ref.plugin_ref.priority,
"plugin.timeout": self.timeout,
},
)
except Exception as e:
logger.debug("Plugin observability start_span failed: %s", e)
# Execute plugin