Background jobs + event streaming for Ruby, backed by NATS. No Redis. No DB polling. Disk-backed, horizontally scalable β no message is ever silently dropped.
# Define a job
class SendEmailJob
include Cosmo::Job
options stream: :default, retry: 3, dead: true
def perform(user_id, template)
EmailService.send(user_id, template)
end
end
# Enqueue it
SendEmailJob.perform_async(123, "welcome")
SendEmailJob.perform_in(1.day, 123, "followup")# Process a continuous event stream
class ClicksProcessor
include Cosmo::Stream
options stream: :clickstream, batch_size: 100,
consumer: { subjects: ["events.clicks.>"] }
def process_one
Analytics.track(message.data)
message.ack
end
end
ClicksProcessor.publish({ user_id: 123, page: "/home" }, subject: "events.clicks.homepage")bundle exec cosmo -C config/cosmo.yml -c 20 jobs # Run jobs
bundle exec cosmo -C config/cosmo.yml -c 20 streams # Run streams- Why NATS?
- Features
- Installation
- Quick Start
- Core Concepts
- Advanced Usage
- CLI Reference
- Deployment
- Monitoring
- Examples
Most Ruby job queues use Redis or Postgres β tools that were never designed for this.
NATS is a single ~20MB binary with a ~10MB memory footprint β yet it delivers disk-backed persistent streams, Pub/Sub, KV store, and true horizontal clustering at millions of messages per second.
| Redis/DB-backed | NATS | |
|---|---|---|
| Persistence | Memory-only / DB bloat | Disk-backed, TB-scale |
| Scaling | Vertical only | True horizontal clustering |
| Job | Yes | Yes |
| Stream | No | Yes |
| Backpressure | No, grow unbounded | Yes |
| Multi-DC | Complex setup | Native geo-distribution |
One NATS server replaces your message broker, job queue, and KV store β with lower operational overhead.
- Familiar API β
perform_async,perform_in,perform_at - Priority queues β critical, high, default, low with weighted round-robin
- Scheduled jobs β execute at a specific time or after a delay
- Automatic retries β exponential backoff, configurable attempts
- Dead letter queue β capture permanently failed jobs
- Job uniqueness β prevent duplicate execution
- Real-time event streams β process continuous data feeds
- Batch processing β handle multiple messages in one go
- Message replay β reprocess from any point in time
- Consumer groups β load-balanced across workers
- Custom serialization β JSON, MessagePack, Protobuf
# Gemfile
gem "cosmonats"Requirements: Ruby β₯ 3.1, NATS Server (install guide)
Spin up NATS instantly with Docker:
docker run -p 4222:4222 -p 8222:8222 nats:alpine -jsMount the monitoring UI in your Rack app:
require "cosmo/web"
# Rails
mount Cosmo::Web => "/cosmo"
# Any Rack app (config.ru)
map "/cosmo" { run Cosmo::Web }concurrency: 5
max_retries: 3
consumers:
jobs:
default:
ack_policy: explicit
max_deliver: 10
max_ack_pending: 10
ack_wait: 15
subject: jobs.%{name}.>
setup:
jobs:
default:
storage: file
retention: workqueue
subjects: ["jobs.%{name}.>"]
allow_direct: truebundle exec cosmo -Sclass SendEmailJob
include Cosmo::Job
options stream: :default, retry: 3, dead: true
def perform(user_id, email_type)
UserMailer.send(email_type, user_id).deliver_now
end
endSendEmailJob.perform_async(42, "welcome")bundle exec cosmo -C config/cosmo.yml -c 10 -r ./app/jobs jobsclass ReportJob
include Cosmo::Job
options(
stream: :critical, # Stream name
retry: 5, # Retry attempts
dead: true # Send to dead letter queue on final failure
)
def perform(report_id)
logger.info "Processing report #{report_id}"
Report.find(report_id).generate!
rescue StandardError => e
logger.error "Failed: #{e.message}"
raise # Triggers retry with exponential backoff
end
end
ReportJob.perform_async(42) # Enqueue now
ReportJob.perform_in(30.minutes, 42) # Delayed
ReportJob.perform_at(Time.parse("2026-01-25 10:00"), 42) # Scheduled
ReportJob.perform_sync(42) # Inline, no NATS (great for tests)class ClicksProcessor
include Cosmo::Stream
options(
stream: :clickstream,
batch_size: 100,
start_position: :last, # :first, :last, :new, or timestamp
consumer: {
ack_policy: "explicit",
max_deliver: 3,
max_ack_pending: 100,
subjects: ["events.clicks.>"]
}
)
# Process one message at a time
def process_one
Analytics.track_click(message.data)
message.ack
end
# OR process a batch
def process(messages)
Analytics.bulk_track(messages.map(&:data))
messages.each(&:ack)
end
end
# Publishing
ClicksProcessor.publish({ user_id: 123, page: "/home" }, subject: "events.clicks.homepage")
# Acknowledgment strategies
message.ack # Success
message.nack(delay: 5_000_000_000) # Retry in 5 seconds (nanoseconds)
message.term # Permanent failure, no retryFull config/cosmo.yml example:
timeout: 25 # Shutdown timeout in seconds
concurrency: &concurrency 1 # Number of worker threads
max_retries: &max_retries 3 # Default max retries
stream_config: &stream_config
storage: file # storage type (file or memory)
retention: workqueue # retention policy (limits, interest, workqueue)
duplicate_window: 120 # time window for duplicate message detection in seconds
discard: old # discard new messages when stream is full (discard new or old)
allow_direct: true # allow direct messages to stream, required for web UI
subjects:
- jobs.%{name}.> # subject pattern for stream, %{name} will be replaced with stream name
consumer_config: &consumer_config
ack_policy: explicit # ack policy (explicit, none, all), each individual message must be acknowledged
max_deliver: 10 # maximum number of times a message will be delivered before it's considered failed
max_ack_pending: 20 # maximum number of messages with pending ack for this consumer
ack_wait: 60 # time in seconds to wait for an ack before redelivering the message
subject: jobs.%{name}.> # subject pattern for consumer, %{name} will be replaced with stream name
consumers:
jobs:
critical:
<<: *consumer_config
priority: 50
high:
<<: *consumer_config
priority: 30
default:
<<: *consumer_config
priority: 15
low:
<<: *consumer_config
priority: 5
scheduled:
<<: *consumer_config
max_deliver: 1
max_ack_pending: 100
ack_wait: 10
setup:
jobs:
critical:
<<: *stream_config
description: Very critical priority jobs
high:
<<: *stream_config
description: Higher priority jobs
default:
<<: *stream_config
description: Default priority jobs
low:
<<: *stream_config
description: Lower priority jobs
scheduled:
<<: *stream_config
description: Scheduled jobs
dead:
<<: *stream_config
retention: limits
max_msgs: 10000
max_age: 604800 # 7d
description: Broken jobs (DLQ)
development:
verbose: false
concurrency: *concurrency
staging:
verbose: true
concurrency: 3
production:
concurrency: 3Programmatic:
Cosmo::Config.set(:concurrency, 20)
Cosmo::Config.set(:setup, :streams, :custom, { storage: "file", subjects: ["custom.>"] })Environment variables:
export NATS_URL=nats://localhost:4222
export COSMO_JOBS_FETCH_TIMEOUT=0.1
export COSMO_STREAMS_FETCH_TIMEOUT=0.1Priority Queues:
class UrgentJob
include Cosmo::Job
options stream: :critical # priority: 50 in config β polled most frequently
endCustom Serializers:
module MessagePackSerializer
def self.serialize(data) = MessagePack.pack(data)
def self.deserialize(payload) = MessagePack.unpack(payload)
end
class FastStream
include Cosmo::Stream
options publisher: { serializer: MessagePackSerializer }
endError Handling:
class ResilientJob
include Cosmo::Job
options retry: 5, dead: true
def perform(data)
process_data(data)
rescue RetryableError => e
logger.warn "Retryable: #{e.message}"
raise # Will retry with exponential backoff
rescue FatalError => e
logger.error "Fatal: #{e.message}"
# Don't raise β won't retry, won't go to DLQ
end
endTesting:
# Synchronous β no NATS needed
SendEmailJob.perform_sync(123, "test")
# Async β returns a job ID
jid = SendEmailJob.perform_async(123, "welcome")
assert_kind_of String, jidcosmo -C config/cosmo.yml --setup # Create streams in NATS (idempotent)
cosmo -C config/cosmo.yml -c 20 -r ./app/jobs jobs # Jobs only
cosmo -C config/cosmo.yml -c 20 streams # Streams only
cosmo -C config/cosmo.yml -c 20 # Both| Flag | Description | Example |
|---|---|---|
-C, --config PATH |
Config file path | -C config/cosmo.yml |
-c, --concurrency INT |
Worker threads | -c 20 |
-r, --require PATH |
Auto-require directory | -r ./app/jobs |
-t, --timeout NUM |
Shutdown timeout (sec) | -t 60 |
-S, --setup |
Setup streams & exit | --setup |
NATS Cluster config:
# nats-server.conf
port: 4222
jetstream {
store_dir: /var/lib/nats
max_file: 10G
}
cluster {
name: cosmo-cluster
listen: 0.0.0.0:6222
routes: [nats://nats-2:6222, nats://nats-3:6222]
}Docker Compose:
services:
nats:
image: nats:latest
command: -js -c /etc/nats/nats-server.conf
volumes:
- ./nats.conf:/etc/nats/nats-server.conf
- nats-data:/var/lib/nats
worker:
build: .
environment:
NATS_URL: nats://nats:4222
command: bundle exec cosmo -C config/cosmo.yml -c 20 jobs
deploy:
replicas: 3Systemd Service:
# /etc/systemd/system/cosmo.service
[Unit]
Description=Cosmo Background Processor
After=network.target
[Service]
Type=simple
User=deploy
WorkingDirectory=/var/www/myapp
Environment=RAILS_ENV=production
Environment=NATS_URL=nats://localhost:4222
ExecStart=/usr/local/bin/bundle exec cosmo -C config/cosmo.yml -c 20 jobs
Restart=always
RestartSec=10
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=cosmo
[Install]
WantedBy=multi-user.targetsudo systemctl enable cosmo && sudo systemctl start cosmoStructured logs:
2026-01-23T10:15:30.123Z INFO pid=12345 tid=abc jid=def: start
2026-01-23T10:15:32.456Z INFO pid=12345 tid=abc jid=def elapsed=2.333: done
Stream Metrics:
client = Cosmo::Client.instance
info = client.stream_info("default")
info.state.messages # Total messages
info.state.bytes # Total bytes
info.state.consumer_count # Number of consumersPrometheus β NATS exposes metrics at :8222/metrics:
jetstream_server_store_msgsβ Messages in streamjetstream_consumer_delivered_msgsβ Delivered messagesjetstream_consumer_ack_pendingβ Pending acknowledgments
Email queue with scheduling:
class EmailJob
include Cosmo::Job
options stream: :default, retry: 3
def perform(user_id, template)
user = User.find(user_id)
EmailService.send(user.email, template)
end
end
EmailJob.perform_async(123, "welcome")
EmailJob.perform_in(1.day, 123, "followup")Image Processing Pipeline:
class ImageProcessor
include Cosmo::Stream
options(
stream: :images,
consumer: { subjects: ["images.uploaded.>"] }
)
def process_one
processed = ImageService.process(message.data["url"])
publish(processed, subject: "images.processed.optimized")
message.ack
rescue => e
logger.error "Processing failed: #{e.message}"
message.nack(delay: 30_000_000_000) # retry in 30s
end
end
ImageProcessor.publish({ url: "https://example.com/image.jpg" }, subject: "images.uploaded.user")Real-Time Analytics:
class AnalyticsAggregator
include Cosmo::Stream
options batch_size: 1000, consumer: { subjects: ["events.*.>"] }
def process(messages)
aggregates = messages.map(&:data).group_by { |e| e["type"] }.transform_values(&:count)
Analytics.bulk_insert(aggregates)
messages.each(&:ack)
end
endMade with β€οΈ for Ruby
Blast off Cosmonats! π