Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
baf302b
Start event-driven example
toby-coleman Mar 16, 2025
b29dd82
None
toby-coleman Mar 16, 2025
bdd4675
WIP diagram for event-driven example
toby-coleman Mar 16, 2025
a03c94a
Prepare snippets on code
toby-coleman Mar 16, 2025
d5c218f
Fix typo in docstring
toby-coleman Mar 16, 2025
c43dba9
Writeup event-driven tutorial
toby-coleman Mar 21, 2025
be278a6
Fix broken links
toby-coleman Mar 21, 2025
a51f9a6
Update for implemented feature
toby-coleman Mar 21, 2025
c9a2036
Update links
toby-coleman Mar 21, 2025
258bd33
Add YAML version
toby-coleman Mar 21, 2025
cb19233
Add info on logging to tutorial
toby-coleman Mar 21, 2025
956148d
Add tags
toby-coleman Mar 21, 2025
6814620
Add tags
toby-coleman Mar 21, 2025
0928c57
Add tags to demos
toby-coleman Mar 21, 2025
34f9b04
Add tag index
toby-coleman Mar 21, 2025
57cb065
Add car wash demo
toby-coleman Mar 22, 2025
400a22a
Update with text descriptions
toby-coleman Mar 22, 2025
3942f86
README fixes
toby-coleman Mar 22, 2025
9244758
Minor updates
toby-coleman Mar 22, 2025
7fb6e23
Fix logger for types not supported by msgspec
toby-coleman Mar 22, 2025
78cac07
Remove logging conversions
toby-coleman Mar 22, 2025
f75bf3a
Install ipywidgets to remove warnings in notebooks
toby-coleman Mar 22, 2025
21059df
Merge remote-tracking branch 'origin/main' into docs/event-based-example
toby-coleman Apr 1, 2025
b7692b0
Update diagram
toby-coleman Apr 1, 2025
11d3e02
Merge remote-tracking branch 'origin/main' into docs/event-based-example
toby-coleman Apr 10, 2025
dd8b96a
Merge remote-tracking branch 'origin/main' into docs/event-based-example
toby-coleman Sep 15, 2025
971a22c
Rename folder
toby-coleman Sep 15, 2025
642086a
Fixup event tutorial
toby-coleman Sep 16, 2025
9ffa0a0
Fixup carwash examples
toby-coleman Sep 16, 2025
729806e
Update LLM prompt
toby-coleman Sep 17, 2025
ad955e5
Unused import
toby-coleman Sep 17, 2025
65f85d4
Delete car wash example
toby-coleman Sep 23, 2025
80fcc85
Update instructions
toby-coleman Sep 23, 2025
8becf58
Swap out for momentum signal example
toby-coleman Oct 1, 2025
1c67def
Update text
toby-coleman Oct 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ Some examples of what you can build with Plugboard include:
- A **command line interface** for executing models;
- Built to handle the **data intensive simulation** requirements of industrial process applications;
- Modern implementation with **Python 3.12 and above** based around **asyncio** with complete type annotation coverage;
- Built-in integrations for **loading/saving data** from cloud storage and SQL databases.
- Built-in integrations for **loading/saving data** from cloud storage and SQL databases;
- **Detailed logging** of component inputs, outputs and state for monitoring and process mining or surrogate modelling use-cases.

## 🔌 Installation

Expand Down Expand Up @@ -161,21 +162,20 @@ plugboard process run my-model.yaml

## 📖 Documentation

