Skip to content

dwerner/procstream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

procstream

Runtime-agnostic async process execution with streaming output and log parsing.

Features

  • Async process spawning with streaming stdout/stderr
  • Sync blocking API for simple use cases
  • Process lifecycle management (terminate, kill, signals)
  • Event parsing: Derive macro to transform log lines into typed events

Usage

Basic Process Execution

use procstream::sync::{execute, SyncCommand};

let result = execute(SyncCommand::new("echo").arg("hello"))?;
assert!(result.success);
assert_eq!(result.stdout.trim(), "hello");

Async Streaming

use procstream::{Command, Executor, Target, ManagedProcess};
use futures::StreamExt;

let executor = Executor::local("my-service");
let cmd = Command::builder("my-server").build();
let target = Target::ManagedProcess(ManagedProcess::new());

let (mut events, handle) = executor.launch(&target, cmd).await?;

while let Some(event) = events.next().await {
    println!("{:?}", event);  // ProcessEvent { Stdout, Stderr, Started, Exited }
}

Event Parsing

Transform process output into typed domain events using the #[derive(EventParser)] macro.

Enable with the macros feature:

[dependencies]
procstream = { version = "0.1", features = ["macros"] }

Pattern Syntax

Syntax Type Description
{name} String Capture until next literal or end of line
{name:u64} u64 Parse unsigned integer
{name:i64} i64 Parse signed integer
{name:f64} f64 Parse floating point
{name:} String Capture rest of line
{name:rest} String Capture rest of line (explicit)
{name:json} T: Deserialize Parse JSON into field type

Example

use procstream::EventParser;

#[derive(Debug, EventParser)]
enum ServerEvent {
    #[pattern("server: listening on {socket}")]
    #[ready]  // marks this variant as readiness indicator
    Listening { socket: String },

    #[pattern("session {id} created for {learner}")]
    SessionCreated { id: String, learner: String },

    #[pattern("routed {bytes:u64} bytes from {from} to {to}")]
    MessageRouted { from: String, to: String, bytes: u64 },

    #[pattern("ERROR: {message}")]
    #[pattern("error: {message}")]  // multiple patterns for same variant
    Error { message: String },

    #[pattern("debug: {rest:}")]  // capture rest of line
    Debug { rest: String },
}

fn main() {
    // Parse log lines into typed events
    let line = "session abc123 created for alice";
    if let Some(event) = ServerEvent::parse(line) {
        println!("{:?}", event);
        // ServerEvent::SessionCreated { id: "abc123", learner: "alice" }
    }

    // Check readiness
    if ServerEvent::is_ready("server: listening on /tmp/server.sock") {
        println!("Server is ready!");
    }
}

JSON Parsing

Deserialize JSON directly into struct fields:

use procstream::EventParser;
use serde::Deserialize;

#[derive(Debug, Deserialize, PartialEq)]
struct Config {
    port: u16,
    host: String,
}

#[derive(Debug, EventParser)]
enum ConfigEvent {
    #[pattern("config loaded: {config:json}")]
    Loaded { config: Config },

    #[pattern("raw: {data:json}")]
    Raw { data: serde_json::Value },
}

fn main() {
    let line = r#"config loaded: {"port":8080,"host":"localhost"}"#;
    let event = ConfigEvent::parse(line);
    // Some(ConfigEvent::Loaded { config: Config { port: 8080, host: "localhost" } })
}

Generated API

The derive macro generates:

impl ServerEvent {
    /// Parse a log line into an event
    pub fn parse(line: &str) -> Option<Self>;

    /// Check if a line indicates readiness (variants marked with #[ready])
    pub fn is_ready(line: &str) -> bool;
}

impl procstream::EventParser for ServerEvent {
    fn parse(line: &str) -> Option<Self>;
    fn parse_event(event: &ProcessEvent) -> Option<Self>;
    fn is_ready(line: &str) -> bool;
    fn is_ready_event(event: &ProcessEvent) -> bool;
}

License

MIT

About

async-process based streaming for subprocesses

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages