Parse the output from nix.

This commit is contained in:
Tom Alexander
2026-02-16 21:17:25 -05:00
parent c4c811a099
commit d65c54d997
5 changed files with 227 additions and 92 deletions

48
flake.lock generated Normal file
View File

@@ -0,0 +1,48 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1771008912,
"narHash": "sha256-gf2AmWVTs8lEq7z/3ZAsgnZDhWIckkb+ZnAo5RzSxJg=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a82ccc39b39b621151d6732718e3e250109076fa",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs",
"rust-overlay": "rust-overlay"
}
},
"rust-overlay": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1771211437,
"narHash": "sha256-lcNK438i4DGtyA+bPXXyVLHVmJjYpVKmpux9WASa3ro=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "c62195b3d6e1bb11e0c2fb2a494117d3b55d410f",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

View File

@@ -1,3 +1,5 @@
mod high_level;
mod nix_output_stream;
mod output_stream;
mod running_build;
pub(crate) use high_level::*;

View File

@@ -0,0 +1,81 @@
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use super::output_stream::OutputStream;
use crate::Result;
pub(crate) struct NixOutputStream {
inner: OutputStream,
}
impl NixOutputStream {
pub(crate) fn new(inner: OutputStream) -> Self {
NixOutputStream { inner }
}
pub(crate) async fn next(&mut self) -> Result<Option<NixMessage>> {
let line = self.inner.next_line().await?;
let line = match line {
crate::nix_util::output_stream::OutputLine::Stdout(l)
| crate::nix_util::output_stream::OutputLine::Stderr(l) => l,
crate::nix_util::output_stream::OutputLine::Done => {
return Ok(None);
}
};
if !line.starts_with("@nix ") {
println!("FAIL PARSE: {line}");
return Ok(Some(NixMessage::ParseFailure(line)));
}
let payload = &line[4..];
if let Ok(action) = serde_json::from_str(&payload) {
return Ok(Some(NixMessage::Action(action)));
}
if let Ok(parsed) = serde_json::from_str(&payload) {
println!("GENERIC PARSE: {line}");
return Ok(Some(NixMessage::Generic(parsed)));
}
println!("FAIL PARSE: {line}");
Ok(Some(NixMessage::ParseFailure(line)))
}
}
#[derive(Debug)]
pub(crate) enum NixMessage {
ParseFailure(String),
Generic(Value),
Action(NixAction),
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "action", rename_all = "lowercase")]
pub(crate) enum NixAction {
Msg {
level: u8,
msg: String,
},
Start {
id: u64,
level: u8,
parent: u8,
text: String,
r#type: u8,
},
Stop {
id: u64,
},
Result {
id: u64,
fields: Vec<Field>,
r#type: u8,
},
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub(crate) enum Field {
Number(u16),
Text(String),
}

View File

@@ -0,0 +1,85 @@
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::Lines;
use tokio::process::Child;
use tokio::process::ChildStderr;
use tokio::process::ChildStdout;
use crate::Result;
pub(crate) struct OutputStream {
stdout: Option<Lines<BufReader<ChildStdout>>>,
stderr: Option<Lines<BufReader<ChildStderr>>>,
}
impl OutputStream {
pub(crate) fn from_child(child: &mut Child) -> Result<Self> {
let stdout = child
.stdout
.take()
.expect("child did not have a handle to stdout");
let stderr = child
.stderr
.take()
.expect("child did not have a handle to stderr");
let stdout = BufReader::new(stdout).lines();
let stderr = BufReader::new(stderr).lines();
Ok(OutputStream {
stdout: Some(stdout),
stderr: Some(stderr),
})
}
pub(crate) async fn next_line(&mut self) -> Result<OutputLine> {
loop {
match (&mut self.stdout, &mut self.stderr) {
(None, None) => {
return Ok(OutputLine::Done);
}
(None, Some(err)) => {
if let Some(line) = err.next_line().await? {
return Ok(OutputLine::Stderr(line));
} else {
return Ok(OutputLine::Done);
}
}
(Some(out), None) => {
if let Some(line) = out.next_line().await? {
return Ok(OutputLine::Stdout(line));
} else {
return Ok(OutputLine::Done);
}
}
(Some(out), Some(err)) => {
tokio::select! {
Ok(line) = out.next_line() => match line {
Some(line) => {
return Ok(OutputLine::Stdout(line));
},
None => {
self.stdout.take();
},
},
Ok(line) = err.next_line() => match line {
Some(line) => {
return Ok(OutputLine::Stderr(line));
},
None => {
self.stderr.take();
},
},
else => {
return Ok(OutputLine::Done);
},
};
}
};
}
}
}
pub(crate) enum OutputLine {
Stdout(String),
Stderr(String),
Done,
}

View File

@@ -1,13 +1,12 @@
use sqlx::Row;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::Lines;
use tokio::process::Child;
use tokio::process::ChildStderr;
use tokio::process::ChildStdout;
use crate::Result;
use crate::database::db_handle::DbHandle;
use crate::nix_util::nix_output_stream::NixOutputStream;
use crate::nix_util::output_stream::OutputStream;
use super::nix_output_stream::NixMessage;
pub(crate) struct RunningBuild<'db> {
db_handle: &'db DbHandle,
@@ -34,7 +33,8 @@ impl<'db> RunningBuild<'db> {
.await?
.try_get("id")?;
let mut output_stream = OutputStream::from_child(&mut child)?;
let output_stream = OutputStream::from_child(&mut child)?;
let mut nix_output_stream = NixOutputStream::new(output_stream);
let exit_status_handle = tokio::spawn(async move {
let status = child
@@ -44,14 +44,9 @@ impl<'db> RunningBuild<'db> {
status
});
loop {
let next_line = output_stream.next_line().await?;
match next_line {
OutputLine::Stdout(line) | OutputLine::Stderr(line) => {
self.handle_line(line)?;
}
OutputLine::Done => break,
};
while let Some(message) = nix_output_stream.next().await? {
// foo
self.handle_message(message)?;
}
let exit_status = exit_status_handle.await?;
@@ -73,84 +68,8 @@ impl<'db> RunningBuild<'db> {
Ok(())
}
fn handle_line(&mut self, line: String) -> Result<()> {
fn handle_message(&mut self, message: NixMessage) -> Result<()> {
// println!("OUT: {:?}", message);
Ok(())
}
}
struct OutputStream {
stdout: Option<Lines<BufReader<ChildStdout>>>,
stderr: Option<Lines<BufReader<ChildStderr>>>,
}
impl OutputStream {
pub(crate) fn from_child(child: &mut Child) -> Result<Self> {
let stdout = child
.stdout
.take()
.expect("child did not have a handle to stdout");
let stderr = child
.stderr
.take()
.expect("child did not have a handle to stderr");
let stdout = BufReader::new(stdout).lines();
let stderr = BufReader::new(stderr).lines();
Ok(OutputStream {
stdout: Some(stdout),
stderr: Some(stderr),
})
}
pub(crate) async fn next_line(&mut self) -> Result<OutputLine> {
loop {
match (&mut self.stdout, &mut self.stderr) {
(None, None) => {
return Ok(OutputLine::Done);
}
(None, Some(err)) => {
if let Some(line) = err.next_line().await? {
return Ok(OutputLine::Stderr(line));
} else {
return Ok(OutputLine::Done);
}
}
(Some(out), None) => {
if let Some(line) = out.next_line().await? {
return Ok(OutputLine::Stdout(line));
} else {
return Ok(OutputLine::Done);
}
}
(Some(out), Some(err)) => {
tokio::select! {
Ok(line) = out.next_line() => match line {
Some(line) => {
return Ok(OutputLine::Stdout(line));
},
None => {
self.stdout.take();
},
},
Ok(line) = err.next_line() => match line {
Some(line) => {
return Ok(OutputLine::Stderr(line));
},
None => {
self.stderr.take();
},
},
else => {
return Ok(OutputLine::Done);
},
};
}
};
}
}
}
enum OutputLine {
Stdout(String),
Stderr(String),
Done,
}