Compare commits

...

10 Commits

Author SHA1 Message Date
Tom Alexander
81f73ac7a9
Add support for a very basic randomize. 2022-02-21 23:58:10 -05:00
Tom Alexander
24c4d95bb5
Use canonicalized path to avoid replaying file when using relative paths. 2022-02-21 23:39:41 -05:00
Tom Alexander
36ff41debe
Cleanup 2022-02-21 23:32:21 -05:00
Tom Alexander
9100f08f5c
Remove wal and shm files. 2022-02-21 23:20:56 -05:00
Tom Alexander
18d4ded010
Fully working for in-order watching. 2022-02-21 22:42:00 -05:00
Tom Alexander
9f51c953fe
Inserting watched entries but not reading them correctly. 2022-02-21 22:12:42 -05:00
Tom Alexander
2310889e80
Inserting profiles. 2022-02-21 21:49:56 -05:00
Tom Alexander
466af9f1b3
Stop iterating if the reason a video stopped is anything except eof. 2022-02-21 19:33:39 -05:00
Tom Alexander
6eeee7ef04
Properly playing the files in the list instead of hard-coded paths. 2022-02-21 18:50:28 -05:00
Tom Alexander
860f5759c1
Sending the end file event out to listeners. 2022-02-21 18:30:03 -05:00
6 changed files with 135 additions and 96 deletions

View File

@ -3,7 +3,9 @@ name = "record_watch"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [[bin]]
name = "rw"
path = "src/main.rs"
[dependencies] [dependencies]
walkdir = "2.3.2" walkdir = "2.3.2"
@ -13,3 +15,5 @@ dirs = "4.0.0"
serde_json = "1.0.79" serde_json = "1.0.79"
serde = { version = "1.0.136", features= ["derive"] } serde = { version = "1.0.136", features= ["derive"] }
bytes = "1.1.0" bytes = "1.1.0"
rand = "0.8.5"

View File

