Effect queue
When the runtime is running on the Structured or Asupersync
lane, background tasks (Cmd::Task) are
enqueued to a dedicated worker thread instead of spawned one-per-
task. The queue gives us three things you don’t get with raw
thread::spawn:
- A single place to measure — enqueued / processed / dropped / in-flight.
- Back-pressure, so a runaway producer cannot exhaust the system.
- Priority scheduling (SRPT / Smith’s rule / aging) driven by the intelligence layer.
Files:
- Config:
crates/ftui-runtime/src/program.rs:2031 - Telemetry:
crates/ftui-runtime/src/effect_system.rs:112 - Scheduler:
crates/ftui-runtime/src/queueing_scheduler.rs
Configuration
pub enum TaskExecutorBackend {
#[default] Spawned, // one OS thread per task, legacy
EffectQueue, // queueing scheduler
#[cfg(feature = "asupersync-executor")]
Asupersync, // blocking pool under Asupersync
}
pub struct EffectQueueConfig {
pub enabled: bool,
pub backend: TaskExecutorBackend,
pub scheduler: SchedulerConfig, // Smith's rule by default
pub max_queue_depth: usize, // 0 == unbounded
}
impl EffectQueueConfig {
pub fn with_enabled(self, enabled: bool) -> Self;
pub fn with_backend(self, backend: TaskExecutorBackend) -> Self;
pub fn with_scheduler(self, scheduler: SchedulerConfig) -> Self;
pub fn with_max_queue_depth(self, depth: usize) -> Self;
}Select a backend explicitly, or leave it at default and let the
runtime lane pick the right one via
RuntimeLane::task_executor_backend.
use ftui::prelude::*;
use ftui_runtime::TaskExecutorBackend;
let cfg = ProgramConfig::fullscreen()
.with_effect_queue(
EffectQueueConfig::default()
.with_backend(TaskExecutorBackend::EffectQueue)
.with_max_queue_depth(256),
);Backpressure
When max_queue_depth > 0 and the live depth would exceed the cap,
the new task is dropped rather than blocking the caller. Dropped
tasks:
- Increment the
effects_queue_droppedcounter. - Emit a
tracing::warn!onftui.effectwithevent = "effect_queue.drop"and the drop reason. - Do not produce a message —
updatewill never see the result of a dropped task.
Rationale: blocking Cmd::Task would stall update, freezing input
and rendering. Dropping keeps the UI responsive and forces the
operator to see the pressure via metrics.
Queue telemetry
Atomic counters and a single struct snapshot:
pub fn effects_queue_enqueued() -> u64;
pub fn effects_queue_processed() -> u64;
pub fn effects_queue_dropped() -> u64;
pub fn effects_queue_high_water() -> u64;
pub struct QueueTelemetry {
pub enqueued: u64,
pub processed: u64,
pub dropped: u64,
pub high_water: u64,
pub in_flight: u64, // enqueued - processed - dropped
}
pub fn queue_telemetry() -> QueueTelemetry;Counter semantics:
| Field | Meaning |
|---|---|
enqueued | Total tasks accepted into the queue (monotonic). |
processed | Total tasks completed successfully (monotonic). |
dropped | Total tasks dropped by backpressure or post-shutdown. |
high_water | Maximum queue depth observed; ratchet — only increases. |
in_flight | Currently queued or executing. Derived live at snapshot time. |
These counters are process-global and zero at startup; the snapshot is cheap enough to poll once per frame without skewing timing.
A simple operator dashboard in 10 lines:
use ftui_core::geometry::Rect;
use ftui_runtime::effect_system::queue_telemetry;
use ftui_widgets::Widget;
use ftui_widgets::paragraph::Paragraph;
fn draw_dashboard(frame: &mut Frame) {
let t = queue_telemetry();
let line0 = format!("queue: {} (hw {})", t.in_flight, t.high_water);
Paragraph::new(line0.as_ref())
.render(Rect::new(0, 0, frame.width(), 1), frame);
let line1 = format!(
"enqueued {} processed {} dropped {}",
t.enqueued, t.processed, t.dropped,
);
Paragraph::new(line1.as_ref())
.render(Rect::new(0, 1, frame.width(), 1), frame);
}Scheduling
The queue consults a SchedulerConfig to pick the next ready task.
Defaults are Smith’s rule with Smith enabled
(smith_enabled: true, force_fifo: false, preemptive: false).
Smith’s rule orders tasks by weight / expected_cost, balancing
importance against cost.
Other policies, all configurable:
| Policy | Use case |
|---|---|
| SRPT (Shortest Remaining Process Time) | Reduces queue depth fastest; can starve long tasks. |
FIFO (force_fifo: true) | Deterministic order for tests. |
Aging (aging_factor > 0) | Bumps long-waiting tasks to prevent starvation. |
Smith’s-rule derivation and trade-offs live in intelligence/scheduling.
You can still hint scheduler fields per task:
Cmd::task_weighted(/* weight */ 2.0, /* est_ms */ 10.0, move || Msg::Done)Worked example: bounded queue with pressure alerts
use ftui::prelude::*;
use ftui_core::geometry::Rect;
use ftui_runtime::{EffectQueueConfig, TaskExecutorBackend};
use ftui_runtime::effect_system::queue_telemetry;
use ftui_widgets::Widget;
use ftui_widgets::paragraph::Paragraph;
use std::time::Duration;
fn configure() -> ProgramConfig {
ProgramConfig::fullscreen()
.with_effect_queue(
EffectQueueConfig::default()
.with_backend(TaskExecutorBackend::EffectQueue)
.with_max_queue_depth(128),
)
}
// In Model::update: show pressure in the UI.
fn render_pressure(frame: &mut Frame) {
let t = queue_telemetry();
let pressure = if t.in_flight > 96 { "!!!"
} else if t.in_flight > 64 { "!!"
} else if t.in_flight > 32 { "!" } else { "" };
let line = format!("pressure {}", pressure);
Paragraph::new(line.as_ref())
.render(Rect::new(0, 0, frame.width(), 1), frame);
}
// In a subscription: periodic report.
fn report() -> Cmd<Msg> {
let t = queue_telemetry();
Cmd::log(format!(
"queue enqueued={} processed={} dropped={} in_flight={} hw={}",
t.enqueued, t.processed, t.dropped, t.in_flight, t.high_water,
))
}Pitfalls
Unbounded max_queue_depth hides producer runaways. A bug in a
subscription that emits a Cmd::task on every tick will silently
grow the queue forever. Set a cap large enough for legitimate
bursts but small enough that you’ll notice a leak within a minute.
Switching to Spawned for “simplicity” loses backpressure. The
Spawned backend has no queue to drop from — every task gets a
thread. A storm of tasks becomes a storm of threads, which is
worse, not better.
Counters are process-global. Don’t compare absolute values
across processes. Compare deltas or ratios (e.g.
dropped / enqueued).