|
1 | 1 | # -*- coding: utf-8 -*- |
2 | 2 | import logging |
| 3 | +import re |
3 | 4 | from abc import ABC, abstractmethod |
4 | 5 | from dataclasses import dataclass |
5 | 6 | from typing import Any, Dict, Optional, Sequence, Union |
|
14 | 15 | from .sql.parser import SQLParser |
15 | 16 | from .sql.query_builder import QueryExecutionPlan |
16 | 17 | from .sql.update_builder import UpdateExecutionPlan |
| 18 | +from .sql.view_builder import ViewExecutionPlan |
17 | 19 |
|
18 | 20 | _logger = logging.getLogger(__name__) |
19 | 21 |
|
@@ -642,10 +644,124 @@ def execute( |
642 | 644 | return self._execute_execution_plan(self._execution_plan, connection, parameters) |
643 | 645 |
|
644 | 646 |
|
| 647 | +class ViewExecution(ExecutionStrategy): |
| 648 | + """Execution strategy for view statements (CREATE VIEW, DROP VIEW).""" |
| 649 | + |
| 650 | + _DDL_PATTERN = re.compile( |
| 651 | + r"^\s*(CREATE\s+VIEW|DROP\s+VIEW)\b", |
| 652 | + re.IGNORECASE, |
| 653 | + ) |
| 654 | + |
| 655 | + @property |
| 656 | + def execution_plan(self) -> ViewExecutionPlan: |
| 657 | + return self._execution_plan |
| 658 | + |
| 659 | + def supports(self, context: ExecutionContext) -> bool: |
| 660 | + return bool(self._DDL_PATTERN.match(context.query)) |
| 661 | + |
| 662 | + def _parse_sql(self, sql: str) -> ViewExecutionPlan: |
| 663 | + normalized = " ".join(sql.split()) |
| 664 | + |
| 665 | + # CREATE VIEW view_name ON collection_name AS 'pipeline_json' |
| 666 | + create_match = re.match( |
| 667 | + r"CREATE\s+VIEW\s+(\w+)\s+ON\s+(\w+)\s+AS\s+'(.*)'", |
| 668 | + normalized, |
| 669 | + re.IGNORECASE | re.DOTALL, |
| 670 | + ) |
| 671 | + if create_match: |
| 672 | + import json |
| 673 | + |
| 674 | + view_name = create_match.group(1) |
| 675 | + source_collection = create_match.group(2) |
| 676 | + pipeline_str = create_match.group(3) |
| 677 | + try: |
| 678 | + pipeline = json.loads(pipeline_str) |
| 679 | + except json.JSONDecodeError as e: |
| 680 | + raise SqlSyntaxError(f"Invalid pipeline JSON in CREATE VIEW: {e}") |
| 681 | + |
| 682 | + if not isinstance(pipeline, list): |
| 683 | + raise SqlSyntaxError("Pipeline must be a JSON array") |
| 684 | + |
| 685 | + return ViewExecutionPlan( |
| 686 | + collection=view_name, |
| 687 | + ddl_type="create_view", |
| 688 | + view_on=source_collection, |
| 689 | + pipeline=pipeline, |
| 690 | + ) |
| 691 | + |
| 692 | + # DROP VIEW view_name |
| 693 | + drop_match = re.match( |
| 694 | + r"DROP\s+VIEW\s+(\w+)\s*$", |
| 695 | + normalized, |
| 696 | + re.IGNORECASE, |
| 697 | + ) |
| 698 | + if drop_match: |
| 699 | + view_name = drop_match.group(1) |
| 700 | + return ViewExecutionPlan( |
| 701 | + collection=view_name, |
| 702 | + ddl_type="drop_view", |
| 703 | + ) |
| 704 | + |
| 705 | + raise SqlSyntaxError(f"Unsupported DDL statement: {sql}") |
| 706 | + |
| 707 | + def _execute_execution_plan( |
| 708 | + self, |
| 709 | + execution_plan: ViewExecutionPlan, |
| 710 | + connection: Any = None, |
| 711 | + parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None, |
| 712 | + ) -> Optional[Dict[str, Any]]: |
| 713 | + try: |
| 714 | + if not connection: |
| 715 | + raise OperationalError("No connection provided") |
| 716 | + |
| 717 | + db = connection.database |
| 718 | + |
| 719 | + if execution_plan.ddl_type == "create_view": |
| 720 | + command = { |
| 721 | + "create": execution_plan.collection, |
| 722 | + "viewOn": execution_plan.view_on, |
| 723 | + "pipeline": execution_plan.pipeline, |
| 724 | + } |
| 725 | + _logger.debug(f"Executing MongoDB create view command: {command}") |
| 726 | + return _run_db_command(db, command, connection, "create view") |
| 727 | + |
| 728 | + elif execution_plan.ddl_type == "drop_view": |
| 729 | + # MongoDB drops views with the regular drop command |
| 730 | + command = {"drop": execution_plan.collection} |
| 731 | + _logger.debug(f"Executing MongoDB drop view command: {command}") |
| 732 | + return _run_db_command(db, command, connection, "drop view") |
| 733 | + |
| 734 | + else: |
| 735 | + raise ProgrammingError(f"Unknown DDL type: {execution_plan.ddl_type}") |
| 736 | + |
| 737 | + except PyMongoError as e: |
| 738 | + _logger.error(f"MongoDB DDL execution failed: {e}") |
| 739 | + raise DatabaseError(f"DDL execution failed: {e}") |
| 740 | + except (ProgrammingError, DatabaseError, OperationalError): |
| 741 | + raise |
| 742 | + except Exception as e: |
| 743 | + _logger.error(f"Unexpected error during DDL execution: {e}") |
| 744 | + raise OperationalError(f"DDL execution error: {e}") |
| 745 | + |
| 746 | + def execute( |
| 747 | + self, |
| 748 | + context: ExecutionContext, |
| 749 | + connection: Any, |
| 750 | + parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None, |
| 751 | + ) -> Optional[Dict[str, Any]]: |
| 752 | + _logger.debug(f"Using DDL execution for query: {context.query[:100]}") |
| 753 | + self._execution_plan = self._parse_sql(context.query) |
| 754 | + |
| 755 | + if not self._execution_plan.validate(): |
| 756 | + raise SqlSyntaxError("Generated DDL plan is invalid") |
| 757 | + |
| 758 | + return self._execute_execution_plan(self._execution_plan, connection, parameters) |
| 759 | + |
| 760 | + |
645 | 761 | class ExecutionPlanFactory: |
646 | 762 | """Factory for creating appropriate execution strategy based on query context""" |
647 | 763 |
|
648 | | - _strategies = [StandardQueryExecution(), InsertExecution(), UpdateExecution(), DeleteExecution()] |
| 764 | + _strategies = [ViewExecution(), StandardQueryExecution(), InsertExecution(), UpdateExecution(), DeleteExecution()] |
649 | 765 |
|
650 | 766 | @classmethod |
651 | 767 | def get_strategy(cls, context: ExecutionContext) -> ExecutionStrategy: |
|
0 commit comments