Subscriptions
A subscription is a long-lived event source that produces messages into the update loop. Timers, file watchers, child processes, WebSocket streams — anything whose lifecycle is “start when I declare it, stop when I stop declaring it” — is a subscription.
The runtime manages lifecycles automatically: you declare what should be running, and the runtime reconciles the live set against that declaration.
Files:
- Trait:
crates/ftui-runtime/src/subscription.rs:33 - Built-in
Every:crates/ftui-runtime/src/subscription.rs:443 ProcessSubscription:crates/ftui-runtime/src/process_subscription.rs:69
The trait
pub type SubId = u64;
pub trait Subscription<M: Send + 'static>: Send {
/// Unique identifier for deduplication.
/// Subscriptions with the same ID are considered identical.
fn id(&self) -> SubId;
/// Run the subscription on a background thread.
/// Send messages through `sender`; exit when `stop` is triggered.
fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal);
}Two invariants do most of the work:
idis stable across updates. If a subscription returns the sameSubIdon two consecutive calls toModel::subscriptions, the runtime keeps it running. If the ID changes, the old one is stopped and the new one is started.runblocks until the stop signal fires or the channel disconnects. Implementations checkstop.wait_timeout(…)orstop.is_stopped()at safe points.
StopSignal
#[derive(Clone)]
pub struct StopSignal { /* wraps CancellationToken */ }
impl StopSignal {
pub fn is_stopped(&self) -> bool;
pub fn wait_timeout(&self, duration: Duration) -> bool; // true if stopped
pub fn cancellation_token(&self) -> &CancellationToken;
}StopSignal is backed by a CancellationToken (see
crates/ftui-runtime/src/cancellation.rs). The Structured and
Asupersync runtime lanes propagate the
same token to child tasks so cancellation is cooperative all the way
down.
wait_timeout is the preferred polling primitive — it sleeps on a
condition variable, which is cheaper than busy-waiting and also
responds immediately to a cancel.
Declarative reconciliation
Each time Model::update returns, the runtime calls
Model::subscriptions, diffs the result by SubId against the
previous cycle, and:
Tracing for this cycle lives on ftui.runtime:
subscription.start— a new subscription entered the active set.subscription.stop— an old subscription left and was joined.subscription.stop_all— the runtime is shutting down.
Counters are available via ftui_runtime::effect_system:
use ftui_runtime::effect_system::*;
subscription_starts_total();
subscription_stops_total();
subscription_panics_total();
reconcile_count();Panics inside run are caught and counted but do not crash the
runtime; the subscription is removed and Model::on_error is called
with a synthetic error string.
Built-in: Every
A fixed-interval ticker. Message factory is Fn() -> M, so you can
capture per-cycle state.
pub struct Every<M: Send + 'static> { /* ... */ }
impl<M: Send + 'static> Every<M> {
pub fn new(interval: Duration, make_msg: impl Fn() -> M + Send + Sync + 'static) -> Self;
pub fn with_id(id: SubId, interval: Duration, make_msg: impl Fn() -> M + Send + Sync + 'static) -> Self;
}The default id is derived from the interval nanoseconds (XORed with
"TICK" as a magic) so two Every instances with the same interval
dedupe. Use with_id when you need distinct identical intervals.
use ftui_runtime::subscription::Every;
use std::time::Duration;
fn subscriptions(&self) -> Vec<Box<dyn Subscription<Msg>>> {
vec![
Box::new(Every::new(Duration::from_secs(1), || Msg::Tick)),
Box::new(Every::with_id(0xBEEF, Duration::from_secs(1), || Msg::HeartBeat)),
]
}Built-in: ProcessSubscription
Spawns a child process, captures its stdout/stderr line-by-line, and
emits ProcessEvent messages.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProcessEvent {
Stdout(String),
Stderr(String),
Exited(i32),
Signaled(i32),
Killed,
Error(String),
}
pub struct ProcessSubscription<M: Send + 'static> { /* ... */ }
impl<M> ProcessSubscription<M> {
pub fn new(
program: impl Into<String>,
make_msg: impl Fn(ProcessEvent) -> M + Send + Sync + 'static,
) -> Self;
pub fn arg(self, arg: impl Into<String>) -> Self;
pub fn args(self, args: impl IntoIterator<Item = impl Into<String>>) -> Self;
pub fn env(self, key: impl Into<String>, value: impl Into<String>) -> Self;
pub fn timeout(self, duration: Duration) -> Self;
pub fn with_id(self, id: SubId) -> Self;
}The SubId is hashed from program + args + env + timeout, so
re-declaring the same command is a true no-op and changing any field
causes the old process to be killed and a new one to start.
use ftui_runtime::process_subscription::{ProcessSubscription, ProcessEvent};
use std::time::Duration;
enum Msg { Log(ProcessEvent), /* … */ }
fn subscriptions(&self) -> Vec<Box<dyn Subscription<Msg>>> {
vec![Box::new(
ProcessSubscription::new("tail", Msg::Log)
.arg("-f")
.arg("/var/log/syslog")
.timeout(Duration::from_secs(60)),
)]
}When stop fires the child is sent SIGKILL (Unix) and reader threads
are joined with a bounded timeout; detached threads are logged rather
than leaked.
Writing a custom subscription
use ftui_runtime::subscription::{Subscription, SubId, StopSignal};
use std::sync::mpsc;
use std::time::{Duration, Instant};
struct Poll<M> {
id: SubId,
interval: Duration,
make_msg: Box<dyn Fn() -> M + Send + Sync>,
}
impl<M: Send + 'static> Subscription<M> for Poll<M> {
fn id(&self) -> SubId { self.id }
fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
let deadline = Instant::now() + Duration::from_secs(30);
loop {
if stop.wait_timeout(self.interval) { break; }
if Instant::now() >= deadline { break; }
if sender.send((self.make_msg)()).is_err() { break; }
}
}
}Three conventions:
- Check
stop.wait_timeoutinstead ofthread::sleep. - Exit when
sender.sendreturnsErr— the receiver has been dropped, which means the runtime no longer wants your messages. - Don’t panic. If you must propagate a fatal condition, send an error-shaped message and then return.
Pitfalls
Changing a subscription parameter changes its ID. If you want to
keep the same instance running while tuning (for example a debounce
interval), either pin the ID with with_id or keep the parameter
constant during a session.
Channel-closed is the fast exit path, not a hang. Your run
loop must observe sender.send(..).is_err() and return. Otherwise,
the subscription continues to consume CPU after the receiver was
dropped — which happens during reconciliation and shutdown.