For more information including a detailed API reference and step-by-step usage examples, refer to the [documentation site](https://docs.plugboard.dev). We recommend diving into the [tutorials](https://docs.plugboard.dev/examples/tutorials/hello-world/) for a step-by-step to getting started.
For more information including a detailed API reference and step-by-step usage examples, refer to the [documentation site](https://docs.plugboard.dev). We recommend diving into the [tutorials](https://docs.plugboard.dev/latest/examples/tutorials/hello-world/) for a step-by-step to getting started.

## 🐾 Roadmap

Plugboard is under active development, with new features in the works:

- Detailed logging of component inputs, outputs and state for monitoring and process mining or surrogate modelling use-cases.
- Support for strongly typed data messages and validation based on pydantic.
- Support for different parallelisation patterns such as: single-threaded with coroutines, single-host multi process, or distributed with Ray in Kubernetes.
- Data exchange between components with popular messaging technologies like RabbitMQ and Google Pub/Sub.
- Support for different message exchange patterns such as: one-to-one, one-to-many, many-to-one etc via a broker; or peer-to-peer with http requests.

## 👋 Contributions

Contributions are welcomed and warmly received! For bug fixes and smaller feature requests feel free to open an issue on this repo. For any larger changes please get in touch with us to discuss first. More information for developers can be found in [the contributing section](https://docs.plugboard.dev/contributing/) of the docs.
Contributions are welcomed and warmly received! For bug fixes and smaller feature requests feel free to open an issue on this repo. For any larger changes please get in touch with us to discuss first. More information for developers can be found in [the contributing section](https://docs.plugboard.dev/latest/contributing/) of the docs.

## ⚖️ Licence

Expand Down
2 changes: 2 additions & 0 deletions docs/examples/tutorials/.meta.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tags:
- tutorial
81 changes: 80 additions & 1 deletion docs/examples/tutorials/event-driven-models.md
Original file line number Diff line number Diff line change
@@ -1 +1,80 @@
Tutorial coming soon.
---
tags:
- events
---
So far everything we have built in Plugboard has been a **discrete-time model**. This means that the whole model advances step-wise, i.e. `step` gets called on each [`Component`][plugboard.component.Component], calculating all of their outputs before advancing the simulation on.

In this tutorial we're going to introduce an **event-driven model**, where data can be passed around between components based on triggers that you can define. Event-based models can be useful in a variety of scenarios, for example when modelling parts moving around a production line, or passengers arriving at a transport hub.

## Event-based model

Here's the model that we're going to build. Given a stream of random numbers, we'll trigger `HighEvent` whenever the value is above `0.8` and `LowEvent` whenever the value is below `0.2`. This allows us to funnel data into different parts of the model: in this case we'll just save the latest high/low values to a file at each step. In the diagram the _dotted lines_ represent the flow of event data: `FindHighLowValues` will publish events, while `CollectHigh` and `CollectLow` will subscribe to receive high and low events respectively.

```mermaid
graph LR;
Random(random-generator)-->FindHighLowValues(find-high-low);
FindHighLowValues(find-high-low)-.->HighEvent{{high-event}};
FindHighLowValues(find-high-low)-.->LowEvent{{low-event}};
HighEvent{{high-event}}-.->CollectHigh(collect-high);
LowEvent{{low-event}}-.->CollectLow(collect-low);
CollectHigh(collect-high)-->SaveHigh(save-high);
CollectLow(collect-low)-->SaveLow(save-low);
```

## Defining events

First we need to define the events that are going to get used in the model. Each event needs a name, in this case `"high_event"` and `"low_event"` and a `data` type associated with it. Use a [Pydantic](https://docs.pydantic.dev/latest/) model to define the format of this `data` field.

```python
--8<-- "examples/tutorials/005_events/hello_events.py:events"
```

## Building components to create and consume events

So far all of our process models have run step-by-step until completion. When a model contains event-driven components, we need a way to tell them to stop at the end of the simulation, otherwise they will stay running and listening for events forever.

In this example, our `Random` component will drive the process by generating input random values. When it has completed `iters` iterations, we can use it to stop the model by sending a [`StopEvent`][plugboard.events.StopEvent], causing other event-driven components in the model to shutdown.

```python
--8<-- "examples/tutorials/005_events/hello_events.py:source-component"
```

1. Use `self.io.queue_event` to send an event from a [`Component`][plugboard.component.Component]. Here we are sending [`StopEvent`][plugboard.events.StopEvent] to stop the process once all of the random values have been generated.

Next, we will define `FindHighLowValues` to identify high and low values in the stream of random numbers and publish `HighEvent` and `LowEvent` respectively.

```python
--8<-- "examples/tutorials/005_events/hello_events.py:event-publisher"
```

1. See how we use the [`IOController`][plugboard.component.IOController] to declare that this [`Component`][plugboard.component.Component] will publish events.
2. Call `self.io.queue_event` to send an event of the correct type.

Finally, we need components to subscribe to these events and process them. Use the `Event.handler` decorator to identify the method on each [`Component`][plugboard.component.Component] that will do this processing.

```python
--8<-- "examples/tutorials/005_events/hello_events.py:event-consumers"
```

1. Specify the events that this [`Component`][plugboard.component.Component] will subscribe to.
2. Use this decorator to indicate that we handle `HighEvent` here...
3. ...and we handle `LowEvent` here.

!!! note
In a real model you could define whatever logic you need inside your event handler, e.g. create a file, publish another event, etc. Here we just store the event on an attribute so that its value can be output on the next call to `step()`.

## Putting it all together

Now we can create a [`Process`][plugboard.process.Process] from all these components. The outputs from `CollectLow` and `CollectHigh` are connected to separate [`FileWriter`][plugboard.library.FileWriter] components so that we'll get a CSV file containing the latest high and low values at each step of the simulation.

!!! info
We need a few extra lines of code to create connectors for the event-based parts of the model. If you define your process in YAML this will be done automatically for you, but if you are defining the process in code then you will need to use the [`EventConnectorBuilder`][plugboard.events.EventConnectorBuilder] to do this.

```python hl_lines="15-17"
--8<-- "examples/tutorials/005_events/hello_events.py:main"
```

1. These connectors are for the normal, non-event driven parts of the model and connect [`Component`][plugboard.component.Component]` inputs and outputs.
2. These lines will set up connectors for the events in the model.

Take a look at the `high.csv` and `low.csv` files: the first few rows will usually be empty, and then as soon as high or low values are identified they will start to appear in the CSVs. As usual, you can run this model from the CLI using `plugboard process run model.yaml`.
9 changes: 9 additions & 0 deletions docs/examples/tutorials/more-components.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
---
tags:
- logging
- llm
- io
---
Plugboard's [`Component`][plugboard.component.Component] objects can run anything you can code in Python. This includes:

* Using your own or third-party Python packages;
Expand Down Expand Up @@ -54,6 +60,9 @@ We can now define a component to query a weather API and get temperature and win
--8<-- "examples/tutorials/003_more_components/hello_llm.py:weather"
```

!!! info
See how we used `self._logger` to record log messages. All Plugboard [`Component`][plugboard.component.Component] objects have a [structlog](https://www.structlog.org/) logger on the `_logger` attribute. See [configuration](../../../usage/configuration/) for more information on configuring the logging.

### Putting it all together

As usual, we can link all our components together in a [`LocalProcess`][plugboard.process.Process] and run them as follows:
Expand Down
4 changes: 4 additions & 0 deletions docs/examples/tutorials/running-in-parallel.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
---
tags:
- ray
---
Up until now we have running all our models in a single computational process. This is perfectly sufficient for simple models, or when your components can make use of [Python's asyncio](https://docs.python.org/3/library/asyncio.html) to avoid blocking.

As your models get larger and more computationally intensive you may benefit from running parts of the model in parallel. Plugboard integrates with the [Ray](https://docs.ray.io/) framework, allowing you to split your computation across multiple CPU cores, or even across nodes in a [Ray cluster](https://docs.ray.io/en/latest/cluster/getting-started.html).
Expand Down
4 changes: 2 additions & 2 deletions docs/usage/key-concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ When implementing your own components, you will need to:
* Specify its inputs and ouputs using an [`IOController`][plugboard.component.IOController];
* Define a `step()` method the executes the main logic of your component for a single step; and
* Optionally define an `init()` method to do any required preparatory steps before the model in run.
* In the case of event based models, define custom `Event` subclasses and corresponding event handler methods decorated with `Event.handler`.
* In the case of event based models, define custom [`Event`][plugboard.events.Event] subclasses and corresponding event handler methods decorated with `Event.handler`.

### Connectors

Expand All @@ -42,7 +42,7 @@ graph LR;
A(Load data)-->D(Record output);
```

For models with explicitly declared input and output fields, connectors for each input-output pair must be defined explicitly using one of the `Connector` implementations. Connectors required for any events used in the model will be created for you automatically.
For models with explicitly declared input and output fields, connectors for each input-output pair must be defined explicitly using one of the [`Connector`][plugboard.connector.Connector] implementations. Connectors required for any events used in the model will be created for you automatically.

### Processes

Expand Down
5 changes: 5 additions & 0 deletions docs/usage/topics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Topic index

To find information on a specific topic, you can look for pages under one of the tags below.

<!-- material/tags -->
2 changes: 2 additions & 0 deletions examples/demos/llm/.meta.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tags:
- llm
3 changes: 3 additions & 0 deletions examples/demos/llm/002_bluesky_websocket/.meta.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
tags:
- io
- streaming
2 changes: 2 additions & 0 deletions examples/demos/physics-models/.meta.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tags:
- physics-models
10 changes: 7 additions & 3 deletions examples/tutorials/003_more_components/hello_llm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Simple hello world example."""
"""Simple LLM example."""

# fmt: off
import asyncio
Expand Down Expand Up @@ -36,8 +36,12 @@ async def step(self) -> None:
)
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
print(f"Error querying weather API: {e}")
except httpx.HTTPStatusError:
self._logger.error(
"Error querying weather API",
code=response.status_code,
message=response.text,
)
return
data = response.json()
self.temperature = data["current"]["temperature_2m"]
Expand Down
1 change: 1 addition & 0 deletions examples/tutorials/005_events/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
157 changes: 157 additions & 0 deletions examples/tutorials/005_events/hello_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""Event-based model example."""

# fmt: off
import asyncio
import random
import typing as _t

from pydantic import BaseModel

from plugboard.component import Component, IOController
from plugboard.connector import AsyncioConnector, ConnectorBuilder
from plugboard.events import Event, EventConnectorBuilder, StopEvent
from plugboard.library import FileWriter
from plugboard.process import LocalProcess
from plugboard.schemas import ConnectorSpec, ComponentArgsDict


# --8<-- [start:events]
class ExtremeValue(BaseModel):
"""Data for event_A."""

value: float
extreme_type: _t.Literal["high", "low"]


class HighEvent(Event):
"""High value event type."""

type: _t.ClassVar[str] = "high_event"
data: ExtremeValue


class LowEvent(Event):
"""Low value event type."""

type: _t.ClassVar[str] = "low_event"
data: ExtremeValue
# --8<-- [end:events]


# --8<-- [start:source-component]
class Random(Component):
"""Generates random numbers."""

io = IOController(outputs=["value"])

def __init__(self, iters: int = 50, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self.max_iters = iters
self.completed_iters = 0

async def step(self) -> None:
self.completed_iters += 1
self.value = random.random()
if self.completed_iters >= self.max_iters:
self.io.queue_event(StopEvent(source=self.name, data={})) # (1)!
# --8<-- [end:source-component]


# --8<-- [start:event-publisher]
class FindHighLowValues(Component):
"""Publishes an event on high or low values."""

io = IOController(inputs=["value"], output_events=[LowEvent, HighEvent]) # (1)!

def __init__(
self,
low_limit: float = 0.2,
high_limit: float = 0.8,
**kwargs: _t.Unpack[ComponentArgsDict],
) -> None:
super().__init__(**kwargs)
self.low_limit = low_limit
self.high_limit = high_limit

async def step(self) -> None:
if self.value >= self.high_limit:
self.io.queue_event( # (2)!
HighEvent(
source=self.name, data=ExtremeValue(value=self.value, extreme_type="high")
)
)
if self.value <= self.low_limit:
self.io.queue_event(
LowEvent(source=self.name, data=ExtremeValue(value=self.value, extreme_type="low"))
)
# --8<-- [end:event-publisher]


# --8<-- [start:event-consumers]
class CollectHigh(Component):
"""Collects values from high events."""

io = IOController(input_events=[HighEvent], outputs=["value"]) # (1)!

def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self.latest_event: _t.Optional[ExtremeValue] = None

async def step(self) -> None:
self.value = self.latest_event.value if self.latest_event else None

@HighEvent.handler # (2)!
async def handle_event(self, event: HighEvent) -> None:
self.latest_event = event.data


class CollectLow(Component):
"""Collects values from low events."""

io = IOController(input_events=[LowEvent], outputs=["value"])

def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self.latest_event: _t.Optional[ExtremeValue] = None

async def step(self) -> None:
self.value = self.latest_event.value if self.latest_event else None

@LowEvent.handler # (3)!
async def handle_event(self, event: LowEvent) -> None:
self.latest_event = event.data
# --8<-- [end:event-consumers]


async def main() -> None:
# --8<-- [start:main]
components = [
Random(name="random-generator"),
FindHighLowValues(name="find-high-low", low_limit=0.2, high_limit=0.8),
CollectHigh(name="collect-high"),
CollectLow(name="collect-low"),
FileWriter(name="save-high", path="high.csv", field_names=["value"]),
FileWriter(name="save-low", path="low.csv", field_names=["value"]),
]
connect = lambda in_, out_: AsyncioConnector(spec=ConnectorSpec(source=in_, target=out_))
connectors = [ # (1)!
connect("random-generator.value", "find-high-low.value"),
connect("collect-high.value", "save-high.value"),
connect("collect-low.value", "save-low.value"),
]
connector_builder = ConnectorBuilder(connector_cls=AsyncioConnector) # (2)!
event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder)
event_connectors = list(event_connector_builder.build(components).values())

process = LocalProcess(
components=components,
connectors=connectors + event_connectors,
)

async with process:
await process.run()
# --8<-- [end:main]


if __name__ == "__main__":
asyncio.run(main())
Loading