Profesyonel yazılım geliştirme standartlarına göre tasarlanmış, genişletilebilir mimari.
✅ Single Responsibility - Her sınıf tek bir sorumluluğa sahip
✅ Open/Closed - Yeni transport/protocol eklemek için mevcut kodu değiştirmiyoruz
✅ Liskov Substitution - Tüm transport'lar birbirinin yerine kullanılabilir
✅ Interface Segregation - Küçük, odaklanmış interface'ler
✅ Dependency Inversion - Soyut interface'lere bağımlılık
- Strategy Pattern - Transport seçimi runtime'da
- Factory Pattern - Transport oluşturma merkezi
- Bridge Pattern - Transport ve Protocol ayrımı
- Facade Pattern - CameraClient basit interface
- Singleton Pattern - CameraManager tek instance
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ (FastAPI endpoints, business logic) │
└────────────────────────┬────────────────────────────────────┘
│
┌────────────────────────▼────────────────────────────────────┐
│ Camera Client (Facade) │
│ • CameraClient - High-level interface │
│ • CameraManager - Multi-camera management │
└────────┬──────────────────────────────┬─────────────────────┘
│ │
┌────────▼──────────┐ ┌─────────▼──────────┐
│ Protocol Layer │ │ Transport Layer │
│ │ │ │
│ • BinaryProtocol │ │ • TCPTransport │
│ • VISCAProtocol │ │ • UDPTransport │
│ • ONVIFProtocol │ │ • SerialTransport │
│ │ │ • UARTTransport │
│ (What to send) │ │ • VISCATransport │
│ │ │ │
│ │ │ (How to send) │
└───────────────────┘ └─────────────────────┘
class CameraTransport(ABC):
async def connect() -> bool
async def disconnect() -> bool
async def send(data: bytes) -> int
async def receive(size: int) -> bytes
async def send_command(command: bytes) -> bytes# Reliable, connection-oriented
# Use case: Ethernet cameras, stable networks
config = TransportConfig(
transport_type=TransportType.TCP,
host="192.168.1.100",
port=4000
)
transport = TCPTransport(config)# Connectionless, fast but unreliable
# Use case: Low-latency, packet loss acceptable
config = TransportConfig(
transport_type=TransportType.UDP,
host="192.168.1.100",
port=4000
)
transport = UDPTransport(config)# Serial port communication
# Use case: Direct serial connection
config = TransportConfig(
transport_type=TransportType.SERIAL,
device="/dev/ttyUSB0",
baudrate=9600,
extra={'parity': 'N', 'stopbits': 1}
)
transport = SerialTransport(config)# Alias for SerialTransport
# Use case: Embedded systems, UART pins
config = TransportConfig(
transport_type=TransportType.UART,
device="/dev/ttyS0",
baudrate=115200
)
transport = UARTTransport(config)# Sony VISCA protocol over serial
# Use case: Sony professional cameras
config = TransportConfig(
transport_type=TransportType.VISCA,
device="/dev/ttyUSB0",
baudrate=9600
)
transport = VISCATransport(config, camera_address=1)class CameraProtocol(ABC):
def encode_command(command: ProtocolCommand) -> bytes
def decode_response(data: bytes) -> ProtocolResponse
def get_available_commands() -> list[str]# Custom binary protocol
# Frame: [length(4)] [msg_id(4)] [payload]
protocol = BinaryProtocol()
cmd = ProtocolCommand(name="set_brightness", params={"brightness": 5})
data = protocol.encode_command(cmd)# Sony VISCA protocol
# Frame: [0x8X] [category] [command] [data] [0xFF]
protocol = VISCAProtocol(camera_address=1)
cmd = ProtocolCommand(name="zoom_in", params={"speed": 3})
data = protocol.encode_command(cmd)# TCP + Binary Protocol
config = CameraConfig(
camera_id="cam1",
name="Front Camera",
transport_type="tcp",
protocol_type="binary",
host="192.168.1.100",
port=4000
)
client = CameraClient(config)
# Use with context manager
async with client:
response = await client.execute_command("set_brightness", {
"brightness": 5,
"region": "C1"
})
print(response.success)config = CameraConfig(
camera_id="cam2",
name="PTZ Camera",
transport_type="visca",
protocol_type="visca",
device="/dev/ttyUSB0",
baudrate=9600,
camera_address=1
)
client = CameraClient(config)
async with client:
await client.execute_command("zoom_in", {"speed": 3})
await client.execute_command("pan_left", {"speed": 12})from backend.app.camera import get_camera_manager
# Get manager instance
manager = get_camera_manager()
# Load cameras from YAML
manager.load_cameras_from_config()
# Connect to all cameras
await manager.connect_all()
# Get specific camera
client = manager.get_client("cam-front-day")
if client:
await client.execute_command("set_brightness", {"brightness": 5})
# Disconnect all
await manager.disconnect_all()# 1. Transport sınıfı oluştur
class WebSocketTransport(NetworkTransport):
async def connect(self) -> bool:
# WebSocket connection logic
pass
async def send(self, data: bytes) -> int:
# Send over WebSocket
pass
async def receive(self, size: int) -> bytes:
# Receive from WebSocket
pass
# 2. Factory'ye kaydet
TransportFactory.register_transport(
TransportType.WEBSOCKET,
WebSocketTransport
)
# 3. Kullan!
config = TransportConfig(
transport_type=TransportType.WEBSOCKET,
host="ws://camera.local",
port=8080
)
transport = TransportFactory.create(config)# 1. Protocol sınıfı oluştur
class ONVIFProtocol(CameraProtocol):
def encode_command(self, command: ProtocolCommand) -> bytes:
# ONVIF SOAP XML encoding
pass
def decode_response(self, data: bytes) -> ProtocolResponse:
# Parse ONVIF response
pass
def get_available_commands(self) -> list[str]:
return ['get_device_info', 'get_profiles', 'ptz_move', ...]
# 2. CameraClient'ta kullan
config = CameraConfig(
camera_id="cam3",
name="ONVIF Camera",
transport_type="tcp",
protocol_type="onvif", # Yeni protocol
host="192.168.1.102",
port=80
)cameras:
- id: "cam-ethernet"
name: "Ethernet Camera"
type: "daylight"
enabled: true
ptz:
protocol: "tcp" # Transport
host: "192.168.1.100"
port: 4000
commands:
- id: "brightness"
command: "set_brightness" # Protocol command
params: {brightness: 3}cameras:
- id: "cam-visca"
name: "Sony VISCA Camera"
type: "daylight"
enabled: true
ptz:
protocol: "visca" # Transport + Protocol
device: "/dev/ttyUSB0"
baudrate: 9600
camera_address: 1
commands:
- id: "zoom_in"
command: "zoom_in"
params: {speed: 3}cameras:
- id: "cam-uart"
name: "Embedded UART Camera"
type: "thermal"
enabled: true
ptz:
protocol: "uart"
device: "/dev/ttyS0"
baudrate: 115200
commands:
- id: "brightness"
command: "set_brightness"
params: {brightness: 5}import asyncio
from backend.app.transport import create_tcp_transport
async def test_tcp():
transport = create_tcp_transport("192.168.1.100", 4000)
async with transport:
# Send command
await transport.send(b'\x00\x00\x00\x10\x00\x00\x00\x02\x05\x01\x01')
# Receive response
response = await transport.receive()
print(f"Response: {response.hex()}")
asyncio.run(test_tcp())from backend.app.transport import create_visca_transport
async def test_visca():
transport = create_visca_transport("/dev/ttyUSB0", baudrate=9600, camera_address=1)
async with transport:
# Zoom in
response = await transport.zoom_in(speed=3)
print(f"Zoom in: {response}")
# Pan/tilt home
response = await transport.pan_tilt_home()
print(f"Home: {response}")
asyncio.run(test_visca())from backend.app.camera import CameraClient, CameraConfig
async def test_client():
config = CameraConfig(
camera_id="test",
name="Test Camera",
transport_type="tcp",
protocol_type="binary",
host="192.168.1.100",
port=4000
)
client = CameraClient(config)
async with client:
response = await client.execute_command("set_brightness", {
"brightness": 5,
"region": "C1",
"camera_type": "daylight"
})
if response.success:
print("✅ Command successful!")
else:
print(f"❌ Command failed: {response.error_message}")
asyncio.run(test_client())import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('backend.app.transport')
logger.setLevel(logging.DEBUG)def hexdump(data: bytes, prefix: str = ""):
"""Pretty print hex data"""
hex_str = ' '.join(f'{b:02X}' for b in data)
print(f"{prefix}{hex_str}")
# Usage
hexdump(command_bytes, "TX: ")
hexdump(response_bytes, "RX: ")| Transport | Protocol | Use Case | Reliability | Speed | Complexity |
|---|---|---|---|---|---|
| TCP | Binary | Ethernet camera | High | Medium | Low |
| UDP | Binary | Low-latency | Low | High | Low |
| Serial | Binary | Direct connection | High | Low | Medium |
| UART | Binary | Embedded system | High | Low | Medium |
| VISCA | VISCA | Sony camera | High | Low | High |
Bu mimari sayesinde:
✅ Yeni transport eklemek kolay - Sadece CameraTransport'tan türet
✅ Yeni protocol eklemek kolay - Sadece CameraProtocol'dan türet
✅ Test edilebilir - Her katman bağımsız test edilebilir
✅ Bakımı kolay - SOLID prensipleri sayesinde
✅ Genişletilebilir - Factory pattern ile yeni tipler eklenebilir
✅ PetaLinux uyumlu - Minimal dependency, async I/O
backend/app/transport/base.py- Abstract basebackend/app/transport/tcp.py- TCP implementationbackend/app/transport/udp.py- UDP implementationbackend/app/transport/serial.py- Serial/UART implementationbackend/app/transport/visca.py- VISCA implementationbackend/app/transport/factory.py- Factory pattern
backend/app/protocol/base.py- Abstract basebackend/app/protocol/binary.py- Binary protocolbackend/app/protocol/visca_protocol.py- VISCA protocol
backend/app/camera/client.py- Camera client facadebackend/app/camera/manager.py- Multi-camera manager
cameras.yaml- Camera definitionsconfig.yaml- System configuration
Profesyonel, modüler, genişletilebilir! 🎉