Throw away results from the first poll since those are historical.

This commit is contained in:
Tom Alexander 2022-05-12 10:11:19 -04:00
parent c8b9e73c75
commit 1be697b8bf
Signed by: talexander
GPG Key ID: D3A179C9A53C0EDE
2 changed files with 54 additions and 35 deletions

View File

@ -1,4 +1,6 @@
use reqwest::header::HeaderValue; use reqwest::header::HeaderValue;
use reqwest::RequestBuilder;
use reqwest::Response;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
@ -36,9 +38,22 @@ impl GithubEndpointWatcher {
}) })
} }
pub async fn get_results( pub async fn skip_results_before_now(&mut self) -> Result<(), Box<dyn std::error::Error>> {
&mut self, // This will incur an API call to get the headers for ETAG and
) -> Result<Option<serde_json::Value>, Box<dyn std::error::Error>> { // last modified at but it will throw away the results.
let request = self.build_request()?;
self.sleep_until_next_poll().await?;
self.sleep_until_ratelimit().await?;
let request_started_at = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
info!("Hitting url {}", self.url);
let response = request.send().await?;
self.update_headers_from_response(&response, request_started_at)?;
response.error_for_status()?;
Ok(())
}
fn build_request(&mut self) -> Result<RequestBuilder, Box<dyn std::error::Error>> {
let mut request = self let mut request = self
.http_client .http_client
.get(&self.url) .get(&self.url)
@ -58,14 +73,16 @@ impl GithubEndpointWatcher {
); );
request = request.header(reqwest::header::IF_MODIFIED_SINCE, last_modified_at); request = request.header(reqwest::header::IF_MODIFIED_SINCE, last_modified_at);
} }
self.sleep_until_next_poll().await?; Ok(request)
self.sleep_until_ratelimit().await?; }
let request_started_at = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
info!("Hitting url {}", self.url);
let response = request.send().await?;
fn update_headers_from_response(
&mut self,
response: &Response,
request_started_at: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let headers = response.headers(); let headers = response.headers();
let etag = headers.get(reqwest::header::ETAG).map(|x| x.to_owned()); self.etag = headers.get(reqwest::header::ETAG).map(|x| x.to_owned());
let poll_interval = number_header(headers.get("x-poll-interval"))?; let poll_interval = number_header(headers.get("x-poll-interval"))?;
if let Some(interval) = poll_interval { if let Some(interval) = poll_interval {
self.previous_poll_interval = Some(interval); self.previous_poll_interval = Some(interval);
@ -75,13 +92,10 @@ impl GithubEndpointWatcher {
self.ratelimit_reset = number_header(headers.get("x-ratelimit-reset"))?; self.ratelimit_reset = number_header(headers.get("x-ratelimit-reset"))?;
// let ratelimit_used = headers.get("x-ratelimit-used").map(|x| x.to_owned()); // let ratelimit_used = headers.get("x-ratelimit-used").map(|x| x.to_owned());
// let ratelimit_resource = headers.get("x-ratelimit-resource").map(|x| x.to_owned()); // let ratelimit_resource = headers.get("x-ratelimit-resource").map(|x| x.to_owned());
let last_modified_at = headers self.last_modified_at = headers
.get(reqwest::header::LAST_MODIFIED) .get(reqwest::header::LAST_MODIFIED)
.map(|x| x.to_owned()); .map(|x| x.to_owned());
if let reqwest::StatusCode::NOT_MODIFIED = response.status() {
self.etag = etag;
self.last_modified_at = last_modified_at;
let poll_interval_parsed: u64 = poll_interval.unwrap_or_else(|| { let poll_interval_parsed: u64 = poll_interval.unwrap_or_else(|| {
let fallback_interval = self.previous_poll_interval.unwrap_or(300); let fallback_interval = self.previous_poll_interval.unwrap_or(300);
info!( info!(
@ -91,22 +105,26 @@ impl GithubEndpointWatcher {
fallback_interval fallback_interval
}); });
self.next_poll_allowed = request_started_at + poll_interval_parsed; self.next_poll_allowed = request_started_at + poll_interval_parsed;
return Ok(None); Ok(())
} else { }
pub async fn get_results(
&mut self,
) -> Result<Option<serde_json::Value>, Box<dyn std::error::Error>> {
let request = self.build_request()?;
self.sleep_until_next_poll().await?;
self.sleep_until_ratelimit().await?;
let request_started_at = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
info!("Hitting url {}", self.url);
let response = request.send().await?;
self.update_headers_from_response(&response, request_started_at)?;
let response_status = response.status();
let body = response.json::<serde_json::Value>().await?; let body = response.json::<serde_json::Value>().await?;
self.etag = etag; if let reqwest::StatusCode::NOT_MODIFIED = response_status {
self.last_modified_at = last_modified_at; return Ok(None);
let poll_interval_parsed: u64 = poll_interval.unwrap_or_else(|| { } else {
let fallback_interval = self.previous_poll_interval.unwrap_or(300);
info!(
"No poll interval returned, defaulting to {} second polling.",
fallback_interval
);
fallback_interval
});
self.next_poll_allowed = request_started_at + poll_interval_parsed;
return Ok(Some(body)); return Ok(Some(body));
} }
} }

View File

@ -77,6 +77,7 @@ impl<'a> GithubCtl<'a> {
tokio::spawn(async move { tokio::spawn(async move {
let mut endpoint_watcher = GithubEndpointWatcher::new(username, token, url) let mut endpoint_watcher = GithubEndpointWatcher::new(username, token, url)
.expect("Failed to create endpoint watcher."); .expect("Failed to create endpoint watcher.");
endpoint_watcher.skip_results_before_now().await.expect("Failed to fetch initial historical results.");
loop { loop {
let api_result = match endpoint_watcher.get_results().await { let api_result = match endpoint_watcher.get_results().await {
Ok(result) => result, Ok(result) => result,