Skip to content

Latest commit

 

History

History
617 lines (466 loc) · 13.2 KB

File metadata and controls

617 lines (466 loc) · 13.2 KB

📖 Kullanım Örnekleri

Modüler Transport & Protocol Sistemi Kullanım Kılavuzu


🎯 Hızlı Başlangıç

1. Basit TCP Kamera

from backend.app.camera import CameraClient, CameraConfig

# Kamera konfigürasyonu
config = CameraConfig(
    camera_id="cam1",
    name="Front Camera",
    transport_type="tcp",      # TCP transport
    protocol_type="binary",    # Binary protocol
    host="192.168.1.100",
    port=4000
)

# Client oluştur
client = CameraClient(config)

# Async context manager ile kullan
async with client:
    # Brightness ayarla
    response = await client.execute_command("set_brightness", {
        "brightness": 5,
        "region": "C1",
        "camera_type": "daylight"
    })
    
    if response.success:
        print("✅ Brightness ayarlandı!")

🔌 Transport Örnekleri

TCP Transport (Ethernet Kameralar)

config = CameraConfig(
    camera_id="ethernet-cam",
    name="Ethernet Camera",
    transport_type="tcp",
    protocol_type="binary",
    host="192.168.1.100",
    port=4000,
    timeout=5.0,
    retry_count=3
)

client = CameraClient(config)
await client.connect()

# Komut gönder
response = await client.execute_command("set_digital_zoom", {
    "zoom_level": "2x",
    "region": "C1",
    "camera_type": "daylight"
})

await client.disconnect()

UDP Transport (Düşük Gecikme)

config = CameraConfig(
    camera_id="thermal-cam",
    name="Thermal Camera",
    transport_type="udp",      # UDP - connectionless
    protocol_type="binary",
    host="192.168.1.101",
    port=4001
)

async with CameraClient(config) as client:
    # Thermal polarity değiştir
    await client.execute_command("set_polarity", {
        "polarity": "white_hot",
        "region": "C1"
    })

Serial Transport (RS-232/RS-485)

config = CameraConfig(
    camera_id="serial-cam",
    name="Serial Camera",
    transport_type="serial",   # Serial port
    protocol_type="binary",
    device="/dev/ttyUSB0",
    baudrate=9600,
    extra={
        "parity": "N",         # None, Even, Odd
        "stopbits": 1,
        "bytesize": 8
    }
)

async with CameraClient(config) as client:
    await client.execute_command("set_brightness", {
        "brightness": 3,
        "region": "C1",
        "camera_type": "daylight"
    })

UART Transport (Gömülü Sistemler)

config = CameraConfig(
    camera_id="embedded-cam",
    name="Embedded UART Camera",
    transport_type="uart",     # UART (hardware-level serial)
    protocol_type="binary",
    device="/dev/ttyS0",
    baudrate=115200
)

async with CameraClient(config) as client:
    await client.execute_command("set_framerate", {
        "framerate": 30,
        "region": "C1",
        "camera_type": "daylight"
    })

VISCA Transport (Sony Kameralar)

config = CameraConfig(
    camera_id="sony-ptz",
    name="Sony PTZ Camera",
    transport_type="visca",    # VISCA protocol over serial
    protocol_type="visca",
    device="/dev/ttyUSB0",
    baudrate=9600,
    camera_address=1           # VISCA address (1-7)
)

async with CameraClient(config) as client:
    # Zoom in
    await client.execute_command("zoom_in", {"speed": 3})
    
    # Pan left
    await client.execute_command("pan_left", {"speed": 12})
    
    # Home position
    await client.execute_command("pan_tilt_home", {})

🎮 Camera Manager Kullanımı

Tüm Kameraları Yükle

from backend.app.camera import get_camera_manager

# Manager instance al (singleton)
manager = get_camera_manager()

# cameras.yaml'dan kameraları yükle
manager.load_cameras_from_config()

# Tüm kamera ID'lerini listele
camera_ids = manager.get_camera_ids()
print(f"Loaded {len(camera_ids)} cameras: {camera_ids}")

Belirli Kamera ile Çalış

# Kamera client'ı al
client = manager.get_client("cam-front-day")

if client:
    async with client:
        response = await client.execute_command("set_brightness", {
            "brightness": 5
        })
        print(f"Response: {response.success}")
else:
    print("Camera not found!")

Tüm Kameralara Bağlan

# Tüm kameralara bağlan
await manager.connect_all()

