-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path2.main_fun.py
More file actions
196 lines (167 loc) · 9.26 KB
/
2.main_fun.py
File metadata and controls
196 lines (167 loc) · 9.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
## pip install google-genai==0.3.0
import asyncio
import json
import os
import websockets
from google import genai
import base64
# Load API key from environment
os.environ['GOOGLE_API_KEY'] = 'YOUR API KEY' # 자신의 키로 교체하세요!
MODEL = "gemini-2.0-flash-exp" # use your model ID
client = genai.Client(
http_options={
'api_version': 'v1alpha',
}
)
# 조명값을 조절하는 함수(피지컬 컴퓨팅 장치로 보내는 것도 연계하면 좋을듯)
def set_light_values(brightness, color_temp):
return {
"brightness": brightness,
"colorTemperature": color_temp,
}
# 도구로 활용(구성 함수 호출)
# https://ai.google.dev/gemini-api/docs/function-calling?hl=ko
tool_set_light_values = {
"function_declarations": [
{
"name": "set_light_values",
"description": "Set the brightness and color temperature of a room light.",
"parameters": {
"type": "OBJECT",
"properties": {
"brightness": {
"type": "NUMBER",
"description": "Light level from 0 to 100. Zero is off and 100 is full brightness"
},
"color_temp": {
"type": "STRING",
"description": "Color temperature of the light fixture, which can be `daylight`, `cool` or `warm`."
}
},
"required": ["brightness", "color_temp"]
}
}
]
}
# 웹 소켓 핸들러
async def gemini_session_handler(client_websocket: websockets.WebSocketServerProtocol):
"""Handles the interaction with Gemini API within a websocket session.
Args:
client_websocket: The websocket connection to the client.
"""
try:
config_message = await client_websocket.recv()
config_data = json.loads(config_message)
config = config_data.get("setup", {})
config["tools"] = [tool_set_light_values] # 제미나이한테 할수 있는 도구 추가
# 세션 연결
async with client.aio.live.connect(model=MODEL, config=config) as session:
print("Connected to Gemini API")
# 클라이언트로부터 사진, 음성을 받아서 gemini로 전달
async def send_to_gemini():
"""Sends messages from the client websocket to the Gemini API."""
try:
async for message in client_websocket:
try:
data = json.loads(message)
if "realtime_input" in data:
for chunk in data["realtime_input"]["media_chunks"]:
if chunk["mime_type"] == "audio/pcm":
await session.send({"mime_type": "audio/pcm", "data": chunk["data"]})
elif chunk["mime_type"] == "image/jpeg":
await session.send({"mime_type": "image/jpeg", "data": chunk["data"]})
except Exception as e:
print(f"Error sending to Gemini: {e}")
print("Client connection closed (send)")
except Exception as e:
print(f"Error sending to Gemini: {e}")
finally:
print("send_to_gemini closed")
async def receive_from_gemini():
"""Receives responses from the Gemini API and forwards them to the client, looping until turn is complete."""
try:
while True:
try:
print("receiving from gemini")
async for response in session.receive():
# first_response = True
# print(f"response: {response}")
if response.server_content is None:
if response.tool_call is not None:
# handle the tool call
print(f"Tool call received: {response.tool_call}")
function_calls = response.tool_call.function_calls
function_responses = []
for function_call in function_calls:
name = function_call.name
args = function_call.args
# Extract the numeric part from Gemini's function call ID
call_id = function_call.id
# set light values가 들어올 경우 밝기와 컬러맵 인자를 전달
if name == "set_light_values":
try:
result = set_light_values(int(args["brightness"]),
args["color_temp"])
function_responses.append(
{
"name": name,
# "response": {"result": "The light is broken."},
"response": {"result": result},
"id": call_id
}
)
await client_websocket.send(
json.dumps({"text": json.dumps(function_responses)}))
print("Function executed")
except Exception as e:
print(f"Error executing function: {e}")
continue
# Send function response back to Gemini
print(f"function_responses: {function_responses}")
await session.send(function_responses)
continue
# print(f'Unhandled server message! - {response}')
# continue
model_turn = response.server_content.model_turn
if model_turn:
for part in model_turn.parts:
# print(f"part: {part}")
if hasattr(part, 'text') and part.text is not None:
# print(f"text: {part.text}")
await client_websocket.send(json.dumps({"text": part.text}))
elif hasattr(part, 'inline_data') and part.inline_data is not None:
# if first_response:
# print("audio mime_type:", part.inline_data.mime_type)
# first_response = False
base64_audio = base64.b64encode(part.inline_data.data).decode('utf-8')
await client_websocket.send(json.dumps({
"audio": base64_audio,
}))
print("audio received")
if response.server_content.turn_complete:
print('\n<Turn complete>')
except websockets.exceptions.ConnectionClosedOK:
print("Client connection closed normally (receive)")
break # Exit the loop if the connection is closed
except Exception as e:
print(f"Error receiving from Gemini: {e}")
break # exit the lo
except Exception as e:
print(f"Error receiving from Gemini: {e}")
finally:
print("Gemini connection closed (receive)")
# Start send loop
send_task = asyncio.create_task(send_to_gemini())
# Launch receive loop as a background task
receive_task = asyncio.create_task(receive_from_gemini())
await asyncio.gather(send_task, receive_task)
except Exception as e:
print(f"Error in Gemini session: {e}")
finally:
print("Gemini session closed.")
async def main() -> None:
async with websockets.serve(gemini_session_handler, "localhost", 9082):
print("Running websocket server localhost:9082...")
await asyncio.Future() # Keep the server running indefinitely
if __name__ == "__main__":
asyncio.run(main())