Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
name: Publish packages

on:
push:
tags:
- 'v*-rc*'
- 'v*-test*'
- 'v*-alpha*'
- 'v*-beta*'
release:
types: [published]
workflow_dispatch:
Expand Down Expand Up @@ -157,9 +163,9 @@ jobs:
runs-on: ubuntu-latest

permissions:
id-token: write # Required for OIDC trusted publishing
actions: read # Required for actions/download-artifact
contents: read # Required for repository access
id-token: write # Required for OIDC trusted publishing
actions: read # Required for actions/download-artifact
contents: write # Required for gh release create/upload

environment:
name: publish
Expand All @@ -184,8 +190,26 @@ jobs:
name: artifacts-macOS
path: dist

- name: Create GitHub Release (test tag)
if: github.event_name == 'push'
env:
GH_TOKEN: ${{ github.token }}
run: |
gh release create "${{ github.ref_name }}" \
--prerelease \
--generate-notes \
--title "${{ github.ref_name }}" \
dist/*

- name: Upload assets to existing Release
if: github.event_name == 'release'
env:
GH_TOKEN: ${{ github.token }}
run: |
gh release upload "${{ github.event.release.tag_name }}" dist/* --clobber

- name: Publish to PyPI
if: ${{ github.event_name == 'release' || github.event.inputs.publish_pypi == 'true' }}
if: ${{ github.event_name == 'release' || (github.event_name == 'workflow_dispatch' && github.event.inputs.publish_pypi == 'true') }}
uses: pypa/gh-action-pypi-publish@release/v1

# - name: Publish Conda package
Expand Down
9 changes: 7 additions & 2 deletions singlestoredb/apps/_python_udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
if typing.TYPE_CHECKING:
from ._uvicorn_util import AwaitableUvicornServer

# Keep track of currently running server
# Keep track of currently running server and app
_running_server: 'typing.Optional[AwaitableUvicornServer]' = None
_running_app: typing.Optional[Application] = None

# Maximum number of UDFs allowed
MAX_UDFS_LIMIT = 10
Expand All @@ -21,7 +22,7 @@ async def run_udf_app(
log_level: str = 'error',
kill_existing_app_server: bool = True,
) -> UdfConnectionInfo:
global _running_server
global _running_server, _running_app
from ._uvicorn_util import AwaitableUvicornServer

try:
Expand All @@ -38,6 +39,9 @@ async def run_udf_app(
if _running_server is not None:
await _running_server.shutdown()
_running_server = None
if _running_app is not None:
_running_app.shutdown()
_running_app = None
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

# Kill if any other process is occupying the port
kill_process_by_port(app_config.listen_port)
Expand Down Expand Up @@ -72,6 +76,7 @@ async def run_udf_app(
if app_config.running_interactively:
app.register_functions(replace=True)

_running_app = app
_running_server = AwaitableUvicornServer(config)
asyncio.create_task(_running_server.serve())
await _running_server.wait_for_startup()
Expand Down
44 changes: 36 additions & 8 deletions singlestoredb/functions/ext/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"""
import argparse
import asyncio
import concurrent.futures
import contextvars
import dataclasses
import datetime
Expand Down Expand Up @@ -1000,6 +1001,15 @@ def __init__(
self.log_level = log_level
self.disable_metrics = disable_metrics

# Dedicated event loop for async UDF execution, isolated from the server loop
self._udf_loop = asyncio.new_event_loop()
self._udf_thread = threading.Thread(
target=self._udf_loop.run_forever,
daemon=True,
name='async-udf-loop',
)
self._udf_thread.start()
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

# Configure logging
self._configure_logging()

Expand Down Expand Up @@ -1033,6 +1043,11 @@ def _configure_logging(self) -> None:
# Prevent propagation to avoid duplicate or differently formatted messages
self.logger.propagate = False

def shutdown(self) -> None:
"""Shut down the dedicated UDF event loop."""
self._udf_loop.call_soon_threadsafe(self._udf_loop.stop)
self._udf_thread.join(timeout=5)
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

def get_uvicorn_log_config(self) -> Dict[str, Any]:
"""
Create uvicorn log config that matches the Application's logging format.
Expand Down Expand Up @@ -1189,15 +1204,24 @@ async def __call__(
func_info['colspec'], b''.join(data),
)

func_task = asyncio.create_task(
func(cancel_event, call_timer, *inputs)
if func_info['is_async']
else to_thread(
Comment thread
cursor[bot] marked this conversation as resolved.
lambda: asyncio.run(
func(cancel_event, call_timer, *inputs),
func_task: 'asyncio.Task[Any]'
udf_future: 'Optional[concurrent.futures.Future[Any]]' = None
if func_info['is_async']:
udf_future = asyncio.run_coroutine_threadsafe(
func(cancel_event, call_timer, *inputs),
self._udf_loop,
)
func_task = asyncio.ensure_future(
asyncio.wrap_future(udf_future),
)
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
else:
func_task = asyncio.create_task(
to_thread(
lambda: asyncio.run(
func(cancel_event, call_timer, *inputs),
),
),
),
)
)
disconnect_task = asyncio.create_task(
asyncio.sleep(int(1e9))
if ignore_cancel else cancel_on_disconnect(receive),
Expand All @@ -1218,12 +1242,16 @@ async def __call__(
for task in done:
if task is disconnect_task:
cancel_event.set()
if udf_future is not None:
udf_future.cancel()
raise asyncio.CancelledError(
'Function call was cancelled by client disconnect',
)

elif task is timeout_task:
cancel_event.set()
if udf_future is not None:
udf_future.cancel()
raise asyncio.TimeoutError(
'Function call was cancelled due to timeout',
)
Expand Down
34 changes: 18 additions & 16 deletions singlestoredb/tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
try:
import pandas as pd
has_pandas = True
_pd_str_dtype = str(pd.DataFrame({'a': ['x']}).dtypes['a'])
except ImportError:
has_pandas = False
_pd_str_dtype = 'object'


class TestConnection(unittest.TestCase):
Expand Down Expand Up @@ -1124,21 +1126,21 @@ def test_alltypes_pandas(self):
('timestamp', 'datetime64[us]'),
('timestamp_6', 'datetime64[us]'),
('year', 'float64'),
('char_100', 'object'),
('char_100', _pd_str_dtype),
('binary_100', 'object'),
('varchar_200', 'object'),
('varchar_200', _pd_str_dtype),
('varbinary_200', 'object'),
('longtext', 'object'),
('mediumtext', 'object'),
('text', 'object'),
('tinytext', 'object'),
('longtext', _pd_str_dtype),
('mediumtext', _pd_str_dtype),
('text', _pd_str_dtype),
('tinytext', _pd_str_dtype),
('longblob', 'object'),
('mediumblob', 'object'),
('blob', 'object'),
('tinyblob', 'object'),
('json', 'object'),
('enum', 'object'),
('set', 'object'),
('enum', _pd_str_dtype),
('set', _pd_str_dtype),
('bit', 'object'),
]

Expand Down Expand Up @@ -1266,21 +1268,21 @@ def test_alltypes_no_nulls_pandas(self):
('timestamp', 'datetime64[us]'),
('timestamp_6', 'datetime64[us]'),
('year', 'int16'),
('char_100', 'object'),
('char_100', _pd_str_dtype),
('binary_100', 'object'),
('varchar_200', 'object'),
('varchar_200', _pd_str_dtype),
('varbinary_200', 'object'),
('longtext', 'object'),
('mediumtext', 'object'),
('text', 'object'),
('tinytext', 'object'),
('longtext', _pd_str_dtype),
('mediumtext', _pd_str_dtype),
('text', _pd_str_dtype),
('tinytext', _pd_str_dtype),
('longblob', 'object'),
('mediumblob', 'object'),
('blob', 'object'),
('tinyblob', 'object'),
('json', 'object'),
('enum', 'object'),
('set', 'object'),
('enum', _pd_str_dtype),
('set', _pd_str_dtype),
('bit', 'object'),
]

Expand Down
Loading