Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 10 additions & 29 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,25 @@ jobs:
- black
- ruff
- mypy
- stubtest
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
python-version: "3.x"
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: clippy
override: true
- name: Install uv
uses: astral-sh/setup-uv@v7
- name: Run lint check
uses: pre-commit/action@v3.0.0
with:
extra_args: -a ${{ matrix.cmd }}
extra_args: -a -v ${{ matrix.cmd }}
fmt:
runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -52,33 +60,6 @@ jobs:
with:
token: ${{secrets.GITHUB_TOKEN}}
deny: warnings
stubtest:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: clippy
override: true
- uses: actions/setup-python@v6
with:
python-version: 3.x
- name: Install uv
uses: astral-sh/setup-uv@v7
- id: setup-venv
name: Setup virtualenv
run: python -m venv .venv
- name: Build lib
uses: PyO3/maturin-action@v1
with:
command: dev --uv
sccache: true
- name: Run stubtest
run: |
set -e
source .venv/bin/activate
stubtest --ignore-disjoint-bases natsrpy
pytest:
runs-on: ubuntu-latest
steps:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/target
delete-me-*

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ repos:
name: python mypy
always_run: true
pass_filenames: false
args: ["python"]
args: ["python", "examples"]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.15.7
hooks:
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async-nats = "0.46"
bytes = "1.11.1"
futures-util = "0.3.32"
log = "0.4.29"
pyo3 = { version = "0.28", features = ["experimental-inspect"] }
pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] }
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
pyo3-log = "0.13.3"
serde = { version = "1.0.228", features = ["derive"] }
Expand Down
36 changes: 36 additions & 0 deletions examples/callback_subscriptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio

from natsrpy import Message, Nats


async def main() -> None:
"""Main function to run the example."""
nats = Nats(["nats://localhost:4222"])
await nats.startup()

async def callback(message: Message) -> None:
print(f"[FROM_CALLBACK] {message.payload!r}") # noqa: T201

# For callback subscriptions you can use detatch method.
#
# This method does the same as __enter__, however since
# it's a callback-based subscription, context managers
# are ususally not needed.
#
# But please save the reference somewhere, since python garbage
# collector might collect your detatched subscription and
# stop receiving any new messages.
cb_sub = await nats.subscribe("cb-subj", callback=callback).detatch()
await cb_sub.unsubscribe(limit=1)

nats.publish("cb-subj", "message for callback")

# Waiting for subscriber to read all the messages.
await cb_sub.wait()

# Don't forget to call shutdown.
await nats.shutdown()


if __name__ == "__main__":
asyncio.run(main())
4 changes: 2 additions & 2 deletions examples/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ async def main() -> None:
# We use messages() to get async iterator which we
# use to get messages for push_consumer.
async for push_message in await push_consumer.messages():
print(f"[FROM_PUSH] {push_message.payload}") # noqa: T201
print(f"[FROM_PUSH] {push_message.payload!r}") # noqa: T201
await push_message.ack()
break

# Pull consumers have to request batches of messages.
for pull_message in await pull_consumer.fetch(max_messages=10):
print(f"[FROM_PULL] {pull_message.payload}") # noqa: T201
print(f"[FROM_PULL] {pull_message.payload!r}") # noqa: T201
await pull_message.ack()

# Cleanup
Expand Down
2 changes: 0 additions & 2 deletions examples/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ async def main() -> None:

await kv.delete("test-key")

# Alternatively you can
# use await watcher.next()
async for event in watcher:
print("[EVENT]", event) # noqa: T201
break
Expand Down
20 changes: 9 additions & 11 deletions examples/request_reply.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,22 @@ async def main() -> None:
# Here we create responder, that will be
# answering to our requests.
async def responder(message: Message) -> None:
print(f"[REQUEST]: {message.payload}, headers={message.headers}") # noqa: T201
print(f"[REQUEST]: {message.payload!r}, headers={message.headers}") # noqa: T201
if message.reply:
await nats.publish(
message.reply,
f"reply to {message.payload}",
f"reply to {message.payload!r}",
headers=message.headers,
)

# Start responder using callback-based subsciption.
sub = await nats.subscribe(subj, callback=responder)
# Send 3 concurrent requests.
responses = await asyncio.gather(
nats.request(subj, "request1"),
nats.request(subj, "request2", headers={"header": "value"}),
nats.request(subj, "request3", inbox="test-inbox"),
)
# Disconnect resonder.
await sub.drain()
async with nats.subscribe(subj, callback=responder):
# Send 3 concurrent requests.
responses = await asyncio.gather(
nats.request(subj, "request1"),
nats.request(subj, "request2", headers={"header": "value"}),
nats.request(subj, "request3", inbox="test-inbox"),
)

# Iterate over replies.
for resp in responses:
Expand Down
29 changes: 14 additions & 15 deletions examples/simple_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,20 @@ async def main() -> None:
# Here we initiate subscription.
# We do it before sending messages,
# in order to catch them once we will start reading.
subscription = await nats.subscribe("hello")

# Publish accepts str | bytes | bytearray | memoryview
await nats.publish("hello", "str world")
await nats.publish("hello", b"bytes world")
await nats.publish("hello", bytearray(b"bytearray world"))
await nats.publish("hello", "headers", headers={"one": "two"})
await nats.publish("hello", "headers", headers={"one": ["two", "three"]})

