-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathn8n_integration_example.py
More file actions
337 lines (264 loc) ยท 11.4 KB
/
n8n_integration_example.py
File metadata and controls
337 lines (264 loc) ยท 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
"""Complete n8n Integration Example for PraisonAI
This example demonstrates the complete bidirectional n8n โ PraisonAI integration:
1. PraisonAI โ n8n: Agents executing n8n workflows (using PraisonAI-Tools)
2. n8n โ PraisonAI: n8n workflows invoking PraisonAI agents (using API endpoint)
Prerequisites:
1. Install dependencies:
pip install "praisonai-tools[n8n]" fastapi uvicorn
2. Set up n8n instance:
docker run -it --rm --name n8n -p 5678:5678 -v n8n_data:/home/node/.n8n docker.n8n.io/n8nio/n8n
3. Environment variables:
export N8N_URL="http://localhost:5678"
export N8N_API_KEY="your-api-key" # optional for local testing
Usage:
python examples/python/n8n_integration_example.py
"""
import os
import asyncio
import uvicorn
from fastapi import FastAPI
from praisonaiagents import Agent
def setup_praisonai_agents():
"""Set up PraisonAI agents with n8n tools."""
print("๐ Setting up PraisonAI agents with n8n capabilities...")
try:
from praisonai_tools.n8n import n8n_workflow, n8n_list_workflows
# Create automation agent with n8n tools
automation_agent = Agent(
name="automation-agent",
instructions="""
You are an automation specialist with access to n8n's 400+ integrations.
You can help users automate workflows across platforms like:
- Communication: Slack, Discord, Telegram, Gmail, Teams
- Productivity: Notion, Google Sheets, Airtable, Trello
- Databases: PostgreSQL, MongoDB, MySQL, Redis
- APIs: REST, GraphQL, webhooks
When asked to perform automation tasks:
1. First list available n8n workflows if needed
2. Execute the appropriate workflow with the provided data
3. Explain what the workflow accomplished
Always be helpful and provide clear explanations.
""",
tools=[n8n_workflow, n8n_list_workflows],
llm="gpt-4o-mini"
)
# Create notification agent focused on messaging
notification_agent = Agent(
name="notification-agent",
instructions="""
You specialize in sending notifications and messages across platforms.
You have access to n8n workflows for various messaging platforms.
When asked to send notifications:
1. Determine the best platform for the message
2. Use appropriate n8n workflow for that platform
3. Include relevant context and formatting
Be efficient and reliable with notifications.
""",
tools=[n8n_workflow],
llm="gpt-4o-mini"
)
return {
"automation": automation_agent,
"notification": notification_agent
}
except ImportError as e:
print(f"โ Error importing n8n tools: {e}")
print("๐ก Install with: pip install 'praisonai-tools[n8n]'")
return {}
def setup_api_server_with_agents(agents):
"""Set up FastAPI server with agent invoke endpoints."""
print("๐ Setting up API server for n8n โ PraisonAI integration...")
from praisonai.api.agent_invoke import register_agent, router
# Create FastAPI app
app = FastAPI(
title="PraisonAI n8n Integration Server",
description="API server for n8n to invoke PraisonAI agents",
version="1.0.0"
)
# Include agent invoke router
app.include_router(router)
# Register agents
for agent_id, agent in agents.items():
register_agent(agent_id, agent)
print(f"๐ Registered agent: {agent_id}")
@app.get("/")
async def root():
"""Root endpoint with integration information."""
return {
"message": "PraisonAI n8n Integration Server",
"available_agents": list(agents.keys()),
"endpoints": {
"invoke_agent": "/api/v1/agents/{agent_id}/invoke",
"list_agents": "/api/v1/agents",
"agent_info": "/api/v1/agents/{agent_id}"
},
"n8n_integration": {
"description": "Use HTTP Request node in n8n to invoke agents",
"example_url": "http://localhost:8000/api/v1/agents/automation/invoke",
"example_body": {
"message": "Send a Slack message to #general saying 'Hello from n8n!'",
"session_id": "n8n-workflow-123"
}
}
}
return app
async def demo_praisonai_to_n8n(agents):
"""Demonstrate PraisonAI agents calling n8n workflows."""
print("\n๐ Demo: PraisonAI โ n8n Integration")
print("=" * 50)
if not agents:
print("โ No agents available for demo")
return
automation_agent = agents.get("automation")
if not automation_agent:
print("โ Automation agent not available")
return
try:
print("๐ Testing agent with n8n workflow listing...")
response = automation_agent.start(
"Can you list the available n8n workflows?"
)
print(f"๐ค Agent: {response}")
print("\n๐ Testing workflow execution...")
response = automation_agent.start(
"I need to send a notification that our deployment was successful. "
"Can you help me send this via Slack to the #deployments channel?"
)
print(f"๐ค Agent: {response}")
except Exception as e:
print(f"โ Demo error: {e}")
async def demo_n8n_to_praisonai():
"""Demonstrate n8n invoking PraisonAI agents via HTTP."""
print("\n๐ Demo: n8n โ PraisonAI Integration")
print("=" * 50)
try:
import httpx
# Simulate n8n HTTP Request node call
print("๐ก Simulating n8n HTTP Request node call...")
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8000/api/v1/agents/notification/invoke",
json={
"message": "A new user has signed up: john@example.com. Please send a welcome email.",
"session_id": "n8n-user-signup-workflow"
},
timeout=30.0
)
if response.status_code == 200:
data = response.json()
print(f"โ
Success: {data['result']}")
print(f"๐ Session: {data['session_id']}")
else:
print(f"โ Error: {response.status_code} - {response.text}")
except httpx.ConnectError:
print("โ Connection error: Make sure the API server is running")
print("๐ก Start server in another terminal: python examples/python/n8n_integration_example.py --server")
except Exception as e:
print(f"โ Demo error: {e}")
def create_n8n_workflow_examples():
"""Show examples of n8n workflow configurations."""
print("\n๐ n8n Workflow Configuration Examples")
print("=" * 50)
print("""
๐น HTTP Request Node (n8n โ PraisonAI):
Method: POST
URL: http://localhost:8000/api/v1/agents/automation/invoke
Body:
{
"message": "{{ $json.user_message }}",
"session_id": "{{ $json.session_id || workflow.id }}"
}
๐น Webhook Trigger for PraisonAI response:
URL: http://localhost:8000/webhook/praisonai-response
Method: POST
๐น Slack notification after agent response:
Channel: {{ $json.channel || '#general' }}
Message: Agent response: {{ $json.agent_result }}
""")
async def run_interactive_demo():
"""Run interactive demo of the integration."""
print("\n๐ฎ Interactive Demo")
print("=" * 20)
# Set up agents
agents = setup_praisonai_agents()
if not agents:
print("โ Cannot run interactive demo without agents")
return
automation_agent = agents.get("automation")
if not automation_agent:
print("โ Automation agent not available")
return
print("๐ฌ Chat with the automation agent (type 'quit' to exit)")
print("๐ก Try: 'List available n8n workflows'")
print("๐ก Try: 'Send a Slack message to #general'")
while True:
try:
user_input = input("\n๐ค You: ").strip()
if user_input.lower() in ['quit', 'exit', 'q']:
break
if not user_input:
continue
print("๐ค Agent: ", end="", flush=True)
response = automation_agent.start(user_input)
print(response)
except KeyboardInterrupt:
print("\n๐ Goodbye!")
break
except Exception as e:
print(f"\nโ Error: {e}")
def run_server(port=8000):
"""Run the API server for n8n integration."""
print(f"\n๐ Starting PraisonAI n8n Integration Server on port {port}...")
# Set up agents
agents = setup_praisonai_agents()
# Create app
app = setup_api_server_with_agents(agents)
print(f"โ
Server ready at http://localhost:{port}")
print(f"๐ Available agents: {list(agents.keys())}")
print("\n๐ n8n HTTP Request Node Configuration:")
print(f" URL: http://localhost:{port}/api/v1/agents/{{agent_id}}/invoke")
print(" Method: POST")
print(' Body: {"message": "Your message here", "session_id": "optional"}')
# Run server
uvicorn.run(app, host="0.0.0.0", port=port)
async def main():
"""Main function to run demos."""
import argparse
parser = argparse.ArgumentParser(description="PraisonAI n8n Integration Demo")
parser.add_argument("--server", action="store_true", help="Run API server")
parser.add_argument("--demo", action="store_true", help="Run demos")
parser.add_argument("--interactive", action="store_true", help="Run interactive demo")
parser.add_argument("--port", type=int, default=8000, help="Server port")
args = parser.parse_args()
if args.server:
run_server(args.port)
elif args.interactive:
await run_interactive_demo()
elif args.demo:
# Set up agents for demos
agents = setup_praisonai_agents()
# Run demos
await demo_praisonai_to_n8n(agents)
create_n8n_workflow_examples()
print("\n๐ก To test n8n โ PraisonAI, start the server:")
print(" python examples/python/n8n_integration_example.py --server")
print(" Then run: python examples/python/n8n_integration_example.py --test-api")
else:
# Default: show info and run basic demo
print("๐ PraisonAI โ n8n Bidirectional Integration")
print("=" * 50)
print("\n๐ Integration Components:")
print("1. ๐ง PraisonAI-Tools: n8n workflow execution tools")
print("2. ๐ API Endpoint: Agent invoke endpoint for n8n")
print("3. ๐ค Agents: PraisonAI agents with n8n capabilities")
print("\n๐ Available Commands:")
print(" --demo Run integration demos")
print(" --server Start API server for n8n")
print(" --interactive Run interactive agent chat")
# Quick demo
agents = setup_praisonai_agents()
if agents:
await demo_praisonai_to_n8n(agents)
create_n8n_workflow_examples()
if __name__ == "__main__":
asyncio.run(main())