use sqlx::Row; use tokio::process::Child; use tracing::error; use crate::Result; use crate::database::db_handle::DbHandle; use crate::nix_util::nix_output_stream::ActivityStartMessage; use crate::nix_util::nix_output_stream::NixAction; use crate::nix_util::nix_output_stream::NixOutputStream; use crate::nix_util::output_stream::OutputStream; use crate::nix_util::transparent_iter::TransparentIter; use super::activity::Activity; use super::activity::ActivityBuild; use super::activity::ActivityBuildWaiting; use super::activity::ActivityBuilds; use super::activity::ActivityCopyPath; use super::activity::ActivityCopyPaths; use super::activity::ActivityFetchTree; use super::activity::ActivityFileTransfer; use super::activity::ActivityOptimizeStore; use super::activity::ActivityPostBuildHook; use super::activity::ActivityQueryPathInfo; use super::activity::ActivityRealize; use super::activity::ActivityState; use super::activity::ActivitySubstitute; use super::activity::ActivityUnknown; use super::activity::ActivityVerifyPaths; use super::activity_tree::ActivityId; use super::activity_tree::ActivityTree; use super::activity_tree::ActivityTreeEntry; use super::nix_output_stream::ActivityResultMessage; use super::nix_output_stream::NixMessage; pub(crate) struct RunningBuild<'db> { db_handle: &'db DbHandle, activity_tree: ActivityTree, } impl<'db> RunningBuild<'db> { pub(crate) fn new(db_handle: &'db DbHandle) -> Result { Ok(RunningBuild { db_handle, activity_tree: ActivityTree::new(), }) } pub(crate) async fn run_to_completion( &mut self, mut child: Child, target_name: TN, ) -> Result<()> where TN: AsRef, { let build_id: i64 = sqlx::query( r#"INSERT INTO build (start_time, target) SELECT unixepoch('now'), ? RETURNING id"#, ) .bind(target_name.as_ref()) .fetch_one(&self.db_handle.conn) .await? .try_get("id")?; let output_stream = OutputStream::from_child(&mut child)?; let mut nix_output_stream: NixOutputStream = NixOutputStream::new(output_stream); let exit_status_handle = tokio::spawn(async move { let status = child .wait() .await .expect("nixos-rebuild encountered an error"); status }); while let Some(message) = nix_output_stream.next().await? { self.handle_message(message)?; } let exit_status = exit_status_handle.await?; println!("nix build status was: {}", exit_status); let update: u64 = sqlx::query(r#"UPDATE build SET end_time=unixepoch('now'), status=? WHERE id=?"#) .bind( exit_status .code() .expect("Process should have an exit code."), ) .bind(build_id) .execute(&self.db_handle.conn) .await? .rows_affected(); assert!(update == 1); Ok(()) } pub(crate) fn handle_message(&mut self, message: NixMessage) -> Result<()> { let message = match message { NixMessage::ParseFailure(line) => { error!("FAIL PARSE: {line}"); return Ok(()); } NixMessage::Generic(_value, line) => { error!("GENERIC PARSE: {line}"); return Ok(()); } NixMessage::Action(nix_action) => nix_action, }; match message { NixAction::Msg(msg_message) => { // For now we can ignore the messages. } NixAction::Start(activity_start_message) => { match activity_start_message { ActivityStartMessage::Unknown(activity_start_unknown) => { self.activity_tree.add_activity( activity_start_unknown.id, activity_start_unknown.parent, Activity::Unknown(ActivityUnknown { state: ActivityState::default(), text: activity_start_unknown.text, }), )?; } ActivityStartMessage::CopyPath(activity_start_copy_path) => { self.activity_tree.add_activity( activity_start_copy_path.id, activity_start_copy_path.parent, Activity::CopyPath(ActivityCopyPath { state: ActivityState::default(), missing_path: activity_start_copy_path.missing_path, source: activity_start_copy_path.source, destination: activity_start_copy_path.destination, done: 0, expected: 0, }), )?; } ActivityStartMessage::FileTransfer(activity_start_file_transfer) => { self.activity_tree.add_activity( activity_start_file_transfer.id, activity_start_file_transfer.parent, Activity::FileTransfer(ActivityFileTransfer { state: ActivityState::default(), url: activity_start_file_transfer.url, done: 0, expected: 0, }), )?; } ActivityStartMessage::Realize(activity_start_realize) => { self.activity_tree.add_activity( activity_start_realize.id, activity_start_realize.parent, Activity::Realize(ActivityRealize { state: ActivityState::default(), }), )?; } ActivityStartMessage::CopyPaths(activity_start_copy_paths) => { self.activity_tree.add_activity( activity_start_copy_paths.id, activity_start_copy_paths.parent, Activity::CopyPaths(ActivityCopyPaths { state: ActivityState::default(), text: activity_start_copy_paths.text, done: 0, expected: 0, running: 0, failed: 0, }), )?; } ActivityStartMessage::Builds(activity_start_builds) => { self.activity_tree.add_activity( activity_start_builds.id, activity_start_builds.parent, Activity::Builds(ActivityBuilds { state: ActivityState::default(), done: 0, expected: 0, running: 0, failed: 0, }), )?; } ActivityStartMessage::Build(activity_start_build) => { self.activity_tree.add_activity( activity_start_build.id, activity_start_build.parent, Activity::Build(ActivityBuild { state: ActivityState::default(), drv_path: activity_start_build.drv_path, machine_name: if activity_start_build.machine_name.len() > 0 { Some(activity_start_build.machine_name) } else { None }, phase: None, }), )?; } ActivityStartMessage::OptimizeStore(activity_start_optimize_store) => { self.activity_tree.add_activity( activity_start_optimize_store.id, activity_start_optimize_store.parent, Activity::OptimizeStore(ActivityOptimizeStore { state: ActivityState::default(), }), )?; } ActivityStartMessage::VerifyPaths(activity_start_verify_paths) => { self.activity_tree.add_activity( activity_start_verify_paths.id, activity_start_verify_paths.parent, Activity::VerifyPaths(ActivityVerifyPaths { state: ActivityState::default(), }), )?; } ActivityStartMessage::Substitute(activity_start_substitute) => { self.activity_tree.add_activity( activity_start_substitute.id, activity_start_substitute.parent, Activity::Substitute(ActivitySubstitute { state: ActivityState::default(), }), )?; } ActivityStartMessage::QueryPathInfo(activity_start_query_path_info) => { self.activity_tree.add_activity( activity_start_query_path_info.id, activity_start_query_path_info.parent, Activity::QueryPathInfo(ActivityQueryPathInfo { state: ActivityState::default(), }), )?; } ActivityStartMessage::PostBuildHook(activity_start_post_build_hook) => { self.activity_tree.add_activity( activity_start_post_build_hook.id, activity_start_post_build_hook.parent, Activity::PostBuildHook(ActivityPostBuildHook { state: ActivityState::default(), }), )?; } ActivityStartMessage::BuildWaiting(activity_start_build_waiting) => { self.activity_tree.add_activity( activity_start_build_waiting.id, 0, Activity::BuildWaiting(ActivityBuildWaiting { state: ActivityState::default(), text: activity_start_build_waiting.text, drv_path: activity_start_build_waiting.drv_path, path_resolved: activity_start_build_waiting.path_resolved, }), )?; } ActivityStartMessage::FetchTree(activity_start_fetch_tree) => { self.activity_tree.add_activity( activity_start_fetch_tree.id, activity_start_fetch_tree.parent, Activity::FetchTree(ActivityFetchTree { state: ActivityState::default(), }), )?; } }; // self.print_current_status(); } NixAction::Stop(stop_message) => { let activity = self .activity_tree .get_activity_id(stop_message.id) .map(|activity_id| self.activity_tree.get_mut(&activity_id))?; activity.get_mut_activity().stop(); // self.print_current_status(); // println!("{}", serde_json::to_string(&message)?); } NixAction::Result(activity_result_message) => { match activity_result_message { ActivityResultMessage::FileLinked(_activity_result_file_linked) => {} ActivityResultMessage::BuildLogLine(_activity_result_build_log_line) => { // These are the output from the actual build (as opposed to the output from nix). } ActivityResultMessage::UntrustedPath(_activity_result_untrusted_path) => {} ActivityResultMessage::CorruptedPath(_activity_result_corrupted_path) => {} ActivityResultMessage::SetPhase(activity_result_set_phase) => { let activity_id = self .activity_tree .get_activity_id(activity_result_set_phase.id)?; let activity = self.activity_tree.get_mut(&activity_id); activity .get_mut_activity() .set_phase(Some(activity_result_set_phase.phase)); } ActivityResultMessage::Progress(activity_result_progress) => { let activity_id = self .activity_tree .get_activity_id(activity_result_progress.id)?; let activity = self.activity_tree.get_mut(&activity_id); activity.get_mut_activity().set_progress( activity_result_progress.done, activity_result_progress.expected, activity_result_progress.running, activity_result_progress.failed, ); } ActivityResultMessage::SetExpected(activity_result_set_expected) => { let activity_id = self .activity_tree .get_activity_id(activity_result_set_expected.id)?; let activity = self.activity_tree.get_mut(&activity_id); activity .get_mut_activity() .set_expected(activity_result_set_expected.expected); } ActivityResultMessage::PostBuildLogLine( _activity_result_post_build_log_line, ) => {} ActivityResultMessage::FetchStatus(_activity_result_fetch_status) => {} }; } }; Ok(()) } fn print_current_status(&self) -> () { let mut tree = String::new(); let draw_order = self.get_draw_order(); for dag_entry in draw_order { let leading_bars = { let mut leading_bars = String::with_capacity(3 * dag_entry.depth.len()); for leading_bar in dag_entry .depth .iter() .map(|depth| if *depth { "𜹈 " } else { " " }) { leading_bars.push_str(leading_bar); } leading_bars }; let branch = if dag_entry.has_later_siblings { "𜸨" } else { "𜸛" }; let activity = self.activity_tree.get(&dag_entry.activity_id); let display_name = activity .get_activity() .display_name() .expect("Currently we always return a display name."); let activity_id = activity.get_activity_id(); let parent_id = activity.get_parent_id(); tree += &format!("{leading_bars}{branch}𜸟 {display_name} {parent_id} {activity_id}\n"); } if tree.is_empty() { println!("No active activities."); } else { print!("\n{}\n", tree); } } fn get_draw_order(&self) -> Vec { let mut draw_order: Vec = Vec::new(); let mut stack: Vec = self .get_children_in_order(self.activity_tree.get_root_id()) .filter_map(|child| { if child.get_activity().is_active() { Some(DrawStackEntry::HasNotVisitedChildren(DrawDagEntry { activity_id: child.get_activity_id().clone(), depth: Vec::new(), has_later_siblings: true, })) } else { None } }) .collect(); if stack.len() == 0 { return draw_order; } stack .last_mut() .expect("If-statement ensured this is Some()") .set_no_later_siblings(); while !stack.is_empty() { let current_entry = stack.pop().expect("While-loop ensured this is Some."); match current_entry { DrawStackEntry::HasNotVisitedChildren(draw_dag_entry) => { let current_id = draw_dag_entry.activity_id.clone(); let mut current_depth = draw_dag_entry.depth.clone(); current_depth.push(draw_dag_entry.has_later_siblings); stack.push(DrawStackEntry::VisitedChildren(draw_dag_entry)); let children: Vec = self .get_children_in_order(current_id) .filter_map(|child| { if child.get_activity().is_active() { Some(child.get_activity_id().clone()) } else { None } }) .collect(); let has_children = !children.is_empty(); stack.extend(children.into_iter().map(|child_id| { DrawStackEntry::HasNotVisitedChildren(DrawDagEntry { activity_id: child_id, depth: current_depth.clone(), has_later_siblings: true, }) })); if has_children { stack .last_mut() .expect("If-statement ensured this is Some()") .set_no_later_siblings(); } } DrawStackEntry::VisitedChildren(draw_dag_entry) => { draw_order.push(draw_dag_entry); } }; } draw_order } fn get_children_in_order( &self, parent_id: ActivityId, ) -> impl Iterator { let parent = self.activity_tree.get(&parent_id); parent .get_child_ids() .iter() .map(|child_id| self.activity_tree.get(child_id)) .flat_map(|child| TransparentIter::new(&self.activity_tree, child)) } } enum DrawStackEntry { HasNotVisitedChildren(DrawDagEntry), VisitedChildren(DrawDagEntry), } struct DrawDagEntry { activity_id: ActivityId, depth: Vec, has_later_siblings: bool, } impl DrawStackEntry { fn set_no_later_siblings(&mut self) -> () { match self { DrawStackEntry::HasNotVisitedChildren(draw_dag_entry) => { draw_dag_entry.has_later_siblings = false } DrawStackEntry::VisitedChildren(draw_dag_entry) => { draw_dag_entry.has_later_siblings = false } }; } }