diff --git a/nodes/src/nodes/prompt_template/IGlobal.py b/nodes/src/nodes/prompt_template/IGlobal.py
new file mode 100644
index 000000000..0ac0a2501
--- /dev/null
+++ b/nodes/src/nodes/prompt_template/IGlobal.py
@@ -0,0 +1,44 @@
+# =============================================================================
+# MIT License
+# Copyright (c) 2026 Aparavi Software AG
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# =============================================================================
+
+from rocketlib import IGlobalBase, OPEN_MODE
+from ai.common.config import Config
+
+
+class IGlobal(IGlobalBase):
+ config = None
+
+ def beginGlobal(self):
+ if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
+ pass
+ else:
+ import os
+ from depends import depends # type: ignore
+
+ requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'
+ depends(requirements)
+
+ self.config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)
+
+ def endGlobal(self):
+ self.config = None
diff --git a/nodes/src/nodes/prompt_template/IInstance.py b/nodes/src/nodes/prompt_template/IInstance.py
new file mode 100644
index 000000000..05ba01276
--- /dev/null
+++ b/nodes/src/nodes/prompt_template/IInstance.py
@@ -0,0 +1,85 @@
+# =============================================================================
+# MIT License
+# Copyright (c) 2026 Aparavi Software AG
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# =============================================================================
+
+from rocketlib import IInstanceBase
+from .IGlobal import IGlobal
+from .template_engine import render
+from ai.common.schema import Question
+from rocketlib import debug, Entry
+
+
+class IInstance(IInstanceBase):
+ IGlobal: IGlobal
+
+ def __init__(self):
+ """Initialize the prompt template node instance state."""
+ super().__init__()
+ self.collected_text: list[str] = []
+ self.question = Question()
+
+ def open(self, entry: Entry):
+ pass
+
+ def _get_template_context(self) -> dict:
+ """Build the context dictionary for template rendering."""
+ config = self.IGlobal.config
+ if config is None:
+ config = {}
+ raw_variables = config.get('variables', {})
+ variables = dict(raw_variables) if isinstance(raw_variables, dict) else {}
+
+ # Add collected text as 'input' if available
+ if self.collected_text:
+ variables.setdefault('input', '\n'.join(self.collected_text))
+
+ return variables
+
+ def writeQuestions(self, question: Question):
+ for q in question.questions:
+ self.question.addQuestion(q.text)
+
+ def writeText(self, text: str):
+ self.collected_text.append(text)
+
+ def closing(self):
+ try:
+ config = self.IGlobal.config or {}
+ template = config.get('template', '{{input}}')
+ context = self._get_template_context()
+
+ # Render questions with full context now that all text has arrived
+ if self.question.questions:
+ rendered_question = Question()
+ for q in self.question.questions:
+ context['question'] = q.text
+ rendered = render(template, context)
+ rendered_question.addQuestion(rendered)
+ self.instance.writeQuestions(rendered_question)
+ # If we only have text input, render and output as text
+ elif self.collected_text:
+ rendered = render(template, context)
+ self.instance.writeText(rendered)
+
+ except Exception as e:
+ debug(f'Error in prompt_template node: {e}')
+ raise
diff --git a/nodes/src/nodes/prompt_template/__init__.py b/nodes/src/nodes/prompt_template/__init__.py
new file mode 100644
index 000000000..1fff03125
--- /dev/null
+++ b/nodes/src/nodes/prompt_template/__init__.py
@@ -0,0 +1,27 @@
+# =============================================================================
+# MIT License
+# Copyright (c) 2026 Aparavi Software AG
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# =============================================================================
+
+from .IGlobal import IGlobal
+from .IInstance import IInstance
+
+__all__ = ['IGlobal', 'IInstance']
diff --git a/nodes/src/nodes/prompt_template/requirements.txt b/nodes/src/nodes/prompt_template/requirements.txt
new file mode 100644
index 000000000..e709a9ba1
--- /dev/null
+++ b/nodes/src/nodes/prompt_template/requirements.txt
@@ -0,0 +1 @@
+# No external dependencies — stdlib only
diff --git a/nodes/src/nodes/prompt_template/services.json b/nodes/src/nodes/prompt_template/services.json
new file mode 100644
index 000000000..6c9df0137
--- /dev/null
+++ b/nodes/src/nodes/prompt_template/services.json
@@ -0,0 +1,195 @@
+{
+ //
+ // Required:
+ // The displayable name of this node
+ //
+ "title": "Prompt Template",
+ //
+ // Required:
+ // The protocol is the endpoint protocol
+ //
+ "protocol": "prompt-template://",
+ //
+ // Required:
+ // Class type of the node - what it does
+ //
+ "classType": [
+ "text"
+ ],
+ //
+ // Required:
+ // Capabilities are flags that change the behavior of the underlying
+ // engine
+ //
+ "capabilities": ["experimental"],
+ //
+ // Optional:
+ // Register is either filter, endpoint or ignored if not specified. If the
+ // type is specified, a factory is registered of that given type
+ //
+ "register": "filter",
+ //
+ // Optional:
+ // The node is the actual pyhsical node to instantiate - if
+ // not specified, the protocol will be used
+ //
+ "node": "python",
+ //
+ // Optional:
+ // The path is the executable/script code - it is node dependent
+ // and is optional for most node
+ //
+ "path": "nodes.prompt_template",
+ //
+ // Required:
+ // The prefix map when added/removed when convertting URLs <=> paths
+ //
+ "prefix": "prompt_template",
+ //
+ // Optional:
+ // Description to of this driver
+ //
+ "description": [
+ "A transformation component that renders prompt templates with dynamic ",
+ "variable substitution. Supports {{variable}} placeholders resolved from ",
+ "pipeline context, built-in helpers ({{now}}, {{uuid}}, {{random}}), ",
+ "conditional sections ({% if var %}...{% endif %}), loop support ",
+ "({% for item in list %}...{% endfor %}), and escape sequences for ",
+ "literal braces. Implemented with stdlib only — no Jinja2 dependency."
+ ],
+ //
+ // Optional:
+ // The icon is the icon to display in the UI for this node
+ //
+ "icon": "prompt.svg",
+ //
+ // Optional:
+ // The tile is the displayable name of this node
+ //
+ "tile": [],
+ //
+ // Optional:
+ // As a pipe component, define what this pipe component takes
+ // and what it produces
+ //
+ "lanes": {
+ "text": ["text"],
+ "questions": ["questions"]
+ },
+ "input": [
+ {
+ "lane": "text",
+ "description": "Text input - used as context variables for template rendering",
+ "output": [
+ {
+ "lane": "text",
+ "description": "Rendered template output as text"
+ }
+ ]
+ },
+ {
+ "lane": "questions",
+ "description": "Questions input - question text is rendered through the template",
+ "output": [
+ {
+ "lane": "questions",
+ "description": "Questions with template-rendered text"
+ }
+ ]
+ }
+ ],
+ //
+ // Optional:
+ // Profile section are configuration options used by the driver
+ // itself
+ //
+ "preconfig": {
+ "default": "default",
+ "profiles": {
+ "default": {
+ "template": "{{input}}",
+ "variables": {}
+ }
+ }
+ },
+ //
+ // Test configuration for automated node testing
+ //
+ "test": {
+ "profiles": ["default"],
+ "cases": [
+ {
+ "questions": {
+ "questions": [{"text": "Hello world"}]
+ },
+ "expect": {
+ "questions": {
+ "notEmpty": true
+ }
+ }
+ },
+ {
+ "config": {
+ "template": "Summarize the following: {{input}}",
+ "variables": {"input": "The quick brown fox jumps over the lazy dog."}
+ },
+ "questions": {
+ "questions": [{"text": "test prompt"}]
+ },
+ "expect": {
+ "questions": {
+ "notEmpty": true,
+ "contains": "Summarize the following:"
+ }
+ }
+ },
+ {
+ "config": {
+ "template": "Q={{question}} | I={{input}}",
+ "variables": {"input": "ctx"}
+ },
+ "questions": {
+ "questions": [{"text": "what is this?"}]
+ },
+ "expect": {
+ "questions": {
+ "contains": "Q=what is this?"
+ }
+ }
+ }
+ ]
+ },
+ //
+ // Optional:
+ // Local fields defintions - these define fields only for the
+ // current service. You may specify them here, or directly
+ // in the shape
+ //
+ "fields": {
+ "template": {
+ "type": "string",
+ "title": "Template",
+ "description": "Prompt template with {{variable}} placeholders, conditionals ({% if %}), and loops ({% for %})"
+ },
+ "variables": {
+ "type": "object",
+ "title": "Variables",
+ "description": "Key-value pairs of variables to substitute into the template"
+ }
+ },
+ //
+ // Required:
+ // Defines the fields (shape) of the service. Either source or target
+ // map be specified, or both, but at least one is required
+ //
+ "shape": [
+ {
+ "section": "Pipe",
+ "title": "Template",
+ "properties": [
+ "template",
+ "variables"
+ ]
+ }
+ ]
+}
diff --git a/nodes/src/nodes/prompt_template/template_engine.py b/nodes/src/nodes/prompt_template/template_engine.py
new file mode 100644
index 000000000..b99ad58fc
--- /dev/null
+++ b/nodes/src/nodes/prompt_template/template_engine.py
@@ -0,0 +1,311 @@
+# =============================================================================
+# MIT License
+# Copyright (c) 2026 Aparavi Software AG
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# =============================================================================
+
+"""
+Lightweight template engine with Jinja2-style syntax, implemented using only
+the Python standard library.
+
+Supported features:
+ - Variable substitution: {{variable}}
+ - Built-in helpers: {{now}}, {{uuid}}, {{random}}
+ - Conditionals: {% if var %}...{% elif var2 %}...{% else %}...{% endif %}
+ - Loops: {% for item in list %}...{% endfor %}
+ - Escape sequences: \\{{ and \\}} for literal braces
+"""
+
+import re
+import uuid as _uuid
+from datetime import datetime, timezone
+from random import random as _random
+from typing import Any
+
+
+# Sentinel used to distinguish "key missing" from "key is None"
+_MISSING = object()
+
+# ---------------------------------------------------------------------------
+# Regex patterns
+# ---------------------------------------------------------------------------
+_ESCAPE_OPEN = r'\\\{\{'
+_ESCAPE_CLOSE = r'\\\}\}'
+_PLACEHOLDER_OPEN = '\x00LBRACE\x00'
+_PLACEHOLDER_CLOSE = '\x00RBRACE\x00'
+
+
+# ---------------------------------------------------------------------------
+# Built-in helpers
+# ---------------------------------------------------------------------------
+_BUILTINS = {
+ 'now': lambda: datetime.now(timezone.utc).isoformat(),
+ 'uuid': lambda: str(_uuid.uuid4()),
+ 'random': lambda: str(_random()),
+}
+
+
+# ---------------------------------------------------------------------------
+# Context value resolution
+# ---------------------------------------------------------------------------
+def _resolve(name: str, context: dict) -> Any:
+ """Resolve a dotted name against *context*, falling back to built-ins.
+
+ Resolution order: context takes priority over built-in helpers.
+ """
+ name = name.strip()
+
+ # 1. Try context first — user-supplied values always win
+ parts = name.split('.')
+ value = context
+ for part in parts:
+ if isinstance(value, dict):
+ value = value.get(part, _MISSING)
+ else:
+ value = getattr(value, part, _MISSING)
+ if value is _MISSING:
+ break
+
+ if value is not _MISSING:
+ return value
+
+ # 2. Fall back to built-in helpers only when context lookup failed
+ if name in _BUILTINS:
+ return _BUILTINS[name]()
+
+ return ''
+
+
+# ---------------------------------------------------------------------------
+# Block-level processing (if / for)
+# ---------------------------------------------------------------------------
+
+
+def _find_matching_end(tokens: list[tuple[str, str]], start: int, open_tag: str, close_tag: str) -> int:
+ """Return the index of the matching close_tag, handling nesting."""
+ depth = 1
+ i = start
+ while i < len(tokens):
+ kind, text = tokens[i]
+ if kind == 'block':
+ tag = text.split()[0] if text.strip() else ''
+ if tag == open_tag:
+ depth += 1
+ elif tag == close_tag:
+ depth -= 1
+ if depth == 0:
+ return i
+ i += 1
+ return -1
+
+
+def _tokenize(template: str) -> list[tuple[str, str]]:
+ """Split template into a list of (kind, content) tokens.
+
+ Kinds: "text", "var", "block".
+ """
+ tokens: list[tuple[str, str]] = []
+ pos = 0
+ combined = re.compile(r'(\{\{.+?\}\}|\{%[-\s]*.*?[-\s]*%\})', re.DOTALL)
+ for m in combined.finditer(template):
+ if m.start() > pos:
+ tokens.append(('text', template[pos : m.start()]))
+ raw = m.group(0)
+ if raw.startswith('{{'):
+ tokens.append(('var', raw[2:-2]))
+ else:
+ tokens.append(('block', raw[2:-2].strip()))
+ pos = m.end()
+ if pos < len(template):
+ tokens.append(('text', template[pos:]))
+ return tokens
+
+
+def _eval_tokens(tokens: list[tuple[str, str]], context: dict) -> str:
+ """Walk a token list and recursively evaluate blocks."""
+ parts: list[str] = []
+ i = 0
+ while i < len(tokens):
+ kind, content = tokens[i]
+
+ if kind == 'text':
+ parts.append(content)
+ i += 1
+
+ elif kind == 'var':
+ parts.append(str(_resolve(content, context)))
+ i += 1
+
+ elif kind == 'block':
+ tag_parts = content.split(None, 1)
+ tag = tag_parts[0] if tag_parts else ''
+
+ if tag == 'if':
+ i = _handle_if(tokens, i, context, parts)
+ elif tag == 'for':
+ i = _handle_for(tokens, i, context, parts)
+ else:
+ parts.append('{%' + content + '%}')
+ i += 1
+ else:
+ i += 1
+
+ return ''.join(parts)
+
+
+def _at_top_level(tokens: list[tuple[str, str]], start: int, pos: int) -> bool:
+ """Check if *pos* is at the top nesting level between *start* and *pos*."""
+ depth_if = 0
+ depth_for = 0
+ for i in range(start, pos):
+ kind, text = tokens[i]
+ if kind == 'block':
+ tag = text.split()[0] if text.strip() else ''
+ if tag == 'if':
+ depth_if += 1
+ elif tag == 'endif':
+ depth_if -= 1
+ elif tag == 'for':
+ depth_for += 1
+ elif tag == 'endfor':
+ depth_for -= 1
+ return depth_if == 0 and depth_for == 0
+
+
+def _handle_if(
+ tokens: list[tuple[str, str]],
+ start: int,
+ context: dict,
+ parts: list[str],
+) -> int:
+ """Process an {% if %}...{% elif %}...{% else %}...{% endif %} block."""
+ end = _find_matching_end(tokens, start + 1, 'if', 'endif')
+ if end == -1:
+ parts.append('{%' + tokens[start][1] + '%}')
+ return start + 1
+
+ branches: list[tuple[str | None, list[tuple[str, str]]]] = []
+ tag_content = tokens[start][1]
+ cond_text = tag_content.split(None, 1)[1] if len(tag_content.split(None, 1)) > 1 else ''
+ body_tokens: list[tuple[str, str]] = []
+ i = start + 1
+ while i < end:
+ kind, text = tokens[i]
+ if kind == 'block':
+ tag = text.split()[0] if text.strip() else ''
+ if tag in ('elif', 'else') and _at_top_level(tokens, start + 1, i):
+ branches.append((cond_text, body_tokens))
+ body_tokens = []
+ if tag == 'elif':
+ cond_text = text.split(None, 1)[1] if len(text.split(None, 1)) > 1 else ''
+ else:
+ cond_text = None # else branch
+ i += 1
+ continue
+ body_tokens.append(tokens[i])
+ i += 1
+ branches.append((cond_text, body_tokens))
+
+ for cond, body in branches:
+ if cond is None or _truthy(_resolve(cond, context)):
+ parts.append(_eval_tokens(body, context))
+ break
+
+ return end + 1
+
+
+def _handle_for(
+ tokens: list[tuple[str, str]],
+ start: int,
+ context: dict,
+ parts: list[str],
+) -> int:
+ """Process a {% for item in list %}...{% endfor %} block."""
+ end = _find_matching_end(tokens, start + 1, 'for', 'endfor')
+ if end == -1:
+ parts.append('{%' + tokens[start][1] + '%}')
+ return start + 1
+
+ content = tokens[start][1]
+ m = re.match(r'for\s+(\w+)\s+in\s+(.+)', content)
+ if not m:
+ parts.append('{%' + content + '%}')
+ return end + 1
+
+ loop_var = m.group(1)
+ iterable_name = m.group(2).strip()
+ iterable = _resolve(iterable_name, context)
+
+ if not hasattr(iterable, '__iter__') or isinstance(iterable, str):
+ return end + 1
+
+ body_tokens = tokens[start + 1 : end]
+ for item in iterable:
+ child_context = {**context, loop_var: item}
+ parts.append(_eval_tokens(body_tokens, child_context))
+
+ return end + 1
+
+
+def _truthy(value) -> bool:
+ """Determine whether a template value is truthy."""
+ if value is _MISSING or value is None:
+ return False
+ if isinstance(value, str):
+ return value != ''
+ return bool(value)
+
+
+# ---------------------------------------------------------------------------
+# Public API
+# ---------------------------------------------------------------------------
+
+
+def render(template: str, context: dict | None = None) -> str:
+ """Render *template* with the given *context* dictionary.
+
+ Parameters
+ ----------
+ template:
+ The template string containing ``{{var}}``, ``{% if %}``, ``{% for %}``
+ constructs.
+ context:
+ A dictionary of variables available for substitution.
+
+ Returns
+ -------
+ The rendered string.
+ """
+ if context is None:
+ context = {}
+
+ # 1. Protect escaped braces
+ text = re.sub(_ESCAPE_OPEN, _PLACEHOLDER_OPEN, template)
+ text = re.sub(_ESCAPE_CLOSE, _PLACEHOLDER_CLOSE, text)
+
+ # 2. Tokenize and evaluate
+ tokens = _tokenize(text)
+ result = _eval_tokens(tokens, context)
+
+ # 3. Restore escaped braces
+ result = result.replace(_PLACEHOLDER_OPEN, '{{')
+ result = result.replace(_PLACEHOLDER_CLOSE, '}}')
+
+ return result
diff --git a/nodes/test/prompt_template/test_engine.py b/nodes/test/prompt_template/test_engine.py
new file mode 100644
index 000000000..b1b1f3064
--- /dev/null
+++ b/nodes/test/prompt_template/test_engine.py
@@ -0,0 +1,387 @@
+# =============================================================================
+# MIT License
+# Copyright (c) 2026 Aparavi Software AG
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# =============================================================================
+
+"""
+Unit tests for the custom prompt_template engine.
+
+These tests exercise ``nodes/src/nodes/prompt_template/template_engine.py`` in
+isolation — no server, no dependencies beyond the Python standard library and
+pytest. The engine is loaded directly via ``importlib`` so that importing the
+surrounding ``nodes`` package (which pulls in ``depends``) is avoided.
+
+Coverage targets:
+ - variable substitution (flat + dotted paths)
+ - missing variables / graceful fallback
+ - built-in helpers (``now``, ``uuid``, ``random``)
+ - context overriding built-ins (regression guard)
+ - conditionals (``if`` / ``elif`` / ``else`` / nested)
+ - empty and malformed ``{% if %}`` conditions
+ - loops (flat + nested)
+ - unclosed / malformed block tags
+ - escape sequences for literal braces
+ - large inputs / deeply nested blocks (ReDoS + recursion safety)
+ - HTML pass-through (no implicit escaping)
+"""
+
+import importlib.util
+import sys
+import uuid as _uuid
+from pathlib import Path
+
+
+# ---------------------------------------------------------------------------
+# Load template_engine.py directly without importing the ``nodes`` package.
+#
+# ``nodes/__init__.py`` pulls in the ``depends`` module which is only available
+# inside the built server environment. Loading the engine file directly keeps
+# these tests pure and runnable anywhere Python is available.
+# ---------------------------------------------------------------------------
+_ENGINE_PATH = Path(__file__).resolve().parent.parent / 'src' / 'nodes' / 'prompt_template' / 'template_engine.py'
+
+_spec = importlib.util.spec_from_file_location('prompt_template_engine', _ENGINE_PATH)
+assert _spec is not None and _spec.loader is not None, 'failed to locate template_engine.py'
+template_engine = importlib.util.module_from_spec(_spec)
+sys.modules['prompt_template_engine'] = template_engine
+_spec.loader.exec_module(template_engine)
+
+render = template_engine.render
+
+
+# ===========================================================================
+# 1. Simple variable substitution
+# ===========================================================================
+class TestVariableSubstitution:
+ def test_flat_variable(self):
+ assert render('Hello {{name}}', {'name': 'world'}) == 'Hello world'
+
+ def test_multiple_variables(self):
+ result = render('{{greeting}}, {{name}}!', {'greeting': 'Hi', 'name': 'Ada'})
+ assert result == 'Hi, Ada!'
+
+ def test_whitespace_inside_braces_is_stripped(self):
+ assert render('Hello {{ name }}', {'name': 'world'}) == 'Hello world'
+
+ def test_dotted_path(self):
+ ctx = {'user': {'email': 'ada@example.com'}}
+ assert render('{{user.email}}', ctx) == 'ada@example.com'
+
+ def test_deep_dotted_path(self):
+ ctx = {'a': {'b': {'c': {'d': 'leaf'}}}}
+ assert render('{{a.b.c.d}}', ctx) == 'leaf'
+
+ def test_dotted_path_with_attribute_object(self):
+ class Obj:
+ pass
+
+ o = Obj()
+ o.name = 'attribute-value'
+ assert render('{{obj.name}}', {'obj': o}) == 'attribute-value'
+
+ def test_non_string_value_is_stringified(self):
+ assert render('count={{n}}', {'n': 42}) == 'count=42'
+
+ def test_empty_template(self):
+ assert render('', {'name': 'world'}) == ''
+
+ def test_none_context(self):
+ # Should not crash when context is None.
+ assert render('hello', None) == 'hello'
+
+ def test_template_without_placeholders_passthrough(self):
+ assert render('no placeholders here', {}) == 'no placeholders here'
+
+
+# ===========================================================================
+# 2. Missing variables / graceful handling
+# ===========================================================================
+class TestMissingVariables:
+ def test_missing_flat_variable_renders_empty(self):
+ assert render('Hello {{name}}', {}) == 'Hello '
+
+ def test_missing_dotted_segment_renders_empty(self):
+ assert render('{{user.email}}', {'user': {}}) == ''
+
+ def test_missing_dotted_root_renders_empty(self):
+ assert render('{{user.email}}', {}) == ''
+
+ def test_missing_variable_does_not_raise(self):
+ # Graceful handling is a documented contract.
+ render('{{a}} {{b}} {{c.d}}', {'a': 'x'}) # must not raise
+
+
+# ===========================================================================
+# 3. Built-in helpers
+# ===========================================================================
+class TestBuiltins:
+ def test_now_is_iso_format_utc(self):
+ out = render('{{now}}', {})
+ # ISO8601 with timezone — has a 'T' separator and a '+00:00' suffix.
+ assert 'T' in out
+ assert out.endswith('+00:00')
+
+ def test_uuid_is_valid_uuid4(self):
+ out = render('{{uuid}}', {})
+ parsed = _uuid.UUID(out)
+ assert parsed.version == 4
+
+ def test_random_is_float_between_zero_and_one(self):
+ out = render('{{random}}', {})
+ value = float(out)
+ assert 0.0 <= value < 1.0
+
+ def test_builtins_work_inside_larger_template(self):
+ out = render('id={{uuid}}', {})
+ assert out.startswith('id=')
+ # The suffix must still be a valid UUID4
+ _uuid.UUID(out[len('id=') :])
+
+
+# ===========================================================================
+# 4. Context overrides built-ins (regression test for the recent fix)
+# ===========================================================================
+class TestContextOverridesBuiltins:
+ def test_context_value_wins_over_now(self):
+ assert render('{{now}}', {'now': 'FIXED'}) == 'FIXED'
+
+ def test_context_value_wins_over_uuid(self):
+ assert render('{{uuid}}', {'uuid': 'static-id'}) == 'static-id'
+
+ def test_context_value_wins_over_random(self):
+ assert render('{{random}}', {'random': '0.5'}) == '0.5'
+
+ def test_builtin_still_used_when_not_in_context(self):
+ # Sanity: when `now` is absent from context, the builtin fires.
+ out = render('{{now}}', {'other': 'value'})
+ assert 'T' in out # builtin ran
+
+
+# ===========================================================================
+# 5. Conditionals
+# ===========================================================================
+class TestConditionals:
+ def test_if_true_branch(self):
+ assert render('{% if flag %}yes{% endif %}', {'flag': True}) == 'yes'
+
+ def test_if_false_branch(self):
+ assert render('{% if flag %}yes{% endif %}', {'flag': False}) == ''
+
+ def test_if_else_true(self):
+ tpl = '{% if flag %}yes{% else %}no{% endif %}'
+ assert render(tpl, {'flag': True}) == 'yes'
+
+ def test_if_else_false(self):
+ tpl = '{% if flag %}yes{% else %}no{% endif %}'
+ assert render(tpl, {'flag': False}) == 'no'
+
+ def test_elif_matches_second(self):
+ tpl = '{% if a %}A{% elif b %}B{% else %}C{% endif %}'
+ assert render(tpl, {'a': False, 'b': True}) == 'B'
+
+ def test_elif_falls_through_to_else(self):
+ tpl = '{% if a %}A{% elif b %}B{% else %}C{% endif %}'
+ assert render(tpl, {'a': False, 'b': False}) == 'C'
+
+ def test_empty_string_is_falsy(self):
+ assert render('{% if s %}yes{% endif %}', {'s': ''}) == ''
+
+ def test_none_is_falsy(self):
+ assert render('{% if s %}yes{% endif %}', {'s': None}) == ''
+
+ def test_missing_variable_is_falsy(self):
+ # Missing vars resolve to '' which is falsy, so the branch is skipped.
+ assert render('{% if absent %}yes{% endif %}', {}) == ''
+
+ def test_empty_if_condition_is_falsy(self):
+ # `{% if %}` has no condition name -> resolves to empty string -> falsy.
+ # Must not raise; must render the else branch.
+ tpl = '{% if %}T{% else %}F{% endif %}'
+ assert render(tpl, {}) == 'F'
+
+ def test_nested_if_inner_branch(self):
+ tpl = '{% if a %}A-{% if b %}B{% else %}NB{% endif %}{% endif %}'
+ assert render(tpl, {'a': True, 'b': True}) == 'A-B'
+ assert render(tpl, {'a': True, 'b': False}) == 'A-NB'
+ assert render(tpl, {'a': False, 'b': True}) == ''
+
+ def test_dotted_condition(self):
+ tpl = '{% if user.active %}on{% else %}off{% endif %}'
+ assert render(tpl, {'user': {'active': True}}) == 'on'
+ assert render(tpl, {'user': {'active': False}}) == 'off'
+
+
+# ===========================================================================
+# 6. Loops
+# ===========================================================================
+class TestLoops:
+ def test_basic_for_loop(self):
+ tpl = '{% for item in items %}{{item}},{% endfor %}'
+ assert render(tpl, {'items': ['a', 'b', 'c']}) == 'a,b,c,'
+
+ def test_for_loop_empty_iterable(self):
+ tpl = '[{% for item in items %}{{item}}{% endfor %}]'
+ assert render(tpl, {'items': []}) == '[]'
+
+ def test_for_loop_over_dicts_with_dotted_access(self):
+ tpl = '{% for u in users %}{{u.name}}:{% endfor %}'
+ ctx = {'users': [{'name': 'ada'}, {'name': 'bob'}]}
+ assert render(tpl, ctx) == 'ada:bob:'
+
+ def test_for_loop_with_missing_iterable_is_noop(self):
+ tpl = 'start[{% for item in items %}{{item}}{% endfor %}]end'
+ assert render(tpl, {}) == 'start[]end'
+
+ def test_for_loop_over_string_is_noop(self):
+ # _handle_for explicitly skips strings so we don't iterate characters.
+ tpl = '{% for c in s %}{{c}}{% endfor %}'
+ assert render(tpl, {'s': 'hello'}) == ''
+
+ def test_nested_for_loops(self):
+ tpl = '{% for row in rows %}{% for cell in row %}{{cell}},{% endfor %}|{% endfor %}'
+ ctx = {'rows': [[1, 2], [3, 4]]}
+ assert render(tpl, ctx) == '1,2,|3,4,|'
+
+ def test_loop_variable_shadowing(self):
+ # Loop var should be visible only inside the loop body.
+ tpl = '{% for x in items %}{{x}},{% endfor %}outer:{{x}}'
+ assert render(tpl, {'items': ['a', 'b'], 'x': 'OUT'}) == 'a,b,outer:OUT'
+
+
+# ===========================================================================
+# 7. Malformed / unclosed tags
+# ===========================================================================
+class TestMalformedTags:
+ def test_unclosed_if_emits_raw_tag(self):
+ # When there's no matching endif, the engine falls back to rendering
+ # the raw tag text and continuing — graceful rather than crashing.
+ out = render('{% if flag %}body', {'flag': True})
+ # The raw tag is preserved; the body is still processed as text.
+ assert 'if flag' in out
+ assert 'body' in out
+
+ def test_unclosed_for_emits_raw_tag(self):
+ out = render('{% for x in items %}body', {'items': [1, 2]})
+ assert 'for x in items' in out
+ assert 'body' in out
+
+ def test_malformed_for_tag_is_noop(self):
+ # `{% for foo %}` has no `in ...` clause — regex match fails, the raw
+ # tag is emitted and the body between tags is dropped. The engine
+ # must not raise.
+ out = render('before{% for foo %}X{% endfor %}after', {})
+ assert out.startswith('before')
+ assert out.endswith('after')
+ assert 'X' not in out # body is dropped
+
+ def test_unknown_block_tag_is_passed_through(self):
+ tpl = 'x{% weird tag %}y'
+ out = render(tpl, {})
+ assert 'weird tag' in out
+ assert out.startswith('x')
+ assert out.endswith('y')
+
+
+# ===========================================================================
+# 8. Escape sequences
+# ===========================================================================
+class TestEscapes:
+ def test_escaped_open_brace(self):
+ # \{{ should render as literal {{
+ assert render(r'\{{name}}', {'name': 'ignored'}) == '{{name}}'
+
+ def test_escaped_close_brace(self):
+ # \}} should render as literal }}
+ assert render(r'{{name}}\}}', {'name': 'X'}) == 'X}}'
+
+ def test_escaped_pair_does_not_substitute(self):
+ assert render(r'\{{x\}}', {'x': 'should-not-appear'}) == '{{x}}'
+
+
+# ===========================================================================
+# 9. HTML pass-through (no implicit escaping)
+# ===========================================================================
+class TestHtmlPassthrough:
+ def test_html_substituted_unchanged(self):
+ # The engine does NOT auto-escape — confirming existing behavior so
+ # that any future change is a conscious one. Callers are responsible
+ # for sanitizing untrusted values.
+ tpl = 'Result: {{payload}}'
+ ctx = {'payload': ''}
+ assert render(tpl, ctx) == 'Result: '
+
+ def test_html_entity_in_template_is_literal(self):
+ assert render('&', {}) == '&'
+
+
+# ===========================================================================
+# 10. Safety: large / deeply-nested input (ReDoS + recursion)
+# ===========================================================================
+class TestSafety:
+ def test_large_literal_input_runs_quickly(self):
+ # ~100KB with one placeholder: verifies the tokenizer regex handles
+ # large literal runs correctly and does not catastrophically backtrack
+ # (ReDoS guard). A regex blow-up would manifest as a multi-minute hang
+ # that pytest would terminate — no wall-clock threshold is needed here,
+ # and asserting one only introduces CI flakiness on slow/busy runners.
+ template = 'x' * 50_000 + '{{v}}' + 'y' * 50_000
+ out = render(template, {'v': 'MID'})
+ assert out == ('x' * 50_000 + 'MID' + 'y' * 50_000)
+
+ def test_many_placeholders(self):
+ tpl = ''.join(f'{{{{v{i}}}}}' for i in range(500))
+ ctx = {f'v{i}': str(i) for i in range(500)}
+ out = render(tpl, ctx)
+ assert out == ''.join(str(i) for i in range(500))
+
+ def test_nested_if_blocks_do_not_blow_stack(self):
+ depth = 50
+ tpl = '{% if flag %}' * depth + 'X' + '{% endif %}' * depth
+ out = render(tpl, {'flag': True})
+ assert out == 'X'
+
+ def test_nested_for_blocks_do_not_blow_stack(self):
+ # 10 nested loops of 1 item each — exercises the recursive
+ # _eval_tokens path without exploding combinatorially.
+ depth = 10
+ header = ''.join(f'{{% for x{i} in items %}}' for i in range(depth))
+ footer = '{% endfor %}' * depth
+ tpl = header + 'Y' + footer
+ out = render(tpl, {'items': [1]})
+ assert out == 'Y'
+
+
+# ===========================================================================
+# 11. Public API surface
+# ===========================================================================
+class TestPublicAPI:
+ def test_render_returns_string(self):
+ assert isinstance(render('hello', {}), str)
+
+ def test_render_accepts_none_context(self):
+ assert render('hello {{x}}', None) == 'hello '
+
+ def test_regression_for_question_substitution(self):
+ # Mirrors the real IInstance.closing() usage where `question` is
+ # injected into the per-render context alongside collected text.
+ tpl = 'Q={{question}} | I={{input}}'
+ ctx = {'question': 'what is this?', 'input': 'ctx'}
+ assert render(tpl, ctx) == 'Q=what is this? | I=ctx'