-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathoperation.py
More file actions
250 lines (220 loc) · 9.53 KB
/
operation.py
File metadata and controls
250 lines (220 loc) · 9.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
from abc import ABC, abstractmethod
import logging
from typing import Type, Dict, Optional, Any, List, ClassVar, Union
from pydantic import BaseModel, Field, ConfigDict
from pydantic_settings import BaseSettings
from rdflib.term import Node
from rdflib import URIRef, Literal, BNode, Graph
from rdflib.namespace import XSD
from rdflib.query import Result
class Operation(ABC, BaseModel):
"""
Abstract base class for all operations with dual execution paths:
1. execute() - Pure RDFLib function
2. execute_json() - JSON argument processing
Operations can optionally implement MCPTool interface for MCP client access.
"""
registry: ClassVar[Dict[str, Type["Operation"]]] = {}
settings: BaseSettings = Field(exclude=True)
context: Any = {}
model_config = ConfigDict(extra="allow")
@classmethod
def name(cls) -> str:
return cls.__name__
@classmethod
@abstractmethod
def description(cls) -> str:
pass
@classmethod
@abstractmethod
def inputSchema(cls) -> dict:
pass
@abstractmethod
def execute(self, *args) -> Union[Node, Result, Graph]:
"""Pure function: RDFLib terms → RDFLib terms/Results/Graphs"""
pass
@abstractmethod
def execute_json(
self, arguments: dict, variable_stack: list = []
) -> Union[Node, Result, Graph]:
"""JSON execution: processes JSON args, returns RDFLib objects"""
pass
@classmethod
def register(cls, operation_cls: Type["Operation"]) -> None:
if not issubclass(operation_cls, cls):
raise ValueError(
f"Cannot register {operation_cls}: Must be a subclass of Operation."
)
cls.registry[operation_cls.name()] = operation_cls
logging.info(f"Registered operation: {operation_cls.name()}")
@classmethod
def list_operations(cls) -> List[Type["Operation"]]:
return list(cls.registry.values())
@classmethod
def get(cls, name: str) -> Optional[Type["Operation"]]:
return cls.registry.get(name)
@classmethod
def process_json(
cls,
settings: BaseSettings,
json_data: Any,
context: dict = {},
variable_stack: list = [],
) -> Any:
"""Class method for processing JSON with @op structures"""
if isinstance(json_data, dict):
if "@op" in json_data:
op_name = json_data["@op"]
op_args = json_data.get("args", {})
operation_cls = cls.get(op_name)
if not operation_cls:
raise ValueError(f"Unknown operation: {op_name}")
operation = operation_cls(settings=settings, context=context)
result = operation.execute_json(op_args, variable_stack)
# Return RDFLib objects as-is for operation chaining
return result
# 🔁 Recurse into each value — allows nested @op inside JSON-LD and SPARQL bindings
return {
k: cls.process_json(settings, v, context, variable_stack)
for k, v in json_data.items()
}
elif isinstance(json_data, list):
# For sequential operations, share variable stack to allow accumulation
results = []
current_stack = variable_stack.copy()
for item in json_data:
result = cls.process_json(settings, item, context, current_stack)
results.append(result)
return results
else:
# Convert plain values to RDFLib terms
return cls.json_to_rdflib(json_data)
@staticmethod
def _serialize_for_json_context(obj) -> Any:
"""Convert RDFLib objects to appropriate format for JSON consumption"""
if isinstance(obj, (URIRef, Literal, BNode)):
return str(obj) # Convert RDFLib terms to strings for JSON-LD
elif hasattr(obj, "to_json") and callable(obj.to_json):
return obj.to_json() # Convert Result to SPARQL JSON format
elif isinstance(obj, Graph):
# Keep graphs as-is for now - they'll be serialized by HTTP operations
return obj
else:
return obj
# Variable stack management methods
def push_variable_scope(self, variable_stack: list):
"""Create a new variable scope (like entering a new XSLT template)."""
variable_stack.append({})
def pop_variable_scope(self, variable_stack: list):
"""Exit the current variable scope (like leaving an XSLT template)."""
if variable_stack:
variable_stack.pop()
def set_variable(self, name: str, value: Any, variable_stack: list):
"""Set a variable in the current scope."""
if not variable_stack:
variable_stack.append({})
variable_stack[-1][name] = value
def get_variable(self, name: str, variable_stack: list) -> Any:
"""Get a variable value, searching from innermost to outermost scope."""
for scope in reversed(variable_stack):
if name in scope:
return scope[name]
raise ValueError(f"Variable '{name}' not found")
# Conversion helpers between different formats
@staticmethod
def json_to_rdflib(data) -> Node:
"""Convert JSON/binding objects to RDFLib terms"""
if isinstance(data, dict) and "type" in data and "value" in data:
# SPARQL binding object - values may have been processed to RDFLib terms
type_str = str(data["type"]) # Convert potential Literal to string
value_str = str(data["value"]) # Convert potential Literal to string
if type_str == "uri":
return URIRef(value_str)
elif type_str == "literal":
datatype = data.get("datatype")
lang = data.get("xml:lang")
if datatype:
datatype = URIRef(str(datatype))
if lang:
lang = str(lang)
return Literal(value_str, datatype=datatype, lang=lang)
elif type_str == "bnode":
return BNode(value_str)
else:
raise ValueError(f"Unknown binding type: {type_str}")
elif isinstance(data, (URIRef, Literal, BNode)):
# Already RDFLib term
return data
elif hasattr(data, "__class__") and data.__class__.__name__ in [
"JSONResult",
"Result",
]:
# Result should pass through unchanged
return data
elif isinstance(data, str):
# Plain string → always convert to string literal
return Literal(data, datatype=XSD.string)
elif isinstance(data, int):
return Literal(data, datatype=XSD.integer)
elif isinstance(data, float):
return Literal(data, datatype=XSD.double)
elif isinstance(data, bool):
return Literal(data, datatype=XSD.boolean)
else:
# Default: convert to string literal
return Literal(str(data), datatype=XSD.string)
@staticmethod
def plain_to_rdflib(value: Any) -> Node:
"""Convert plain Python values to RDFLib terms for MCP interface"""
if isinstance(value, str):
# Plain string → always convert to string literal
return Literal(value, datatype=XSD.string)
elif isinstance(value, int):
return Literal(value, datatype=XSD.integer)
elif isinstance(value, float):
return Literal(value, datatype=XSD.double)
elif isinstance(value, bool):
return Literal(value, datatype=XSD.boolean)
else:
return Literal(str(value), datatype=XSD.string)
@staticmethod
def to_string_literal(term: Node) -> Literal:
"""Convert Literal terms to string-compatible literals, following SPARQL semantics"""
if isinstance(term, Literal):
# Both xsd:string and rdf:langString are string-compatible in SPARQL
if term.datatype == XSD.string:
return term # Already xsd:string, return as-is
elif term.language is not None:
return term # rdf:langString (datatype=None, lang=xx), return as-is (compatible with string operations)
elif term.datatype is None and term.language is None:
# Plain literal without datatype or language - treat as string
return term
else:
# Other literal datatypes need explicit conversion
raise TypeError(
f"Cannot implicitly convert {term.datatype} to string. Use Str() operation for explicit casting."
)
else:
# URIRef, BNode, etc. require explicit casting
raise TypeError(
f"Cannot implicitly convert {type(term).__name__} to string. Use Str() operation for explicit casting."
)
@staticmethod
def rdflib_to_plain(term: Node) -> Any:
"""Convert RDFLib terms to plain Python values for MCP interface"""
if isinstance(term, URIRef):
return str(term)
elif isinstance(term, Literal):
# Convert based on datatype
if term.datatype == XSD.integer:
return int(term)
elif term.datatype == XSD.double or term.datatype == XSD.float:
return float(term)
elif term.datatype == XSD.boolean:
return term.toPython()
else:
return str(term)
elif isinstance(term, BNode):
return str(term)
else:
return str(term)