# Komutları çalıştır
for cam_id in manager.get_camera_ids():
    client = manager.get_client(cam_id)
    if client:
        await client.execute_command("set_brightness", {"brightness": 5})

# Tüm bağlantıları kapat
await manager.disconnect_all()

🔧 Düşük Seviye Transport Kullanımı

Direkt Transport Kullanımı

from backend.app.transport import create_tcp_transport

# TCP transport oluştur
transport = create_tcp_transport("192.168.1.100", 4000)

async with transport:
    # Raw bytes gönder
    command = b'\x00\x00\x00\x10\x00\x00\x00\x02\x05\x01\x01'
    await transport.send(command)
    
    # Cevap al
    response = await transport.receive()
    print(f"Response: {response.hex()}")

VISCA Transport Direkt Kullanımı

from backend.app.transport import create_visca_transport

transport = create_visca_transport(
    device="/dev/ttyUSB0",
    baudrate=9600,
    camera_address=1
)

async with transport:
    # VISCA zoom in
    response = await transport.zoom_in(speed=3)
    print(f"Zoom response: {response}")
    
    # Pan/tilt
    await transport.pan_tilt(
        pan_speed=12,
        tilt_speed=12,
        pan_dir=1,    # 1=left, 2=right, 3=stop
        tilt_dir=3    # 1=up, 2=down, 3=stop
    )

📡 Protocol Kullanımı

Binary Protocol

from backend.app.protocol import BinaryProtocol, ProtocolCommand

protocol = BinaryProtocol()

# Komut oluştur
cmd = ProtocolCommand(
    name="set_brightness",
    params={
        "brightness": 5,
        "region": "C1",
        "camera_type": "daylight"
    }
)

# Encode et
encoded = protocol.encode_command(cmd)
print(f"Encoded: {encoded.hex()}")

# Cevap decode et
response_bytes = b'\x00\x00\x00\x11\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\xFF'
response = protocol.decode_response(response_bytes)

if response.success:
    print("Command successful!")

VISCA Protocol

from backend.app.protocol import VISCAProtocol, ProtocolCommand

protocol = VISCAProtocol(camera_address=1)

# Zoom in komutu
cmd = ProtocolCommand(
    name="zoom_in",
    params={"speed": 3}
)

encoded = protocol.encode_command(cmd)
print(f"VISCA command: {encoded.hex()}")
# Output: 81 04 07 23 ff

🏭 Factory Pattern Kullanımı

Transport Factory

from backend.app.transport import TransportFactory, TransportConfig, TransportType

# Config'den transport oluştur
config = TransportConfig(
    transport_type=TransportType.TCP,
    host="192.168.1.100",
    port=4000
)

transport = TransportFactory.create(config)

Dictionary'den Transport Oluştur

config_dict = {
    "transport_type": "tcp",
    "host": "192.168.1.100",
    "port": 4000,
    "timeout": 5.0
}

transport = TransportFactory.create_from_dict(config_dict)

🔍 Debugging

Loglama Aktif Et

import logging

# Root logger
logging.basicConfig(level=logging.DEBUG)

# Transport logger
transport_logger = logging.getLogger('backend.app.transport')
transport_logger.setLevel(logging.DEBUG)

# Protocol logger
protocol_logger = logging.getLogger('backend.app.protocol')
protocol_logger.setLevel(logging.DEBUG)

# Şimdi tüm transport/protocol işlemleri loglanacak

Hex Dump Utility

def hexdump(data: bytes, prefix: str = ""):
    """Pretty print hex data"""
    hex_str = ' '.join(f'{b:02X}' for b in data)
    ascii_str = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)
    print(f"{prefix}{hex_str}  |{ascii_str}|")

# Kullanım
command = b'\x00\x00\x00\x10\x00\x00\x00\x02\x05\x01\x01'
hexdump(command, "TX: ")
# Output: TX: 00 00 00 10 00 00 00 02 05 01 01  |.........|

🧪 Test Örnekleri

Basit Test

import asyncio
from backend.app.camera import CameraClient, CameraConfig

async def test_camera():
    config = CameraConfig(
        camera_id="test",
        name="Test Camera",
        transport_type="tcp",
        protocol_type="binary",
        host="192.168.1.100",
        port=4000
    )
    
    client = CameraClient(config)
    
    try:
        await client.connect()
        print("✅ Connected")
        
        response = await client.execute_command("set_brightness", {
            "brightness": 5,
            "region": "C1",
            "camera_type": "daylight"
        })
        
        assert response.success, "Command failed"
        print("✅ Test passed")
        
    finally:
        await client.disconnect()

