from backend.app.camera import CameraClient, CameraConfig
# Kamera konfigürasyonu
config = CameraConfig(
camera_id="cam1",
name="Front Camera",
transport_type="tcp", # TCP transport
protocol_type="binary", # Binary protocol
host="192.168.1.100",
port=4000
)
# Client oluştur
client = CameraClient(config)
# Async context manager ile kullan
async with client:
# Brightness ayarla
response = await client.execute_command("set_brightness", {
"brightness": 5,
"region": "C1",
"camera_type": "daylight"
})
if response.success:
print("✅ Brightness ayarlandı!")config = CameraConfig(
camera_id="ethernet-cam",
name="Ethernet Camera",
transport_type="tcp",
protocol_type="binary",
host="192.168.1.100",
port=4000,
timeout=5.0,
retry_count=3
)
client = CameraClient(config)
await client.connect()
# Komut gönder
response = await client.execute_command("set_digital_zoom", {
"zoom_level": "2x",
"region": "C1",
"camera_type": "daylight"
})
await client.disconnect()config = CameraConfig(
camera_id="thermal-cam",
name="Thermal Camera",
transport_type="udp", # UDP - connectionless
protocol_type="binary",
host="192.168.1.101",
port=4001
)
async with CameraClient(config) as client:
# Thermal polarity değiştir
await client.execute_command("set_polarity", {
"polarity": "white_hot",
"region": "C1"
})config = CameraConfig(
camera_id="serial-cam",
name="Serial Camera",
transport_type="serial", # Serial port
protocol_type="binary",
device="/dev/ttyUSB0",
baudrate=9600,
extra={
"parity": "N", # None, Even, Odd
"stopbits": 1,
"bytesize": 8
}
)
async with CameraClient(config) as client:
await client.execute_command("set_brightness", {
"brightness": 3,
"region": "C1",
"camera_type": "daylight"
})config = CameraConfig(
camera_id="embedded-cam",
name="Embedded UART Camera",
transport_type="uart", # UART (hardware-level serial)
protocol_type="binary",
device="/dev/ttyS0",
baudrate=115200
)
async with CameraClient(config) as client:
await client.execute_command("set_framerate", {
"framerate": 30,
"region": "C1",
"camera_type": "daylight"
})config = CameraConfig(
camera_id="sony-ptz",
name="Sony PTZ Camera",
transport_type="visca", # VISCA protocol over serial
protocol_type="visca",
device="/dev/ttyUSB0",
baudrate=9600,
camera_address=1 # VISCA address (1-7)
)
async with CameraClient(config) as client:
# Zoom in
await client.execute_command("zoom_in", {"speed": 3})
# Pan left
await client.execute_command("pan_left", {"speed": 12})
# Home position
await client.execute_command("pan_tilt_home", {})from backend.app.camera import get_camera_manager
# Manager instance al (singleton)
manager = get_camera_manager()
# cameras.yaml'dan kameraları yükle
manager.load_cameras_from_config()
# Tüm kamera ID'lerini listele
camera_ids = manager.get_camera_ids()
print(f"Loaded {len(camera_ids)} cameras: {camera_ids}")# Kamera client'ı al
client = manager.get_client("cam-front-day")
if client:
async with client:
response = await client.execute_command("set_brightness", {
"brightness": 5
})
print(f"Response: {response.success}")
else:
print("Camera not found!")# Tüm kameralara bağlan
await manager.connect_all()
# Komutları çalıştır
for cam_id in manager.get_camera_ids():
client = manager.get_client(cam_id)
if client:
await client.execute_command("set_brightness", {"brightness": 5})
# Tüm bağlantıları kapat
await manager.disconnect_all()from backend.app.transport import create_tcp_transport
# TCP transport oluştur
transport = create_tcp_transport("192.168.1.100", 4000)
async with transport:
# Raw bytes gönder
command = b'\x00\x00\x00\x10\x00\x00\x00\x02\x05\x01\x01'
await transport.send(command)
# Cevap al
response = await transport.receive()
print(f"Response: {response.hex()}")from backend.app.transport import create_visca_transport
transport = create_visca_transport(
device="/dev/ttyUSB0",
baudrate=9600,
camera_address=1
)
async with transport:
# VISCA zoom in
response = await transport.zoom_in(speed=3)
print(f"Zoom response: {response}")
# Pan/tilt
await transport.pan_tilt(
pan_speed=12,
tilt_speed=12,
pan_dir=1, # 1=left, 2=right, 3=stop
tilt_dir=3 # 1=up, 2=down, 3=stop
)from backend.app.protocol import BinaryProtocol, ProtocolCommand
protocol = BinaryProtocol()
# Komut oluştur
cmd = ProtocolCommand(
name="set_brightness",
params={
"brightness": 5,
"region": "C1",
"camera_type": "daylight"
}
)
# Encode et
encoded = protocol.encode_command(cmd)
print(f"Encoded: {encoded.hex()}")
# Cevap decode et
response_bytes = b'\x00\x00\x00\x11\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\xFF'
response = protocol.decode_response(response_bytes)
if response.success:
print("Command successful!")from backend.app.protocol import VISCAProtocol, ProtocolCommand
protocol = VISCAProtocol(camera_address=1)
# Zoom in komutu
cmd = ProtocolCommand(
name="zoom_in",
params={"speed": 3}
)
encoded = protocol.encode_command(cmd)
print(f"VISCA command: {encoded.hex()}")
# Output: 81 04 07 23 fffrom backend.app.transport import TransportFactory, TransportConfig, TransportType
# Config'den transport oluştur
config = TransportConfig(
transport_type=TransportType.TCP,
host="192.168.1.100",
port=4000
)
transport = TransportFactory.create(config)config_dict = {
"transport_type": "tcp",
"host": "192.168.1.100",
"port": 4000,
"timeout": 5.0
}
transport = TransportFactory.create_from_dict(config_dict)import logging
# Root logger
logging.basicConfig(level=logging.DEBUG)
# Transport logger
transport_logger = logging.getLogger('backend.app.transport')
transport_logger.setLevel(logging.DEBUG)
# Protocol logger
protocol_logger = logging.getLogger('backend.app.protocol')
protocol_logger.setLevel(logging.DEBUG)
# Şimdi tüm transport/protocol işlemleri loglanacakdef hexdump(data: bytes, prefix: str = ""):
"""Pretty print hex data"""
hex_str = ' '.join(f'{b:02X}' for b in data)
ascii_str = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)
print(f"{prefix}{hex_str} |{ascii_str}|")
# Kullanım
command = b'\x00\x00\x00\x10\x00\x00\x00\x02\x05\x01\x01'
hexdump(command, "TX: ")
# Output: TX: 00 00 00 10 00 00 00 02 05 01 01 |.........|import asyncio
from backend.app.camera import CameraClient, CameraConfig
async def test_camera():
config = CameraConfig(
camera_id="test",
name="Test Camera",
transport_type="tcp",
protocol_type="binary",
host="192.168.1.100",
port=4000
)
client = CameraClient(config)
try:
await client.connect()
print("✅ Connected")
response = await client.execute_command("set_brightness", {
"brightness": 5,
"region": "C1",
"camera_type": "daylight"
})
assert response.success, "Command failed"
print("✅ Test passed")
finally:
await client.disconnect()
# Çalıştır
asyncio.run(test_camera())from backend.app.transport.base import CameraTransport, TransportConfig
class MockTransport(CameraTransport):
"""Mock transport for testing"""
async def connect(self):
self.is_connected = True
return True
async def disconnect(self):
self.is_connected = False
return True
async def send(self, data: bytes):
print(f"Mock send: {data.hex()}")
return len(data)
async def receive(self, size: int):
# Return mock response
return b'\x00\x00\x00\x11\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\xFF'
# Test ile kullan
config = TransportConfig(transport_type=TransportType.TCP)
transport = MockTransport(config)
async with transport:
await transport.send(b'\x00\x00\x00\x10')
response = await transport.receive()
print(f"Mock response: {response.hex()}")from backend.app.transport.base import NetworkTransport
class WebSocketTransport(NetworkTransport):
"""WebSocket transport implementation"""
async def connect(self):
# WebSocket connection logic
import websockets
self.ws = await websockets.connect(f"ws://{self.host}:{self.port}")
self.is_connected = True
return True
async def disconnect(self):
await self.ws.close()
self.is_connected = False
return True
async def send(self, data: bytes):
await self.ws.send(data)
return len(data)
async def receive(self, size: int):
data = await self.ws.recv()
return data if isinstance(data, bytes) else data.encode()
# Factory'ye kaydet
from backend.app.transport import TransportFactory, TransportType
TransportFactory.register_transport(
TransportType.WEBSOCKET,
WebSocketTransport
)
# Kullan
config = TransportConfig(
transport_type=TransportType.WEBSOCKET,
host="camera.local",
port=8080
)
transport = TransportFactory.create(config)from backend.app.protocol.base import CameraProtocol, ProtocolCommand, ProtocolResponse
class JSONProtocol(CameraProtocol):
"""JSON-based protocol"""
def encode_command(self, command: ProtocolCommand):
import json
data = {
"command": command.name,
"params": command.params
}
return json.dumps(data).encode()
def decode_response(self, data: bytes):
import json
try:
response = json.loads(data.decode())
return ProtocolResponse(
success=response.get("success", False),
data=response.get("data", {}),
raw_bytes=data
)
except Exception as e:
return ProtocolResponse(
success=False,
error_message=str(e),
raw_bytes=data
)
def get_available_commands(self):
return ["get_status", "set_config", "reboot"]
# Kullan
protocol = JSONProtocol(ProtocolType.HTTP)# Kameraları önceden bağla
manager = get_camera_manager()
manager.load_cameras_from_config()
await manager.connect_all()
# Artık hızlı komut gönderebilirsin
for _ in range(100):
client = manager.get_client("cam1")
await client.execute_command("set_brightness", {"brightness": 5})# Hızlı timeout (düşük gecikme ağlar)
config = CameraConfig(
...,
timeout=1.0,
retry_count=1
)
# Yavaş timeout (güvenilir olmayan ağlar)
config = CameraConfig(
...,
timeout=10.0,
retry_count=5
)from fastapi import FastAPI, HTTPException
from backend.app.camera import get_camera_manager
app = FastAPI()
@app.on_event("startup")
async def startup():
manager = get_camera_manager()
manager.load_cameras_from_config()
await manager.connect_all()
@app.post("/api/camera/{camera_id}/command")
async def execute_command(camera_id: str, command: str, params: dict):
manager = get_camera_manager()
client = manager.get_client(camera_id)
if not client:
raise HTTPException(404, "Camera not found")
response = await client.execute_command(command, params)
if not response.success:
raise HTTPException(500, response.error_message)
return {"success": True, "data": response.data}Daha fazla örnek için ARCHITECTURE.md dosyasına bakın! 🎉