Files
codex-controller-loop/src/controller/engine.rs
eric ebb6b488fe
Some checks failed
distribution-gate / distribution-gate (push) Failing after 1m56s
feat: 0.1.0
2026-04-04 18:41:34 +02:00

712 lines
26 KiB
Rust

use std::path::PathBuf;
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use anyhow::Result;
use crate::app::{AppEvent, ControlCommand};
use crate::controller::{executor, goal_checker, planner, verifier};
use crate::model::{
ControllerPhase, GoalStatus, PlannerResponse, SessionEntry, SessionSource, SessionStream,
StepStatus, TaskConfig,
};
use crate::prompt;
use crate::repo;
use crate::storage::toon;
pub fn runtime_loop(
repo_root: PathBuf,
config: TaskConfig,
control_rx: Receiver<ControlCommand>,
event_tx: Sender<AppEvent>,
) -> Result<()> {
toon::ensure_controller_files(&config)?;
let _ = event_tx.send(AppEvent::Session(SessionEntry {
source: SessionSource::Controller,
stream: SessionStream::Status,
title: "Session".to_string(),
tag: Some(config.controller_id()),
body: format!("Controller task loaded from {}", config.plan_file.display()),
run_id: repo::next_run_id(),
}));
loop {
let mut plan = toon::read_plan(&config.plan_file)?;
let mut state = toon::read_state(&config.state_file)?;
if recover_stale_execution_state(&config, &mut plan, &mut state, &event_tx)? {
continue;
}
let goal_md = toon::read_markdown(&config.goal_file)?;
let standards_md = toon::read_markdown(&config.standards_file)?;
refresh_usage_state(&mut state);
if matches!(state.phase, ControllerPhase::Planning)
&& state.planning_session.pending_input.is_some()
{
let pending_input = state
.planning_session
.pending_input
.as_deref()
.unwrap_or_default()
.to_string();
process_planning_submission(&repo_root, &config, &pending_input, &event_tx)?;
continue;
}
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
match control_rx.try_recv() {
Ok(ControlCommand::Quit) => break,
Ok(ControlCommand::Pause) => {
crate::controller::state::pause(&mut state);
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
continue;
}
Ok(ControlCommand::Resume) => {
crate::controller::state::resume(&mut state);
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
continue;
}
Ok(ControlCommand::Stop) => {
state.phase = ControllerPhase::Blocked;
state.goal_status = GoalStatus::Blocked;
state.set_stop_reason("Stopped by user.");
let reason = state
.phase_notice()
.unwrap_or_else(|| "Stopped by user.".to_string());
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
let _ = event_tx.send(AppEvent::Session(SessionEntry {
source: SessionSource::Warning,
stream: SessionStream::Status,
title: "Notice".to_string(),
tag: Some(config.controller_id()),
body: reason,
run_id: repo::next_run_id(),
}));
continue;
}
Ok(ControlCommand::Submit(_text)) => {
if matches!(state.phase, ControllerPhase::Planning) {
let persisted_state = toon::read_state(&config.state_file)?;
if let Some(pending_input) =
persisted_state.planning_session.pending_input.as_deref()
{
process_planning_submission(
&repo_root,
&config,
pending_input,
&event_tx,
)?;
}
} else {
let _ = event_tx.send(AppEvent::Session(SessionEntry {
source: SessionSource::Warning,
stream: SessionStream::Status,
title: "Execution".to_string(),
tag: Some(config.controller_id()),
body: "Execution is autonomous. Use /pause, /resume, /stop, /status, /diff, /tests, or /goal update."
.to_string(),
run_id: repo::next_run_id(),
}));
}
continue;
}
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {}
}
match state.phase {
ControllerPhase::Planning
| ControllerPhase::Paused
| ControllerPhase::Blocked
| ControllerPhase::Done => {
std::thread::sleep(std::time::Duration::from_millis(100));
continue;
}
ControllerPhase::Executing => {}
}
if goal_checker::is_done(&plan, &state)? {
state.phase = ControllerPhase::Done;
state.clear_stop_reason();
state.goal_status = GoalStatus::Done;
let completion_summary = build_completion_summary(&plan);
state.set_completion_summary(completion_summary.clone());
state.history.push(crate::model::HistoryEvent {
timestamp: repo::now_timestamp(),
kind: "goal-complete".to_string(),
detail: completion_summary.clone(),
});
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
let _ = event_tx.send(AppEvent::Session(SessionEntry {
source: SessionSource::Controller,
stream: SessionStream::Status,
title: "Goal".to_string(),
tag: Some(config.controller_id()),
body: completion_summary,
run_id: repo::next_run_id(),
}));
continue;
}
let resumable_step = resumable_step(&plan, &state);
if resumable_step.is_none() && (state.replan_required || plan.has_no_actionable_steps()) {
let _ = event_tx.send(AppEvent::Session(SessionEntry {
source: SessionSource::Planner,
stream: SessionStream::Status,
title: "Planner".to_string(),
tag: Some(config.controller_id()),
body: "Refining plan".to_string(),
run_id: repo::next_run_id(),
}));
plan =
planner::refine_without_user_input(&repo_root, &config, &plan, &state, &event_tx)?;
state.replan_required = false;
toon::write_plan(&config.plan_file, &plan)?;
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
continue;
}
let Some(step) = resumable_step.or_else(|| planner::next_step(&plan, &state)) else {
state.phase = ControllerPhase::Blocked;
state.goal_status = GoalStatus::Blocked;
state.set_stop_reason(
"No actionable step remained and autonomous replan produced nothing.",
);
let reason = state
.phase_notice()
.unwrap_or_else(|| "Controller is blocked.".to_string());
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
let _ = event_tx.send(AppEvent::Session(SessionEntry {
source: SessionSource::Warning,
stream: SessionStream::Status,
title: "Notice".to_string(),
tag: Some(config.controller_id()),
body: reason,
run_id: repo::next_run_id(),
}));
continue;
};
let _ = event_tx.send(AppEvent::Session(SessionEntry {
source: SessionSource::Executor,
stream: SessionStream::Status,
title: "Step".to_string(),
tag: Some(step.id.clone()),
body: format!("Executing {}", step.title),
run_id: repo::next_run_id(),
}));
state.clear_stop_reason();
state.clear_completion_summary();
state.replan_required = false;
state
.blocked_steps
.retain(|blocked_step| blocked_step != &step.id);
plan.mark_active(&step.id);
state.current_step_id = Some(step.id.clone());
state.iteration += 1;
refresh_usage_state(&mut state);
toon::write_plan(&config.plan_file, &plan)?;
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
let exec = executor::implement(&repo_root, &config, &state, &plan, &step, &event_tx)?;
if goal_checker::needs_goal_clarification(&exec) {
state.phase = ControllerPhase::Planning;
state.set_stop_reason(format!(
"Execution requested goal clarification while processing {}.",
step.id
));
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
continue;
}
let verification = verifier::verify_step(&repo_root, &exec, &event_tx)?;
if !verification.passed {
plan.mark_blocked(&step.id);
plan.append_step_note(&step.id, verification.summary.as_str());
state.last_verification = Some(verification);
state.blocked_steps.push(step.id.clone());
state.replan_required = true;
state.set_stop_reason(format!("Verification failed for {}.", step.id));
toon::write_plan(&config.plan_file, &plan)?;
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
continue;
}
let cleanup = verifier::verify_cleanup(&config, &step, &exec)?;
if !cleanup.passed {
plan.mark_todo(&step.id);
plan.append_step_note(&step.id, cleanup.summary.as_str());
state.last_cleanup_summary = Some(cleanup);
state.set_stop_reason(format!(
"Cleanup requirements were not satisfied for {}.",
step.id
));
toon::write_plan(&config.plan_file, &plan)?;
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
continue;
}
let tests = verifier::run_tests(&repo_root, &exec, &event_tx)?;
if !tests.passed {
plan.mark_todo(&step.id);
plan.append_step_note(&step.id, tests.summary.as_str());
state.last_full_test_summary = Some(tests);
state.set_stop_reason(format!("Tests failed for {}.", step.id));
toon::write_plan(&config.plan_file, &plan)?;
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
continue;
}
plan.append_step_note(&step.id, completion_note(&exec));
plan.mark_done(&step.id);
state.complete_step(&step, verification, cleanup, tests);
toon::write_plan(&config.plan_file, &plan)?;
toon::write_state(&config.state_file, &state)?;
emit_snapshot(&event_tx, &goal_md, &standards_md, &plan, &state);
}
Ok(())
}
fn refresh_usage_state(state: &mut crate::model::ControllerState) {
let snapshot = crate::process::refresh_usage_snapshot(state);
crate::process::persist_usage_snapshot(state, &snapshot);
}
fn process_planning_submission(
repo_root: &std::path::Path,
config: &TaskConfig,
latest_user_input: &str,
event_tx: &Sender<AppEvent>,
) -> Result<PlannerResponse> {
let response = crate::planning::session::advance(
repo_root,
config,
latest_user_input,
event_tx,
)?;
if let Some(question) = response.question.clone() {
let _ = event_tx.send(AppEvent::Session(SessionEntry {
source: SessionSource::Planner,
stream: SessionStream::Status,
title: "Question".to_string(),
tag: Some(config.controller_id()),
body: question,
run_id: repo::next_run_id(),
}));
}
Ok(response)
}
fn emit_snapshot(
event_tx: &Sender<AppEvent>,
goal_md: &str,
standards_md: &str,
plan: &crate::model::Plan,
state: &crate::model::ControllerState,
) {
let _ = event_tx.send(AppEvent::Snapshot {
goal_md: goal_md.to_string(),
standards_md: standards_md.to_string(),
plan: plan.clone(),
state: state.clone(),
});
}
fn resumable_step(
plan: &crate::model::Plan,
state: &crate::model::ControllerState,
) -> Option<crate::model::PlanStep> {
let current_step_id = state.current_step_id.as_deref()?;
plan.steps
.iter()
.find(|step| {
step.id == current_step_id
&& matches!(
step.status,
StepStatus::Todo | StepStatus::Active | StepStatus::Blocked
)
})
.cloned()
}
fn completion_note(exec: &crate::model::ExecutionResponse) -> String {
let mut parts = Vec::new();
let summary = exec.summary.trim();
if !summary.is_empty() {
parts.push(prompt::truncate_text(summary, 180));
}
let notes = exec
.notes
.iter()
.map(|note| note.trim())
.filter(|note| !note.is_empty())
.map(|note| prompt::truncate_text(note, 120))
.collect::<Vec<_>>();
if !notes.is_empty() {
parts.push(format!("Agent notes: {}", notes.join("; ")));
}
if parts.is_empty() {
"Completed the step.".to_string()
} else {
prompt::truncate_text(&parts.join(" "), 240)
}
}
fn build_completion_summary(plan: &crate::model::Plan) -> String {
let completed_steps = plan
.steps
.iter()
.filter(|step| step.status.is_done())
.collect::<Vec<_>>();
if completed_steps.is_empty() {
return "Goal complete.".to_string();
}
let mut details = completed_steps
.iter()
.take(4)
.map(|step| {
let mut item = format!(
"{}: {}",
step.id,
prompt::truncate_text(&step.title, 80)
);
if !step.notes.trim().is_empty() {
item.push_str(" - ");
item.push_str(&prompt::truncate_text(&step.notes, 120));
}
item
})
.collect::<Vec<_>>();
let omitted = completed_steps.len().saturating_sub(details.len());
if omitted > 0 {
details.push(format!(
"... and {} more completed step{}",
omitted,
if omitted == 1 { "" } else { "s" }
));
}
prompt::truncate_text(
&format!(
"Completed {} step{}: {}",
completed_steps.len(),
if completed_steps.len() == 1 { "" } else { "s" },
details.join("; ")
),
320,
)
}
fn recover_stale_execution_state(
config: &TaskConfig,
plan: &mut crate::model::Plan,
state: &mut crate::model::ControllerState,
event_tx: &Sender<AppEvent>,
) -> Result<bool> {
let current_step_id = state.current_step_id.clone();
let has_stale_current_step = if let Some(current_step_id) = &current_step_id {
!plan.steps.iter().any(|step| {
step.id == *current_step_id
&& matches!(
step.status,
StepStatus::Todo | StepStatus::Active | StepStatus::Blocked
)
})
} else {
false
};
if !has_stale_current_step && state.current_step_id.is_some() {
return Ok(false);
}
let active_steps = plan.active_step_ids();
if !has_stale_current_step && active_steps.is_empty() {
return Ok(false);
}
for step_id in &active_steps {
plan.mark_todo(step_id);
plan.append_step_note(
step_id,
"Controller recovered this step from stale active state and returned it to todo.",
);
}
state.phase = ControllerPhase::Executing;
state.goal_status = GoalStatus::InProgress;
state.clear_stop_reason();
state.replan_required = false;
state.current_step_id = None;
let reason = if has_stale_current_step && !active_steps.is_empty() {
format!(
"Recovered stale execution state for {}. Cleared current_step_id {}. Reset {} to todo.",
config.controller_id(),
current_step_id.unwrap_or_default(),
active_steps.join(", ")
)
} else if has_stale_current_step {
format!(
"Recovered stale execution state for {}. Cleared current_step_id {}.",
config.controller_id(),
current_step_id.unwrap_or_default()
)
} else {
format!(
"Recovered stale active step state for {}. Reset {} to todo.",
config.controller_id(),
active_steps.join(", ")
)
};
state.notes.push(reason.clone());
toon::write_plan(&config.plan_file, plan)?;
toon::write_state(&config.state_file, state)?;
let _ = event_tx.send(AppEvent::Session(SessionEntry {
source: SessionSource::Controller,
stream: SessionStream::Status,
title: "Notice".to_string(),
tag: Some(config.controller_id()),
body: reason,
run_id: repo::next_run_id(),
}));
Ok(true)
}
#[cfg(test)]
mod tests {
use std::sync::mpsc;
use tempfile::tempdir;
use super::*;
use crate::model::{ControllerState, Plan, PlanStep, StepStatus};
use crate::storage::toon;
#[test]
fn recovers_stale_active_step_without_current_step() {
let temp = tempdir().expect("tempdir");
let mut config = TaskConfig::default_for("stale-active");
let root = temp.path().join(".agent/controllers/stale-active");
config.goal_file = root.join("goal.md");
config.plan_file = root.join("plan.toon");
config.state_file = root.join("state.toon");
config.standards_file = root.join("standards.md");
let mut plan = Plan {
version: 1,
goal_summary: "goal".to_string(),
steps: vec![PlanStep {
id: "s1".to_string(),
title: "Scope".to_string(),
status: StepStatus::Active,
attempts: 1,
..PlanStep::default()
}],
};
let mut state = ControllerState {
phase: ControllerPhase::Blocked,
goal_status: GoalStatus::Blocked,
..ControllerState::default()
};
state
.set_stop_reason("No actionable step remained and autonomous replan produced nothing.");
toon::ensure_controller_files(&config).expect("ensure files");
let (event_tx, event_rx) = mpsc::channel();
let recovered = recover_stale_execution_state(&config, &mut plan, &mut state, &event_tx)
.expect("recover");
assert!(recovered);
assert!(matches!(plan.steps[0].status, StepStatus::Todo));
assert!(plan.steps[0].notes.contains("stale active state"));
assert!(matches!(state.phase, ControllerPhase::Executing));
assert!(matches!(state.goal_status, GoalStatus::InProgress));
assert!(state.stop_reason.is_none());
let event = event_rx.recv().expect("notice event");
match event {
AppEvent::Session(entry) => assert!(entry.body.contains("Reset s1 to todo")),
other => panic!("unexpected event: {other:?}"),
}
}
#[test]
fn recovers_stale_current_step_reference() {
let temp = tempdir().expect("tempdir");
let mut config = TaskConfig::default_for("stale-current");
let root = temp.path().join(".agent/controllers/stale-current");
config.goal_file = root.join("goal.md");
config.plan_file = root.join("plan.toon");
config.state_file = root.join("state.toon");
config.standards_file = root.join("standards.md");
let mut plan = Plan {
version: 1,
goal_summary: "goal".to_string(),
steps: vec![PlanStep {
id: "s1".to_string(),
title: "Scope".to_string(),
status: StepStatus::Done,
..PlanStep::default()
}],
};
let mut state = ControllerState {
phase: ControllerPhase::Blocked,
goal_status: GoalStatus::Blocked,
current_step_id: Some("s1".to_string()),
..ControllerState::default()
};
toon::ensure_controller_files(&config).expect("ensure files");
let (event_tx, event_rx) = mpsc::channel();
let recovered = recover_stale_execution_state(&config, &mut plan, &mut state, &event_tx)
.expect("recover");
assert!(recovered);
assert!(matches!(state.current_step_id, None));
assert!(matches!(state.phase, ControllerPhase::Executing));
assert!(matches!(state.goal_status, GoalStatus::InProgress));
assert!(state.stop_reason.is_none());
let event = event_rx.recv().expect("notice event");
match event {
AppEvent::Session(entry) => {
assert!(entry.body.contains("Cleared current_step_id s1"));
}
other => panic!("unexpected event: {other:?}"),
}
}
#[test]
fn resumable_step_prefers_current_blocked_or_active_step() {
let plan = Plan {
version: 1,
goal_summary: "goal".to_string(),
steps: vec![
PlanStep {
id: "s1".to_string(),
title: "Scope".to_string(),
status: StepStatus::Blocked,
..PlanStep::default()
},
PlanStep {
id: "s2".to_string(),
title: "Other".to_string(),
status: StepStatus::Done,
..PlanStep::default()
},
],
};
let state = ControllerState {
phase: ControllerPhase::Blocked,
goal_status: GoalStatus::Blocked,
current_step_id: Some("s1".to_string()),
replan_required: true,
..ControllerState::default()
};
let resumed = resumable_step(&plan, &state).expect("expected resumable step");
assert_eq!(resumed.id, "s1");
}
#[test]
fn emit_snapshot_clones_current_plan_and_state() {
let (event_tx, event_rx) = mpsc::channel();
let plan = Plan {
version: 1,
goal_summary: "goal".to_string(),
steps: vec![PlanStep {
id: "s1".to_string(),
title: "Scope".to_string(),
status: StepStatus::Active,
..PlanStep::default()
}],
};
let state = ControllerState {
phase: ControllerPhase::Executing,
current_step_id: Some("s1".to_string()),
..ControllerState::default()
};
emit_snapshot(&event_tx, "goal body", "standards body", &plan, &state);
let event = event_rx.recv().expect("snapshot event");
match event {
AppEvent::Snapshot {
goal_md,
standards_md,
plan,
state,
} => {
assert_eq!(goal_md, "goal body");
assert_eq!(standards_md, "standards body");
assert!(matches!(plan.steps[0].status, StepStatus::Active));
assert_eq!(state.current_step_id.as_deref(), Some("s1"));
}
other => panic!("unexpected event: {other:?}"),
}
}
#[test]
fn completion_note_uses_execution_summary_and_notes() {
let note = completion_note(&crate::model::ExecutionResponse {
summary: "Implemented the board note flow".to_string(),
notes: vec![
"Kept the change localized to completion handling".to_string(),
"Verified the board still renders done steps".to_string(),
],
..crate::model::ExecutionResponse::default()
});
assert!(note.contains("Implemented the board note flow"));
assert!(note.contains("Agent notes:"));
assert!(note.contains("Kept the change localized"));
}
#[test]
fn build_completion_summary_lists_done_steps() {
let plan = Plan {
version: 1,
goal_summary: "goal".to_string(),
steps: vec![
PlanStep {
id: "s1".to_string(),
title: "First".to_string(),
notes: "Finished the first change.".to_string(),
status: StepStatus::Done,
..PlanStep::default()
},
PlanStep {
id: "s2".to_string(),
title: "Second".to_string(),
notes: "Finished the second change.".to_string(),
status: StepStatus::Done,
..PlanStep::default()
},
],
};
let summary = build_completion_summary(&plan);
assert!(summary.contains("Completed 2 steps"));
assert!(summary.contains("s1: First"));
assert!(summary.contains("s2: Second"));
}
}