diff --git a/.github/instructions/models.instructions.md b/.github/instructions/models.instructions.md
index 49c05273..a2a3cf8f 100644
--- a/.github/instructions/models.instructions.md
+++ b/.github/instructions/models.instructions.md
@@ -95,4 +95,85 @@ import asyncio
async with process:
await process.run()
-```
\ No newline at end of file
+```
+
+## Event-driven models
+
+You can help users to implement event-driven models using Plugboard's event system. Components can emit and handle events to communicate with each other.
+
+Examples of where you might want to use events include:
+* A component that monitors a data stream and emits an event when a threshold is crossed;
+* A component that listens for events and triggers actions in response, e.g. sending an alert;
+* A trading algorithm that uses events to signal buy/sell decisions.
+
+Events must be defined by inheriting from the `plugboard.events.Event` class. Each event class should define the data it carries using a Pydantic `BaseModel`. For example:
+
+```python
+from pydantic import BaseModel
+from plugboard.events import Event
+
+class MyEventData(BaseModel):
+ some_value: int
+ another_value: str
+
+class MyEvent(Event):
+ data: MyEventData
+```
+
+Components can emit events using the `self.io.queue_event()` method or by returning them from an event handler. Event handlers are defined using methods decorated with `@EventClass.handler`. For example:
+
+```python
+from plugboard.component import Component, IOController as IO
+
+class MyEventPublisher(Component):
+ io = IO(inputs=["some_input"], output_events=[MyEvent])
+
+ async def step(self) -> None:
+ # Emit an event
+ event_data = MyEventData(some_value=42, another_value=f"received {self.some_input}")
+ self.io.queue_event(MyEvent(source=self.name, data=event_data))
+
+class MyEventSubscriber(Component):
+ io = IO(input_events=[MyEvent], output_events=[MyEvent])
+
+ @MyEvent.handler
+ async def handle_my_event(self, event: MyEvent) -> MyEvent:
+ # Handle the event
+ print(f"Received event: {event.data}")
+ output_event_data = MyEventData(some_value=event.data.some_value + 1, another_value="handled")
+ return MyEvent(source=self.name, data=output_event_data)
+```
+
+To assemble a process with event-driven components, you can use the same approach as for non-event-driven components. You will need to create connectors for event-driven components using `plugboard.events.event_connector_builder.EventConnectorBuilder`. For example:
+
+```python
+from plugboard.connector import AsyncioConnector, ConnectorBuilder
+from plugboard.events.event_connector_builder import EventConnectorBuilder
+from plugboard.process import LocalProcess
+
+# Define components....
+component_1 = ...
+component_2 = ...
+
+# Define connectors for non-event components as before
+connect = lambda in_, out_: AsyncioConnector(spec=ConnectorSpec(source=in_, target=out_))
+connectors = [
+ connect("component_1.output", "component_2.input"),
+ ...
+]
+
+connector_builder = ConnectorBuilder(connector_cls=AsyncioConnector)
+event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder)
+event_connectors = list(event_connector_builder.build(components).values())
+
+process = LocalProcess(
+ components=[
+ component_1, component_2, ...
+ ],
+ connectors=connectors + event_connectors,
+)
+```
+
+## Exporting models
+
+If the user wants to export their model you use in the CLI, you can do this by calling `process.dump("path/to/file.yaml")`.
diff --git a/docs/examples/tutorials/event-driven-models.md b/docs/examples/tutorials/event-driven-models.md
index 4047a74c..b0b99843 100644
--- a/docs/examples/tutorials/event-driven-models.md
+++ b/docs/examples/tutorials/event-driven-models.md
@@ -1 +1,78 @@
-Tutorial coming soon.
+---
+tags:
+ - events
+---
+So far everything we have built in Plugboard has been a **discrete-time model**. This means that the whole model advances step-wise, i.e. `step` gets called on each [`Component`][plugboard.component.Component], calculating all of their outputs before advancing the simulation on.
+
+In this tutorial we're going to introduce an **event-driven model**, where data can be passed around between components based on triggers that you can define. Event-based models can be useful in a variety of scenarios, for example when modelling parts moving around a production line, or to trigger expensive computation only when certain conditions are met in the model.
+
+## Event-based model
+
+Here's the model that we're going to build. Given a stream of random numbers, we'll trigger `HighEvent` whenever the value is above `0.8` and `LowEvent` whenever the value is below `0.2`. This allows us to funnel data into different parts of the model: in this case we'll just save the latest high/low values to a file at each step. In the diagram the _dotted lines_ represent the flow of event data: `FindHighLowValues` will publish events, while `CollectHigh` and `CollectLow` will subscribe to receive high and low events respectively.
+
+```mermaid
+flowchart LR
+ collect-high@{ shape: rounded, label: CollectHigh
**collect-high** } --> save-high@{ shape: rounded, label: FileWriter
**save-high** }
+ collect-low@{ shape: rounded, label: CollectLow
**collect-low** } --> save-low@{ shape: rounded, label: FileWriter
**save-low** }
+ random-generator@{ shape: rounded, label: Random
**random-generator** } --> find-high-low@{ shape: rounded, label: FindHighLowValues
**find-high-low** }
+ low_event@{ shape: hex, label: LowEvent } -.-> collect-low@{ shape: rounded, label: CollectLow
**collect-low** }
+ high_event@{ shape: hex, label: HighEvent } -.-> collect-high@{ shape: rounded, label: CollectHigh
**collect-high** }
+ find-high-low@{ shape: rounded, label: FindHighLowValues
**find-high-low** } -.-> high_event@{ shape: hex, label: HighEvent }
+ find-high-low@{ shape: rounded, label: FindHighLowValues
**find-high-low** } -.-> low_event@{ shape: hex, label: LowEvent }
+```
+
+## Defining events
+
+First we need to define the events that are going to get used in the model. Each event needs a name, in this case `"high_event"` and `"low_event"` and a `data` type associated with it. Use a [Pydantic](https://docs.pydantic.dev/latest/) model to define the format of this `data` field.
+
+```python
+--8<-- "examples/tutorials/005_events/hello_events.py:events"
+```
+
+## Building components to create and consume events
+
+So far all of our process models have run step-by-step until completion. When a model contains event-driven components, we need a way to tell them to stop at the end of the simulation, otherwise they will stay running and listening for events forever.
+
+In this example, our `Random` component will drive the process by generating input random values. When it has completed `iters` iterations, we call `self.io.close()` to stop the model, causing other components in the model to shutdown.
+
+```python
+--8<-- "examples/tutorials/005_events/hello_events.py:source-component"
+```
+
+Next, we will define `FindHighLowValues` to identify high and low values in the stream of random numbers and publish `HighEvent` and `LowEvent` respectively.
+
+```python
+--8<-- "examples/tutorials/005_events/hello_events.py:event-publisher"
+```
+
+1. See how we use the [`IOController`][plugboard.component.IOController] to declare that this [`Component`][plugboard.component.Component] will publish events.
+2. Use `self.io.queue_event` to send an event from a [`Component`][plugboard.component.Component]. Here we are senging the `HighEvent` or `LowEvent` depending on the input value.
+
+Finally, we need components to subscribe to these events and process them. Use the `Event.handler` decorator to identify the method on each [`Component`][plugboard.component.Component] that will do this processing.
+
+```python
+--8<-- "examples/tutorials/005_events/hello_events.py:event-consumers"
+```
+
+1. Specify the events that this [`Component`][plugboard.component.Component] will subscribe to.
+2. Use this decorator to indicate that we handle `HighEvent` here...
+3. ...and we handle `LowEvent` here.
+
+!!! note
+ In a real model you could define whatever logic you need inside your event handler, e.g. create a file, publish another event, etc. Here we just store the event on an attribute so that its value can be output via the `step()` method.
+
+## Putting it all together
+
+Now we can create a [`Process`][plugboard.process.Process] from all these components. The outputs from `CollectLow` and `CollectHigh` are connected to separate [`FileWriter`][plugboard.library.FileWriter] components so that we'll get a CSV file containing the latest high and low values at each step of the simulation.
+
+!!! info
+ We need a few extra lines of code to create connectors for the event-based parts of the model. If you define your process in YAML this will be done automatically for you, but if you are defining the process in code then you will need to use the [`EventConnectorBuilder`][plugboard.events.EventConnectorBuilder] to do this.
+
+```python hl_lines="15-17"
+--8<-- "examples/tutorials/005_events/hello_events.py:main"
+```
+
+1. These connectors are for the normal, non-event driven parts of the model and connect [`Component`][plugboard.component.Component]` inputs and outputs.
+2. These lines will set up connectors for the events in the model.
+
+Take a look at the `high.csv` and `low.csv` files: the first few rows will usually be empty, and then as soon as high or low values are identified they will start to appear in the CSVs. As usual, you can run this model from the CLI using `plugboard process run model.yaml`.
\ No newline at end of file
diff --git a/examples/demos/finance/.meta.yml b/examples/demos/finance/.meta.yml
new file mode 100644
index 00000000..f53b3e9a
--- /dev/null
+++ b/examples/demos/finance/.meta.yml
@@ -0,0 +1,2 @@
+tags:
+ - finance
\ No newline at end of file
diff --git a/examples/demos/finance/001_momentum_signal/.gitignore b/examples/demos/finance/001_momentum_signal/.gitignore
new file mode 100644
index 00000000..16f2dc5f
--- /dev/null
+++ b/examples/demos/finance/001_momentum_signal/.gitignore
@@ -0,0 +1 @@
+*.csv
\ No newline at end of file
diff --git a/examples/demos/finance/001_momentum_signal/.meta.yml b/examples/demos/finance/001_momentum_signal/.meta.yml
new file mode 100644
index 00000000..9e58b17e
--- /dev/null
+++ b/examples/demos/finance/001_momentum_signal/.meta.yml
@@ -0,0 +1,2 @@
+tags:
+ - events
\ No newline at end of file
diff --git a/examples/demos/finance/001_momentum_signal/momentum-signal.ipynb b/examples/demos/finance/001_momentum_signal/momentum-signal.ipynb
new file mode 100644
index 00000000..db33182d
--- /dev/null
+++ b/examples/demos/finance/001_momentum_signal/momentum-signal.ipynb
@@ -0,0 +1,616 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "0",
+ "metadata": {},
+ "source": [
+ "# Momentum trading\n",
+ "\n",
+ "[](https://codespaces.new/plugboard-dev/plugboard)\n",
+ "\n",
+ "This notebook implements a simple momentum strategy on the S&P 500 using Plugboard’s event-driven components:\n",
+ "\n",
+ "- Data source: streams S&P 500 prices from Yahoo! Finance;\n",
+ "- Indicators: three pairs of exponential moving averages (fast/medium/slow);\n",
+ "- Signals: compare EMAs to create buy/sell signals;\n",
+ "- Events: combine three signals into a TradeEvent (weak/strong buy/sell);\n",
+ "- Sink: write trades to a CSV file for inspection.\n",
+ "\n",
+ "You can run the process, then visualize trades on a price chart."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "1",
+ "metadata": {},
+ "source": [
+ "Here's a diagram to illustrate the whole process:\n",
+ "\n",
+ ""
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "2",
+ "metadata": {},
+ "source": [
+ "### Imports and helpers\n",
+ "\n",
+ "We import Plugboard’s core building blocks and define a small helper to create connectors:\n",
+ "\n",
+ "- Components expose named inputs/outputs and can emit/receive events.\n",
+ "- Connectors move values between component fields.\n",
+ "- Event connectors route declared events between publishers and subscribers automatically."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "3",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import pandas as pd\n",
+ "import datetime as dt\n",
+ "import typing as _t\n",
+ "\n",
+ "from plugboard.connector import AsyncioConnector, ConnectorBuilder\n",
+ "from plugboard.events import EventConnectorBuilder\n",
+ "from plugboard.process import LocalProcess\n",
+ "from plugboard.schemas import ConnectorSpec\n",
+ "\n",
+ "import pandas as pd\n",
+ "import plotly.graph_objects as go\n",
+ "\n",
+ "import yfinance as yf\n",
+ "from pydantic import BaseModel\n",
+ "from plugboard.events import Event\n",
+ "\n",
+ "from plugboard.component import Component, IOController as IO\n",
+ "from plugboard.schemas import ComponentArgsDict\n",
+ "\n",
+ "# Helper to create field connectors\n",
+ "connect = lambda src, tgt: AsyncioConnector(spec=ConnectorSpec(source=src, target=tgt))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "4",
+ "metadata": {},
+ "source": [
+ "### Components: data and indicators\n",
+ "\n",
+ "- YahooPriceLoader streams price and timestamp row-by-row from Yahoo Finance for ^GSPC.\n",
+ "- EMA consumes `price` and emits an exponentially weighted moving average as `ema`.\n",
+ "\n",
+ "Components declare their I/O via `IOController`, giving Plugboard enough metadata to wire processes."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "5",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def _ensure_dt(val: _t.Any) -> dt.datetime:\n",
+ " if isinstance(val, dt.datetime):\n",
+ " return val\n",
+ " if isinstance(val, dt.date):\n",
+ " return dt.datetime.combine(val, dt.time())\n",
+ " return pd.to_datetime(val).to_pydatetime()\n",
+ "\n",
+ "\n",
+ "class YahooPriceLoader(Component):\n",
+ " \"\"\"Loads historical prices for a symbol from Yahoo Finance and streams them row by row.\n",
+ "\n",
+ " Outputs per step:\n",
+ " price: float - adjusted close price (or close if adj not present)\n",
+ " timestamp: datetime\n",
+ " \"\"\"\n",
+ "\n",
+ " io = IO(outputs=[\"price\", \"timestamp\"]) # stream out prices\n",
+ "\n",
+ " def __init__(\n",
+ " self,\n",
+ " symbol: str = \"^GSPC\",\n",
+ " period: str | None = None,\n",
+ " start: str | dt.date | None = None,\n",
+ " end: str | dt.date | None = None,\n",
+ " interval: str = \"1d\",\n",
+ " limit: int | None = None,\n",
+ " **kwargs: _t.Unpack[ComponentArgsDict],\n",
+ " ) -> None:\n",
+ " super().__init__(**kwargs)\n",
+ " self.symbol = symbol\n",
+ " self.period = period\n",
+ " self.start = start\n",
+ " self.end = end\n",
+ " self.interval = interval\n",
+ " self.limit = limit\n",
+ " self._data: pd.DataFrame | None = None\n",
+ " self._iter = 0\n",
+ "\n",
+ " async def _ensure_data(self) -> None:\n",
+ " if self._data is not None:\n",
+ " return\n",
+ " if yf is None: # pragma: no cover - runtime safeguard\n",
+ " raise RuntimeError(\"yfinance not installed. Please 'pip install yfinance'.\")\n",
+ " df = yf.download(\n",
+ " self.symbol,\n",
+ " period=self.period,\n",
+ " start=self.start,\n",
+ " end=self.end,\n",
+ " interval=self.interval,\n",
+ " progress=False,\n",
+ " )\n",
+ " if df.empty:\n",
+ " raise RuntimeError(f\"No data returned for symbol {self.symbol}\")\n",
+ " # Prefer Adj Close if exists\n",
+ " if \"Adj Close\" in df.columns:\n",
+ " df.rename(columns={\"Adj Close\": \"AdjClose\"}, inplace=True)\n",
+ " price_col = \"AdjClose\"\n",
+ " elif \"Close\" in df.columns:\n",
+ " price_col = \"Close\"\n",
+ " else:\n",
+ " price_col = df.columns[0]\n",
+ " df = df[[price_col]].rename(columns={price_col: \"price\"})\n",
+ " df.index.name = \"timestamp\"\n",
+ " df.reset_index(inplace=True)\n",
+ " if self.limit is not None:\n",
+ " df = df.head(self.limit)\n",
+ " # Remove column multi-index if present\n",
+ " self._data = df.droplevel(1, axis=\"columns\")\n",
+ "\n",
+ " @property\n",
+ " def df(self) -> pd.DataFrame:\n",
+ " \"\"\"The full DataFrame of loaded price data.\"\"\"\n",
+ " if self._data is None:\n",
+ " raise RuntimeError(\"Data not yet loaded. Call step() first.\")\n",
+ " return self._data\n",
+ "\n",
+ " async def step(self) -> None: # noqa: D401\n",
+ " await self._ensure_data()\n",
+ " if self._iter >= len(self._data):\n",
+ " await self.io.close()\n",
+ " return\n",
+ " row = self._data.iloc[self._iter]\n",
+ " self.price = float(row[\"price\"])\n",
+ " ts = row[\"timestamp\"]\n",
+ " self.timestamp = _ensure_dt(ts)\n",
+ " self._iter += 1"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "6",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "class EMA(Component):\n",
+ " \"\"\"Computes an exponential moving average of an input price stream.\n",
+ "\n",
+ " Inputs:\n",
+ " price: float\n",
+ " Outputs:\n",
+ " ema: float\n",
+ " \"\"\"\n",
+ "\n",
+ " io = IO(inputs=[\"price\"], outputs=[\"ema\"])\n",
+ "\n",
+ " def __init__(\n",
+ " self,\n",
+ " alpha: float | None = None,\n",
+ " span: int | None = None,\n",
+ " **kwargs: _t.Unpack[ComponentArgsDict],\n",
+ " ) -> None:\n",
+ " super().__init__(**kwargs)\n",
+ " if alpha is None and span is None:\n",
+ " raise ValueError(\"Provide either alpha or span\")\n",
+ " if alpha is not None and not (0 < alpha <= 1):\n",
+ " raise ValueError(\"alpha must be (0,1]\")\n",
+ " self.alpha = alpha if alpha is not None else 2 / (span + 1)\n",
+ " self._ema: float | None = None\n",
+ "\n",
+ " async def step(self) -> None:\n",
+ " price = float(self.price)\n",
+ " if self._ema is None:\n",
+ " self._ema = price\n",
+ " else:\n",
+ " self._ema = self.alpha * price + (1 - self.alpha) * self._ema\n",
+ " self.ema = self._ema"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7",
+ "metadata": {},
+ "source": [
+ "### Components: signals and events\n",
+ "\n",
+ "- CrossoverSignal reads two EMAs (`fast`, `slow`) and emits a directional `signal`.\n",
+ "- TradeSignalAggregator takes three signals plus the current `price` and `timestamp`, and emits a `TradeEvent` with direction/size/strength.\n",
+ "- TradeEventFileWriter subscribes to `TradeEvent` and appends a CSV row per event."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "8",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "class CrossoverSignal(Component):\n",
+ " \"\"\"Generates buy/sell/hold signal from two moving averages.\n",
+ "\n",
+ " Inputs:\n",
+ " fast: float\n",
+ " slow: float\n",
+ " Outputs:\n",
+ " signal: int (1=buy, -1=sell)\n",
+ " \"\"\"\n",
+ "\n",
+ " io = IO(inputs=[\"fast\", \"slow\"], outputs=[\"signal\"])\n",
+ "\n",
+ " def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:\n",
+ " super().__init__(**kwargs)\n",
+ "\n",
+ " async def step(self) -> None:\n",
+ " fast = float(self.fast)\n",
+ " slow = float(self.slow)\n",
+ " self.signal = 1 if fast >= slow else -1"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "9",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "class TradeData(BaseModel):\n",
+ " \"\"\"Data for a trade decision.\n",
+ "\n",
+ " side: \"buy\" or \"sell\"\n",
+ " size: number of units\n",
+ " price: execution price\n",
+ " timestamp: event time\n",
+ " strength: \"strong\" | \"weak\"\n",
+ " count_buy: number of buy signals\n",
+ " count_sell: number of sell signals\n",
+ " \"\"\"\n",
+ "\n",
+ " side: _t.Literal[\"buy\", \"sell\"]\n",
+ " size: int\n",
+ " price: float\n",
+ " timestamp: dt.datetime\n",
+ " strength: _t.Literal[\"strong\", \"weak\"]\n",
+ " count_buy: int\n",
+ " count_sell: int\n",
+ "\n",
+ "\n",
+ "class TradeEvent(Event):\n",
+ " \"\"\"Event emitted when strategy decides to trade.\"\"\"\n",
+ "\n",
+ " type: _t.ClassVar[str] = \"trade_event\"\n",
+ " data: TradeData\n",
+ "\n",
+ "\n",
+ "class TradeSignalAggregator(Component):\n",
+ " \"\"\"Aggregates three crossover signals into trade events.\n",
+ "\n",
+ " Inputs:\n",
+ " s1, s2, s3 (int signals: 1 buy, -1 sell, 0 hold)\n",
+ " price (float)\n",
+ " timestamp (datetime)\n",
+ " Output events:\n",
+ " TradeEvent\n",
+ " Logic:\n",
+ " strong buy = 3 buys -> size 2\n",
+ " weak buy = 2 buys 1 sell -> size 1\n",
+ " strong sell = 3 sells -> size 2\n",
+ " weak sell = 2 sells 1 buy -> size 1\n",
+ " \"\"\"\n",
+ "\n",
+ " io = IO(\n",
+ " inputs=[\"s1\", \"s2\", \"s3\", \"price\", \"timestamp\"],\n",
+ " output_events=[TradeEvent],\n",
+ " )\n",
+ "\n",
+ " def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:\n",
+ " super().__init__(**kwargs)\n",
+ " self._previous_signal: int | None = None\n",
+ "\n",
+ " async def step(self) -> None:\n",
+ " signals = [int(self.s1), int(self.s2), int(self.s3)]\n",
+ " count_buy = sum(1 for s in signals if s == 1)\n",
+ " count_sell = sum(1 for s in signals if s == -1)\n",
+ " net_signal = count_buy - count_sell\n",
+ " if net_signal >= 2:\n",
+ " decision, strength, size = \"buy\", \"strong\", 2\n",
+ " elif net_signal == 1:\n",
+ " decision, strength, size = \"buy\", \"weak\", 1\n",
+ " elif net_signal <= -2:\n",
+ " decision, strength, size = \"sell\", \"strong\", 2\n",
+ " elif net_signal == -1:\n",
+ " decision, strength, size = \"sell\", \"weak\", 1\n",
+ "\n",
+ " if net_signal != self._previous_signal:\n",
+ " self._logger.info(\n",
+ " f\"Emitting trade decision\", decision=decision, size=size, strength=strength\n",
+ " )\n",
+ " trade = TradeEvent(\n",
+ " source=self.name,\n",
+ " data=TradeData(\n",
+ " side=decision,\n",
+ " size=size,\n",
+ " price=float(self.price),\n",
+ " timestamp=_ensure_dt(self.timestamp),\n",
+ " strength=strength,\n",
+ " count_buy=count_buy,\n",
+ " count_sell=count_sell,\n",
+ " ),\n",
+ " )\n",
+ " self.io.queue_event(trade)\n",
+ " self._previous_signal = net_signal"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "10",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "class TradeEventFileWriter(Component):\n",
+ " \"\"\"Consumes trade events and writes them to a CSV file (append mode).\"\"\"\n",
+ "\n",
+ " io = IO(input_events=[TradeEvent])\n",
+ "\n",
+ " def __init__(self, path: str = \"trades.csv\", **kwargs: _t.Unpack[ComponentArgsDict]) -> None:\n",
+ " super().__init__(**kwargs)\n",
+ " self.path = path\n",
+ " # Write header\n",
+ " with open(self.path, \"w\", encoding=\"utf-8\") as f:\n",
+ " f.write(\"timestamp,side,size,price,strength,count_buy,count_sell,source\\n\")\n",
+ "\n",
+ " @TradeEvent.handler\n",
+ " async def handle_trade(self, event: TradeEvent) -> None: # noqa: D401\n",
+ " d = event.data\n",
+ " with open(self.path, \"a\", encoding=\"utf-8\") as f:\n",
+ " f.write(\n",
+ " f\"{d.timestamp.isoformat()},{d.side},{d.size},{d.price:.4f},{d.strength},{d.count_buy},{d.count_sell},{event.source}\\n\"\n",
+ " )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "11",
+ "metadata": {},
+ "source": [
+ "### Wire the process\n",
+ "\n",
+ "Here we:\n",
+ "\n",
+ "- Instantiate the source, indicator, signal, aggregator, and writer components.\n",
+ "- Connect fields with `AsyncioConnector` (price→EMAs, EMAs→signals, signals→aggregator).\n",
+ "- Build event connectors so `TradeEvent` flows from the aggregator to the file writer.\n",
+ "- Create a `LocalProcess` to run everything in-process."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "12",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Build components\n",
+ "price_loader = YahooPriceLoader(name=\"loader\", period=\"10y\", interval=\"1d\")\n",
+ "\n",
+ "# Three EMAs with different speeds (adjust spans as desired)\n",
+ "ema_fast_1 = EMA(name=\"ema-fast-1\", span=8)\n",
+ "ema_fast_2 = EMA(name=\"ema-fast-2\", span=15)\n",
+ "ema_medium_1 = EMA(name=\"ema-medium-1\", span=30)\n",
+ "ema_medium_2 = EMA(name=\"ema-medium-2\", span=50)\n",
+ "ema_slow_1 = EMA(name=\"ema-slow-1\", span=80)\n",
+ "ema_slow_2 = EMA(name=\"ema-slow-2\", span=150)\n",
+ "\n",
+ "# Signals from different pairings\n",
+ "sig_fast = CrossoverSignal(name=\"sig-fast\")\n",
+ "sig_medium = CrossoverSignal(name=\"sig-medium\")\n",
+ "sig_slow = CrossoverSignal(name=\"sig-slow\")\n",
+ "\n",
+ "# Aggregator producing trade events\n",
+ "aggregator = TradeSignalAggregator(name=\"trade-aggregator\")\n",
+ "trade_writer = TradeEventFileWriter(name=\"trade-writer\", path=\"trades.csv\")\n",
+ "\n",
+ "components = [\n",
+ " price_loader,\n",
+ " ema_fast_1,\n",
+ " ema_fast_2,\n",
+ " ema_medium_1,\n",
+ " ema_medium_2,\n",
+ " ema_slow_1,\n",
+ " ema_slow_2,\n",
+ " sig_fast,\n",
+ " sig_medium,\n",
+ " sig_slow,\n",
+ " aggregator,\n",
+ " trade_writer,\n",
+ "]\n",
+ "\n",
+ "# Field connectors\n",
+ "connectors = [\n",
+ " # Price to EMAs\n",
+ " connect(\"loader.price\", \"ema-fast-1.price\"),\n",
+ " connect(\"loader.price\", \"ema-fast-2.price\"),\n",
+ " connect(\"loader.price\", \"ema-medium-1.price\"),\n",
+ " connect(\"loader.price\", \"ema-medium-2.price\"),\n",
+ " connect(\"loader.price\", \"ema-slow-1.price\"),\n",
+ " connect(\"loader.price\", \"ema-slow-2.price\"),\n",
+ " # Convert the three pairs of EMAs into signals\n",
+ " connect(\"ema-fast-1.ema\", \"sig-fast.fast\"),\n",
+ " connect(\"ema-fast-2.ema\", \"sig-fast.slow\"),\n",
+ " connect(\"ema-medium-1.ema\", \"sig-medium.fast\"),\n",
+ " connect(\"ema-medium-2.ema\", \"sig-medium.slow\"),\n",
+ " connect(\"ema-slow-1.ema\", \"sig-slow.fast\"),\n",
+ " connect(\"ema-slow-2.ema\", \"sig-slow.slow\"),\n",
+ " # Signals + price + timestamp into aggregator\n",
+ " connect(\"sig-fast.signal\", \"trade-aggregator.s1\"),\n",
+ " connect(\"sig-medium.signal\", \"trade-aggregator.s2\"),\n",
+ " connect(\"sig-slow.signal\", \"trade-aggregator.s3\"),\n",
+ " connect(\"loader.price\", \"trade-aggregator.price\"),\n",
+ " connect(\"loader.timestamp\", \"trade-aggregator.timestamp\"),\n",
+ "]\n",
+ "\n",
+ "# Event connectors\n",
+ "builder = ConnectorBuilder(connector_cls=AsyncioConnector)\n",
+ "event_builder = EventConnectorBuilder(connector_builder=builder)\n",
+ "event_connectors = list(event_builder.build(components).values())\n",
+ "\n",
+ "process = LocalProcess(components=components, connectors=connectors + event_connectors)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "13",
+ "metadata": {},
+ "source": [
+ "### Run the process\n",
+ "\n",
+ "Running the process iterates over the price history, updates indicators, produces signals, emits trade events, and writes them to `trades.csv`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "14",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "async with process:\n",
+ " await process.run()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "15",
+ "metadata": {},
+ "source": [
+ "### Visualize trades from CSV\n",
+ "\n",
+ "After the run, `trades.csv` contains one row per `TradeEvent`. We overlay buy/sell markers on the price series to see where the strategy acted."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "16",
+ "metadata": {},
+ "source": []
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "17",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "df_prices = price_loader.df\n",
+ "df_trades = pd.read_csv(\"trades.csv\", parse_dates=[\"timestamp\"])\n",
+ "\n",
+ "traces = [\n",
+ " go.Scatter(x=df_prices[\"timestamp\"], y=df_prices[\"price\"], mode=\"lines\", name=\"Price\"),\n",
+ " go.Scatter(\n",
+ " x=df_trades[df_trades[\"side\"] == \"buy\"][\"timestamp\"],\n",
+ " y=df_trades[df_trades[\"side\"] == \"buy\"][\"price\"],\n",
+ " mode=\"markers\",\n",
+ " name=\"Buy\",\n",
+ " marker=dict(\n",
+ " color=\"green\",\n",
+ " symbol=\"triangle-up\",\n",
+ " size=df_trades[df_trades[\"side\"] == \"buy\"][\"strength\"].map(\n",
+ " lambda x: 18 if x == \"strong\" else 12\n",
+ " ),\n",
+ " ),\n",
+ " ),\n",
+ " go.Scatter(\n",
+ " x=df_trades[df_trades[\"side\"] == \"sell\"][\"timestamp\"],\n",
+ " y=df_trades[df_trades[\"side\"] == \"sell\"][\"price\"],\n",
+ " mode=\"markers\",\n",
+ " name=\"Sell\",\n",
+ " marker=dict(\n",
+ " color=\"red\",\n",
+ " symbol=\"triangle-down\",\n",
+ " size=df_trades[df_trades[\"side\"] == \"sell\"][\"strength\"].map(\n",
+ " lambda x: 18 if x == \"strong\" else 12\n",
+ " ),\n",
+ " ),\n",
+ " ),\n",
+ "]\n",
+ "fig = go.Figure(data=traces)\n",
+ "fig"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "18",
+ "metadata": {},
+ "source": [
+ "### Visualize the process diagram\n",
+ "\n",
+ "We can render a Mermaid diagram of the Plugboard process, showing components, fields, and event flows. This helps debug and document the model wiring."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "19",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Visualise the process\n",
+ "from plugboard.diagram import MermaidDiagram\n",
+ "\n",
+ "diagram_md = MermaidDiagram.from_process(process)\n",
+ "diagram_md.url"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "20",
+ "metadata": {},
+ "source": [
+ "### Next steps\n",
+ "\n",
+ "Potential enhancements to this example could include:\n",
+ "\n",
+ "- Adding a component to track PnL from the trades;\n",
+ "- Using `plugboard.tune` to choose the moving averages to optimise PnL."
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": ".venv",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.12.8"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/examples/tutorials/005_events/.gitignore b/examples/tutorials/005_events/.gitignore
new file mode 100644
index 00000000..16f2dc5f
--- /dev/null
+++ b/examples/tutorials/005_events/.gitignore
@@ -0,0 +1 @@
+*.csv
\ No newline at end of file
diff --git a/examples/tutorials/005_events/hello_events.py b/examples/tutorials/005_events/hello_events.py
new file mode 100644
index 00000000..c2a03f2f
--- /dev/null
+++ b/examples/tutorials/005_events/hello_events.py
@@ -0,0 +1,157 @@
+"""Event-based model example."""
+
+# fmt: off
+import asyncio
+import random
+import typing as _t
+
+from pydantic import BaseModel
+
+from plugboard.component import Component, IOController
+from plugboard.connector import AsyncioConnector, ConnectorBuilder
+from plugboard.events import Event, EventConnectorBuilder, StopEvent
+from plugboard.library import FileWriter
+from plugboard.process import LocalProcess
+from plugboard.schemas import ConnectorSpec, ComponentArgsDict
+
+
+# --8<-- [start:events]
+class ExtremeValue(BaseModel):
+ """Data for event_A."""
+
+ value: float
+ extreme_type: _t.Literal["high", "low"]
+
+
+class HighEvent(Event):
+ """High value event type."""
+
+ type: _t.ClassVar[str] = "high_event"
+ data: ExtremeValue
+
+
+class LowEvent(Event):
+ """Low value event type."""
+
+ type: _t.ClassVar[str] = "low_event"
+ data: ExtremeValue
+# --8<-- [end:events]
+
+
+# --8<-- [start:source-component]
+class Random(Component):
+ """Generates random numbers."""
+
+ io = IOController(outputs=["value"])
+
+ def __init__(self, iters: int = 50, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
+ super().__init__(**kwargs)
+ self.max_iters = iters
+ self.completed_iters = 0
+
+ async def step(self) -> None:
+ self.completed_iters += 1
+ self.value = random.random()
+ if self.completed_iters >= self.max_iters:
+ await self.io.close()
+# --8<-- [end:source-component]
+
+
+# --8<-- [start:event-publisher]
+class FindHighLowValues(Component):
+ """Publishes an event on high or low values."""
+
+ io = IOController(inputs=["value"], output_events=[LowEvent, HighEvent]) # (1)!
+
+ def __init__(
+ self,
+ low_limit: float = 0.2,
+ high_limit: float = 0.8,
+ **kwargs: _t.Unpack[ComponentArgsDict],
+ ) -> None:
+ super().__init__(**kwargs)
+ self.low_limit = low_limit
+ self.high_limit = high_limit
+
+ async def step(self) -> None:
+ if self.value >= self.high_limit:
+ self.io.queue_event( # (2)!
+ HighEvent(
+ source=self.name, data=ExtremeValue(value=self.value, extreme_type="high")
+ )
+ )
+ if self.value <= self.low_limit:
+ self.io.queue_event(
+ LowEvent(source=self.name, data=ExtremeValue(value=self.value, extreme_type="low"))
+ )
+# --8<-- [end:event-publisher]
+
+
+# --8<-- [start:event-consumers]
+class CollectHigh(Component):
+ """Collects values from high events."""
+
+ io = IOController(input_events=[HighEvent], outputs=["value"]) # (1)!
+
+ def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
+ super().__init__(**kwargs)
+ self.latest_event: _t.Optional[ExtremeValue] = None
+
+ async def step(self) -> None:
+ self.value = self.latest_event.value if self.latest_event else None
+
+ @HighEvent.handler # (2)!
+ async def handle_event(self, event: HighEvent) -> None:
+ self.latest_event = event.data
+
+
+class CollectLow(Component):
+ """Collects values from low events."""
+
+ io = IOController(input_events=[LowEvent], outputs=["value"])
+
+ def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
+ super().__init__(**kwargs)
+ self.latest_event: _t.Optional[ExtremeValue] = None
+
+ async def step(self) -> None:
+ self.value = self.latest_event.value if self.latest_event else None
+
+ @LowEvent.handler # (3)!
+ async def handle_event(self, event: LowEvent) -> None:
+ self.latest_event = event.data
+# --8<-- [end:event-consumers]
+
+
+async def main() -> None:
+ # --8<-- [start:main]
+ components = [
+ Random(name="random-generator"),
+ FindHighLowValues(name="find-high-low", low_limit=0.2, high_limit=0.8),
+ CollectHigh(name="collect-high"),
+ CollectLow(name="collect-low"),
+ FileWriter(name="save-high", path="high.csv", field_names=["value"]),
+ FileWriter(name="save-low", path="low.csv", field_names=["value"]),
+ ]
+ connect = lambda in_, out_: AsyncioConnector(spec=ConnectorSpec(source=in_, target=out_))
+ connectors = [ # (1)!
+ connect("random-generator.value", "find-high-low.value"),
+ connect("collect-high.value", "save-high.value"),
+ connect("collect-low.value", "save-low.value"),
+ ]
+ connector_builder = ConnectorBuilder(connector_cls=AsyncioConnector) # (2)!
+ event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder)
+ event_connectors = list(event_connector_builder.build(components).values())
+
+ process = LocalProcess(
+ components=components,
+ connectors=connectors + event_connectors,
+ )
+
+ async with process:
+ await process.run()
+ # --8<-- [end:main]
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/tutorials/005_events/model.yaml b/examples/tutorials/005_events/model.yaml
new file mode 100644
index 00000000..c3f68cd2
--- /dev/null
+++ b/examples/tutorials/005_events/model.yaml
@@ -0,0 +1,38 @@
+plugboard:
+ process:
+ args:
+ components:
+ - type: hello_events.Random
+ args:
+ name: "random-generator"
+ iters: 50
+ - type: hello_events.FindHighLowValues
+ args:
+ name: "find-high-low"
+ low_limit: 0.2
+ high_limit: 0.8
+ - type: hello_events.CollectLow
+ args:
+ name: "collect-low"
+ - type: hello_events.CollectHigh
+ args:
+ name: "collect-high"
+ - type: plugboard.library.file_io.FileWriter
+ args:
+ name: "save-low"
+ path: "low.csv"
+ field_names:
+ - value
+ - type: plugboard.library.file_io.FileWriter
+ args:
+ name: "save-high"
+ path: "high.csv"
+ field_names:
+ - value
+ connectors:
+ - source: "random-generator.value"
+ target: "find-high-low.value"
+ - source: "collect-low.value"
+ target: "save-low.value"
+ - source: "collect-high.value"
+ target: "save-high.value"
diff --git a/mkdocs.yaml b/mkdocs.yaml
index 5a25aa30..f754c2dd 100644
--- a/mkdocs.yaml
+++ b/mkdocs.yaml
@@ -119,11 +119,14 @@ nav:
- Demos:
- Fundamentals:
- Simple model: examples/demos/fundamentals/001_simple_model/simple-model.ipynb
+ - Production line: examples/demos/fundamentals/002_production_line_optimisation/production-line.ipynb
- LLMs:
- Data filtering: examples/demos/llm/001_data_filter/llm-filtering.ipynb
- Websocket streaming: examples/demos/llm/002_bluesky_websocket/bluesky-websocket.ipynb
- Physics-based models:
- Hot water tank: examples/demos/physics-models/001-hot-water-tank/hot-water-tank.ipynb
+ - Finance:
+ - Momentum trading signal: examples/demos/finance/001_momentum_signal/momentum-signal.ipynb
- API Reference:
- component: api/component/component.md
- connector: api/connector/connector.md