use std::borrow::Cow; use std::collections::BTreeMap; use sqlx::Row; use tokio::process::Child; use tracing::error; use crate::Result; use crate::database::db_handle::DbHandle; use crate::nix_util::nix_output_stream::ActivityStartMessage; use crate::nix_util::nix_output_stream::NixAction; use crate::nix_util::nix_output_stream::NixOutputStream; use crate::nix_util::output_stream::OutputStream; use super::nix_output_stream::ActivityResultMessage; use super::nix_output_stream::NixMessage; pub(crate) struct RunningBuild<'db> { db_handle: &'db DbHandle, activity_tree: BTreeMap, } impl<'db> RunningBuild<'db> { pub(crate) fn new(db_handle: &'db DbHandle) -> Result { Ok(RunningBuild { db_handle, activity_tree: BTreeMap::new(), }) } pub(crate) async fn run_to_completion( &mut self, mut child: Child, target_name: TN, ) -> Result<()> where TN: AsRef, { let build_id: i64 = sqlx::query( r#"INSERT INTO build (start_time, target) SELECT unixepoch('now'), ? RETURNING id"#, ) .bind(target_name.as_ref()) .fetch_one(&self.db_handle.conn) .await? .try_get("id")?; let output_stream = OutputStream::from_child(&mut child)?; let mut nix_output_stream: NixOutputStream = NixOutputStream::new(output_stream); let exit_status_handle = tokio::spawn(async move { let status = child .wait() .await .expect("nixos-rebuild encountered an error"); status }); while let Some(message) = nix_output_stream.next().await? { self.handle_message(message)?; } let exit_status = exit_status_handle.await?; println!("nix build status was: {}", exit_status); let update: u64 = sqlx::query(r#"UPDATE build SET end_time=unixepoch('now'), status=? WHERE id=?"#) .bind( exit_status .code() .expect("Process should have an exit code."), ) .bind(build_id) .execute(&self.db_handle.conn) .await? .rows_affected(); assert!(update == 1); Ok(()) } pub(crate) fn handle_message(&mut self, message: NixMessage) -> Result<()> { let message = match message { NixMessage::ParseFailure(line) => { error!("FAIL PARSE: {line}"); return Ok(()); } NixMessage::Generic(_value, line) => { error!("GENERIC PARSE: {line}"); return Ok(()); } NixMessage::Action(nix_action) => nix_action, }; match message { NixAction::Msg(msg_message) => { // For now we can ignore the messages. } NixAction::Start(activity_start_message) => { let id = activity_start_message.get_id(); let entry = self.activity_tree.entry(id); let entry = match entry { std::collections::btree_map::Entry::Vacant(vacant_entry) => vacant_entry, std::collections::btree_map::Entry::Occupied(_occupied_entry) => { panic!("Started an already started activity: {id}.") } }; match activity_start_message { ActivityStartMessage::Unknown(activity_start_unknown) => { entry.insert(Activity::Unknown(ActivityUnknown { id: activity_start_unknown.id, parent: activity_start_unknown.parent, state: ActivityState::default(), level: activity_start_unknown.level, text: activity_start_unknown.text, })); } ActivityStartMessage::CopyPath(activity_start_copy_path) => { entry.insert(Activity::CopyPath(ActivityCopyPath { id: activity_start_copy_path.id, parent: activity_start_copy_path.parent, state: ActivityState::default(), })); } ActivityStartMessage::FileTransfer(activity_start_file_transfer) => { entry.insert(Activity::FileTransfer(ActivityFileTransfer { id: activity_start_file_transfer.id, parent: activity_start_file_transfer.parent, state: ActivityState::default(), text: activity_start_file_transfer.text, url: activity_start_file_transfer.url, })); } ActivityStartMessage::Realize(activity_start_realize) => { entry.insert(Activity::Realize(ActivityRealize { id: activity_start_realize.id, parent: activity_start_realize.parent, state: ActivityState::default(), })); } ActivityStartMessage::CopyPaths(activity_start_copy_paths) => { entry.insert(Activity::CopyPaths(ActivityCopyPaths { id: activity_start_copy_paths.id, parent: activity_start_copy_paths.parent, state: ActivityState::default(), level: activity_start_copy_paths.level, text: activity_start_copy_paths.text, })); } ActivityStartMessage::Builds(activity_start_builds) => { entry.insert(Activity::Builds(ActivityBuilds { id: activity_start_builds.id, parent: activity_start_builds.parent, state: ActivityState::default(), })); } ActivityStartMessage::Build(activity_start_build) => { entry.insert(Activity::Build(ActivityBuild { id: activity_start_build.id, parent: activity_start_build.parent, state: ActivityState::default(), text: activity_start_build.text, drv_path: activity_start_build.drv_path, })); } ActivityStartMessage::OptimizeStore(activity_start_optimize_store) => { entry.insert(Activity::OptimizeStore(ActivityOptimizeStore { id: activity_start_optimize_store.id, parent: activity_start_optimize_store.parent, state: ActivityState::default(), })); } ActivityStartMessage::VerifyPaths(activity_start_verify_paths) => { entry.insert(Activity::VerifyPaths(ActivityVerifyPaths { id: activity_start_verify_paths.id, parent: activity_start_verify_paths.parent, state: ActivityState::default(), })); } ActivityStartMessage::Substitute(activity_start_substitute) => { entry.insert(Activity::Substitute(ActivitySubstitute { id: activity_start_substitute.id, parent: activity_start_substitute.parent, state: ActivityState::default(), })); } ActivityStartMessage::QueryPathInfo(activity_start_query_path_info) => { entry.insert(Activity::QueryPathInfo(ActivityQueryPathInfo { id: activity_start_query_path_info.id, parent: activity_start_query_path_info.parent, state: ActivityState::default(), })); } ActivityStartMessage::PostBuildHook(activity_start_post_build_hook) => { entry.insert(Activity::PostBuildHook(ActivityPostBuildHook { id: activity_start_post_build_hook.id, parent: activity_start_post_build_hook.parent, state: ActivityState::default(), })); } ActivityStartMessage::BuildWaiting(activity_start_build_waiting) => { entry.insert(Activity::BuildWaiting(ActivityBuildWaiting { id: activity_start_build_waiting.id, state: ActivityState::default(), })); } ActivityStartMessage::FetchTree(activity_start_fetch_tree) => { entry.insert(Activity::FetchTree(ActivityFetchTree { id: activity_start_fetch_tree.id, parent: activity_start_fetch_tree.parent, state: ActivityState::default(), })); } }; self.print_current_status(); } NixAction::Stop(stop_message) => { let entry = self.activity_tree.entry(stop_message.id); match entry { std::collections::btree_map::Entry::Vacant(_vacant_entry) => { panic!( "Stopped an activity that is not in the tree: {}", stop_message.id ); } std::collections::btree_map::Entry::Occupied(mut occupied_entry) => { occupied_entry.get_mut().stop(); } }; self.print_current_status(); // println!("{}", serde_json::to_string(&message)?); } NixAction::Result(activity_result_message) => { match activity_result_message { ActivityResultMessage::FileLinked(activity_result_file_linked) => { // FileLinked // TODO: Haven't seen any of these. // warn!("Found FileLinked: {}", serde_json::to_string(&message)?); } ActivityResultMessage::BuildLogLine(activity_result_build_log_line) => { // BuildLogLine // The first field is a string containing the log line } ActivityResultMessage::UntrustedPath(activity_result_untrusted_path) => { // UntrustedPath // TODO: Haven't seen any of these. // warn!("Found UntrustedPath: {}", serde_json::to_string(&message)?); } ActivityResultMessage::CorruptedPath(activity_result_corrupted_path) => { // CorruptedPath // TODO: Haven't seen any of these. // warn!("Found CorruptedPath: {}", serde_json::to_string(&message)?); } ActivityResultMessage::SetPhase(activity_result_set_phase) => { // SetPhase // The first field is the phase name } ActivityResultMessage::Progress(activity_result_progress) => { // Progress // Fields numerator, denominator, running?, failed? } ActivityResultMessage::SetExpected(activity_result_set_expected) => { // SetExpected // Fields activity type?, expected? } ActivityResultMessage::PostBuildLogLine( activity_result_post_build_log_line, ) => { // PostBuildLogLine // TODO: Haven't seen any of these. // warn!( // "Found PostBuildLogLine: {}", // serde_json::to_string(&message)? // ); } ActivityResultMessage::FetchStatus(activity_result_fetch_status) => { // FetchStatus // TODO: Haven't seen any of these. // warn!("Found FetchStatus: {}", serde_json::to_string(&message)?); // println!("{}", serde_json::to_string(&message)?); } }; } }; Ok(()) } fn print_current_status(&self) -> () { let in_progress = self .activity_tree .iter() .filter(|activity: &(&u64, &Activity)| activity.1.is_active()); let names: Vec> = in_progress .filter_map(|(id, activity)| activity.display_name()) .collect(); let name_list = names.join(", "); // TODO: Make a meaningful current status. // println!("In progress: {name_list}"); } } enum ActivityState { Started { progress_numerator: u64, progress_denominator: u64, progress_running: u64, progress_failed: u64, }, Stopped, } impl Default for ActivityState { fn default() -> Self { ActivityState::Started { progress_numerator: 0, progress_denominator: 0, progress_running: 0, progress_failed: 0, } } } #[repr(u8)] enum Activity { Unknown(ActivityUnknown), CopyPath(ActivityCopyPath), FileTransfer(ActivityFileTransfer), Realize(ActivityRealize), CopyPaths(ActivityCopyPaths), Builds(ActivityBuilds), Build(ActivityBuild), OptimizeStore(ActivityOptimizeStore), VerifyPaths(ActivityVerifyPaths), Substitute(ActivitySubstitute), QueryPathInfo(ActivityQueryPathInfo), PostBuildHook(ActivityPostBuildHook), BuildWaiting(ActivityBuildWaiting), FetchTree(ActivityFetchTree), } impl Activity { fn stop(&mut self) -> () { match self { Activity::Unknown(activity_unknown) => { activity_unknown.state = ActivityState::Stopped; } Activity::CopyPath(activity_copy_path) => { activity_copy_path.state = ActivityState::Stopped; } Activity::FileTransfer(activity_file_transfer) => { activity_file_transfer.state = ActivityState::Stopped; } Activity::Realize(activity_realize) => { activity_realize.state = ActivityState::Stopped; } Activity::CopyPaths(activity_copy_paths) => { activity_copy_paths.state = ActivityState::Stopped; } Activity::Builds(activity_builds) => { activity_builds.state = ActivityState::Stopped; } Activity::Build(activity_build) => { activity_build.state = ActivityState::Stopped; } Activity::OptimizeStore(activity_optimize_store) => { activity_optimize_store.state = ActivityState::Stopped; } Activity::VerifyPaths(activity_verify_paths) => { activity_verify_paths.state = ActivityState::Stopped; } Activity::Substitute(activity_substitute) => { activity_substitute.state = ActivityState::Stopped; } Activity::QueryPathInfo(activity_query_path_info) => { activity_query_path_info.state = ActivityState::Stopped; } Activity::PostBuildHook(activity_post_build_hook) => { activity_post_build_hook.state = ActivityState::Stopped; } Activity::BuildWaiting(activity_build_waiting) => { activity_build_waiting.state = ActivityState::Stopped; } Activity::FetchTree(activity_fetch_tree) => { activity_fetch_tree.state = ActivityState::Stopped; } } } fn is_active(&self) -> bool { match self { Activity::Unknown(activity_unknown) => { matches!(activity_unknown.state, ActivityState::Started { .. }) } Activity::CopyPath(activity_copy_path) => { matches!(activity_copy_path.state, ActivityState::Started { .. }) } Activity::FileTransfer(activity_file_transfer) => { matches!(activity_file_transfer.state, ActivityState::Started { .. }) } Activity::Realize(activity_realize) => { matches!(activity_realize.state, ActivityState::Started { .. }) } Activity::CopyPaths(activity_copy_paths) => { matches!(activity_copy_paths.state, ActivityState::Started { .. }) } Activity::Builds(activity_builds) => { matches!(activity_builds.state, ActivityState::Started { .. }) } Activity::Build(activity_build) => { matches!(activity_build.state, ActivityState::Started { .. }) } Activity::OptimizeStore(activity_optimize_store) => { matches!(activity_optimize_store.state, ActivityState::Started { .. }) } Activity::VerifyPaths(activity_verify_paths) => { matches!(activity_verify_paths.state, ActivityState::Started { .. }) } Activity::Substitute(activity_substitute) => { matches!(activity_substitute.state, ActivityState::Started { .. }) } Activity::QueryPathInfo(activity_query_path_info) => { matches!( activity_query_path_info.state, ActivityState::Started { .. } ) } Activity::PostBuildHook(activity_post_build_hook) => { matches!( activity_post_build_hook.state, ActivityState::Started { .. } ) } Activity::BuildWaiting(activity_build_waiting) => { matches!(activity_build_waiting.state, ActivityState::Started { .. }) } Activity::FetchTree(activity_fetch_tree) => { matches!(activity_fetch_tree.state, ActivityState::Started { .. }) } } } fn display_name(&self) -> Option> { match self { Activity::Unknown(activity_unknown) => { // TODO Some(Cow::Borrowed("Unknown")) } Activity::CopyPath(activity_copy_path) => { // TODO Some(Cow::Borrowed("CopyPath")) } Activity::FileTransfer(activity_file_transfer) => { // TODO Some(Cow::Borrowed("FileTransfer")) } Activity::Realize(activity_realize) => { // TODO Some(Cow::Borrowed("Realize")) } Activity::CopyPaths(activity_copy_paths) => { // TODO Some(Cow::Borrowed("CopyPaths")) } Activity::Builds(activity_builds) => { // TODO Some(Cow::Borrowed("Builds")) } Activity::Build(activity_build) => { // TODO Some(Cow::Borrowed("Build")) } Activity::OptimizeStore(activity_optimize_store) => { // TODO Some(Cow::Borrowed("OptimizeStore")) } Activity::VerifyPaths(activity_verify_paths) => { // TODO Some(Cow::Borrowed("VerifyPaths")) } Activity::Substitute(activity_substitute) => { // TODO Some(Cow::Borrowed("Substitute")) } Activity::QueryPathInfo(activity_query_path_info) => { // TODO Some(Cow::Borrowed("QueryPathInfo")) } Activity::PostBuildHook(activity_post_build_hook) => { // TODO Some(Cow::Borrowed("PostBuildHook")) } Activity::BuildWaiting(activity_build_waiting) => { // TODO Some(Cow::Borrowed("BuildWaiting")) } Activity::FetchTree(activity_fetch_tree) => { // TODO Some(Cow::Borrowed("FetchTree")) } } } } struct ActivityUnknown { id: u64, parent: u64, state: ActivityState, level: u8, text: String, } struct ActivityCopyPath { id: u64, parent: u64, state: ActivityState, } struct ActivityFileTransfer { id: u64, parent: u64, state: ActivityState, text: String, url: String, } struct ActivityRealize { id: u64, parent: u64, state: ActivityState, } struct ActivityCopyPaths { id: u64, parent: u64, state: ActivityState, level: u8, text: String, } struct ActivityBuilds { id: u64, parent: u64, state: ActivityState, } struct ActivityBuild { id: u64, parent: u64, state: ActivityState, text: String, drv_path: String, } struct ActivityOptimizeStore { id: u64, parent: u64, state: ActivityState, } struct ActivityVerifyPaths { id: u64, parent: u64, state: ActivityState, } struct ActivitySubstitute { id: u64, parent: u64, state: ActivityState, } struct ActivityQueryPathInfo { id: u64, parent: u64, state: ActivityState, } struct ActivityPostBuildHook { id: u64, parent: u64, state: ActivityState, } struct ActivityBuildWaiting { id: u64, state: ActivityState, } struct ActivityFetchTree { id: u64, parent: u64, state: ActivityState, }