Compare commits
No commits in common. "81f73ac7a9ed5c295aaf84f517d447864c08b7df" and "0dd5cf8fe4ff893cdbfcfddd3c6b4be9d2dd8a81" have entirely different histories.
81f73ac7a9
...
0dd5cf8fe4
@ -3,9 +3,7 @@ name = "record_watch"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[[bin]]
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
name = "rw"
|
|
||||||
path = "src/main.rs"
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
walkdir = "2.3.2"
|
walkdir = "2.3.2"
|
||||||
@ -15,5 +13,3 @@ dirs = "4.0.0"
|
|||||||
serde_json = "1.0.79"
|
serde_json = "1.0.79"
|
||||||
serde = { version = "1.0.136", features= ["derive"] }
|
serde = { version = "1.0.136", features= ["derive"] }
|
||||||
bytes = "1.1.0"
|
bytes = "1.1.0"
|
||||||
rand = "0.8.5"
|
|
||||||
|
|
||||||
|
@ -1,11 +1,11 @@
|
|||||||
CREATE TABLE profile (
|
CREATE TABLE profile (
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
name TEXT NOT NULL,
|
name TEXT NOT NULL,
|
||||||
UNIQUE(name)
|
UNIQUE(name)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE watched (
|
CREATE TABLE watched (
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
profile INTEGER NOT NULL,
|
profile INTEGER NOT NULL,
|
||||||
path TEXT NOT NULL,
|
path TEXT NOT NULL,
|
||||||
watched_at DATE NOT NULL,
|
watched_at DATE NOT NULL,
|
||||||
|
92
src/main.rs
92
src/main.rs
@ -1,30 +1,22 @@
|
|||||||
use sqlx::sqlite::SqliteConnectOptions;
|
use sqlx::sqlite::SqliteConnectOptions;
|
||||||
use sqlx::Connection;
|
use sqlx::sqlite::SqlitePool;
|
||||||
use sqlx::Row;
|
use std::env;
|
||||||
use std::{env, str::FromStr};
|
use std::path::Path;
|
||||||
|
use std::process::Stdio;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||||
use tokio::process::Command;
|
use tokio::process::Command;
|
||||||
use walkdir::DirEntry;
|
use walkdir::DirEntry;
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
mod mpvctl;
|
mod mpvctl;
|
||||||
use crate::mpvctl::MpvCtl;
|
use crate::mpvctl::MpvCtl;
|
||||||
use rand::seq::SliceRandom;
|
|
||||||
use rand::thread_rng;
|
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let mut args = env::args().peekable();
|
let mut args = env::args();
|
||||||
let _program = args.next().expect("argv[0] should be this program?");
|
let _program = args.next().expect("argv[0] should be this program?");
|
||||||
let randomize: bool = match args.peek() {
|
|
||||||
Some(flag) if flag == "-r" => {
|
|
||||||
// Consume the flag
|
|
||||||
args.next().expect("We already know this arg exists");
|
|
||||||
true
|
|
||||||
}
|
|
||||||
_ => false,
|
|
||||||
};
|
|
||||||
let profile = args.next().expect("Must provide a profile");
|
let profile = args.next().expect("Must provide a profile");
|
||||||
|
|
||||||
let directories: Vec<String> = args.collect();
|
let directories: Vec<String> = args.collect();
|
||||||
|
|
||||||
let mut files: Vec<DirEntry> = Vec::new();
|
let mut files: Vec<DirEntry> = Vec::new();
|
||||||
@ -44,24 +36,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
let url = format!("sqlite://{}", db_path.as_path().display());
|
let url = format!("sqlite://{}", db_path.as_path().display());
|
||||||
|
|
||||||
let mut dbconn = sqlx::SqliteConnection::connect_with(
|
let pool =
|
||||||
&SqliteConnectOptions::from_str(&url)?
|
SqlitePool::connect_with(SqliteConnectOptions::from_str(&url)?.create_if_missing(true))
|
||||||
.create_if_missing(true)
|
.await?;
|
||||||
// Use the normal journal mode to avoid those pesky shm and wal files
|
sqlx::migrate!("./migrations").run(&pool).await?;
|
||||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Delete),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
sqlx::migrate!("./migrations").run(&mut dbconn).await?;
|
|
||||||
|
|
||||||
let profiles_created: u64 = sqlx::query(r#"INSERT OR IGNORE INTO profile (name) VALUES (?)"#)
|
for f in files {
|
||||||
.bind(&profile)
|
println!("Launching {}", f.path().display());
|
||||||
.execute(&mut dbconn)
|
|
||||||
.await?
|
|
||||||
.rows_affected();
|
|
||||||
if profiles_created != 0 {
|
|
||||||
println!("Created a new profile");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
launch_mpv().await?;
|
launch_mpv().await?;
|
||||||
|
|
||||||
// TODO: Figure out a better way to wait for the socket to exist and be connectable
|
// TODO: Figure out a better way to wait for the socket to exist and be connectable
|
||||||
@ -69,46 +51,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
let mut mpvctl = MpvCtl::connect("/tmp/recordwatchsocket").await?;
|
let mut mpvctl = MpvCtl::connect("/tmp/recordwatchsocket").await?;
|
||||||
let mut end_file_listener = mpvctl.listen(&["end-file"])?;
|
let mut end_file_listener = mpvctl.listen(&["end-file"])?;
|
||||||
if randomize {
|
let client_name = mpvctl.get_client_name().await?;
|
||||||
files.shuffle(&mut thread_rng());
|
let _ = mpvctl
|
||||||
}
|
.play_video("/home/talexander/Downloads/test.mp4")
|
||||||
for f in files {
|
.await?;
|
||||||
let canonicalized_path = f.path().canonicalize()?;
|
|
||||||
let already_watched_count: i64 = sqlx::query(r#"SELECT count(*) FROM watched WHERE path = ? AND profile = (SELECT id FROM PROFILE WHERE name = ?)"#)
|
|
||||||
.bind(canonicalized_path.as_os_str().to_str())
|
|
||||||
.bind(&profile)
|
|
||||||
.fetch_one(&mut dbconn).await?.try_get("count(*)")?;
|
|
||||||
|
|
||||||
println!("Already watched count: {}", already_watched_count);
|
while let Some(evt) = end_file_listener.recv().await {
|
||||||
if already_watched_count > 0 {
|
println!("end file event {}", evt);
|
||||||
println!("Skipping: Already watched {}", canonicalized_path.display());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("Launching {}", canonicalized_path.display());
|
|
||||||
mpvctl.play_video(&canonicalized_path).await?;
|
|
||||||
if let Some(evt) = end_file_listener.recv().await {
|
|
||||||
println!("end file event {}", evt);
|
|
||||||
if let serde_json::Value::Object(obj) = evt {
|
|
||||||
let reason = obj.get("reason");
|
|
||||||
if let Some(serde_json::Value::String(reason_body)) = reason {
|
|
||||||
if reason_body == "eof" {
|
|
||||||
// Watched the video until the end
|
|
||||||
let insert: u64 = sqlx::query(r#"INSERT INTO watched (profile, path, watched_at) SELECT (SELECT id FROM PROFILE WHERE name = ?) profile, ? path, DATETIME('now') watched_at"#)
|
|
||||||
.bind(&profile)
|
|
||||||
.bind(canonicalized_path.as_os_str().to_str())
|
|
||||||
.execute(&mut dbconn).await?.rows_affected();
|
|
||||||
println!("Insert: {:?}", insert);
|
|
||||||
assert!(insert == 1);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dbconn.close().await?;
|
sleep(Duration::from_secs(50)).await;
|
||||||
|
let client_name = mpvctl.get_client_name().await?;
|
||||||
|
|
||||||
|
println!("done {}", client_name);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use serde::ser::SerializeMap;
|
use serde::ser::SerializeMap;
|
||||||
use serde::{Serialize, Serializer};
|
use serde::{Deserialize, Serialize, Serializer};
|
||||||
|
|
||||||
pub struct Command {
|
pub struct Command {
|
||||||
command: Vec<String>,
|
command: Vec<String>,
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use bytes::{Buf, BytesMut};
|
use bytes::{Buf, BytesMut};
|
||||||
use std::io::Cursor;
|
use std::io::{self, Cursor};
|
||||||
use tokio::{io::AsyncReadExt, net::unix::OwnedReadHalf};
|
use tokio::{io::AsyncReadExt, net::unix::OwnedReadHalf};
|
||||||
|
|
||||||
pub struct MpvFramed {
|
pub struct MpvFramed {
|
||||||
|
@ -1,10 +1,14 @@
|
|||||||
use super::command::Command;
|
use super::command::Command;
|
||||||
use super::mpv_framed::MpvFramed;
|
use super::mpv_framed::MpvFramed;
|
||||||
|
use bytes::BytesMut;
|
||||||
|
use std::borrow::BorrowMut;
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::Path;
|
use std::io::Read;
|
||||||
|
use std::os::unix::prelude::OsStrExt;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use tokio::io::AsyncWriteExt;
|
use std::{collections::BTreeMap, path::Path};
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio::net::unix::OwnedWriteHalf;
|
use tokio::net::unix::OwnedWriteHalf;
|
||||||
use tokio::net::UnixStream;
|
use tokio::net::UnixStream;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
@ -84,70 +88,75 @@ impl MpvCtl {
|
|||||||
db: &CommandDb,
|
db: &CommandDb,
|
||||||
event_listeners: &EventDb,
|
event_listeners: &EventDb,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let current_frame = framed_read.read_frame().await.map_err(|e| e.to_string())?;
|
match framed_read.read_frame().await {
|
||||||
if let Some(frame) = current_frame {
|
Ok(Some(frame)) => {
|
||||||
println!("Read {}", frame);
|
println!("Read {}", frame);
|
||||||
// check if its an event
|
// check if its an event
|
||||||
let is_event = match &frame {
|
let is_event = match &frame {
|
||||||
serde_json::Value::Object(obj) => obj.contains_key("event"),
|
serde_json::Value::Object(obj) => obj.contains_key("event"),
|
||||||
_ => false,
|
_ => false,
|
||||||
};
|
|
||||||
if is_event {
|
|
||||||
let obj = match &frame {
|
|
||||||
serde_json::Value::Object(obj) => obj,
|
|
||||||
_ => {
|
|
||||||
return Err("Got back a json value that wasn't an object.".into());
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
match obj.get("event") {
|
if is_event {
|
||||||
Some(serde_json::Value::String(s)) => {
|
let obj = match &frame {
|
||||||
println!("Notifying listeners for event {}", s);
|
serde_json::Value::Object(obj) => obj,
|
||||||
let listeners = {
|
_ => {
|
||||||
let db_handle = event_listeners.lock().unwrap();
|
return Err("Got back a json value that wasn't an object.".into());
|
||||||
match (*db_handle).get(s) {
|
}
|
||||||
None => Vec::new(),
|
};
|
||||||
Some(listeners) => listeners.to_owned(),
|
match obj.get("event") {
|
||||||
|
Some(orig @ serde_json::Value::String(s)) => {
|
||||||
|
println!("Notifying listeners for event {}", s);
|
||||||
|
let listeners = {
|
||||||
|
let db_handle = event_listeners.lock().unwrap();
|
||||||
|
match (*db_handle).get(s) {
|
||||||
|
None => Vec::new(),
|
||||||
|
Some(listeners) => listeners.to_owned(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for listener in listeners {
|
||||||
|
listener.send(frame.clone()).await;
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
for listener in listeners {
|
_ => return Err("Event with no string value".into()),
|
||||||
listener.send(frame.clone()).await?;
|
};
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
// get the request id and push the result into the channel
|
||||||
|
let reqid = {
|
||||||
|
let obj = match &frame {
|
||||||
|
serde_json::Value::Object(obj) => obj,
|
||||||
|
_ => {
|
||||||
|
return Err("Got back a json value that wasn't an object.".into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
match obj.get("request_id") {
|
||||||
|
Some(serde_json::Value::Number(reqid)) if reqid.is_u64() => {
|
||||||
|
reqid.as_u64().unwrap()
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
return Err("Unrecognized request_id".into());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => return Err("Event with no string value".into()),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut db_handle = db.lock().unwrap();
|
||||||
|
if let Entry::Occupied(o) = (*db_handle).entry(reqid) {
|
||||||
|
o.remove()
|
||||||
|
.send(frame)
|
||||||
|
.map_err(|e| "Failed to send frame".to_string())?;
|
||||||
|
} else {
|
||||||
|
return Err("No entry found for request id".into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
// get the request id and push the result into the channel
|
Err(e) => {
|
||||||
let reqid = {
|
return Err("Ran into a problem".into());
|
||||||
let obj = match &frame {
|
|
||||||
serde_json::Value::Object(obj) => obj,
|
|
||||||
_ => {
|
|
||||||
return Err("Got back a json value that wasn't an object.".into());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
match obj.get("request_id") {
|
|
||||||
Some(serde_json::Value::Number(reqid)) if reqid.is_u64() => {
|
|
||||||
reqid.as_u64().unwrap()
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
return Err("Unrecognized request_id".into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut db_handle = db.lock().unwrap();
|
|
||||||
if let Entry::Occupied(o) = (*db_handle).entry(reqid) {
|
|
||||||
o.remove()
|
|
||||||
.send(frame)
|
|
||||||
.map_err(|e| "Failed to send frame".to_string())?;
|
|
||||||
} else {
|
|
||||||
return Err("No entry found for request id".into());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else if let None = current_frame {
|
}
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user