# Çalıştır
asyncio.run(test_camera())

Mock Transport Test

from backend.app.transport.base import CameraTransport, TransportConfig

class MockTransport(CameraTransport):
    """Mock transport for testing"""
    
    async def connect(self):
        self.is_connected = True
        return True
    
    async def disconnect(self):
        self.is_connected = False
        return True
    
    async def send(self, data: bytes):
        print(f"Mock send: {data.hex()}")
        return len(data)
    
    async def receive(self, size: int):
        # Return mock response
        return b'\x00\x00\x00\x11\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\xFF'

# Test ile kullan
config = TransportConfig(transport_type=TransportType.TCP)
transport = MockTransport(config)

async with transport:
    await transport.send(b'\x00\x00\x00\x10')
    response = await transport.receive()
    print(f"Mock response: {response.hex()}")

🎨 Özel Transport/Protocol Ekleme

Özel Transport Oluştur

from backend.app.transport.base import NetworkTransport

class WebSocketTransport(NetworkTransport):
    """WebSocket transport implementation"""
    
    async def connect(self):
        # WebSocket connection logic
        import websockets
        self.ws = await websockets.connect(f"ws://{self.host}:{self.port}")
        self.is_connected = True
        return True
    
    async def disconnect(self):
        await self.ws.close()
        self.is_connected = False
        return True
    
    async def send(self, data: bytes):
        await self.ws.send(data)
        return len(data)
    
    async def receive(self, size: int):
        data = await self.ws.recv()
        return data if isinstance(data, bytes) else data.encode()

# Factory'ye kaydet
from backend.app.transport import TransportFactory, TransportType

TransportFactory.register_transport(
    TransportType.WEBSOCKET,
    WebSocketTransport
)

# Kullan
config = TransportConfig(
    transport_type=TransportType.WEBSOCKET,
    host="camera.local",
    port=8080
)
transport = TransportFactory.create(config)

Özel Protocol Oluştur

from backend.app.protocol.base import CameraProtocol, ProtocolCommand, ProtocolResponse

class JSONProtocol(CameraProtocol):
    """JSON-based protocol"""
    
    def encode_command(self, command: ProtocolCommand):
        import json
        data = {
            "command": command.name,
            "params": command.params
        }
        return json.dumps(data).encode()
    
    def decode_response(self, data: bytes):
        import json
        try:
            response = json.loads(data.decode())
            return ProtocolResponse(
                success=response.get("success", False),
                data=response.get("data", {}),
                raw_bytes=data
            )
        except Exception as e:
            return ProtocolResponse(
                success=False,
                error_message=str(e),
                raw_bytes=data
            )
    
    def get_available_commands(self):
        return ["get_status", "set_config", "reboot"]

# Kullan
protocol = JSONProtocol(ProtocolType.HTTP)

📊 Performans İpuçları

Connection Pooling

# Kameraları önceden bağla
manager = get_camera_manager()
manager.load_cameras_from_config()
await manager.connect_all()

# Artık hızlı komut gönderebilirsin
for _ in range(100):
    client = manager.get_client("cam1")
    await client.execute_command("set_brightness", {"brightness": 5})

Timeout Ayarları

# Hızlı timeout (düşük gecikme ağlar)
config = CameraConfig(
    ...,
    timeout=1.0,
    retry_count=1
)

# Yavaş timeout (güvenilir olmayan ağlar)
config = CameraConfig(
    ...,
    timeout=10.0,
    retry_count=5
)

🚀 Production Kullanımı

FastAPI Entegrasyonu

from fastapi import FastAPI, HTTPException
from backend.app.camera import get_camera_manager

app = FastAPI()

@app.on_event("startup")
async def startup():
    manager = get_camera_manager()
    manager.load_cameras_from_config()
    await manager.connect_all()

@app.post("/api/camera/{camera_id}/command")
async def execute_command(camera_id: str, command: str, params: dict):
    manager = get_camera_manager()
    client = manager.get_client(camera_id)
    
    if not client:
        raise HTTPException(404, "Camera not found")
    
    response = await client.execute_command(command, params)
    
    if not response.success:
        raise HTTPException(500, response.error_message)
    
    return {"success": True, "data": response.data}

Daha fazla örnek için ARCHITECTURE.md dosyasına bakın! 🎉