-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathstr.py
More file actions
70 lines (58 loc) · 2.39 KB
/
str.py
File metadata and controls
70 lines (58 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from typing import Any
from rdflib.term import Node
from rdflib import Literal
from rdflib.namespace import XSD
from mcp import types
from web_algebra.operation import Operation
class Str(Operation):
"""
Converts any RDF term to a string literal
"""
@classmethod
def description(cls) -> str:
return "Converts any RDF term to a string literal"
@classmethod
def inputSchema(cls) -> dict:
return {
"type": "object",
"properties": {
"input": {"description": "The input value to convert to string"},
},
"required": ["input"],
}
def execute(self, term: Node) -> Literal:
"""Pure function: RDFLib term → string literal"""
# Check if already string-compatible
if isinstance(term, Literal):
if term.datatype == XSD.string:
return term # Already xsd:string, return as-is
elif term.language is not None:
return term # rdf:langString (datatype=None, language=xx), return as-is (compatible)
elif term.datatype is None and term.language is None:
# Plain literal without datatype or language - treat as string
return term
# Convert any other term to xsd:string
return Literal(str(term), datatype=XSD.string)
def execute_json(
self, arguments: dict, variable_stack: list = []
) -> Literal:
"""JSON execution: processes JSON args, returns RDFLib string literal"""
# Process the input argument through the JSON system
input_data = Operation.process_json(
self.settings, arguments["input"], self.context, variable_stack
)
# Expect RDFLib term directly
if not isinstance(input_data, Node):
raise TypeError(
f"Str operation expects input to be RDFLib term, got {type(input_data)}"
)
# Call pure function
return self.execute(input_data)
def mcp_run(self, arguments: dict, context: Any = None) -> Any:
"""MCP execution: plain args → plain results"""
# Convert plain input to RDFLib term
rdflib_term = Operation.plain_to_rdflib(arguments["input"])
# Call pure function
result = self.execute(rdflib_term)
# Convert result to plain string for MCP
return [types.TextContent(type="text", text=str(result))]