Tom Alexander f3b00c46ea
Some checks failed
semver Build semver has succeeded
format Build format has succeeded
clippy Build clippy has succeeded
rust-test Build rust-test has failed
build Build build has succeeded
Test: Remove the clients from AppState entirely.
2024-09-30 00:38:32 -04:00

137 lines
3.9 KiB
Rust

#![forbid(unsafe_code)]
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use axum::http::StatusCode;
use axum::middleware;
use axum::routing::get;
use axum::routing::post;
use axum::Json;
use axum::Router;
use kube::Client;
use serde::Serialize;
use tokio::signal;
use tower_http::timeout::TimeoutLayer;
use tower_http::trace::TraceLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use self::app_state::AppState;
use self::gitea_client::GiteaClient;
use self::hook_push::HookPush;
use self::webhook::handle_push;
use self::webhook::hook;
use self::webhook::verify_signature;
mod app_state;
mod crd_pipeline_run;
mod discovery;
mod gitea_client;
mod hook_push;
mod kubernetes;
mod remote_config;
mod webhook;
pub async fn init_tracing() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
"webhookbridge=info,webhook_bridge=info,local_trigger=info,tower_http=debug,axum::rejection=trace"
.into()
}),
)
.with(tracing_subscriber::fmt::layer())
.init();
Ok(())
}
pub async fn launch_server() -> Result<(), Box<dyn std::error::Error>> {
let allowed_repos = std::env::var("WEBHOOK_BRIDGE_REPO_WHITELIST")?;
let allowed_repos: HashSet<_> = allowed_repos
.split(",")
.filter(|s| !s.is_empty())
.map(str::to_owned)
.collect();
tracing::debug!("Using repo whitelist: {:?}", allowed_repos);
let app = Router::new()
.route("/hook", post(hook))
.layer(middleware::from_fn(verify_signature))
.route("/health", get(health))
.layer((
TraceLayer::new_for_http(),
// Add a timeout layer so graceful shutdown can't wait forever.
TimeoutLayer::new(Duration::from_secs(600)),
))
.with_state(AppState {
allowed_repos: Arc::new(allowed_repos),
});
let listener = tokio::net::TcpListener::bind("0.0.0.0:9988").await?;
tracing::info!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
Ok(())
}
pub async fn local_trigger(payload: &str) -> Result<(), Box<dyn std::error::Error>> {
let kubernetes_client: Client = Client::try_default()
.await
.expect("Set KUBECONFIG to a valid kubernetes config.");
let gitea_api_root = std::env::var("WEBHOOK_BRIDGE_API_ROOT")?;
let gitea_api_token = std::env::var("WEBHOOK_BRIDGE_OAUTH_TOKEN")?;
let gitea = GiteaClient::new(gitea_api_root, gitea_api_token);
let allowed_repos = std::env::var("WEBHOOK_BRIDGE_REPO_WHITELIST")
.ok()
.unwrap_or_default();
let allowed_repos: HashSet<_> = allowed_repos
.split(",")
.filter(|s| !s.is_empty())
.map(str::to_owned)
.collect();
tracing::debug!("Using repo whitelist: {:?}", allowed_repos);
let webhook_payload: HookPush = serde_json::from_str(payload)?;
handle_push(gitea, kubernetes_client, &allowed_repos, webhook_payload).await?;
Ok(())
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
}
async fn health() -> (StatusCode, Json<HealthResponse>) {
(StatusCode::OK, Json(HealthResponse { ok: true }))
}
#[derive(Serialize)]
struct HealthResponse {
ok: bool,
}