diff --git a/src/mpvctl/mpvctl.rs b/src/mpvctl/mpvctl.rs index 84bce53..8e6c2c7 100644 --- a/src/mpvctl/mpvctl.rs +++ b/src/mpvctl/mpvctl.rs @@ -1,14 +1,20 @@ use super::command::Command; +use super::mpv_framed::MpvFramed; use bytes::BytesMut; +use std::borrow::BorrowMut; +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::io::Read; use std::os::unix::prelude::OsStrExt; +use std::sync::{Arc, Mutex}; use std::{collections::BTreeMap, path::Path}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::unix::OwnedWriteHalf; use tokio::net::UnixStream; use tokio::sync::oneshot; +type CommandDb = Arc>>>; + pub struct MpvCtl { socket: OwnedWriteHalf, next_request_id: u64, @@ -19,20 +25,50 @@ impl MpvCtl { socket_path: P, ) -> Result> { let socket = UnixStream::connect(socket_path).await?; - let (mut socket_read, socket_write) = socket.into_split(); + let (socket_read, socket_write) = socket.into_split(); + let mut framed_read = MpvFramed::new(socket_read); + let db: CommandDb = Arc::new(Mutex::new(HashMap::new())); tokio::spawn(async move { - let mut buf = vec![0; 1024]; loop { - match socket_read.read(&mut buf).await { - // Remote has closed - Ok(0) => return, - Ok(n) => { - // TODO actually used the read value + match framed_read.read_frame().await { + Ok(Some(frame)) => { + // get the request id and push the result into the channel + let reqid = { + let obj = match &frame { + serde_json::Value::Object(obj) => obj, + _ => { + return Err( + "Got back a json value that wasn't an object.".to_string() + ); + } + }; + match obj.get("request_id") { + Some(serde_json::Value::Number(reqid)) if reqid.is_u64() => { + reqid.as_u64().unwrap() + } + _ => { + return Err("Unrecognized request_id".to_string()); + } + } + }; + + { + let mut db_handle = db.lock().unwrap(); + if let Entry::Occupied(o) = (*db_handle).entry(reqid) { + o.remove() + .send(frame) + .map_err(|e| "Failed to send frame".to_string())?; + } else { + return Err("No entry found for request id".to_string()); + } + } } - Err(_) => { - // Ruh roh - return; + Ok(None) => { + return Ok(()); + } + Err(e) => { + return Err("Ran into a problem".to_string()); } } }