1- import json
21from typing import Annotated , Any , Literal
32
43from fastmcp import Context
54from fastmcp .server .server import ToolResult
6- from mcp .types import ToolAnnotations , TextContent , ImageContent
5+ from mcp .types import ToolAnnotations
76
87from services .registry import mcp_for_unity_tool
98from services .tools import get_unity_instance_from_context
10- from services .tools .utils import coerce_int , coerce_bool , normalize_vector3
9+ from services .tools .utils import build_screenshot_params , extract_screenshot_images
1110from transport .unity_transport import send_with_unity_instance
1211from transport .legacy .unity_connection import async_send_command_with_retry
1312
3231ALL_ACTIONS = SETUP_ACTIONS + CREATION_ACTIONS + CONFIGURATION_ACTIONS + EXTENSION_ACTIONS + CONTROL_ACTIONS + CAPTURE_ACTIONS
3332
3433
35- def _extract_images (response : dict [str , Any ], action : str ) -> ToolResult | None :
36- """If the Unity response contains inline base64 images, return a ToolResult
37- with TextContent + ImageContent blocks. Returns None for normal text-only responses."""
38- if not isinstance (response , dict ) or not response .get ("success" ):
39- return None
40-
41- data = response .get ("data" )
42- if not isinstance (data , dict ):
43- return None
44-
45- # Batch images (surround/orbit mode) — multiple screenshots in one response
46- screenshots = data .get ("screenshots" )
47- if screenshots and isinstance (screenshots , list ):
48- blocks : list [TextContent | ImageContent ] = []
49- summary_screenshots = []
50- for s in screenshots :
51- summary_screenshots .append ({k : v for k , v in s .items () if k != "imageBase64" })
52- text_result = {
53- "success" : True ,
54- "message" : response .get ("message" , "" ),
55- "data" : {
56- "sceneCenter" : data .get ("sceneCenter" ),
57- "sceneRadius" : data .get ("sceneRadius" ),
58- "screenshots" : summary_screenshots ,
59- },
60- }
61- blocks .append (TextContent (type = "text" , text = json .dumps (text_result )))
62- for s in screenshots :
63- b64 = s .get ("imageBase64" )
64- if b64 :
65- blocks .append (TextContent (type = "text" , text = f"[Angle: { s .get ('angle' , '?' )} ]" ))
66- blocks .append (ImageContent (type = "image" , data = b64 , mimeType = "image/png" ))
67- return ToolResult (content = blocks )
68-
69- # Single image (include_image or positioned capture) or contact sheet
70- image_b64 = data .get ("imageBase64" )
71- if not image_b64 :
72- return None
73- text_data = {k : v for k , v in data .items () if k != "imageBase64" }
74- text_result = {"success" : True , "message" : response .get ("message" , "" ), "data" : text_data }
75- return ToolResult (
76- content = [
77- TextContent (type = "text" , text = json .dumps (text_result )),
78- ImageContent (type = "image" , data = image_b64 , mimeType = "image/png" ),
79- ],
80- )
81-
82-
8334@mcp_for_unity_tool (
8435 group = "core" ,
8536 description = (
@@ -196,63 +147,24 @@ async def manage_camera(
196147
197148 # Screenshot params — only relevant for screenshot/screenshot_multiview actions
198149 if action_normalized in CAPTURE_ACTIONS :
199- if screenshot_file_name :
200- params_dict ["fileName" ] = screenshot_file_name
201- coerced_super_size = coerce_int (screenshot_super_size , default = None )
202- if coerced_super_size is not None :
203- params_dict ["superSize" ] = coerced_super_size
204- if camera :
205- params_dict ["camera" ] = camera
206- coerced_include_image = coerce_bool (include_image , default = None )
207- if coerced_include_image is not None :
208- params_dict ["includeImage" ] = coerced_include_image
209- coerced_max_resolution = coerce_int (max_resolution , default = None )
210- if coerced_max_resolution is not None :
211- if coerced_max_resolution <= 0 :
212- return {"success" : False , "message" : "max_resolution must be a positive integer." }
213- params_dict ["maxResolution" ] = coerced_max_resolution
214- if batch :
215- params_dict ["batch" ] = batch
216- if look_at is not None :
217- params_dict ["lookAt" ] = look_at
218-
219- # Orbit params
220- coerced_orbit_angles = coerce_int (orbit_angles , default = None )
221- if coerced_orbit_angles is not None :
222- params_dict ["orbitAngles" ] = coerced_orbit_angles
223- if orbit_elevations is not None :
224- if isinstance (orbit_elevations , str ):
225- try :
226- orbit_elevations = json .loads (orbit_elevations )
227- except (ValueError , TypeError ):
228- return {"success" : False , "message" : "orbit_elevations must be a JSON array of floats." }
229- if not isinstance (orbit_elevations , list ) or not all (
230- isinstance (v , (int , float )) for v in orbit_elevations
231- ):
232- return {"success" : False , "message" : "orbit_elevations must be a list of numbers." }
233- params_dict ["orbitElevations" ] = orbit_elevations
234- if orbit_distance is not None :
235- try :
236- params_dict ["orbitDistance" ] = float (orbit_distance )
237- except (ValueError , TypeError ):
238- return {"success" : False , "message" : "orbit_distance must be a number." }
239- if orbit_fov is not None :
240- try :
241- params_dict ["orbitFov" ] = float (orbit_fov )
242- except (ValueError , TypeError ):
243- return {"success" : False , "message" : "orbit_fov must be a number." }
244- if view_position is not None :
245- vec , err = normalize_vector3 (view_position , "view_position" )
246- if err :
247- return {"success" : False , "message" : err }
248- params_dict ["viewPosition" ] = vec
249- if view_rotation is not None :
250- vec , err = normalize_vector3 (view_rotation , "view_rotation" )
251- if err :
252- return {"success" : False , "message" : err }
253- params_dict ["viewRotation" ] = vec
254-
255- params_dict = {k : v for k , v in params_dict .items () if v is not None }
150+ err = build_screenshot_params (
151+ params_dict ,
152+ screenshot_file_name = screenshot_file_name ,
153+ screenshot_super_size = screenshot_super_size ,
154+ camera = camera ,
155+ include_image = include_image ,
156+ max_resolution = max_resolution ,
157+ batch = batch ,
158+ look_at = look_at ,
159+ orbit_angles = orbit_angles ,
160+ orbit_elevations = orbit_elevations ,
161+ orbit_distance = orbit_distance ,
162+ orbit_fov = orbit_fov ,
163+ view_position = view_position ,
164+ view_rotation = view_rotation ,
165+ )
166+ if err is not None :
167+ return err
256168
257169 result = await send_with_unity_instance (
258170 async_send_command_with_retry ,
@@ -266,7 +178,7 @@ async def manage_camera(
266178
267179 # For capture actions, check for inline images to return as ImageContent
268180 if action_normalized in CAPTURE_ACTIONS :
269- image_result = _extract_images (result , "screenshot" )
181+ image_result = extract_screenshot_images (result )
270182 if image_result is not None :
271183 return image_result
272184
0 commit comments