You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

117 lines
3.9 KiB
Rust

use crate::json_util::get_json_string;
use std::collections::HashSet;
use tokio::sync::mpsc;
use super::github_endpoint_watcher::GithubEndpointWatcher;
pub struct GithubCtl<'a> {
username: &'a str,
token: &'a str,
event_writer: mpsc::Sender<serde_json::Value>,
event_reader: mpsc::Receiver<serde_json::Value>,
}
impl<'a> GithubCtl<'a> {
pub fn new(username: &'a str, token: &'a str) -> Result<Self, Box<dyn std::error::Error>> {
let (tx, rx) = mpsc::channel::<serde_json::Value>(100);
Ok(GithubCtl {
username,
token,
event_writer: tx,
event_reader: rx,
})
}
pub async fn get_event(&mut self) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
if let Some(val) = self.event_reader.recv().await {
return Ok(val);
} else {
return Err("No more events, perhaps all writers hung up?".into());
}
}
pub async fn watch_repo<O, R>(
&mut self,
owner: O,
repo: R,
) -> Result<(), Box<dyn std::error::Error>>
where
O: AsRef<str>,
R: AsRef<str>,
{
let url = format!(
"https://api.github.com/repos/{owner}/{repo}/events",
owner = owner.as_ref().trim(),
repo = repo.as_ref().trim()
);
return Ok(self.watch_list(url).await);
}
pub async fn watch_org<O>(&mut self, org: O) -> Result<(), Box<dyn std::error::Error>>
where
O: AsRef<str>,
{
let url = format!(
"https://api.github.com/users/{username}/events/orgs/{org}",
org = org.as_ref().trim(),
username = self.username.trim()
);
return Ok(self.watch_list(url).await);
}
pub async fn watch_self(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let url = format!(
"https://api.github.com/users/{username}/received_events",
username = self.username.trim()
);
return Ok(self.watch_list(url).await);
}
async fn watch_list<U>(&mut self, url: U)
where
U: Into<String>,
{
let url = url.into();
let username = self.username.to_string();
let token = self.token.to_string();
let event_stream = self.event_writer.clone();
tokio::spawn(async move {
let mut already_seen_event_ids = HashSet::new();
let mut endpoint_watcher = GithubEndpointWatcher::new(username, token, url)
.expect("Failed to create endpoint watcher.");
let mut should_notify: bool = false; // Skip the first page of results because theses events already occurred in the past
loop {
let api_result = match endpoint_watcher.get_results().await {
Ok(result) => result,
Err(e) => {
error!("Failed to get results. {}", e);
return;
}
};
if let Some(serde_json::Value::Array(events)) = api_result {
for event in events {
let event_id = event
.get("id")
.map(get_json_string)
.expect("Ran into an event without a id.")
.to_owned();
if should_notify && already_seen_event_ids.insert(event_id) {
if let Err(_) = event_stream.send(event).await {
error!("Receiver dropped.");
return;
}
}
}
} else if let None = api_result {
info!("No new results available.");
} else {
error!("Unsupported JSON type.");
}
should_notify = true;
}
});
}
}