use std::collections::BTreeMap; use sqlx::Row; use tokio::process::Child; use tracing::error; use tracing::warn; use crate::Result; use crate::database::db_handle::DbHandle; use crate::nix_util::nix_output_stream::NixAction; use crate::nix_util::nix_output_stream::NixOutputStream; use crate::nix_util::output_stream::OutputStream; use super::nix_output_stream::Field; use super::nix_output_stream::NixMessage; const ACTIVITY_TYPE_: i32 = 10; pub(crate) struct RunningBuild<'db> { db_handle: &'db DbHandle, activity_tree: BTreeMap, } impl<'db> RunningBuild<'db> { pub(crate) fn new(db_handle: &'db DbHandle) -> Result { Ok(RunningBuild { db_handle, activity_tree: BTreeMap::new(), }) } pub(crate) async fn run_to_completion( &mut self, mut child: Child, target_name: TN, ) -> Result<()> where TN: AsRef, { let build_id: i64 = sqlx::query( r#"INSERT INTO build (start_time, target) SELECT unixepoch('now'), ? RETURNING id"#, ) .bind(target_name.as_ref()) .fetch_one(&self.db_handle.conn) .await? .try_get("id")?; let output_stream = OutputStream::from_child(&mut child)?; let mut nix_output_stream: NixOutputStream = NixOutputStream::new(output_stream); let exit_status_handle = tokio::spawn(async move { let status = child .wait() .await .expect("nixos-rebuild encountered an error"); status }); while let Some(message) = nix_output_stream.next().await? { self.handle_message(message)?; } let exit_status = exit_status_handle.await?; println!("nixos-rebuild status was: {}", exit_status); let update: u64 = sqlx::query(r#"UPDATE build SET end_time=unixepoch('now'), status=? WHERE id=?"#) .bind( exit_status .code() .expect("Process should have an exit code."), ) .bind(build_id) .execute(&self.db_handle.conn) .await? .rows_affected(); assert!(update == 1); Ok(()) } pub(crate) fn handle_message(&mut self, message: NixMessage) -> Result<()> { let message = match message { NixMessage::ParseFailure(line) => { error!("FAIL PARSE: {line}"); return Ok(()); } NixMessage::Generic(_value, line) => { error!("GENERIC PARSE: {line}"); return Ok(()); } NixMessage::Action(nix_action) => nix_action, }; match &message { NixAction::Msg { level, msg, raw_msg, file, line, column, } => { // For now we can ignore the messages. } NixAction::Start { id, fields, level, parent, text, r#type, } => { let entry = self.activity_tree.entry(*id); let entry = match entry { std::collections::btree_map::Entry::Vacant(vacant_entry) => vacant_entry, std::collections::btree_map::Entry::Occupied(_occupied_entry) => { panic!("Started an already started activity: {id}.") } }; match r#type { 0 => { entry.insert(Activity::Unknown(ActivityUnknown { id: *id, parent: *parent, state: ActivityState::default(), level: *level, text: text.to_owned(), })); } 100 => { // TODO: Haven't seen any of these. warn!("Found CopyPath: {}", serde_json::to_string(&message)?); entry.insert(Activity::CopyPath(ActivityCopyPath { id: *id, parent: *parent, state: ActivityState::default(), })); } 101 => { match fields { Some(f) if f.len() > 1 => { warn!( "Found more than one field in ActivityFileTransfer: {}", serde_json::to_string(&message)? ); } _ => {} }; entry.insert(Activity::FileTransfer(ActivityFileTransfer { id: *id, parent: *parent, state: ActivityState::default(), level: *level, text: text.to_owned(), url: string_field(fields, 0).to_owned(), })); } 102 => { match fields { Some(f) if f.len() > 0 => { warn!( "Found fields in ActivityRealize: {}", serde_json::to_string(&message)? ); } _ => {} }; if text.len() > 0 { warn!( "Found Realize with text: {}", serde_json::to_string(&message)? ); } entry.insert(Activity::Realize(ActivityRealize { id: *id, parent: *parent, state: ActivityState::default(), level: *level, text: text.to_owned(), })); } 103 => { match fields { Some(f) if f.len() > 0 => { warn!( "Found fields in CopyPaths: {}", serde_json::to_string(&message)? ); } _ => {} }; if text.len() > 0 { warn!( "Found CopyPaths with text: {}", serde_json::to_string(&message)? ); } entry.insert(Activity::CopyPaths(ActivityCopyPaths { id: *id, parent: *parent, state: ActivityState::default(), level: *level, text: text.to_owned(), })); } 104 => { match fields { Some(f) if f.len() > 0 => { warn!( "Found fields in Builds: {}", serde_json::to_string(&message)? ); } _ => {} }; if text.len() > 0 { warn!( "Found Builds with text: {}", serde_json::to_string(&message)? ); } entry.insert(Activity::Builds(ActivityBuilds { id: *id, parent: *parent, state: ActivityState::default(), level: *level, text: text.to_owned(), })); } 105 => { // TODO: What are the other fields ["/nix/store/j54kvd8mlj8cl9ycvlkh5987fqvzl4p5-m4-1.4.20.tar.bz2.drv","",1,1] entry.insert(Activity::Build(ActivityBuild { id: *id, parent: *parent, state: ActivityState::default(), level: *level, text: text.to_owned(), path: string_field(fields, 0).to_owned(), })); } 106 => { // TODO: Haven't seen any of these. warn!("Found OptimizeStore: {}", serde_json::to_string(&message)?); entry.insert(Activity::OptimizeStore(ActivityOptimizeStore { id: *id, parent: *parent, state: ActivityState::default(), })); } 107 => { // TODO: Haven't seen any of these. warn!("Found VerifyPath: {}", serde_json::to_string(&message)?); entry.insert(Activity::VerifyPaths(ActivityVerifyPaths { id: *id, parent: *parent, state: ActivityState::default(), })); } 108 => { // TODO: Haven't seen any of these. warn!("Found Subtitute: {}", serde_json::to_string(&message)?); entry.insert(Activity::Substitute(ActivitySubstitute { id: *id, parent: *parent, state: ActivityState::default(), })); } 109 => { // TODO: Haven't seen any of these. warn!("Found QueryPathInfo: {}", serde_json::to_string(&message)?); entry.insert(Activity::QueryPathInfo(ActivityQueryPathInfo { id: *id, parent: *parent, state: ActivityState::default(), })); } 110 => { // TODO: Haven't seen any of these. warn!("Found PostBuildHook: {}", serde_json::to_string(&message)?); entry.insert(Activity::PostBuildHook(ActivityPostBuildHook { id: *id, parent: *parent, state: ActivityState::default(), })); } 111 => { // TODO: Haven't seen any of these. warn!("Found BuildWaiting: {}", serde_json::to_string(&message)?); entry.insert(Activity::BuildWaiting(ActivityBuildWaiting { id: *id, parent: *parent, state: ActivityState::default(), })); } 112 => { // TODO: Haven't seen any of these. warn!("Found FetchTree: {}", serde_json::to_string(&message)?); entry.insert(Activity::FetchTree(ActivityFetchTree { id: *id, parent: *parent, state: ActivityState::default(), })); } _ => { panic!( "Unhandled start activity: {}", serde_json::to_string(&message)? ); } }; self.print_current_status(); } NixAction::Stop { id } => { let entry = self.activity_tree.entry(*id); match entry { std::collections::btree_map::Entry::Vacant(_vacant_entry) => { panic!("Stopped an activity that is not in the tree: {id}"); } std::collections::btree_map::Entry::Occupied(mut occupied_entry) => { occupied_entry.get_mut().stop(); } }; self.print_current_status(); // println!("{}", serde_json::to_string(&message)?); } NixAction::Result { id, fields, r#type } => { match r#type { 100 => { // FileLinked // TODO: Haven't seen any of these. warn!("Found FileLinked: {}", serde_json::to_string(&message)?); } 101 => { // BuildLogLine // The first field is a string containing the log line } 102 => { // UntrustedPath // TODO: Haven't seen any of these. warn!("Found UntrustedPath: {}", serde_json::to_string(&message)?); } 103 => { // CorruptedPath // TODO: Haven't seen any of these. warn!("Found CorruptedPath: {}", serde_json::to_string(&message)?); } 104 => { // SetPhase // The first field is the phase name } 105 => { // Progress // Fields numerator, denominator, running?, failed? } 106 => { // SetExpected // Fields activity type?, expected? } 107 => { // PostBuildLogLine // TODO: Haven't seen any of these. warn!( "Found PostBuildLogLine: {}", serde_json::to_string(&message)? ); } 108 => { // FetchStatus // TODO: Haven't seen any of these. warn!("Found FetchStatus: {}", serde_json::to_string(&message)?); // println!("{}", serde_json::to_string(&message)?); } _ => { panic!("Unhandled result: {}", serde_json::to_string(&message)?); } }; } }; Ok(()) } fn print_current_status(&self) -> () { // let in_progress = self // .activity_tree // .iter() // .filter(|activity: &(&u64, &ActivityTreeEntry)| activity.1.is_active()); // let names: Vec<&str> = in_progress // .map(|activity| match activity.1 { // ActivityTreeEntry::Unknown { // id, // parent, // r#type, // state, // } => "unknown", // ActivityTreeEntry::System { // id, // parent, // r#type, // state, // } => "system", // ActivityTreeEntry::Download { // id, // parent, // r#type, // state, // url, // } => url, // ActivityTreeEntry::Build { // id, // parent, // r#type, // state, // path, // } => path, // }) // .collect(); // let name_list = names.join(", "); // println!("In progress: {name_list}"); } } enum ActivityState { Started { progress_numerator: u64, progress_denominator: u64, progress_running: u64, progress_failed: u64, }, Stopped, } impl Default for ActivityState { fn default() -> Self { ActivityState::Started { progress_numerator: 0, progress_denominator: 0, progress_running: 0, progress_failed: 0, } } } #[repr(u8)] enum Activity { Unknown(ActivityUnknown), CopyPath(ActivityCopyPath), FileTransfer(ActivityFileTransfer), Realize(ActivityRealize), CopyPaths(ActivityCopyPaths), Builds(ActivityBuilds), Build(ActivityBuild), OptimizeStore(ActivityOptimizeStore), VerifyPaths(ActivityVerifyPaths), Substitute(ActivitySubstitute), QueryPathInfo(ActivityQueryPathInfo), PostBuildHook(ActivityPostBuildHook), BuildWaiting(ActivityBuildWaiting), FetchTree(ActivityFetchTree), } impl Activity { fn get_type(&self) -> u8 { match self { Activity::Unknown(_) => 0, Activity::CopyPath(_) => 100, Activity::FileTransfer(_) => 101, Activity::Realize(_) => 102, Activity::CopyPaths(_) => 103, Activity::Builds(_) => 104, Activity::Build(_) => 105, Activity::OptimizeStore(_) => 106, Activity::VerifyPaths(_) => 107, Activity::Substitute(_) => 108, Activity::QueryPathInfo(_) => 109, Activity::PostBuildHook(_) => 110, Activity::BuildWaiting(_) => 111, Activity::FetchTree(_) => 112, } } fn stop(&mut self) -> () { match self { Activity::Unknown(activity_unknown) => { activity_unknown.state = ActivityState::Stopped; } Activity::CopyPath(activity_copy_path) => { activity_copy_path.state = ActivityState::Stopped; } Activity::FileTransfer(activity_file_transfer) => { activity_file_transfer.state = ActivityState::Stopped; } Activity::Realize(activity_realize) => { activity_realize.state = ActivityState::Stopped; } Activity::CopyPaths(activity_copy_paths) => { activity_copy_paths.state = ActivityState::Stopped; } Activity::Builds(activity_builds) => { activity_builds.state = ActivityState::Stopped; } Activity::Build(activity_build) => { activity_build.state = ActivityState::Stopped; } Activity::OptimizeStore(activity_optimize_store) => { activity_optimize_store.state = ActivityState::Stopped; } Activity::VerifyPaths(activity_verify_paths) => { activity_verify_paths.state = ActivityState::Stopped; } Activity::Substitute(activity_substitute) => { activity_substitute.state = ActivityState::Stopped; } Activity::QueryPathInfo(activity_query_path_info) => { activity_query_path_info.state = ActivityState::Stopped; } Activity::PostBuildHook(activity_post_build_hook) => { activity_post_build_hook.state = ActivityState::Stopped; } Activity::BuildWaiting(activity_build_waiting) => { activity_build_waiting.state = ActivityState::Stopped; } Activity::FetchTree(activity_fetch_tree) => { activity_fetch_tree.state = ActivityState::Stopped; } } } } struct ActivityUnknown { id: u64, parent: u64, state: ActivityState, level: u8, text: String, } struct ActivityCopyPath { id: u64, parent: u64, state: ActivityState, } struct ActivityFileTransfer { id: u64, parent: u64, state: ActivityState, level: u8, text: String, url: String, } struct ActivityRealize { id: u64, parent: u64, state: ActivityState, level: u8, text: String, } struct ActivityCopyPaths { id: u64, parent: u64, state: ActivityState, level: u8, text: String, } struct ActivityBuilds { id: u64, parent: u64, state: ActivityState, level: u8, text: String, } struct ActivityBuild { id: u64, parent: u64, state: ActivityState, level: u8, text: String, path: String, } struct ActivityOptimizeStore { id: u64, parent: u64, state: ActivityState, } struct ActivityVerifyPaths { id: u64, parent: u64, state: ActivityState, } struct ActivitySubstitute { id: u64, parent: u64, state: ActivityState, } struct ActivityQueryPathInfo { id: u64, parent: u64, state: ActivityState, } struct ActivityPostBuildHook { id: u64, parent: u64, state: ActivityState, } struct ActivityBuildWaiting { id: u64, parent: u64, state: ActivityState, } struct ActivityFetchTree { id: u64, parent: u64, state: ActivityState, } enum ActivityResult { FileLinked(ResultFileLinked), BuildLogLine(ResultBuildLogLine), UntrustedPath(ResultUntrustedPath), CorruptedPath(ResultCorruptedPath), SetPhase(ResultSetPhase), Progress(ResultProgress), SetExpected(ResultSetExpected), PostBuildLogLine(ResultPostBuildLogLine), FetchStatus(ResultFetchStatus), } impl ActivityResult { fn get_type(&self) -> u8 { match self { ActivityResult::FileLinked(_) => 100, ActivityResult::BuildLogLine(_) => 101, ActivityResult::UntrustedPath(_) => 102, ActivityResult::CorruptedPath(_) => 103, ActivityResult::SetPhase(_) => 104, ActivityResult::Progress(_) => 105, ActivityResult::SetExpected(_) => 106, ActivityResult::PostBuildLogLine(_) => 107, ActivityResult::FetchStatus(_) => 108, } } } struct ResultFileLinked {} struct ResultBuildLogLine {} struct ResultUntrustedPath {} struct ResultCorruptedPath {} struct ResultSetPhase {} struct ResultProgress {} struct ResultSetExpected {} struct ResultPostBuildLogLine {} struct ResultFetchStatus {} fn string_field(fields: &Option>, ind: usize) -> &String { match fields { Some(fields) => match &fields[ind] { Field::Number(_n) => panic!("Expected field {ind} to be text, but it is a number."), Field::Text(t) => t, }, None => panic!("Expected fields but no fields present."), } }