From d65c54d99730d29dcf57257bad379a64ff247c57 Mon Sep 17 00:00:00 2001 From: Tom Alexander Date: Mon, 16 Feb 2026 21:17:25 -0500 Subject: [PATCH] Parse the output from nix. --- flake.lock | 48 ++++++++++++++ src/nix_util/mod.rs | 2 + src/nix_util/nix_output_stream.rs | 81 +++++++++++++++++++++++ src/nix_util/output_stream.rs | 85 ++++++++++++++++++++++++ src/nix_util/running_build.rs | 103 ++++-------------------------- 5 files changed, 227 insertions(+), 92 deletions(-) create mode 100644 flake.lock create mode 100644 src/nix_util/nix_output_stream.rs create mode 100644 src/nix_util/output_stream.rs diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..edbd142 --- /dev/null +++ b/flake.lock @@ -0,0 +1,48 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1771008912, + "narHash": "sha256-gf2AmWVTs8lEq7z/3ZAsgnZDhWIckkb+ZnAo5RzSxJg=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "a82ccc39b39b621151d6732718e3e250109076fa", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs", + "rust-overlay": "rust-overlay" + } + }, + "rust-overlay": { + "inputs": { + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1771211437, + "narHash": "sha256-lcNK438i4DGtyA+bPXXyVLHVmJjYpVKmpux9WASa3ro=", + "owner": "oxalica", + "repo": "rust-overlay", + "rev": "c62195b3d6e1bb11e0c2fb2a494117d3b55d410f", + "type": "github" + }, + "original": { + "owner": "oxalica", + "repo": "rust-overlay", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/src/nix_util/mod.rs b/src/nix_util/mod.rs index 8143d18..898230f 100644 --- a/src/nix_util/mod.rs +++ b/src/nix_util/mod.rs @@ -1,3 +1,5 @@ mod high_level; +mod nix_output_stream; +mod output_stream; mod running_build; pub(crate) use high_level::*; diff --git a/src/nix_util/nix_output_stream.rs b/src/nix_util/nix_output_stream.rs new file mode 100644 index 0000000..77c20ac --- /dev/null +++ b/src/nix_util/nix_output_stream.rs @@ -0,0 +1,81 @@ +use serde::Deserialize; +use serde::Serialize; +use serde_json::Value; + +use super::output_stream::OutputStream; + +use crate::Result; + +pub(crate) struct NixOutputStream { + inner: OutputStream, +} + +impl NixOutputStream { + pub(crate) fn new(inner: OutputStream) -> Self { + NixOutputStream { inner } + } + + pub(crate) async fn next(&mut self) -> Result> { + let line = self.inner.next_line().await?; + let line = match line { + crate::nix_util::output_stream::OutputLine::Stdout(l) + | crate::nix_util::output_stream::OutputLine::Stderr(l) => l, + crate::nix_util::output_stream::OutputLine::Done => { + return Ok(None); + } + }; + if !line.starts_with("@nix ") { + println!("FAIL PARSE: {line}"); + return Ok(Some(NixMessage::ParseFailure(line))); + } + let payload = &line[4..]; + + if let Ok(action) = serde_json::from_str(&payload) { + return Ok(Some(NixMessage::Action(action))); + } + if let Ok(parsed) = serde_json::from_str(&payload) { + println!("GENERIC PARSE: {line}"); + return Ok(Some(NixMessage::Generic(parsed))); + } + println!("FAIL PARSE: {line}"); + Ok(Some(NixMessage::ParseFailure(line))) + } +} + +#[derive(Debug)] +pub(crate) enum NixMessage { + ParseFailure(String), + Generic(Value), + Action(NixAction), +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "action", rename_all = "lowercase")] +pub(crate) enum NixAction { + Msg { + level: u8, + msg: String, + }, + Start { + id: u64, + level: u8, + parent: u8, + text: String, + r#type: u8, + }, + Stop { + id: u64, + }, + Result { + id: u64, + fields: Vec, + r#type: u8, + }, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +pub(crate) enum Field { + Number(u16), + Text(String), +} diff --git a/src/nix_util/output_stream.rs b/src/nix_util/output_stream.rs new file mode 100644 index 0000000..f7817e7 --- /dev/null +++ b/src/nix_util/output_stream.rs @@ -0,0 +1,85 @@ +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::io::Lines; +use tokio::process::Child; +use tokio::process::ChildStderr; +use tokio::process::ChildStdout; + +use crate::Result; + +pub(crate) struct OutputStream { + stdout: Option>>, + stderr: Option>>, +} + +impl OutputStream { + pub(crate) fn from_child(child: &mut Child) -> Result { + let stdout = child + .stdout + .take() + .expect("child did not have a handle to stdout"); + let stderr = child + .stderr + .take() + .expect("child did not have a handle to stderr"); + let stdout = BufReader::new(stdout).lines(); + let stderr = BufReader::new(stderr).lines(); + Ok(OutputStream { + stdout: Some(stdout), + stderr: Some(stderr), + }) + } + + pub(crate) async fn next_line(&mut self) -> Result { + loop { + match (&mut self.stdout, &mut self.stderr) { + (None, None) => { + return Ok(OutputLine::Done); + } + (None, Some(err)) => { + if let Some(line) = err.next_line().await? { + return Ok(OutputLine::Stderr(line)); + } else { + return Ok(OutputLine::Done); + } + } + (Some(out), None) => { + if let Some(line) = out.next_line().await? { + return Ok(OutputLine::Stdout(line)); + } else { + return Ok(OutputLine::Done); + } + } + (Some(out), Some(err)) => { + tokio::select! { + Ok(line) = out.next_line() => match line { + Some(line) => { + return Ok(OutputLine::Stdout(line)); + }, + None => { + self.stdout.take(); + }, + }, + Ok(line) = err.next_line() => match line { + Some(line) => { + return Ok(OutputLine::Stderr(line)); + }, + None => { + self.stderr.take(); + }, + }, + else => { + return Ok(OutputLine::Done); + }, + }; + } + }; + } + } +} + +pub(crate) enum OutputLine { + Stdout(String), + Stderr(String), + Done, +} diff --git a/src/nix_util/running_build.rs b/src/nix_util/running_build.rs index 14fed46..6124bcc 100644 --- a/src/nix_util/running_build.rs +++ b/src/nix_util/running_build.rs @@ -1,13 +1,12 @@ use sqlx::Row; -use tokio::io::AsyncBufReadExt; -use tokio::io::BufReader; -use tokio::io::Lines; use tokio::process::Child; -use tokio::process::ChildStderr; -use tokio::process::ChildStdout; use crate::Result; use crate::database::db_handle::DbHandle; +use crate::nix_util::nix_output_stream::NixOutputStream; +use crate::nix_util::output_stream::OutputStream; + +use super::nix_output_stream::NixMessage; pub(crate) struct RunningBuild<'db> { db_handle: &'db DbHandle, @@ -34,7 +33,8 @@ impl<'db> RunningBuild<'db> { .await? .try_get("id")?; - let mut output_stream = OutputStream::from_child(&mut child)?; + let output_stream = OutputStream::from_child(&mut child)?; + let mut nix_output_stream = NixOutputStream::new(output_stream); let exit_status_handle = tokio::spawn(async move { let status = child @@ -44,14 +44,9 @@ impl<'db> RunningBuild<'db> { status }); - loop { - let next_line = output_stream.next_line().await?; - match next_line { - OutputLine::Stdout(line) | OutputLine::Stderr(line) => { - self.handle_line(line)?; - } - OutputLine::Done => break, - }; + while let Some(message) = nix_output_stream.next().await? { + // foo + self.handle_message(message)?; } let exit_status = exit_status_handle.await?; @@ -73,84 +68,8 @@ impl<'db> RunningBuild<'db> { Ok(()) } - fn handle_line(&mut self, line: String) -> Result<()> { + fn handle_message(&mut self, message: NixMessage) -> Result<()> { + // println!("OUT: {:?}", message); Ok(()) } } - -struct OutputStream { - stdout: Option>>, - stderr: Option>>, -} - -impl OutputStream { - pub(crate) fn from_child(child: &mut Child) -> Result { - let stdout = child - .stdout - .take() - .expect("child did not have a handle to stdout"); - let stderr = child - .stderr - .take() - .expect("child did not have a handle to stderr"); - let stdout = BufReader::new(stdout).lines(); - let stderr = BufReader::new(stderr).lines(); - Ok(OutputStream { - stdout: Some(stdout), - stderr: Some(stderr), - }) - } - - pub(crate) async fn next_line(&mut self) -> Result { - loop { - match (&mut self.stdout, &mut self.stderr) { - (None, None) => { - return Ok(OutputLine::Done); - } - (None, Some(err)) => { - if let Some(line) = err.next_line().await? { - return Ok(OutputLine::Stderr(line)); - } else { - return Ok(OutputLine::Done); - } - } - (Some(out), None) => { - if let Some(line) = out.next_line().await? { - return Ok(OutputLine::Stdout(line)); - } else { - return Ok(OutputLine::Done); - } - } - (Some(out), Some(err)) => { - tokio::select! { - Ok(line) = out.next_line() => match line { - Some(line) => { - return Ok(OutputLine::Stdout(line)); - }, - None => { - self.stdout.take(); - }, - }, - Ok(line) = err.next_line() => match line { - Some(line) => { - return Ok(OutputLine::Stderr(line)); - }, - None => { - self.stderr.take(); - }, - }, - else => { - return Ok(OutputLine::Done); - }, - }; - } - }; - } - } -} - -enum OutputLine { - Stdout(String), - Stderr(String), - Done, -}