Files
nix_builder/src/nix_util/running_build.rs
2026-03-31 22:46:51 -04:00

485 lines
20 KiB
Rust

use std::borrow::Cow;
use sqlx::Row;
use tokio::process::Child;
use tracing::error;
use tracing::info;
use crate::Result;
use crate::database::db_handle::DbHandle;
use crate::nix_util::nix_output_stream::ActivityStartMessage;
use crate::nix_util::nix_output_stream::NixAction;
use crate::nix_util::nix_output_stream::NixOutputStream;
use crate::nix_util::output_stream::OutputStream;
use crate::nix_util::transparent_iter::TransparentIter;
use super::activity::Activity;
use super::activity::ActivityBuild;
use super::activity::ActivityBuildWaiting;
use super::activity::ActivityBuilds;
use super::activity::ActivityCopyPath;
use super::activity::ActivityCopyPaths;
use super::activity::ActivityFetchTree;
use super::activity::ActivityFileTransfer;
use super::activity::ActivityOptimizeStore;
use super::activity::ActivityPostBuildHook;
use super::activity::ActivityQueryPathInfo;
use super::activity::ActivityRealize;
use super::activity::ActivityState;
use super::activity::ActivitySubstitute;
use super::activity::ActivityUnknown;
use super::activity::ActivityVerifyPaths;
use super::activity_tree::ActivityId;
use super::activity_tree::ActivityTree;
use super::activity_tree::ActivityTreeEntry;
use super::nix_output_stream::ActivityResultMessage;
use super::nix_output_stream::NixMessage;
pub(crate) struct RunningBuild<'db> {
db_handle: &'db DbHandle,
activity_tree: ActivityTree,
}
impl<'db> RunningBuild<'db> {
pub(crate) fn new(db_handle: &'db DbHandle) -> Result<Self> {
Ok(RunningBuild {
db_handle,
activity_tree: ActivityTree::new(),
})
}
pub(crate) async fn run_to_completion<TN>(
&mut self,
mut child: Child,
target_name: TN,
) -> Result<()>
where
TN: AsRef<str>,
{
let build_id: i64 = sqlx::query(
r#"INSERT INTO build (start_time, target) SELECT unixepoch('now'), ? RETURNING id"#,
)
.bind(target_name.as_ref())
.fetch_one(&self.db_handle.conn)
.await?
.try_get("id")?;
let output_stream = OutputStream::from_child(&mut child)?;
let mut nix_output_stream: NixOutputStream<OutputStream> =
NixOutputStream::new(output_stream);
let exit_status_handle = tokio::spawn(async move {
let status = child
.wait()
.await
.expect("nixos-rebuild encountered an error");
status
});
while let Some(message) = nix_output_stream.next().await? {
self.handle_message(message)?;
}
let exit_status = exit_status_handle.await?;
println!("nix build status was: {}", exit_status);
let update: u64 =
sqlx::query(r#"UPDATE build SET end_time=unixepoch('now'), status=? WHERE id=?"#)
.bind(
exit_status
.code()
.expect("Process should have an exit code."),
)
.bind(build_id)
.execute(&self.db_handle.conn)
.await?
.rows_affected();
assert!(update == 1);
Ok(())
}
pub(crate) fn handle_message(&mut self, message: NixMessage) -> Result<()> {
let message = match message {
NixMessage::ParseFailure(line) => {
error!("FAIL PARSE: {line}");
return Ok(());
}
NixMessage::Generic(_value, line) => {
error!("GENERIC PARSE: {line}");
return Ok(());
}
NixMessage::Action(nix_action) => nix_action,
};
match message {
NixAction::Msg(msg_message) => {
if msg_message.msg.contains("nix log") {
eprintln!("{}", msg_message.msg);
}
// if msg_message.level == 0 {
// eprintln!("{}", msg_message.msg);
// }
}
NixAction::Start(activity_start_message) => {
match activity_start_message {
ActivityStartMessage::Unknown(activity_start_unknown) => {
self.activity_tree.add_activity(
activity_start_unknown.id,
activity_start_unknown.parent,
Activity::Unknown(ActivityUnknown {
state: ActivityState::default(),
text: activity_start_unknown.text,
}),
)?;
}
ActivityStartMessage::CopyPath(activity_start_copy_path) => {
self.activity_tree.add_activity(
activity_start_copy_path.id,
activity_start_copy_path.parent,
Activity::CopyPath(ActivityCopyPath {
state: ActivityState::default(),
missing_path: activity_start_copy_path.missing_path,
source: activity_start_copy_path.source,
destination: activity_start_copy_path.destination,
done: 0,
expected: 0,
}),
)?;
}
ActivityStartMessage::FileTransfer(activity_start_file_transfer) => {
self.activity_tree.add_activity(
activity_start_file_transfer.id,
activity_start_file_transfer.parent,
Activity::FileTransfer(ActivityFileTransfer {
state: ActivityState::default(),
url: activity_start_file_transfer.url,
done: 0,
expected: 0,
}),
)?;
}
ActivityStartMessage::Realize(activity_start_realize) => {
self.activity_tree.add_activity(
activity_start_realize.id,
activity_start_realize.parent,
Activity::Realize(ActivityRealize {
state: ActivityState::default(),
}),
)?;
}
ActivityStartMessage::CopyPaths(activity_start_copy_paths) => {
self.activity_tree.add_activity(
activity_start_copy_paths.id,
activity_start_copy_paths.parent,
Activity::CopyPaths(ActivityCopyPaths {
state: ActivityState::default(),
text: activity_start_copy_paths.text,
done: 0,
expected: 0,
running: 0,
failed: 0,
}),
)?;
}
ActivityStartMessage::Builds(activity_start_builds) => {
self.activity_tree.add_activity(
activity_start_builds.id,
activity_start_builds.parent,
Activity::Builds(ActivityBuilds {
state: ActivityState::default(),
done: 0,
expected: 0,
running: 0,
failed: 0,
}),
)?;
}
ActivityStartMessage::Build(activity_start_build) => {
self.activity_tree.add_activity(
activity_start_build.id,
activity_start_build.parent,
Activity::Build(ActivityBuild {
state: ActivityState::default(),
drv_path: activity_start_build.drv_path,
machine_name: if activity_start_build.machine_name.len() > 0 {
Some(activity_start_build.machine_name)
} else {
None
},
phase: None,
}),
)?;
}
ActivityStartMessage::OptimizeStore(activity_start_optimize_store) => {
self.activity_tree.add_activity(
activity_start_optimize_store.id,
activity_start_optimize_store.parent,
Activity::OptimizeStore(ActivityOptimizeStore {
state: ActivityState::default(),
}),
)?;
}
ActivityStartMessage::VerifyPaths(activity_start_verify_paths) => {
self.activity_tree.add_activity(
activity_start_verify_paths.id,
activity_start_verify_paths.parent,
Activity::VerifyPaths(ActivityVerifyPaths {
state: ActivityState::default(),
}),
)?;
}
ActivityStartMessage::Substitute(activity_start_substitute) => {
self.activity_tree.add_activity(
activity_start_substitute.id,
activity_start_substitute.parent,
Activity::Substitute(ActivitySubstitute {
state: ActivityState::default(),
}),
)?;
}
ActivityStartMessage::QueryPathInfo(activity_start_query_path_info) => {
self.activity_tree.add_activity(
activity_start_query_path_info.id,
activity_start_query_path_info.parent,
Activity::QueryPathInfo(ActivityQueryPathInfo {
state: ActivityState::default(),
}),
)?;
}
ActivityStartMessage::PostBuildHook(activity_start_post_build_hook) => {
self.activity_tree.add_activity(
activity_start_post_build_hook.id,
activity_start_post_build_hook.parent,
Activity::PostBuildHook(ActivityPostBuildHook {
state: ActivityState::default(),
}),
)?;
}
ActivityStartMessage::BuildWaiting(activity_start_build_waiting) => {
self.activity_tree.add_activity(
activity_start_build_waiting.id,
0,
Activity::BuildWaiting(ActivityBuildWaiting {
state: ActivityState::default(),
text: activity_start_build_waiting.text,
drv_path: activity_start_build_waiting.drv_path,
path_resolved: activity_start_build_waiting.path_resolved,
}),
)?;
}
ActivityStartMessage::FetchTree(activity_start_fetch_tree) => {
self.activity_tree.add_activity(
activity_start_fetch_tree.id,
activity_start_fetch_tree.parent,
Activity::FetchTree(ActivityFetchTree {
state: ActivityState::default(),
}),
)?;
}
};
self.print_current_status();
}
NixAction::Stop(stop_message) => {
let activity = self
.activity_tree
.get_activity_id(stop_message.id)
.map(|activity_id| self.activity_tree.get_mut(&activity_id))?;
activity.get_mut_activity().stop();
self.print_current_status();
// println!("{}", serde_json::to_string(&message)?);
}
NixAction::Result(activity_result_message) => {
match activity_result_message {
ActivityResultMessage::FileLinked(_activity_result_file_linked) => {}
ActivityResultMessage::BuildLogLine(_activity_result_build_log_line) => {
// These are the output from the actual build (as opposed to the output from nix).
}
ActivityResultMessage::UntrustedPath(_activity_result_untrusted_path) => {}
ActivityResultMessage::CorruptedPath(_activity_result_corrupted_path) => {}
ActivityResultMessage::SetPhase(activity_result_set_phase) => {
let activity_id = self
.activity_tree
.get_activity_id(activity_result_set_phase.id)?;
let activity = self.activity_tree.get_mut(&activity_id);
activity
.get_mut_activity()
.set_phase(Some(activity_result_set_phase.phase));
self.print_current_status();
}
ActivityResultMessage::Progress(activity_result_progress) => {
let activity_id = self
.activity_tree
.get_activity_id(activity_result_progress.id)?;
let activity = self.activity_tree.get_mut(&activity_id);
activity.get_mut_activity().set_progress(
activity_result_progress.done,
activity_result_progress.expected,
activity_result_progress.running,
activity_result_progress.failed,
);
self.print_current_status();
}
ActivityResultMessage::SetExpected(activity_result_set_expected) => {
let activity_id = self
.activity_tree
.get_activity_id(activity_result_set_expected.id)?;
let activity = self.activity_tree.get_mut(&activity_id);
activity
.get_mut_activity()
.set_expected(activity_result_set_expected.expected);
self.print_current_status();
}
ActivityResultMessage::PostBuildLogLine(
_activity_result_post_build_log_line,
) => {}
ActivityResultMessage::FetchStatus(_activity_result_fetch_status) => {}
};
}
};
Ok(())
}
fn print_current_status(&self) -> () {
let mut tree = String::new();
let draw_order = self.get_draw_order();
for dag_entry in draw_order {
let leading_bars = {
let mut leading_bars = String::with_capacity(3 * dag_entry.depth.len());
for leading_bar in dag_entry
.depth
.iter()
.map(|depth| if *depth { "𜹈 " } else { " " })
{
leading_bars.push_str(leading_bar);
}
leading_bars
};
let branch = if dag_entry.has_later_siblings {
"𜸨"
} else {
"𜸛"
};
let activity = self.activity_tree.get(&dag_entry.activity_id);
let display_name = activity
.get_activity()
.display_name()
.expect("Currently we always return a display name.");
let progress_text = activity.get_activity().get_progress_text();
let (progress, progress_sep) = match progress_text {
Some(text) => (text, " "),
None => (Cow::Borrowed(""), ""),
};
tree += &format!("{leading_bars}{branch}𜸟 {progress}{progress_sep}{display_name}\n");
}
if tree.is_empty() {
println!("No active activities.");
} else {
print!("\n{}\n", tree);
}
}
fn get_draw_order(&self) -> Vec<DrawDagEntry> {
let mut draw_order: Vec<DrawDagEntry> = Vec::new();
let mut stack: Vec<DrawStackEntry> = self
.get_children_in_order(self.activity_tree.get_root_id())
.filter_map(|child| {
if child.get_activity().is_active() {
Some(DrawStackEntry::HasNotVisitedChildren(DrawDagEntry {
activity_id: child.get_activity_id().clone(),
depth: Vec::new(),
has_later_siblings: true,
}))
} else {
None
}
})
.collect();
if stack.len() == 0 {
return draw_order;
}
stack
.last_mut()
.expect("If-statement ensured this is Some()")
.set_no_later_siblings();
while !stack.is_empty() {
let current_entry = stack.pop().expect("While-loop ensured this is Some.");
match current_entry {
DrawStackEntry::HasNotVisitedChildren(draw_dag_entry) => {
let current_id = draw_dag_entry.activity_id.clone();
let mut current_depth = draw_dag_entry.depth.clone();
current_depth.push(draw_dag_entry.has_later_siblings);
stack.push(DrawStackEntry::VisitedChildren(draw_dag_entry));
let children: Vec<ActivityId> = self
.get_children_in_order(current_id)
.filter_map(|child| {
if child.get_activity().is_active() {
Some(child.get_activity_id().clone())
} else {
None
}
})
.collect();
let has_children = !children.is_empty();
stack.extend(children.into_iter().map(|child_id| {
DrawStackEntry::HasNotVisitedChildren(DrawDagEntry {
activity_id: child_id,
depth: current_depth.clone(),
has_later_siblings: true,
})
}));
if has_children {
stack
.last_mut()
.expect("If-statement ensured this is Some()")
.set_no_later_siblings();
}
}
DrawStackEntry::VisitedChildren(draw_dag_entry) => {
draw_order.push(draw_dag_entry);
}
};
}
draw_order
}
fn get_children_in_order(
&self,
parent_id: ActivityId,
) -> impl Iterator<Item = &ActivityTreeEntry> {
let parent = self.activity_tree.get(&parent_id);
parent
.get_child_ids()
.iter()
.map(|child_id| self.activity_tree.get(child_id))
.flat_map(|child| TransparentIter::new(&self.activity_tree, child))
}
}
enum DrawStackEntry {
HasNotVisitedChildren(DrawDagEntry),
VisitedChildren(DrawDagEntry),
}
struct DrawDagEntry {
activity_id: ActivityId,
depth: Vec<bool>,
has_later_siblings: bool,
}
impl DrawStackEntry {
fn set_no_later_siblings(&mut self) -> () {
match self {
DrawStackEntry::HasNotVisitedChildren(draw_dag_entry) => {
draw_dag_entry.has_later_siblings = false
}
DrawStackEntry::VisitedChildren(draw_dag_entry) => {
draw_dag_entry.has_later_siblings = false
}
};
}
}