Skip to Content

Effect queue

When the runtime is running on the Structured or Asupersync lane, background tasks (Cmd::Task) are enqueued to a dedicated worker thread instead of spawned one-per- task. The queue gives us three things you don’t get with raw thread::spawn:

  1. A single place to measure — enqueued / processed / dropped / in-flight.
  2. Back-pressure, so a runaway producer cannot exhaust the system.
  3. Priority scheduling (SRPT / Smith’s rule / aging) driven by the intelligence layer.

Files:

  • Config: crates/ftui-runtime/src/program.rs:2031
  • Telemetry: crates/ftui-runtime/src/effect_system.rs:112
  • Scheduler: crates/ftui-runtime/src/queueing_scheduler.rs

Configuration

crates/ftui-runtime/src/program.rs
pub enum TaskExecutorBackend { #[default] Spawned, // one OS thread per task, legacy EffectQueue, // queueing scheduler #[cfg(feature = "asupersync-executor")] Asupersync, // blocking pool under Asupersync } pub struct EffectQueueConfig { pub enabled: bool, pub backend: TaskExecutorBackend, pub scheduler: SchedulerConfig, // Smith's rule by default pub max_queue_depth: usize, // 0 == unbounded } impl EffectQueueConfig { pub fn with_enabled(self, enabled: bool) -> Self; pub fn with_backend(self, backend: TaskExecutorBackend) -> Self; pub fn with_scheduler(self, scheduler: SchedulerConfig) -> Self; pub fn with_max_queue_depth(self, depth: usize) -> Self; }

Select a backend explicitly, or leave it at default and let the runtime lane pick the right one via RuntimeLane::task_executor_backend.

use ftui::prelude::*; use ftui_runtime::TaskExecutorBackend; let cfg = ProgramConfig::fullscreen() .with_effect_queue( EffectQueueConfig::default() .with_backend(TaskExecutorBackend::EffectQueue) .with_max_queue_depth(256), );

Backpressure

When max_queue_depth > 0 and the live depth would exceed the cap, the new task is dropped rather than blocking the caller. Dropped tasks:

  1. Increment the effects_queue_dropped counter.
  2. Emit a tracing::warn! on ftui.effect with event = "effect_queue.drop" and the drop reason.
  3. Do not produce a message — update will never see the result of a dropped task.

Rationale: blocking Cmd::Task would stall update, freezing input and rendering. Dropping keeps the UI responsive and forces the operator to see the pressure via metrics.

Queue telemetry

Atomic counters and a single struct snapshot:

crates/ftui-runtime/src/effect_system.rs
pub fn effects_queue_enqueued() -> u64; pub fn effects_queue_processed() -> u64; pub fn effects_queue_dropped() -> u64; pub fn effects_queue_high_water() -> u64; pub struct QueueTelemetry { pub enqueued: u64, pub processed: u64, pub dropped: u64, pub high_water: u64, pub in_flight: u64, // enqueued - processed - dropped } pub fn queue_telemetry() -> QueueTelemetry;

Counter semantics:

FieldMeaning
enqueuedTotal tasks accepted into the queue (monotonic).
processedTotal tasks completed successfully (monotonic).
droppedTotal tasks dropped by backpressure or post-shutdown.
high_waterMaximum queue depth observed; ratchet — only increases.
in_flightCurrently queued or executing. Derived live at snapshot time.

These counters are process-global and zero at startup; the snapshot is cheap enough to poll once per frame without skewing timing.

A simple operator dashboard in 10 lines:

dashboard.rs
use ftui_core::geometry::Rect; use ftui_runtime::effect_system::queue_telemetry; use ftui_widgets::Widget; use ftui_widgets::paragraph::Paragraph; fn draw_dashboard(frame: &mut Frame) { let t = queue_telemetry(); let line0 = format!("queue: {} (hw {})", t.in_flight, t.high_water); Paragraph::new(line0.as_ref()) .render(Rect::new(0, 0, frame.width(), 1), frame); let line1 = format!( "enqueued {} processed {} dropped {}", t.enqueued, t.processed, t.dropped, ); Paragraph::new(line1.as_ref()) .render(Rect::new(0, 1, frame.width(), 1), frame); }

Scheduling

The queue consults a SchedulerConfig to pick the next ready task. Defaults are Smith’s rule with Smith enabled (smith_enabled: true, force_fifo: false, preemptive: false). Smith’s rule orders tasks by weight / expected_cost, balancing importance against cost.

Other policies, all configurable:

PolicyUse case
SRPT (Shortest Remaining Process Time)Reduces queue depth fastest; can starve long tasks.
FIFO (force_fifo: true)Deterministic order for tests.
Aging (aging_factor > 0)Bumps long-waiting tasks to prevent starvation.

Smith’s-rule derivation and trade-offs live in intelligence/scheduling.

You can still hint scheduler fields per task:

Cmd::task_weighted(/* weight */ 2.0, /* est_ms */ 10.0, move || Msg::Done)

Worked example: bounded queue with pressure alerts

examples/bounded_queue.rs
use ftui::prelude::*; use ftui_core::geometry::Rect; use ftui_runtime::{EffectQueueConfig, TaskExecutorBackend}; use ftui_runtime::effect_system::queue_telemetry; use ftui_widgets::Widget; use ftui_widgets::paragraph::Paragraph; use std::time::Duration; fn configure() -> ProgramConfig { ProgramConfig::fullscreen() .with_effect_queue( EffectQueueConfig::default() .with_backend(TaskExecutorBackend::EffectQueue) .with_max_queue_depth(128), ) } // In Model::update: show pressure in the UI. fn render_pressure(frame: &mut Frame) { let t = queue_telemetry(); let pressure = if t.in_flight > 96 { "!!!" } else if t.in_flight > 64 { "!!" } else if t.in_flight > 32 { "!" } else { "" }; let line = format!("pressure {}", pressure); Paragraph::new(line.as_ref()) .render(Rect::new(0, 0, frame.width(), 1), frame); } // In a subscription: periodic report. fn report() -> Cmd<Msg> { let t = queue_telemetry(); Cmd::log(format!( "queue enqueued={} processed={} dropped={} in_flight={} hw={}", t.enqueued, t.processed, t.dropped, t.in_flight, t.high_water, )) }

Pitfalls

Unbounded max_queue_depth hides producer runaways. A bug in a subscription that emits a Cmd::task on every tick will silently grow the queue forever. Set a cap large enough for legitimate bursts but small enough that you’ll notice a leak within a minute.

Switching to Spawned for “simplicity” loses backpressure. The Spawned backend has no queue to drop from — every task gets a thread. A storm of tasks becomes a storm of threads, which is worse, not better.

Counters are process-global. Don’t compare absolute values across processes. Compare deltas or ratios (e.g. dropped / enqueued).

Cross-references