|
15 | 15 | import locale |
16 | 16 | from collections.abc import Callable, MutableMapping |
17 | 17 | from urllib.parse import urlparse |
18 | | -from typing import cast, Any, ClassVar, Optional, Union |
| 18 | +from typing import Any, ClassVar, cast |
19 | 19 |
|
20 | 20 | import elementpath.aliases as ta |
21 | 21 |
|
22 | | -from elementpath.helpers import upper_camel_case, is_ncname, ordinal |
23 | | -from elementpath.exceptions import ElementPathTypeError, \ |
24 | | - ElementPathValueError, MissingContextError, xpath_error |
| 22 | +from elementpath.helpers import upper_camel_case, is_ncname |
| 23 | +from elementpath.exceptions import ElementPathTypeError, ElementPathValueError, xpath_error |
25 | 24 | from elementpath.namespaces import XML_NAMESPACE, XSD_NAMESPACE, XPATH_FUNCTIONS_NAMESPACE, \ |
26 | 25 | XQT_ERRORS_NAMESPACE, XSD_NOTATION, XSD_ANY_ATOMIC_TYPE |
27 | 26 | from elementpath.namespaces import get_prefixed_name |
28 | 27 | from elementpath.datatypes import QName, builtin_atomic_types |
29 | 28 | from elementpath.collations import UNICODE_COLLATION_BASE_URI, UNICODE_CODEPOINT_COLLATION |
30 | | -from elementpath.xpath_tokens import XPathToken, ProxyToken, XPathFunction, XPathConstructor |
31 | | -from elementpath.xpath_context import XPathContext, XPathSchemaContext |
| 29 | +from elementpath.xpath_tokens import XPathToken, ProxyToken, XPathFunction, \ |
| 30 | + XPathConstructor, SchemaConstructor, ExternalFunction |
32 | 31 | from elementpath.sequence_types import is_sequence_type, match_sequence_type |
33 | 32 | from elementpath.schema_proxy import AbstractSchemaProxy |
34 | 33 | from elementpath.xpath1 import XPath1Parser |
@@ -94,8 +93,7 @@ class XPath2Parser(XPath1Parser): |
94 | 93 | 'schema-element', 'text', 'typeswitch', |
95 | 94 | } |
96 | 95 |
|
97 | | - function_signatures: dict[tuple[QName, int], str] \ |
98 | | - = XPath1Parser.function_signatures.copy() |
| 96 | + function_signatures: dict[tuple[QName, int], str] = XPath1Parser.function_signatures.copy() |
99 | 97 | namespaces: dict[str, str] |
100 | 98 | token: XPathToken |
101 | 99 | next_token: XPathToken |
@@ -244,8 +242,8 @@ def advance(self, *symbols: str, message: str | None = None) -> XPathToken: |
244 | 242 |
|
245 | 243 | @classmethod |
246 | 244 | def constructor(cls, symbol: str, bp: int = 90, nargs: ta.NargsType = 1, |
247 | | - sequence_types: Union[tuple[()], tuple[str, ...], list[str]] = (), |
248 | | - label: Union[str, tuple[str, ...]] = 'constructor function') \ |
| 245 | + sequence_types: tuple[()] | tuple[str, ...] | list[str] = (), |
| 246 | + label: str | tuple[str, ...] = 'constructor function') \ |
249 | 247 | -> Callable[[Callable[..., Any]], Callable[..., Any]]: |
250 | 248 | """ |
251 | 249 | Statically creates a constructor token class, that is registered in the globals |
@@ -276,62 +274,61 @@ def bind(func: Callable[..., Any]) -> Callable[..., Any]: |
276 | 274 | return func |
277 | 275 | return bind |
278 | 276 |
|
279 | | - def schema_constructor(self, atomic_type_name: str, bp: int = 90) \ |
280 | | - -> type[XPathFunction]: |
281 | | - """Dynamically registers a token class for a schema atomic type constructor function.""" |
282 | | - if atomic_type_name in (XSD_ANY_ATOMIC_TYPE, XSD_NOTATION): |
283 | | - raise xpath_error('XPST0080') |
| 277 | + def dynamic_register(self, symbol: str, class_name: str, **kwargs: Any) \ |
| 278 | + -> type[ta.XPathTokenType]: |
| 279 | + """ |
| 280 | + Register/update a token class in the symbol table of a parser instance. |
284 | 281 |
|
285 | | - def nud_(self_: XPathFunction) -> XPathFunction: |
286 | | - self_.parser.advance('(') |
287 | | - self_[0:] = self_.parser.expression(5), |
288 | | - self_.parser.advance(')') |
| 282 | + :param symbol: The identifier symbol for the new class. |
| 283 | + :param class_name: The name of the new class. |
| 284 | + :param kwargs: Optional attributes/methods for the token class. |
| 285 | + :return: A token class. |
| 286 | + """ |
| 287 | + kwargs['symbol'] = symbol |
| 288 | + if 'lookup_name' not in kwargs: |
| 289 | + lookup_name = kwargs['lookup_name'] = symbol |
| 290 | + else: |
| 291 | + lookup_name = kwargs['lookup_name'] |
289 | 292 |
|
290 | | - try: |
291 | | - self_.evaluate() # for static context evaluation |
292 | | - except MissingContextError: |
293 | | - pass |
294 | | - return self_ |
| 293 | + if 'label' not in kwargs: |
| 294 | + kwargs['label'] = 'symbol' |
295 | 295 |
|
296 | | - def evaluate_(self_: XPathFunction, context: XPathContext | None = None) \ |
297 | | - -> ta.OneAtomicOrEmpty: |
298 | | - arg = self_.get_argument(context) |
299 | | - if arg is None or self_.parser.schema is None: |
300 | | - return [] |
| 296 | + token_class_bases = kwargs.get('bases', (self.token_base_class,)) |
| 297 | + kwargs.update({ |
| 298 | + '__module__': self.__module__, |
| 299 | + '__qualname__': class_name, |
| 300 | + '__return__': None |
| 301 | + }) |
| 302 | + token_class = cast( |
| 303 | + type[ta.XPathTokenType], |
| 304 | + cast(object, ABCMeta(class_name, token_class_bases, kwargs)) |
| 305 | + ) |
301 | 306 |
|
302 | | - value = self_.string_value(arg) |
303 | | - try: |
304 | | - return self_.parser.schema.cast_as(value, atomic_type_name) |
305 | | - except (TypeError, ValueError) as err: |
306 | | - if isinstance(context, XPathSchemaContext): |
307 | | - return [] |
308 | | - raise self_.error('FORG0001', err) |
| 307 | + if self.symbol_table is self.__class__.symbol_table: |
| 308 | + self.symbol_table = copy.copy(self.symbol_table) |
| 309 | + self.symbol_table[lookup_name] = token_class |
| 310 | + self.tokenizer = None |
| 311 | + return token_class |
| 312 | + |
| 313 | + def schema_constructor(self, atomic_type_name: str, bp: int = 90) -> type[XPathFunction]: |
| 314 | + """ |
| 315 | + Dynamically register a token class for a schema atomic type constructor function. |
| 316 | + """ |
| 317 | + if atomic_type_name in (XSD_ANY_ATOMIC_TYPE, XSD_NOTATION): |
| 318 | + raise xpath_error('XPST0080') |
309 | 319 |
|
310 | 320 | symbol = get_prefixed_name(atomic_type_name, self.namespaces) |
311 | | - token_class_name = "_%sConstructorFunction" % symbol.replace(':', '_') |
| 321 | + class_name = "_%sConstructorFunction" % symbol.replace(':', '_') |
312 | 322 | kwargs = { |
313 | | - 'symbol': symbol, |
| 323 | + 'bases': (SchemaConstructor,), |
| 324 | + 'name': atomic_type_name, |
314 | 325 | 'nargs': 1, |
315 | 326 | 'label': 'constructor function', |
316 | 327 | 'pattern': r'\b%s(?=\s*\(|\s*\(\:.*\:\)\()' % symbol, |
317 | 328 | 'lbp': bp, |
318 | 329 | 'rbp': bp, |
319 | | - 'nud': nud_, |
320 | | - 'evaluate': evaluate_, |
321 | | - '__module__': self.__module__, |
322 | | - '__qualname__': token_class_name, |
323 | | - '__return__': None |
324 | 330 | } |
325 | | - token_class = cast( |
326 | | - type[XPathFunction], ABCMeta(token_class_name, (XPathFunction,), kwargs) |
327 | | - ) |
328 | | - |
329 | | - if self.symbol_table is self.__class__.symbol_table: |
330 | | - self.symbol_table = copy.copy(self.symbol_table) |
331 | | - self.symbol_table[symbol] = token_class |
332 | | - self.tokenizer = None |
333 | | - |
334 | | - return token_class |
| 331 | + return cast(type[XPathFunction], self.dynamic_register(symbol, class_name, **kwargs)) |
335 | 332 |
|
336 | 333 | def external_function(self, |
337 | 334 | callback: Callable[..., Any], |
@@ -367,117 +364,79 @@ def external_function(self, |
367 | 364 | namespace = XPATH_FUNCTIONS_NAMESPACE |
368 | 365 | qname = QName(XPATH_FUNCTIONS_NAMESPACE, f'fn:{symbol}') |
369 | 366 |
|
370 | | - class_name = f'{upper_camel_case(qname.qname)}ExternalFunction' |
371 | | - lookup_name = qname.expanded_name |
372 | | - |
373 | | - if self.symbol_table is self.__class__.symbol_table: |
374 | | - self.symbol_table = copy.copy(self.symbol_table) |
375 | | - |
376 | | - if lookup_name in self.symbol_table: |
377 | | - msg = f'function {qname.qname!r} is already registered' |
378 | | - raise ElementPathValueError(msg) |
379 | | - elif symbol not in self.symbol_table or \ |
380 | | - not issubclass(self.symbol_table[symbol], ProxyToken): |
381 | | - |
382 | | - if symbol in self.symbol_table: |
383 | | - token_cls = self.symbol_table[symbol] |
384 | | - if not issubclass(token_cls, XPathFunction) \ |
385 | | - or token_cls.label == 'kind test': |
386 | | - msg = f'{symbol!r} name collides with {token_cls!r}' |
387 | | - raise ElementPathValueError(msg) |
388 | | - if namespace == token_cls.namespace: |
389 | | - msg = f'function {qname.qname!r} is already registered' |
390 | | - raise ElementPathValueError(msg) |
391 | | - |
392 | | - # Move the token class before register the proxy token |
393 | | - self.symbol_table[f'{{{token_cls.namespace}}}{symbol}'] = token_cls |
394 | | - |
395 | | - token_class_name = f'{upper_camel_case(qname.local_name)}FunctionProxy' |
396 | | - kwargs = { |
397 | | - 'class_name': token_class_name, |
398 | | - 'symbol': symbol, |
399 | | - 'label': 'function', |
400 | | - 'lbp': bp, |
401 | | - 'rbp': bp, |
402 | | - '__module__': self.__module__, |
403 | | - '__qualname__': token_class_name, |
404 | | - '__return__': None |
405 | | - } |
406 | | - self.symbol_table[symbol] = cast( |
407 | | - type[ProxyToken], ABCMeta(class_name, (ProxyToken,), kwargs) |
408 | | - ) |
409 | | - |
410 | | - def evaluate_external_function(self_: XPathFunction, |
411 | | - context: Optional[XPathContext] = None) -> Any: |
412 | | - args = [] |
413 | | - for k in range(len(self_)): |
414 | | - try: |
415 | | - if sequence_types[k][-1] in '+*': |
416 | | - arg = self_[k].evaluate(context) |
417 | | - else: |
418 | | - arg = self_.get_argument(context, index=k) |
419 | | - except IndexError: |
420 | | - arg = self_.get_argument(context, index=k) |
421 | | - args.append(arg) |
422 | | - |
423 | | - if sequence_types: |
424 | | - for k, (arg, st) in enumerate(zip(args, sequence_types), start=1): |
425 | | - if not match_sequence_type(arg, st, self): |
426 | | - msg_ = f"{ordinal(k)} argument does not match sequence type {st!r}" |
427 | | - raise xpath_error('XPDY0050', msg_) |
428 | | - |
429 | | - result = callback(*args) |
430 | | - if not match_sequence_type(result, sequence_types[-1], self): |
431 | | - msg_ = f"Result does not match sequence type {sequence_types[-1]!r}" |
432 | | - raise xpath_error('XPDY0050', msg_) |
433 | | - return result |
434 | | - |
435 | | - return callback(*args) |
436 | | - |
437 | | - kwargs = { |
438 | | - 'class_name': class_name, |
439 | | - 'symbol': symbol, |
440 | | - 'namespace': namespace, |
441 | | - 'label': 'external function', |
442 | | - 'nargs': nargs, |
443 | | - 'lbp': bp, |
444 | | - 'rbp': bp, |
445 | | - 'evaluate': evaluate_external_function, |
446 | | - '__module__': self.__module__, |
447 | | - '__qualname__': class_name, |
448 | | - '__return__': None |
449 | | - } |
| 367 | + function_signatures: dict[tuple[QName, int], str] = {} |
450 | 368 | if sequence_types: |
451 | | - # Register function signature(s) |
452 | | - kwargs['sequence_types'] = sequence_types |
453 | | - if self.function_signatures is self.__class__.function_signatures: |
454 | | - self.function_signatures = dict(self.__class__.function_signatures) |
455 | | - |
456 | 369 | if nargs is None: |
457 | | - pass # pragma: no cover |
| 370 | + assert len(sequence_types) == 1 |
| 371 | + function_signatures[(qname, 0)] = f'function() as {sequence_types[0]}' |
458 | 372 | elif isinstance(nargs, int): |
459 | 373 | assert len(sequence_types) == nargs + 1 |
460 | | - self.function_signatures[(qname, nargs)] = 'function({}) as {}'.format( |
| 374 | + function_signatures[(qname, nargs)] = 'function({}) as {}'.format( |
461 | 375 | ', '.join(sequence_types[:-1]), sequence_types[-1] |
462 | 376 | ) |
463 | 377 | elif nargs[1] is None: |
464 | 378 | assert len(sequence_types) == nargs[0] + 1 |
465 | | - self.function_signatures[(qname, nargs[0])] = 'function({}, ...) as {}'.format( |
| 379 | + function_signatures[(qname, nargs[0])] = 'function({}, ...) as {}'.format( |
466 | 380 | ', '.join(sequence_types[:-1]), sequence_types[-1] |
467 | 381 | ) |
468 | 382 | else: |
469 | 383 | assert len(sequence_types) == nargs[1] + 1 |
470 | 384 | for arity in range(nargs[0], nargs[1] + 1): |
471 | | - self.function_signatures[(qname, arity)] = 'function({}) as {}'.format( |
| 385 | + function_signatures[(qname, arity)] = 'function({}) as {}'.format( |
472 | 386 | ', '.join(sequence_types[:arity]), sequence_types[-1] |
473 | 387 | ) |
474 | 388 |
|
475 | | - token_class = cast( |
476 | | - type[XPathFunction], ABCMeta(class_name, (XPathFunction,), kwargs) |
477 | | - ) |
478 | | - self.symbol_table[lookup_name] = token_class |
479 | | - self.tokenizer = None |
480 | | - return token_class |
| 389 | + if qname.expanded_name in self.symbol_table: |
| 390 | + msg = f'function {qname.qname!r} is already registered' |
| 391 | + raise ElementPathValueError(msg) |
| 392 | + elif symbol not in self.symbol_table: |
| 393 | + lookup_name = symbol |
| 394 | + else: |
| 395 | + lookup_name = qname.expanded_name |
| 396 | + token_class = self.symbol_table[symbol] |
| 397 | + if issubclass(token_class, ProxyToken): |
| 398 | + pass |
| 399 | + elif namespace == token_class.namespace: |
| 400 | + msg = f'function {qname.qname!r} is already registered' |
| 401 | + raise ElementPathValueError(msg) |
| 402 | + elif not issubclass(token_class, XPathFunction) \ |
| 403 | + or token_class.label == 'kind test': |
| 404 | + msg = f'{symbol!r} name collides with {token_class!r}' |
| 405 | + raise ElementPathValueError(msg) |
| 406 | + else: |
| 407 | + # Register a new proxy token, moving the already present token class to |
| 408 | + # qualified name. |
| 409 | + self.symbol_table[f'{{{token_class.namespace}}}{symbol}'] = token_class |
| 410 | + class_name = f'{upper_camel_case(qname.local_name)}FunctionProxy' |
| 411 | + kwargs = { |
| 412 | + 'bases': (ProxyToken,), |
| 413 | + 'label': 'function', |
| 414 | + 'lbp': bp, |
| 415 | + 'rbp': bp, |
| 416 | + } |
| 417 | + self.dynamic_register(symbol, class_name, **kwargs) |
| 418 | + |
| 419 | + class_name = f'{upper_camel_case(qname.qname)}ExternalFunction' |
| 420 | + kwargs = { |
| 421 | + 'lookup_name': lookup_name, |
| 422 | + 'bases': (ExternalFunction,), |
| 423 | + 'namespace': namespace, |
| 424 | + 'label': 'external function', |
| 425 | + 'nargs': nargs, |
| 426 | + 'lbp': bp, |
| 427 | + 'rbp': bp, |
| 428 | + 'sequence_types': sequence_types, |
| 429 | + 'callback': staticmethod(callback), |
| 430 | + } |
| 431 | + token_class = self.dynamic_register(symbol, class_name, **kwargs) |
| 432 | + |
| 433 | + if function_signatures: |
| 434 | + # Register function signatures |
| 435 | + if self.function_signatures is self.__class__.function_signatures: |
| 436 | + self.function_signatures = dict(self.__class__.function_signatures) |
| 437 | + self.function_signatures.update(function_signatures) |
| 438 | + |
| 439 | + return cast(type[XPathFunction], token_class) |
481 | 440 |
|
482 | 441 | def is_schema_bound(self) -> bool: |
483 | 442 | return self.schema is not None and 'symbol_table' in self.__dict__ |
|
0 commit comments