@ -1,11 +1,11 @@
CREATE TABLE profile ( CREATE TABLE profile (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY,
name TEXT NOT NULL, name TEXT NOT NULL,
UNIQUE(name) UNIQUE(name)
); );
CREATE TABLE watched ( CREATE TABLE watched (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY,
profile INTEGER NOT NULL, profile INTEGER NOT NULL,
path TEXT NOT NULL, path TEXT NOT NULL,
watched_at DATE NOT NULL, watched_at DATE NOT NULL,

View File

@ -1,22 +1,30 @@
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
use sqlx::sqlite::SqlitePool; use sqlx::Connection;
use std::env; use sqlx::Row;
use std::path::Path; use std::{env, str::FromStr};
use std::process::Stdio;
use std::str::FromStr;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command; use tokio::process::Command;
use walkdir::DirEntry; use walkdir::DirEntry;
use walkdir::WalkDir; use walkdir::WalkDir;
mod mpvctl; mod mpvctl;
use crate::mpvctl::MpvCtl; use crate::mpvctl::MpvCtl;
use rand::seq::SliceRandom;
use rand::thread_rng;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut args = env::args(); let mut args = env::args().peekable();
let _program = args.next().expect("argv[0] should be this program?"); let _program = args.next().expect("argv[0] should be this program?");
let randomize: bool = match args.peek() {
Some(flag) if flag == "-r" => {
// Consume the flag
args.next().expect("We already know this arg exists");
true
}
_ => false,
};
let profile = args.next().expect("Must provide a profile"); let profile = args.next().expect("Must provide a profile");
let directories: Vec<String> = args.collect(); let directories: Vec<String> = args.collect();
let mut files: Vec<DirEntry> = Vec::new(); let mut files: Vec<DirEntry> = Vec::new();
@ -36,14 +44,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.unwrap(); .unwrap();
let url = format!("sqlite://{}", db_path.as_path().display()); let url = format!("sqlite://{}", db_path.as_path().display());
let pool = let mut dbconn = sqlx::SqliteConnection::connect_with(
SqlitePool::connect_with(SqliteConnectOptions::from_str(&url)?.create_if_missing(true)) &SqliteConnectOptions::from_str(&url)?
.create_if_missing(true)
// Use the normal journal mode to avoid those pesky shm and wal files
.journal_mode(sqlx::sqlite::SqliteJournalMode::Delete),
)
.await?; .await?;
sqlx::migrate!("./migrations").run(&pool).await?; sqlx::migrate!("./migrations").run(&mut dbconn).await?;
for f in files { let profiles_created: u64 = sqlx::query(r#"INSERT OR IGNORE INTO profile (name) VALUES (?)"#)
println!("Launching {}", f.path().display()); .bind(&profile)
.execute(&mut dbconn)
.await?
.rows_affected();
if profiles_created != 0 {
println!("Created a new profile");
} }
launch_mpv().await?; launch_mpv().await?;
// TODO: Figure out a better way to wait for the socket to exist and be connectable // TODO: Figure out a better way to wait for the socket to exist and be connectable
@ -51,20 +69,46 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut mpvctl = MpvCtl::connect("/tmp/recordwatchsocket").await?; let mut mpvctl = MpvCtl::connect("/tmp/recordwatchsocket").await?;
let mut end_file_listener = mpvctl.listen(&["end-file"])?; let mut end_file_listener = mpvctl.listen(&["end-file"])?;
let client_name = mpvctl.get_client_name().await?; if randomize {
let _ = mpvctl files.shuffle(&mut thread_rng());
.play_video("/home/talexander/Downloads/test.mp4") }
.await?; for f in files {
let canonicalized_path = f.path().canonicalize()?;
let already_watched_count: i64 = sqlx::query(r#"SELECT count(*) FROM watched WHERE path = ? AND profile = (SELECT id FROM PROFILE WHERE name = ?)"#)
.bind(canonicalized_path.as_os_str().to_str())
.bind(&profile)
.fetch_one(&mut dbconn).await?.try_get("count(*)")?;
while let Some(evt) = end_file_listener.recv().await { println!("Already watched count: {}", already_watched_count);
println!("end file event {}", evt); if already_watched_count > 0 {
println!("Skipping: Already watched {}", canonicalized_path.display());
continue;
} }
sleep(Duration::from_secs(50)).await; println!("Launching {}", canonicalized_path.display());
let client_name = mpvctl.get_client_name().await?; mpvctl.play_video(&canonicalized_path).await?;
if let Some(evt) = end_file_listener.recv().await {
println!("done {}", client_name); println!("end file event {}", evt);
if let serde_json::Value::Object(obj) = evt {
let reason = obj.get("reason");
if let Some(serde_json::Value::String(reason_body)) = reason {
if reason_body == "eof" {
// Watched the video until the end
let insert: u64 = sqlx::query(r#"INSERT INTO watched (profile, path, watched_at) SELECT (SELECT id FROM PROFILE WHERE name = ?) profile, ? path, DATETIME('now') watched_at"#)
.bind(&profile)
.bind(canonicalized_path.as_os_str().to_str())
.execute(&mut dbconn).await?.rows_affected();
println!("Insert: {:?}", insert);
assert!(insert == 1);
continue;
}
}
}
}
break;
}
dbconn.close().await?;
Ok(()) Ok(())
} }

View File

@ -1,5 +1,5 @@
use serde::ser::SerializeMap; use serde::ser::SerializeMap;
use serde::{Deserialize, Serialize, Serializer}; use serde::{Serialize, Serializer};
pub struct Command { pub struct Command {
command: Vec<String>, command: Vec<String>,

View File

@ -1,5 +1,5 @@
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use std::io::{self, Cursor}; use std::io::Cursor;
use tokio::{io::AsyncReadExt, net::unix::OwnedReadHalf}; use tokio::{io::AsyncReadExt, net::unix::OwnedReadHalf};
pub struct MpvFramed { pub struct MpvFramed {

View File

@ -1,14 +1,10 @@
use super::command::Command; use super::command::Command;
use super::mpv_framed::MpvFramed; use super::mpv_framed::MpvFramed;
use bytes::BytesMut;
use std::borrow::BorrowMut;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Read; use std::path::Path;
use std::os::unix::prelude::OsStrExt;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::{collections::BTreeMap, path::Path}; use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::unix::OwnedWriteHalf; use tokio::net::unix::OwnedWriteHalf;
use tokio::net::UnixStream; use tokio::net::UnixStream;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -88,8 +84,8 @@ impl MpvCtl {
db: &CommandDb, db: &CommandDb,
event_listeners: &EventDb, event_listeners: &EventDb,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
match framed_read.read_frame().await { let current_frame = framed_read.read_frame().await.map_err(|e| e.to_string())?;
Ok(Some(frame)) => { if let Some(frame) = current_frame {
println!("Read {}", frame); println!("Read {}", frame);
// check if its an event // check if its an event
let is_event = match &frame { let is_event = match &frame {
@ -104,7 +100,7 @@ impl MpvCtl {
} }
}; };
match obj.get("event") { match obj.get("event") {
Some(orig @ serde_json::Value::String(s)) => { Some(serde_json::Value::String(s)) => {
println!("Notifying listeners for event {}", s); println!("Notifying listeners for event {}", s);
let listeners = { let listeners = {
let db_handle = event_listeners.lock().unwrap(); let db_handle = event_listeners.lock().unwrap();
@ -114,7 +110,7 @@ impl MpvCtl {
} }
}; };
for listener in listeners { for listener in listeners {
listener.send(frame.clone()).await; listener.send(frame.clone()).await?;
} }
} }
_ => return Err("Event with no string value".into()), _ => return Err("Event with no string value".into()),
@ -149,14 +145,9 @@ impl MpvCtl {
return Err("No entry found for request id".into()); return Err("No entry found for request id".into());
} }
} }
} } else if let None = current_frame {
Ok(None) => {
return Ok(()); return Ok(());
} };
Err(e) => {
return Err("Ran into a problem".into());
}
}
Ok(()) Ok(())
} }