# Calling this method will unsubscribe us,
# after `n` delivered messages.
# or immediately if `n` is not provided.
subscription.unsubscribe(limit=5)
async for message in subscription:
print(message) # noqa: T201
async with nats.subscribe("hello") as subscription:
# Publish accepts str | bytes | bytearray | memoryview
await nats.publish("hello", "str world")
await nats.publish("hello", b"bytes world")
await nats.publish("hello", bytearray(b"bytearray world"))
await nats.publish("hello", "headers", headers={"one": "two"})
await nats.publish("hello", "headers", headers={"one": ["two", "three"]})

# Calling this method will unsubscribe us,
# after `n` delivered messages.
# or immediately if `n` is not provided.
subscription.unsubscribe(limit=5)
async for message in subscription:
print(message) # noqa: T201

# Don't forget to call shutdown.
await nats.shutdown()
Expand Down
65 changes: 31 additions & 34 deletions examples/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,39 @@ async def main() -> None:
cb_lock = asyncio.Event()

async def callback(message: Message) -> None:
print(f"[FROM_CALLBACK] {message.payload}") # noqa: T201
print(f"[FROM_CALLBACK] {message.payload!r}") # noqa: T201
cb_lock.set()

# When subscribing you can set callback.
# In that case CallbackSubscription is returned.
# This type of subscription cannot be iterated.
cb_sub = await nats.subscribe("cb-subj", callback=callback)

# When callback is not set, you get a subscription
# that should be used along with `async for`
# loop, or alternatively you can call
# `await iter_sub.next()` to get a single message.
iter_sub = await nats.subscribe("iter-subj")

# Subscriptions with queue argument create
# subscription with a queue group to distribute
# messages along all subscribers.
queue_sub = await nats.subscribe("queue-subj", queue="example-queue")

await nats.publish("cb-subj", "message for callback")
await nats.publish("iter-subj", "message for iterator")
await nats.publish("queue-subj", "message for queue sub")

# We can unsubscribe after a particular amount of messages.
await iter_sub.unsubscribe(limit=1)
await cb_sub.unsubscribe(limit=1)
await queue_sub.unsubscribe(limit=1)

async for message in iter_sub:
print(f"[FROM_ITERATOR] {message.payload}") # noqa: T201

async for message in queue_sub:
print(f"[FROM_QUEUED] {message.payload}") # noqa: T201

# Making sure that the message in callback is received.
await cb_lock.wait()
async with (
# When subscribing you can set callback.
# In that case CallbackSubscription is returned.
# This type of subscription cannot be iterated.
nats.subscribe("cb-subj", callback=callback) as cb_sub,
# When callback is not set, you get a subscription
# that should be used along with `async for`
nats.subscribe("iter-subj") as iter_sub,
# Subscriptions with queue argument create
# subscription with a queue group to distribute
# messages along all subscribers.
nats.subscribe("queue-subj", queue="example-queue") as queue_sub,
):
await nats.publish("cb-subj", "message for callback")
await nats.publish("iter-subj", "message for iterator")
await nats.publish("queue-subj", "message for queue sub")

# We can unsubscribe after a particular amount of messages.
await iter_sub.unsubscribe(limit=1)
await cb_sub.unsubscribe(limit=1)
await queue_sub.unsubscribe(limit=1)

async for message in iter_sub:
print(f"[FROM_ITERATOR] {message.payload!r}") # noqa: T201

async for message in queue_sub:
print(f"[FROM_QUEUED] {message.payload!r}") # noqa: T201

# Making sure that the message in callback is received.
await cb_lock.wait()

# Don't forget to call shutdown.
await nats.shutdown()
Expand Down
23 changes: 20 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ dynamic = ["version"]
name = "Pavel Kirilin"
email = "s3riussan@gmail.com"

[project.entry-points.opentelemetry_instrumentor]
natsrpy = "natsrpy.instrumentation:NatsrpyInstrumentor"

[project.optional-dependencies]
opentelemetry = [
"opentelemetry-api (>=1.38.0,<2.0.0)",
"opentelemetry-instrumentation (>=0.59b0,<1)",
"opentelemetry-semantic-conventions (>=0.59b0,<1)",
]

[dependency-groups]
dev = [
"anyio>=4,<5",
Expand All @@ -38,9 +48,6 @@ dev = [
requires = ["maturin>=1.12,<2.0"]
build-backend = "maturin"

[tool.uv]
package = false

[tool.maturin]
bindings = "pyo3"
features = ["pyo3/extension-module"]
Expand All @@ -55,11 +62,18 @@ packages = ["natsrpy"]
pretty = true
implicit_reexport = true
allow_untyped_decorators = true
namespace_packages = true
warn_return_any = false

[tool.pytest]
anyio_mode = "auto"

[tool.coverage.run]
omit = [
"python/tests/**/*",
"python/natsrpy/instrumentation/**/*",
]

[tool.ruff]
target-version = "py310"
exclude = [".venv/"]
Expand Down Expand Up @@ -128,3 +142,6 @@ ignore-decorators = ["typing.overload"]

[tool.ruff.lint.pylint]
allow-magic-value-types = ["int", "str", "float"]

[tool.uv]
package = false
Loading
Loading