webhook_bridge/src/webhook.rs
Tom Alexander 81bebf7e17
Some checks failed
semver Build semver has succeeded
format Build format has succeeded
rust-test Build rust-test has failed
build Build build has failed
clippy Build clippy has failed
Test: instantiate new clients for every request.
Trying to figure out why I am getting the below error occasionally in gitea:

Delivery: Post "https://webhookbridge.fizz.buzz/hook": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
2024-09-29 21:50:28 -04:00

222 lines
6.8 KiB
Rust

use std::borrow::Borrow;
use std::collections::HashSet;
use std::future::Future;
use axum::async_trait;
use axum::body::Body;
use axum::body::Bytes;
use axum::extract::FromRequest;
use axum::extract::Request;
use axum::extract::State;
use axum::http::HeaderMap;
use axum::http::StatusCode;
use axum::middleware::Next;
use axum::response::IntoResponse;
use axum::response::Response;
use axum::Json;
use axum::RequestExt;
use base64::engine::general_purpose;
use base64::Engine as _;
use hmac::Hmac;
use hmac::Mac;
use http_body_util::BodyExt;
use serde::Serialize;
use sha2::Sha256;
use tracing::debug;
use crate::app_state::AppState;
use crate::discovery::discover_matching_push_triggers;
use crate::discovery::discover_webhook_bridge_config;
use crate::gitea_client::GiteaClient;
use crate::hook_push::HookPush;
use crate::hook_push::PipelineParamters;
use crate::kubernetes::run_pipelines;
type HmacSha256 = Hmac<Sha256>;
pub(crate) async fn hook(
_headers: HeaderMap,
State(state): State<AppState>,
payload: HookRequest,
) -> (StatusCode, Json<HookResponse>) {
debug!("REQ: {:?}", payload);
match payload {
HookRequest::Push(webhook_payload) => {
let kubernetes_client: kube::Client = kube::Client::try_default()
.await
.expect("Set KUBECONFIG to a valid kubernetes config.");
let gitea_api_root = std::env::var("WEBHOOK_BRIDGE_API_ROOT")?;
let gitea_api_token = std::env::var("WEBHOOK_BRIDGE_OAUTH_TOKEN")?;
let gitea = GiteaClient::new(gitea_api_root, gitea_api_token);
let push_result = handle_push(
gitea,
kubernetes_client,
state.allowed_repos.borrow(),
webhook_payload,
)
.await;
match push_result {
Ok(_) => (
StatusCode::OK,
Json(HookResponse {
ok: true,
message: None,
}),
),
Err(_) => (
// StatusCode::INTERNAL_SERVER_ERROR,
StatusCode::OK,
Json(HookResponse {
ok: false,
message: Some("Failed to handle push event.".to_string()),
}),
),
}
}
HookRequest::Unrecognized(payload) => (
// StatusCode::BAD_REQUEST,
StatusCode::OK,
Json(HookResponse {
ok: false,
message: Some(format!("unrecognized event type: {payload}")),
}),
),
}
}
#[derive(Debug)]
pub(crate) enum HookRequest {
Push(HookPush),
Unrecognized(String),
}
#[async_trait]
impl<S> FromRequest<S> for HookRequest
where
S: Send + Sync,
{
type Rejection = Response;
async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
let event_type = req
.headers()
.get("X-Gitea-Event-Type")
.ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?;
let event_type = event_type
.to_str()
.map_err(|_| StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?;
match event_type {
"push" => {
let Json(payload): Json<HookPush> =
req.extract().await.map_err(IntoResponse::into_response)?;
Ok(HookRequest::Push(payload))
}
_ => Ok(HookRequest::Unrecognized(event_type.to_owned())),
}
}
}
#[derive(Debug, Serialize)]
pub(crate) struct HookResponse {
ok: bool,
message: Option<String>,
}
pub(crate) async fn verify_signature(
request: Request,
next: Next,
) -> Result<impl IntoResponse, Response> {
let signature = request
.headers()
.get("X-Gitea-Signature")
.ok_or(StatusCode::BAD_REQUEST.into_response())?;
let signature = signature
.to_str()
.map_err(|_| StatusCode::BAD_REQUEST.into_response())?;
let signature = hex_to_bytes(signature).ok_or(StatusCode::BAD_REQUEST.into_response())?;
let secret = std::env::var("WEBHOOK_BRIDGE_HMAC_SECRET")
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?;
let request =
inspect_request_body(request, move |body| check_hash(body, secret, signature)).await?;
Ok(next.run(request).await)
}
async fn inspect_request_body<F, Fut>(request: Request, inspector: F) -> Result<Request, Response>
where
F: FnOnce(Bytes) -> Fut,
Fut: Future<Output = Result<Bytes, Response>>,
{
let (parts, body) = request.into_parts();
let bytes = body
.collect()
.await
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?
.to_bytes();
let bytes = inspector(bytes).await?;
Ok(Request::from_parts(parts, Body::from(bytes)))
}
async fn check_hash(body: Bytes, secret: String, signature: Vec<u8>) -> Result<Bytes, Response> {
tracing::debug!("Checking signature {:02x?}", signature.as_slice());
// tracing::info!("Using secret {:?}", secret);
tracing::debug!("and body {}", general_purpose::STANDARD.encode(&body));
let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response())?;
mac.update(&body);
mac.verify_slice(&signature)
.map_err(|e| (StatusCode::UNAUTHORIZED, e.to_string()).into_response())?;
Ok(body)
}
fn hex_to_bytes(s: &str) -> Option<Vec<u8>> {
if s.len() % 2 == 0 {
(0..s.len())
.step_by(2)
.map(|i| {
s.get(i..i + 2)
.and_then(|sub| u8::from_str_radix(sub, 16).ok())
})
.collect()
} else {
None
}
}
pub(crate) async fn handle_push(
gitea: GiteaClient,
kubernetes_client: kube::Client,
allowed_repos: &HashSet<String>,
webhook_payload: HookPush,
) -> Result<(), Box<dyn std::error::Error>> {
let repo_owner = webhook_payload.get_repo_owner()?;
let repo_name = webhook_payload.get_repo_name()?;
let pull_base_sha = webhook_payload.get_pull_base_sha()?;
if !allowed_repos.contains(&webhook_payload.repository.full_name) {
tracing::info!(
"{} is not an allowed repository.",
webhook_payload.repository.full_name
);
return Ok(());
}
let repo_tree = gitea.get_tree(repo_owner, repo_name, pull_base_sha).await?;
let remote_config = discover_webhook_bridge_config(&gitea, &repo_tree).await?;
let pipelines = discover_matching_push_triggers(
&gitea,
&repo_tree,
&webhook_payload.ref_field,
&remote_config,
)
.await?;
run_pipelines(webhook_payload, pipelines, kubernetes_client).await?;
Ok(())
}