forked from MarioDeFelipe/sap-bdc-mcp-server
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_provision_share.py
More file actions
174 lines (153 loc) · 5.89 KB
/
test_provision_share.py
File metadata and controls
174 lines (153 loc) · 5.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""Test script to provision a complete share using the orchestration tool."""
import logging
import json
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def main():
"""Test the provision_share orchestration tool."""
print("=" * 70)
print("SAP BDC - Test Share Provisioning (End-to-End)")
print("=" * 70)
print()
# Initialize the client manager
logger.info("Step 1: Initializing BDC client manager...")
try:
from sap_bdc_mcp.server import client_manager
client_manager.initialize()
logger.info("Client manager initialized successfully")
print()
except Exception as e:
logger.error(f"Failed to initialize: {e}")
return
# Example: Provision a new share end-to-end
share_name = f"test_provisioned_share_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
logger.info("Test Configuration:")
print(f"Share name: {share_name}")
print(f"Tables: main.default.iris_data")
print()
# This simulates what the provision_share MCP tool does
logger.info("Starting end-to-end provisioning workflow...")
print()
try:
from databricks.sdk import WorkspaceClient
import os
w = client_manager.workspace_client
bdc_client = client_manager.client
recipient_name = client_manager.recipient_name
steps_completed = []
# Step 1: Create Databricks share
logger.info("Step 1/4: Creating Databricks share...")
try:
w.shares.create(
name=share_name,
comment="Test share created via provision_share tool"
)
steps_completed.append(f"✅ Created Databricks share '{share_name}'")
print(" ✅ Share created")
except Exception as e:
if "already exists" in str(e).lower():
steps_completed.append(f"ℹ️ Share '{share_name}' already exists")
print(" ℹ️ Share already exists, continuing...")
else:
raise
# Step 2: Add tables to share
logger.info("Step 2/4: Adding tables to share...")
tables = ["main.default.iris_data"]
for table in tables:
try:
w.shares.update(name=share_name, updates=[{
"action": "ADD",
"data_object": {"name": table, "data_object_type": "TABLE"}
}])
steps_completed.append(f" ✅ Added table: {table}")
print(f" ✅ Added table: {table}")
except Exception as e:
if "already exists" in str(e).lower():
steps_completed.append(f" ℹ️ Table {table} already in share")
print(f" ℹ️ Table {table} already in share")
else:
raise
# Step 3: Grant to recipient
logger.info("Step 3/4: Granting share to recipient...")
warehouse_id = os.getenv("DATABRICKS_WAREHOUSE_ID")
grant_sql = f"GRANT SELECT ON SHARE {share_name} TO RECIPIENT `{recipient_name}`"
try:
w.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=grant_sql,
wait_timeout="30s"
)
steps_completed.append(f"✅ Granted to recipient '{recipient_name}'")
print(f" ✅ Granted to recipient: {recipient_name}")
except Exception as e:
if "already granted" in str(e).lower():
steps_completed.append(f"ℹ️ Already granted to recipient")
print(" ℹ️ Already granted to recipient")
else:
raise
# Step 4: Register with SAP BDC
logger.info("Step 4/4: Registering with SAP BDC...")
ord_metadata = {
"title": "Test Provisioned Share",
"shortDescription": "Share created via provision_share tool",
"description": "This share was created using the end-to-end provisioning workflow",
"version": "1.0.0",
"releaseStatus": "active",
"tags": ["test", "provisioned", "automated"]
}
request_body = {
"ord": ord_metadata,
"objects": []
}
result = bdc_client.create_or_update_share(
share_name=share_name,
body=request_body
)
steps_completed.append(f"✅ Registered with SAP BDC")
print(" ✅ Registered with SAP BDC")
# Display results
print()
print("=" * 70)
print("✅ PROVISIONING COMPLETED SUCCESSFULLY!")
print("=" * 70)
print()
print("Steps Completed:")
for step in steps_completed:
print(f" {step}")
print()
print("SAP BDC Registration Result:")
print("-" * 70)
if hasattr(result, '__dict__'):
result_dict = {k: v for k, v in result.__dict__.items() if not k.startswith('_')}
print(json.dumps(result_dict, indent=2, default=str))
else:
print(str(result))
print()
print("=" * 70)
print(f"Share '{share_name}' is now ready for use!")
print("=" * 70)
print()
print("This demonstrates the complete workflow that the 'provision_share'")
print("MCP tool automates in a single call.")
except Exception as e:
logger.error(f"Provisioning failed: {e}")
print()
print("=" * 70)
print("❌ PROVISIONING FAILED")
print("=" * 70)
print()
print("Error details:")
import traceback
traceback.print_exc()
print()
if steps_completed:
print("Steps completed before failure:")
for step in steps_completed:
print(f" {step}")
if __name__ == "__main__":
main()