I think the read loop is done.

This commit is contained in:
Tom Alexander 2022-02-20 20:30:56 -05:00
parent 8082e5d000
commit 132d3cc59c
Signed by: talexander
GPG Key ID: D3A179C9A53C0EDE

View File

@ -1,14 +1,20 @@
use super::command::Command; use super::command::Command;
use super::mpv_framed::MpvFramed;
use bytes::BytesMut; use bytes::BytesMut;
use std::borrow::BorrowMut;
use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Read; use std::io::Read;
use std::os::unix::prelude::OsStrExt; use std::os::unix::prelude::OsStrExt;
use std::sync::{Arc, Mutex};
use std::{collections::BTreeMap, path::Path}; use std::{collections::BTreeMap, path::Path};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::unix::OwnedWriteHalf; use tokio::net::unix::OwnedWriteHalf;
use tokio::net::UnixStream; use tokio::net::UnixStream;
use tokio::sync::oneshot; use tokio::sync::oneshot;
type CommandDb = Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>;
pub struct MpvCtl { pub struct MpvCtl {
socket: OwnedWriteHalf, socket: OwnedWriteHalf,
next_request_id: u64, next_request_id: u64,
@ -19,20 +25,50 @@ impl MpvCtl {
socket_path: P, socket_path: P,
) -> Result<Self, Box<dyn std::error::Error>> { ) -> Result<Self, Box<dyn std::error::Error>> {
let socket = UnixStream::connect(socket_path).await?; let socket = UnixStream::connect(socket_path).await?;
let (mut socket_read, socket_write) = socket.into_split(); let (socket_read, socket_write) = socket.into_split();
let mut framed_read = MpvFramed::new(socket_read);
let db: CommandDb = Arc::new(Mutex::new(HashMap::new()));
tokio::spawn(async move { tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop { loop {
match socket_read.read(&mut buf).await { match framed_read.read_frame().await {
// Remote has closed Ok(Some(frame)) => {
Ok(0) => return, // get the request id and push the result into the channel
Ok(n) => { let reqid = {
// TODO actually used the read value let obj = match &frame {
serde_json::Value::Object(obj) => obj,
_ => {
return Err(
"Got back a json value that wasn't an object.".to_string()
);
}
};
match obj.get("request_id") {
Some(serde_json::Value::Number(reqid)) if reqid.is_u64() => {
reqid.as_u64().unwrap()
}
_ => {
return Err("Unrecognized request_id".to_string());
}
}
};
{
let mut db_handle = db.lock().unwrap();
if let Entry::Occupied(o) = (*db_handle).entry(reqid) {
o.remove()
.send(frame)
.map_err(|e| "Failed to send frame".to_string())?;
} else {
return Err("No entry found for request id".to_string());
}
}
} }
Err(_) => { Ok(None) => {
// Ruh roh return Ok(());
return; }
Err(e) => {
return Err("Ran into a problem".to_string());
} }
} }
} }