Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/connection/connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from yaqd_core import Base
from yaqd_core import IsDaemon


class ConnectionTest(Base):
class ConnectionTest(IsDaemon):
_kind = "connection-test"

def echo(self, s: str):
Expand Down
1 change: 1 addition & 0 deletions yaqd-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/).
## [Unreleased]

### Fixed
- handle cleanup when a client connection is broken
- type hints for IsSensor attributes are appropriate for _n_-dimensional data

## [2023.11.0]
Expand Down
129 changes: 71 additions & 58 deletions yaqd-core/yaqd_core/_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ def __init__(self, daemon, *args, **kwargs):

def connection_lost(self, exc):
peername = self.transport.get_extra_info("peername")
self.logger.info(f"Connection lost from {peername} to {self._daemon.name}")
self.logger.info(f"Connection lost from {peername}")
Comment thread
ddkohler marked this conversation as resolved.
Outdated
self.task.cancel()
self._daemon._connection_lost(peername)

def connection_made(self, transport):
"""Process an incomming connection."""
peername = transport.get_extra_info("peername")
self.logger.info(f"Connection made from {peername} to {self._daemon.name}")
self.logger.info(f"Connection made from {peername}")
self.transport = transport
self.unpacker = avrorpc.Unpacker(self._avro_protocol)
self._daemon._connection_made(peername)
self.task = asyncio.get_event_loop().create_task(self.process_requests())
self.task = self._daemon._loop.create_task(self.process_requests())
self._daemon._tasks.append(self.task)
self.task.add_done_callback(self._daemon._tasks.remove)

def data_received(self, data):
"""Process an incomming request."""
Expand All @@ -38,61 +40,72 @@ def data_received(self, data):
self.unpacker.feed(data)

async def process_requests(self):
async for hs, meta, name, params in self.unpacker:
if hs is not None:
out = bytes(hs)
out = struct.pack(">L", len(out)) + out
self.transport.write(out)
if hs.match == "NONE":
name = ""
try:
async for hs, meta, name, params in self.unpacker:
if hs is not None:
out = bytes(hs)
out = struct.pack(">L", len(out)) + out
self.transport.write(out)
if hs.match == "NONE":
name = ""

out_meta = io.BytesIO()
fastavro.schemaless_writer(
out_meta, {"type": "map", "values": "bytes"}, meta
)
length = out_meta.tell()
self.transport.write(struct.pack(">L", length) + out_meta.getvalue())
self.logger.debug(f"Wrote meta, {meta}, {out_meta.getvalue()}")
try:
response_out = io.BytesIO()
response = None
response_schema = "null"
if name:
fun = getattr(self._daemon, name)
if params is None:
params = []
response = fun(*params)
response_schema = fastavro.parse_schema(
self._avro_protocol["messages"][name].get("response", "null"),
expand=True,
named_schemas=self._named_types,
out_meta = io.BytesIO()
fastavro.schemaless_writer(
out_meta, {"type": "map", "values": "bytes"}, meta
)
length = out_meta.tell()
self.transport.write(struct.pack(">L", length) + out_meta.getvalue())
self.logger.debug(f"Wrote meta, {meta}, {out_meta.getvalue()}")
try:
response_out = io.BytesIO()
response = None
response_schema = "null"
if name:
fun = getattr(self._daemon, name)
if params is None:
params = []
response = fun(*params)
response_schema = fastavro.parse_schema(
self._avro_protocol["messages"][name].get(
"response", "null"
),
expand=True,
named_schemas=self._named_types,
)
# Needed twice for nested types... Probably can be fixed upstream
response_schema = fastavro.parse_schema(
response_schema,
expand=True,
named_schemas=self._named_types,
)
fastavro.schemaless_writer(response_out, response_schema, response)
except Exception as e:
self.logger.error(f"Caught exception: {type(e)} in message {name}")
self.logger.debug(traceback.format_exc())
self.transport.write(struct.pack(">L", 1) + b"\1")
error_out = io.BytesIO()
fastavro.schemaless_writer(error_out, ["string"], repr(e))
length = error_out.tell()
self.transport.write(
struct.pack(">L", length) + error_out.getvalue()
)
# Needed twice for nested types... Probably can be fixed upstream
response_schema = fastavro.parse_schema(
response_schema,
expand=True,
named_schemas=self._named_types,
else:
self.transport.write(struct.pack(">L", 1) + b"\0")
self.logger.debug(f"Wrote non-error flag")
length = response_out.tell()
self.transport.write(
struct.pack(">L", length) + response_out.getvalue()
)
fastavro.schemaless_writer(response_out, response_schema, response)
except Exception as e:
self.logger.error(f"Caught exception: {type(e)} in message {name}")
self.logger.debug(traceback.format_exc())
self.transport.write(struct.pack(">L", 1) + b"\1")
error_out = io.BytesIO()
fastavro.schemaless_writer(error_out, ["string"], repr(e))
length = error_out.tell()
self.transport.write(struct.pack(">L", length) + error_out.getvalue())
else:
self.transport.write(struct.pack(">L", 1) + b"\0")
self.logger.debug(f"Wrote non-error flag")
length = response_out.tell()
self.transport.write(
struct.pack(">L", length) + response_out.getvalue()
)
self.logger.debug(
f"Wrote response {response}, {response_out.getvalue()}"
)
self.transport.write(struct.pack(">L", 0))
if name == "shutdown":
self.logger.debug("Closing transport")
self.transport.close()
self.logger.debug(
f"Wrote response {response}, {response_out.getvalue()}"
)
self.transport.write(struct.pack(">L", 0))
if name == "shutdown":
self.logger.debug("Closing transport")
self.transport.close()
except asyncio.CancelledError as e:
self.logger.debug("task cancellation caught")
await self.unpacker.__aexit__(None, None, None)
self.transport.close()
self.logger.debug(f"file closed? {self.unpacker._file.closed}")
raise e
18 changes: 12 additions & 6 deletions yaqd-core/yaqd_core/avrorpc/unpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,19 @@ async def __anext__(self):
except (ValueError, struct.error):
await self.new_data.wait()

async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0)
self._file.close()
self.buf.close()

def feed(self, data: bytes):
# Must support random access, if it does not, must be fed externally (e.g. TCP)
pos = self._file.tell()
self._file.seek(0, 2)
self._file.write(data)
self._file.seek(pos)
self.new_data.set()
if not self._file.closed:
# Must support random access, if it does not, must be fed externally (e.g. TCP)
pos = self._file.tell()
self._file.seek(0, 2)
self._file.write(data)
self._file.seek(pos)
self.new_data.set()

async def _read_object(self, schema):
schema = fastavro.parse_schema(
Expand Down