-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Expand file tree
/
Copy pathagent.py
More file actions
142 lines (120 loc) · 5.06 KB
/
agent.py
File metadata and controls
142 lines (120 loc) · 5.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from typing import AsyncGenerator
from google.adk.agents import LiveRequestQueue
from google.adk.agents.llm_agent import Agent
from google.adk.tools.function_tool import FunctionTool
from google.genai import Client
from google.genai import types as genai_types
async def monitor_stock_price(stock_symbol: str) -> AsyncGenerator[str, None]:
"""This function will monitor the price for the given stock_symbol in a continuous, streaming and asynchronously way."""
print(f"Start monitor stock price for {stock_symbol}!")
# Let's mock stock price change.
await asyncio.sleep(4)
price_alert1 = f"the price for {stock_symbol} is 300"
yield price_alert1
print(price_alert1)
await asyncio.sleep(4)
price_alert1 = f"the price for {stock_symbol} is 400"
yield price_alert1
print(price_alert1)
await asyncio.sleep(20)
price_alert1 = f"the price for {stock_symbol} is 900"
yield price_alert1
print(price_alert1)
await asyncio.sleep(20)
price_alert1 = f"the price for {stock_symbol} is 500"
yield price_alert1
print(price_alert1)
# for video streaming, `input_stream: LiveRequestQueue` is required and reserved key parameter for ADK to pass the video streams in.
async def monitor_video_stream(
input_stream: LiveRequestQueue,
) -> AsyncGenerator[str, None]:
"""Monitor how many people are in the video streams."""
print("start monitor_video_stream!")
client = Client(vertexai=False)
prompt_text = (
"Count the number of people in this image. Just respond with a numeric"
" number."
)
last_count = None
while True:
last_valid_req = None
print("Start monitoring loop")
# use this loop to pull the latest images and discard the old ones
while input_stream._queue.qsize() != 0:
live_req = await input_stream.get()
if live_req.blob is not None and live_req.blob.mime_type == "image/jpeg":
last_valid_req = live_req
# If we found a valid image, process it
if last_valid_req is not None:
print("Processing the most recent frame from the queue")
# Create an image part using the blob's data and mime type
image_part = genai_types.Part.from_bytes(
data=last_valid_req.blob.data, mime_type=last_valid_req.blob.mime_type
)
contents = genai_types.Content(
role="user",
parts=[image_part, genai_types.Part.from_text(text=prompt_text)],
)
# Call the model to generate content based on the provided image and prompt
response = client.models.generate_content(
model="gemini-2.0-flash-exp",
contents=contents,
config=genai_types.GenerateContentConfig(
system_instruction=(
"You are a helpful video analysis assistant. You can count"
" the number of people in this image or video. Just respond"
" with a numeric number."
)
),
)
if not last_count:
last_count = response.candidates[0].content.parts[0].text
elif last_count != response.candidates[0].content.parts[0].text:
last_count = response.candidates[0].content.parts[0].text
yield response
print("response:", response)
# Wait before checking for new images
await asyncio.sleep(0.5)
# Use this exact function to help ADK stop your streaming tools when requested.
# for example, if we want to stop `monitor_stock_price`, then the agent will
# invoke this function with stop_streaming(function_name=monitor_stock_price).
def stop_streaming(function_name: str):
"""Stop the streaming
Args:
function_name: The name of the streaming function to stop.
"""
pass
root_agent = Agent(
# find supported models here: https://google.github.io/adk-docs/get-started/streaming/quickstart-streaming/
model="gemini-2.0-flash-live-preview-04-09", # for Vertex project
# model="gemini-live-2.5-flash-preview", # for AI studio key
name="video_streaming_agent",
instruction="""
You are a monitoring agent. You can do video monitoring and stock price monitoring
using the provided tools/functions.
When users want to monitor a video stream,
You can use monitor_video_stream function to do that. When monitor_video_stream
returns the alert, you should tell the users.
When users want to monitor a stock price, you can use monitor_stock_price.
Don't ask too many questions. Don't be too talkative.
""",
tools=[
monitor_video_stream,
monitor_stock_price,
FunctionTool(stop_streaming),
],
)