Move the logic into the server.
Some checks failed
semver Build semver has succeeded
clippy Build clippy has failed
format Build format has succeeded
rust-test Build rust-test has succeeded
build Build build has succeeded

This commit is contained in:
Tom Alexander 2024-09-29 00:48:39 -04:00
parent 0548571b6b
commit 07797b9906
Signed by: talexander
GPG Key ID: D3A179C9A53C0EDE
2 changed files with 77 additions and 49 deletions

View File

@ -1,6 +1,4 @@
#![forbid(unsafe_code)]
use std::borrow::Borrow;
use std::sync::Arc;
use std::time::Duration;
use axum::http::StatusCode;
@ -9,8 +7,6 @@ use axum::routing::get;
use axum::routing::post;
use axum::Json;
use axum::Router;
use kube::api::PostParams;
use kube::Api;
use kube::Client;
use serde::Serialize;
use tokio::signal;
@ -19,17 +15,15 @@ use tower_http::trace::TraceLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use self::crd_pipeline_run::PipelineRun;
use self::discovery::discover_matching_push_triggers;
use self::discovery::discover_webhook_bridge_config;
use self::gitea_client::GiteaClient;
use self::hook_push::HookPush;
use self::hook_push::PipelineParamters;
use self::kubernetes::run_pipelines;
use self::remote_config::RemoteConfig;
use self::webhook::handle_push;
use self::webhook::hook;
use self::webhook::verify_signature;
use kube::CustomResourceExt;
mod crd_pipeline_run;
mod discovery;
@ -52,6 +46,41 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with(tracing_subscriber::fmt::layer())
.init();
launch_server().await
}
async fn launch_server() -> Result<(), Box<dyn std::error::Error>> {
let kubernetes_client: Client = Client::try_default()
.await
.expect("Set KUBECONFIG to a valid kubernetes config.");
let gitea_api_root = std::env::var("WEBHOOK_BRIDGE_API_ROOT")?;
let gitea_api_token = std::env::var("WEBHOOK_BRIDGE_OAUTH_TOKEN")?;
let gitea = GiteaClient::new(gitea_api_root, gitea_api_token);
let app = Router::new()
.route("/hook", post(hook))
.layer(middleware::from_fn(verify_signature))
.route("/health", get(health))
.layer((
TraceLayer::new_for_http(),
// Add a timeout layer so graceful shutdown can't wait forever.
TimeoutLayer::new(Duration::from_secs(600)),
))
.with_state(AppState {
kubernetes_client,
gitea,
});
let listener = tokio::net::TcpListener::bind("0.0.0.0:9988").await?;
tracing::info!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
Ok(())
}
async fn local_trigger() -> Result<(), Box<dyn std::error::Error>> {
let kubernetes_client: Client = Client::try_default()
.await
.expect("Set KUBECONFIG to a valid kubernetes config.");
@ -61,43 +90,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let gitea = GiteaClient::new(gitea_api_root, gitea_api_token);
let webhook_payload: HookPush = serde_json::from_str(EXAMPLE_WEBHOOK_PAYLOAD)?;
let repo_tree = gitea
.get_tree(
"talexander",
"webhook_bridge",
webhook_payload.get_pull_base_sha()?,
)
.await?;
let remote_config = discover_webhook_bridge_config(&gitea, &repo_tree).await?;
let pipelines = discover_matching_push_triggers(
&gitea,
&repo_tree,
&webhook_payload.ref_field,
&remote_config,
)
.await?;
run_pipelines(webhook_payload, pipelines, kubernetes_client).await?;
handle_push(gitea, kubernetes_client, webhook_payload).await?;
// let app = Router::new()
// .route("/hook", post(hook))
// .layer(middleware::from_fn(verify_signature))
// .route("/health", get(health))
// .layer((
// TraceLayer::new_for_http(),
// // Add a timeout layer so graceful shutdown can't wait forever.
// TimeoutLayer::new(Duration::from_secs(600)),
// ))
// .with_state(AppState {
// kubernetes_client,
// gitea,
// });
// let listener = tokio::net::TcpListener::bind("0.0.0.0:9988").await?;
// tracing::info!("listening on {}", listener.local_addr().unwrap());
// axum::serve(listener, app)
// .with_graceful_shutdown(shutdown_signal())
// .await?;
Ok(())
}

View File

@ -22,7 +22,12 @@ use serde::Serialize;
use sha2::Sha256;
use tracing::debug;
use crate::discovery::discover_matching_push_triggers;
use crate::discovery::discover_webhook_bridge_config;
use crate::gitea_client::GiteaClient;
use crate::hook_push::HookPush;
use crate::hook_push::PipelineParamters;
use crate::kubernetes::run_pipelines;
use crate::AppState;
type HmacSha256 = Hmac<Sha256>;
@ -34,13 +39,18 @@ pub(crate) async fn hook(
) -> (StatusCode, Json<HookResponse>) {
debug!("REQ: {:?}", payload);
match payload {
HookRequest::Push(_payload) => (
StatusCode::OK,
Json(HookResponse {
ok: true,
message: None,
}),
),
HookRequest::Push(webhook_payload) => {
handle_push(state.gitea, state.kubernetes_client, webhook_payload)
.await
.expect("Failed to handle push event.");
(
StatusCode::OK,
Json(HookResponse {
ok: true,
message: None,
}),
)
}
HookRequest::Unrecognized(payload) => (
StatusCode::BAD_REQUEST,
Json(HookResponse {
@ -153,3 +163,26 @@ fn hex_to_bytes(s: &str) -> Option<Vec<u8>> {
None
}
}
pub(crate) async fn handle_push(
gitea: GiteaClient,
kubernetes_client: kube::Client,
webhook_payload: HookPush,
) -> Result<(), Box<dyn std::error::Error>> {
let repo_owner = webhook_payload.get_repo_owner()?;
let repo_name = webhook_payload.get_repo_name()?;
let pull_base_sha = webhook_payload.get_pull_base_sha()?;
let repo_tree = gitea.get_tree(repo_owner, repo_name, pull_base_sha).await?;
let remote_config = discover_webhook_bridge_config(&gitea, &repo_tree).await?;
let pipelines = discover_matching_push_triggers(
&gitea,
&repo_tree,
&webhook_payload.ref_field,
&remote_config,
)
.await?;
run_pipelines(webhook_payload, pipelines, kubernetes_client).await?;
Ok(())
}