nixos/vector: Migrate from handleTest to runTest

This commit is contained in:
Jonathan Davies 2025-06-15 12:51:56 +00:00
parent 7d66df760c
commit 036a0530cc
No known key found for this signature in database
8 changed files with 641 additions and 657 deletions

View File

@ -1464,7 +1464,7 @@ in
vault-postgresql = runTest ./vault-postgresql.nix;
vaultwarden = discoverTests (import ./vaultwarden.nix);
vdirsyncer = runTest ./vdirsyncer.nix;
vector = handleTest ./vector { };
vector = import ./vector { inherit runTest; };
velocity = runTest ./velocity.nix;
vengi-tools = runTest ./vengi-tools.nix;
victoriametrics = handleTest ./victoriametrics { };

View File

@ -1,45 +1,43 @@
import ../make-test-python.nix (
{ lib, pkgs, ... }:
{ lib, pkgs, ... }:
{
name = "vector-api";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
{
name = "vector-api";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
nodes.machineapi =
{ config, pkgs, ... }:
{
services.vector = {
enable = true;
journaldAccess = false;
settings = {
api.enabled = true;
nodes.machineapi =
{ config, pkgs, ... }:
{
services.vector = {
enable = true;
journaldAccess = false;
settings = {
api.enabled = true;
sources = {
demo_logs = {
type = "demo_logs";
format = "json";
};
sources = {
demo_logs = {
type = "demo_logs";
format = "json";
};
};
sinks = {
file = {
type = "file";
inputs = [ "demo_logs" ];
path = "/var/lib/vector/logs.log";
encoding = {
codec = "json";
};
sinks = {
file = {
type = "file";
inputs = [ "demo_logs" ];
path = "/var/lib/vector/logs.log";
encoding = {
codec = "json";
};
};
};
};
};
};
testScript = ''
machineapi.wait_for_unit("vector")
machineapi.wait_for_open_port(8686)
machineapi.succeed("journalctl -o cat -u vector.service | grep 'API server running'")
machineapi.wait_until_succeeds("curl -sSf http://localhost:8686/health")
'';
}
)
testScript = ''
machineapi.wait_for_unit("vector")
machineapi.wait_for_open_port(8686)
machineapi.succeed("journalctl -o cat -u vector.service | grep 'API server running'")
machineapi.wait_until_succeeds("curl -sSf http://localhost:8686/health")
'';
}

View File

@ -1,14 +1,10 @@
{
system ? builtins.currentSystem,
config ? { },
pkgs ? import ../../.. { inherit system config; },
}:
{ runTest }:
{
file-sink = import ./file-sink.nix { inherit system pkgs; };
api = import ./api.nix { inherit system pkgs; };
dnstap = import ./dnstap.nix { inherit system pkgs; };
journald-clickhouse = import ./journald-clickhouse.nix { inherit system pkgs; };
nginx-clickhouse = import ./nginx-clickhouse.nix { inherit system pkgs; };
syslog-quickwit = import ./syslog-quickwit.nix { inherit system pkgs; };
file-sink = runTest ./file-sink.nix;
api = runTest ./api.nix;
dnstap = runTest ./dnstap.nix;
journald-clickhouse = runTest ./journald-clickhouse.nix;
nginx-clickhouse = runTest ./nginx-clickhouse.nix;
syslog-quickwit = runTest ./syslog-quickwit.nix;
}

View File

@ -1,133 +1,131 @@
import ../make-test-python.nix (
{ lib, pkgs, ... }:
{ lib, pkgs, ... }:
let
dnstapSocket = "/var/run/vector/dnstap.sock";
in
{
name = "vector-dnstap";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
let
dnstapSocket = "/var/run/vector/dnstap.sock";
in
{
name = "vector-dnstap";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
nodes = {
unbound =
{ config, pkgs, ... }:
{
networking.firewall.allowedUDPPorts = [ 53 ];
nodes = {
unbound =
{ config, pkgs, ... }:
{
networking.firewall.allowedUDPPorts = [ 53 ];
services.vector = {
enable = true;
settings = {
sources = {
dnstap = {
type = "dnstap";
multithreaded = true;
mode = "unix";
lowercase_hostnames = true;
socket_file_mode = 504;
socket_path = "${dnstapSocket}";
};
};
sinks = {
file = {
type = "file";
inputs = [ "dnstap" ];
path = "/var/lib/vector/logs.log";
encoding = {
codec = "json";
};
};
};
};
};
systemd.services.vector.serviceConfig = {
RuntimeDirectory = "vector";
RuntimeDirectoryMode = "0770";
};
services.unbound = {
enable = true;
enableRootTrustAnchor = false;
package = pkgs.unbound-full;
settings = {
server = {
interface = [
"0.0.0.0"
"::"
];
access-control = [
"192.168.0.0/24 allow"
"::/0 allow"
];
domain-insecure = "local";
private-domain = "local";
local-zone = "local. static";
local-data = [
''"test.local. 10800 IN A 192.168.123.5"''
];
};
services.vector = {
enable = true;
settings = {
sources = {
dnstap = {
dnstap-enable = "yes";
dnstap-socket-path = "${dnstapSocket}";
dnstap-send-identity = "yes";
dnstap-send-version = "yes";
dnstap-log-client-query-messages = "yes";
dnstap-log-client-response-messages = "yes";
type = "dnstap";
multithreaded = true;
mode = "unix";
lowercase_hostnames = true;
socket_file_mode = 504;
socket_path = "${dnstapSocket}";
};
};
sinks = {
file = {
type = "file";
inputs = [ "dnstap" ];
path = "/var/lib/vector/logs.log";
encoding = {
codec = "json";
};
};
};
};
};
systemd.services.unbound = {
after = [ "vector.service" ];
wants = [ "vector.service" ];
serviceConfig = {
# DNSTAP access
ReadWritePaths = [ "/var/run/vector" ];
SupplementaryGroups = [ "vector" ];
systemd.services.vector.serviceConfig = {
RuntimeDirectory = "vector";
RuntimeDirectoryMode = "0770";
};
services.unbound = {
enable = true;
enableRootTrustAnchor = false;
package = pkgs.unbound-full;
settings = {
server = {
interface = [
"0.0.0.0"
"::"
];
access-control = [
"192.168.0.0/24 allow"
"::/0 allow"
];
domain-insecure = "local";
private-domain = "local";
local-zone = "local. static";
local-data = [
''"test.local. 10800 IN A 192.168.123.5"''
];
};
dnstap = {
dnstap-enable = "yes";
dnstap-socket-path = "${dnstapSocket}";
dnstap-send-identity = "yes";
dnstap-send-version = "yes";
dnstap-log-client-query-messages = "yes";
dnstap-log-client-response-messages = "yes";
};
};
};
dnsclient =
{ config, pkgs, ... }:
{
environment.systemPackages = [ pkgs.dig ];
systemd.services.unbound = {
after = [ "vector.service" ];
wants = [ "vector.service" ];
serviceConfig = {
# DNSTAP access
ReadWritePaths = [ "/var/run/vector" ];
SupplementaryGroups = [ "vector" ];
};
};
};
};
testScript = ''
unbound.wait_for_unit("unbound")
unbound.wait_for_unit("vector")
dnsclient =
{ config, pkgs, ... }:
{
environment.systemPackages = [ pkgs.dig ];
};
};
unbound.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'Socket permissions updated to 0o770'"
)
unbound.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'component_type=dnstap' | grep 'Listening... path=\"${dnstapSocket}\"'"
)
testScript = ''
unbound.wait_for_unit("unbound")
unbound.wait_for_unit("vector")
unbound.wait_for_file("${dnstapSocket}")
unbound.succeed("test 770 -eq $(stat -c '%a' ${dnstapSocket})")
unbound.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'Socket permissions updated to 0o770'"
)
unbound.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'component_type=dnstap' | grep 'Listening... path=\"${dnstapSocket}\"'"
)
dnsclient.systemctl("start network-online.target")
dnsclient.wait_for_unit("network-online.target")
dnsclient.succeed(
"dig @unbound test.local"
)
unbound.wait_for_file("${dnstapSocket}")
unbound.succeed("test 770 -eq $(stat -c '%a' ${dnstapSocket})")
unbound.wait_for_file("/var/lib/vector/logs.log")
dnsclient.systemctl("start network-online.target")
dnsclient.wait_for_unit("network-online.target")
dnsclient.succeed(
"dig @unbound test.local"
)
unbound.wait_until_succeeds(
"grep ClientQuery /var/lib/vector/logs.log | grep '\"domainName\":\"test.local.\"' | grep '\"rcodeName\":\"NoError\"'"
)
unbound.wait_until_succeeds(
"grep ClientResponse /var/lib/vector/logs.log | grep '\"domainName\":\"test.local.\"' | grep '\"rData\":\"192.168.123.5\"'"
)
'';
}
)
unbound.wait_for_file("/var/lib/vector/logs.log")
unbound.wait_until_succeeds(
"grep ClientQuery /var/lib/vector/logs.log | grep '\"domainName\":\"test.local.\"' | grep '\"rcodeName\":\"NoError\"'"
)
unbound.wait_until_succeeds(
"grep ClientResponse /var/lib/vector/logs.log | grep '\"domainName\":\"test.local.\"' | grep '\"rData\":\"192.168.123.5\"'"
)
'';
}

View File

@ -1,58 +1,56 @@
import ../make-test-python.nix (
{ lib, pkgs, ... }:
{ lib, pkgs, ... }:
{
name = "vector-test1";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
{
name = "vector-test1";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
nodes.machine =
{ config, pkgs, ... }:
{
services.vector = {
enable = true;
journaldAccess = true;
settings = {
sources = {
journald.type = "journald";
nodes.machine =
{ config, pkgs, ... }:
{
services.vector = {
enable = true;
journaldAccess = true;
settings = {
sources = {
journald.type = "journald";
vector_metrics.type = "internal_metrics";
vector_metrics.type = "internal_metrics";
vector_logs.type = "internal_logs";
vector_logs.type = "internal_logs";
};
sinks = {
file = {
type = "file";
inputs = [
"journald"
"vector_logs"
];
path = "/var/lib/vector/logs.log";
encoding = {
codec = "json";
};
};
sinks = {
file = {
type = "file";
inputs = [
"journald"
"vector_logs"
];
path = "/var/lib/vector/logs.log";
encoding = {
codec = "json";
};
};
prometheus_exporter = {
type = "prometheus_exporter";
inputs = [ "vector_metrics" ];
address = "[::]:9598";
};
prometheus_exporter = {
type = "prometheus_exporter";
inputs = [ "vector_metrics" ];
address = "[::]:9598";
};
};
};
};
};
# ensure vector is forwarding the messages appropriately
testScript = ''
machine.wait_for_unit("vector.service")
machine.wait_for_open_port(9598)
machine.wait_until_succeeds("journalctl -o cat -u vector.service | grep 'version=\"${pkgs.vector.version}\"'")
machine.wait_until_succeeds("journalctl -o cat -u vector.service | grep 'API is disabled'")
machine.wait_until_succeeds("curl -sSf http://localhost:9598/metrics | grep vector_build_info")
machine.wait_until_succeeds("curl -sSf http://localhost:9598/metrics | grep vector_component_received_bytes_total | grep journald")
machine.wait_until_succeeds("curl -sSf http://localhost:9598/metrics | grep vector_utilization | grep prometheus_exporter")
machine.wait_for_file("/var/lib/vector/logs.log")
'';
}
)
# ensure vector is forwarding the messages appropriately
testScript = ''
machine.wait_for_unit("vector.service")
machine.wait_for_open_port(9598)
machine.wait_until_succeeds("journalctl -o cat -u vector.service | grep 'version=\"${pkgs.vector.version}\"'")
machine.wait_until_succeeds("journalctl -o cat -u vector.service | grep 'API is disabled'")
machine.wait_until_succeeds("curl -sSf http://localhost:9598/metrics | grep vector_build_info")
machine.wait_until_succeeds("curl -sSf http://localhost:9598/metrics | grep vector_component_received_bytes_total | grep journald")
machine.wait_until_succeeds("curl -sSf http://localhost:9598/metrics | grep vector_utilization | grep prometheus_exporter")
machine.wait_for_file("/var/lib/vector/logs.log")
'';
}

View File

@ -1,157 +1,155 @@
import ../make-test-python.nix (
{ lib, pkgs, ... }:
let
# Take the original journald message and create a new payload which only
# contains the relevant fields - these must match the database columns.
journalVrlRemapTransform = {
journald_remap = {
inputs = [ "journald" ];
type = "remap";
source = ''
m = {}
m.app = .SYSLOG_IDENTIFIER
m.host = .host
m.severity = to_int(.PRIORITY) ?? 0
m.level = to_syslog_level(m.severity) ?? ""
m.message = strip_ansi_escape_codes!(.message)
m.timestamp = .timestamp
m.uid = to_int(._UID) ?? 0
m.pid = to_int(._PID) ?? 0
. = [m]
'';
};
};
in
{
name = "vector-journald-clickhouse";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
nodes = {
clickhouse =
{ config, pkgs, ... }:
{
virtualisation.diskSize = 5 * 1024;
virtualisation.memorySize = 4096;
networking.firewall.allowedTCPPorts = [ 6000 ];
services.vector = {
enable = true;
journaldAccess = true;
settings = {
sources = {
journald = {
type = "journald";
};
vector_source = {
type = "vector";
address = "[::]:6000";
};
};
transforms = journalVrlRemapTransform;
sinks = {
clickhouse = {
type = "clickhouse";
inputs = [
"journald_remap"
"vector_source"
];
endpoint = "http://localhost:8123";
database = "journald";
table = "logs";
date_time_best_effort = true;
};
};
};
};
services.clickhouse = {
enable = true;
};
};
vector =
{ config, pkgs, ... }:
{
services.vector = {
enable = true;
journaldAccess = true;
settings = {
sources = {
journald = {
type = "journald";
};
};
transforms = journalVrlRemapTransform;
sinks = {
vector_sink = {
type = "vector";
inputs = [ "journald_remap" ];
address = "clickhouse:6000";
};
};
};
};
};
};
testScript =
let
# work around quote/substitution complexity by Nix, Perl, bash and SQL.
databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS journald";
# https://clickhouse.com/blog/storing-log-data-in-clickhouse-fluent-bit-vector-open-telemetry
tableDDL = pkgs.writeText "table.sql" ''
CREATE TABLE IF NOT EXISTS journald.logs (
timestamp DateTime64(6),
app LowCardinality(String),
host LowCardinality(String),
level LowCardinality(String),
severity UInt8,
message String,
uid UInt16,
pid UInt32,
)
ENGINE = MergeTree()
ORDER BY (host, app, timestamp)
PARTITION BY toYYYYMM(timestamp)
'';
selectQuery = pkgs.writeText "select.sql" ''
SELECT COUNT(host) FROM journald.logs
WHERE message LIKE '%Vector has started%'
'';
in
''
clickhouse.wait_for_unit("clickhouse")
clickhouse.wait_for_open_port(6000)
clickhouse.wait_for_open_port(8123)
clickhouse.succeed(
"cat ${databaseDDL} | clickhouse-client"
)
clickhouse.succeed(
"cat ${tableDDL} | clickhouse-client"
)
for machine in clickhouse, vector:
machine.wait_for_unit("vector")
machine.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'Vector has started'"
)
clickhouse.wait_until_succeeds(
"cat ${selectQuery} | clickhouse-client | grep 2"
)
{ lib, pkgs, ... }:
let
# Take the original journald message and create a new payload which only
# contains the relevant fields - these must match the database columns.
journalVrlRemapTransform = {
journald_remap = {
inputs = [ "journald" ];
type = "remap";
source = ''
m = {}
m.app = .SYSLOG_IDENTIFIER
m.host = .host
m.severity = to_int(.PRIORITY) ?? 0
m.level = to_syslog_level(m.severity) ?? ""
m.message = strip_ansi_escape_codes!(.message)
m.timestamp = .timestamp
m.uid = to_int(._UID) ?? 0
m.pid = to_int(._PID) ?? 0
. = [m]
'';
}
)
};
};
in
{
name = "vector-journald-clickhouse";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
nodes = {
clickhouse =
{ config, pkgs, ... }:
{
virtualisation.diskSize = 5 * 1024;
virtualisation.memorySize = 4096;
networking.firewall.allowedTCPPorts = [ 6000 ];
services.vector = {
enable = true;
journaldAccess = true;
settings = {
sources = {
journald = {
type = "journald";
};
vector_source = {
type = "vector";
address = "[::]:6000";
};
};
transforms = journalVrlRemapTransform;
sinks = {
clickhouse = {
type = "clickhouse";
inputs = [
"journald_remap"
"vector_source"
];
endpoint = "http://localhost:8123";
database = "journald";
table = "logs";
date_time_best_effort = true;
};
};
};
};
services.clickhouse = {
enable = true;
};
};
vector =
{ config, pkgs, ... }:
{
services.vector = {
enable = true;
journaldAccess = true;
settings = {
sources = {
journald = {
type = "journald";
};
};
transforms = journalVrlRemapTransform;
sinks = {
vector_sink = {
type = "vector";
inputs = [ "journald_remap" ];
address = "clickhouse:6000";
};
};
};
};
};
};
testScript =
let
# work around quote/substitution complexity by Nix, Perl, bash and SQL.
databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS journald";
# https://clickhouse.com/blog/storing-log-data-in-clickhouse-fluent-bit-vector-open-telemetry
tableDDL = pkgs.writeText "table.sql" ''
CREATE TABLE IF NOT EXISTS journald.logs (
timestamp DateTime64(6),
app LowCardinality(String),
host LowCardinality(String),
level LowCardinality(String),
severity UInt8,
message String,
uid UInt16,
pid UInt32,
)
ENGINE = MergeTree()
ORDER BY (host, app, timestamp)
PARTITION BY toYYYYMM(timestamp)
'';
selectQuery = pkgs.writeText "select.sql" ''
SELECT COUNT(host) FROM journald.logs
WHERE message LIKE '%Vector has started%'
'';
in
''
clickhouse.wait_for_unit("clickhouse")
clickhouse.wait_for_open_port(6000)
clickhouse.wait_for_open_port(8123)
clickhouse.succeed(
"cat ${databaseDDL} | clickhouse-client"
)
clickhouse.succeed(
"cat ${tableDDL} | clickhouse-client"
)
for machine in clickhouse, vector:
machine.wait_for_unit("vector")
machine.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'Vector has started'"
)
clickhouse.wait_until_succeeds(
"cat ${selectQuery} | clickhouse-client | grep 2"
)
'';
}

View File

@ -1,174 +1,172 @@
import ../make-test-python.nix (
{ lib, pkgs, ... }:
{ lib, pkgs, ... }:
{
name = "vector-nginx-clickhouse";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
{
name = "vector-nginx-clickhouse";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
nodes = {
clickhouse =
{ config, pkgs, ... }:
{
virtualisation.memorySize = 4096;
nodes = {
clickhouse =
{ config, pkgs, ... }:
{
virtualisation.memorySize = 4096;
# Clickhouse module can't listen on a non-loopback IP.
networking.firewall.allowedTCPPorts = [ 6000 ];
services.clickhouse.enable = true;
# Clickhouse module can't listen on a non-loopback IP.
networking.firewall.allowedTCPPorts = [ 6000 ];
services.clickhouse.enable = true;
# Exercise Vector sink->source for now.
services.vector = {
enable = true;
# Exercise Vector sink->source for now.
services.vector = {
enable = true;
settings = {
sources = {
vector_source = {
type = "vector";
address = "[::]:6000";
};
settings = {
sources = {
vector_source = {
type = "vector";
address = "[::]:6000";
};
};
sinks = {
clickhouse = {
type = "clickhouse";
inputs = [ "vector_source" ];
endpoint = "http://localhost:8123";
database = "nginxdb";
table = "access_logs";
skip_unknown_fields = true;
};
sinks = {
clickhouse = {
type = "clickhouse";
inputs = [ "vector_source" ];
endpoint = "http://localhost:8123";
database = "nginxdb";
table = "access_logs";
skip_unknown_fields = true;
};
};
};
};
};
nginx =
{ config, pkgs, ... }:
{
services.nginx = {
enable = true;
virtualHosts.localhost = { };
};
services.vector = {
enable = true;
settings = {
sources = {
nginx_logs = {
type = "file";
include = [ "/var/log/nginx/access.log" ];
read_from = "end";
};
};
sinks = {
vector_sink = {
type = "vector";
inputs = [ "nginx_logs" ];
address = "clickhouse:6000";
};
};
};
};
nginx =
{ config, pkgs, ... }:
{
services.nginx = {
enable = true;
virtualHosts.localhost = { };
};
services.vector = {
enable = true;
settings = {
sources = {
nginx_logs = {
type = "file";
include = [ "/var/log/nginx/access.log" ];
read_from = "end";
};
};
sinks = {
vector_sink = {
type = "vector";
inputs = [ "nginx_logs" ];
address = "clickhouse:6000";
};
};
};
};
systemd.services.vector.serviceConfig = {
SupplementaryGroups = [ "nginx" ];
};
systemd.services.vector.serviceConfig = {
SupplementaryGroups = [ "nginx" ];
};
};
};
};
testScript =
let
# work around quote/substitution complexity by Nix, Perl, bash and SQL.
databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS nginxdb";
testScript =
let
# work around quote/substitution complexity by Nix, Perl, bash and SQL.
databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS nginxdb";
tableDDL = pkgs.writeText "table.sql" ''
CREATE TABLE IF NOT EXISTS nginxdb.access_logs (
message String
)
ENGINE = MergeTree()
ORDER BY tuple()
'';
# Graciously taken from https://clickhouse.com/docs/en/integrations/vector
tableView = pkgs.writeText "table-view.sql" ''
CREATE MATERIALIZED VIEW nginxdb.access_logs_view
(
RemoteAddr String,
Client String,
RemoteUser String,
TimeLocal DateTime,
RequestMethod String,
Request String,
HttpVersion String,
Status Int32,
BytesSent Int64,
UserAgent String
)
ENGINE = MergeTree()
ORDER BY RemoteAddr
POPULATE AS
WITH
splitByWhitespace(message) as split,
splitByRegexp('\S \d+ "([^"]*)"', message) as referer
SELECT
split[1] AS RemoteAddr,
split[2] AS Client,
split[3] AS RemoteUser,
parseDateTimeBestEffort(replaceOne(trim(LEADING '[' FROM split[4]), ':', ' ')) AS TimeLocal,
trim(LEADING '"' FROM split[6]) AS RequestMethod,
split[7] AS Request,
trim(TRAILING '"' FROM split[8]) AS HttpVersion,
split[9] AS Status,
split[10] AS BytesSent,
trim(BOTH '"' from referer[2]) AS UserAgent
FROM
(SELECT message FROM nginxdb.access_logs)
'';
selectQuery = pkgs.writeText "select.sql" "SELECT * from nginxdb.access_logs_view";
in
''
clickhouse.wait_for_unit("clickhouse")
clickhouse.wait_for_open_port(8123)
clickhouse.wait_until_succeeds(
"journalctl -o cat -u clickhouse.service | grep 'Started ClickHouse server'"
)
clickhouse.wait_for_unit("vector")
clickhouse.wait_for_open_port(6000)
clickhouse.succeed(
"cat ${databaseDDL} | clickhouse-client"
)
clickhouse.succeed(
"cat ${tableDDL} | clickhouse-client"
)
clickhouse.succeed(
"cat ${tableView} | clickhouse-client"
)
nginx.wait_for_unit("nginx")
nginx.wait_for_open_port(80)
nginx.wait_for_unit("vector")
nginx.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'Starting file server'"
)
nginx.succeed("curl http://localhost/")
nginx.succeed("curl http://localhost/")
nginx.wait_for_file("/var/log/nginx/access.log")
nginx.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'Found new file to watch. file=/var/log/nginx/access.log'"
)
clickhouse.wait_until_succeeds(
"cat ${selectQuery} | clickhouse-client | grep 'curl'"
tableDDL = pkgs.writeText "table.sql" ''
CREATE TABLE IF NOT EXISTS nginxdb.access_logs (
message String
)
ENGINE = MergeTree()
ORDER BY tuple()
'';
}
)
# Graciously taken from https://clickhouse.com/docs/en/integrations/vector
tableView = pkgs.writeText "table-view.sql" ''
CREATE MATERIALIZED VIEW nginxdb.access_logs_view
(
RemoteAddr String,
Client String,
RemoteUser String,
TimeLocal DateTime,
RequestMethod String,
Request String,
HttpVersion String,
Status Int32,
BytesSent Int64,
UserAgent String
)
ENGINE = MergeTree()
ORDER BY RemoteAddr
POPULATE AS
WITH
splitByWhitespace(message) as split,
splitByRegexp('\S \d+ "([^"]*)"', message) as referer
SELECT
split[1] AS RemoteAddr,
split[2] AS Client,
split[3] AS RemoteUser,
parseDateTimeBestEffort(replaceOne(trim(LEADING '[' FROM split[4]), ':', ' ')) AS TimeLocal,
trim(LEADING '"' FROM split[6]) AS RequestMethod,
split[7] AS Request,
trim(TRAILING '"' FROM split[8]) AS HttpVersion,
split[9] AS Status,
split[10] AS BytesSent,
trim(BOTH '"' from referer[2]) AS UserAgent
FROM
(SELECT message FROM nginxdb.access_logs)
'';
selectQuery = pkgs.writeText "select.sql" "SELECT * from nginxdb.access_logs_view";
in
''
clickhouse.wait_for_unit("clickhouse")
clickhouse.wait_for_open_port(8123)
clickhouse.wait_until_succeeds(
"journalctl -o cat -u clickhouse.service | grep 'Started ClickHouse server'"
)
clickhouse.wait_for_unit("vector")
clickhouse.wait_for_open_port(6000)
clickhouse.succeed(
"cat ${databaseDDL} | clickhouse-client"
)
clickhouse.succeed(
"cat ${tableDDL} | clickhouse-client"
)
clickhouse.succeed(
"cat ${tableView} | clickhouse-client"
)
nginx.wait_for_unit("nginx")
nginx.wait_for_open_port(80)
nginx.wait_for_unit("vector")
nginx.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'Starting file server'"
)
nginx.succeed("curl http://localhost/")
nginx.succeed("curl http://localhost/")
nginx.wait_for_file("/var/log/nginx/access.log")
nginx.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'Found new file to watch. file=/var/log/nginx/access.log'"
)
clickhouse.wait_until_succeeds(
"cat ${selectQuery} | clickhouse-client | grep 'curl'"
)
'';
}

View File

@ -1,162 +1,160 @@
import ../make-test-python.nix (
{ lib, pkgs, ... }:
{ lib, pkgs, ... }:
# Based on https://quickwit.io/docs/log-management/send-logs/using-vector
# Based on https://quickwit.io/docs/log-management/send-logs/using-vector
{
name = "vector-syslog-quickwit";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
{
name = "vector-syslog-quickwit";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
nodes = {
quickwit =
{ config, pkgs, ... }:
{
environment.systemPackages = [ pkgs.jq ];
nodes = {
quickwit =
{ config, pkgs, ... }:
{
environment.systemPackages = [ pkgs.jq ];
networking.firewall.allowedTCPPorts = [ 7280 ];
networking.firewall.allowedTCPPorts = [ 7280 ];
services.quickwit = {
enable = true;
settings = {
listen_address = "::";
};
services.quickwit = {
enable = true;
settings = {
listen_address = "::";
};
};
};
syslog =
{ config, pkgs, ... }:
{
services.vector = {
enable = true;
syslog =
{ config, pkgs, ... }:
{
services.vector = {
enable = true;
settings = {
sources = {
generate_syslog = {
type = "demo_logs";
format = "syslog";
interval = 0.5;
};
settings = {
sources = {
generate_syslog = {
type = "demo_logs";
format = "syslog";
interval = 0.5;
};
};
transforms = {
remap_syslog = {
inputs = [ "generate_syslog" ];
type = "remap";
source = ''
structured = parse_syslog!(.message)
.timestamp_nanos = to_unix_timestamp!(structured.timestamp, unit: "nanoseconds")
.body = structured
.service_name = structured.appname
.resource_attributes.source_type = .source_type
.resource_attributes.host.hostname = structured.hostname
.resource_attributes.service.name = structured.appname
.attributes.syslog.procid = structured.procid
.attributes.syslog.facility = structured.facility
.attributes.syslog.version = structured.version
.severity_text = if includes(["emerg", "err", "crit", "alert"], structured.severity) {
"ERROR"
} else if structured.severity == "warning" {
"WARN"
} else if structured.severity == "debug" {
"DEBUG"
} else if includes(["info", "notice"], structured.severity) {
"INFO"
} else {
structured.severity
}
.scope_name = structured.msgid
del(.message)
del(.host)
del(.timestamp)
del(.service)
del(.source_type)
'';
};
transforms = {
remap_syslog = {
inputs = [ "generate_syslog" ];
type = "remap";
source = ''
structured = parse_syslog!(.message)
.timestamp_nanos = to_unix_timestamp!(structured.timestamp, unit: "nanoseconds")
.body = structured
.service_name = structured.appname
.resource_attributes.source_type = .source_type
.resource_attributes.host.hostname = structured.hostname
.resource_attributes.service.name = structured.appname
.attributes.syslog.procid = structured.procid
.attributes.syslog.facility = structured.facility
.attributes.syslog.version = structured.version
.severity_text = if includes(["emerg", "err", "crit", "alert"], structured.severity) {
"ERROR"
} else if structured.severity == "warning" {
"WARN"
} else if structured.severity == "debug" {
"DEBUG"
} else if includes(["info", "notice"], structured.severity) {
"INFO"
} else {
structured.severity
}
.scope_name = structured.msgid
del(.message)
del(.host)
del(.timestamp)
del(.service)
del(.source_type)
'';
};
};
sinks = {
#emit_syslog = {
# inputs = ["remap_syslog"];
# type = "console";
# encoding.codec = "json";
#};
quickwit_logs = {
type = "http";
method = "post";
inputs = [ "remap_syslog" ];
encoding.codec = "json";
framing.method = "newline_delimited";
uri = "http://quickwit:7280/api/v1/otel-logs-v0_7/ingest";
};
sinks = {
#emit_syslog = {
# inputs = ["remap_syslog"];
# type = "console";
# encoding.codec = "json";
#};
quickwit_logs = {
type = "http";
method = "post";
inputs = [ "remap_syslog" ];
encoding.codec = "json";
framing.method = "newline_delimited";
uri = "http://quickwit:7280/api/v1/otel-logs-v0_7/ingest";
};
};
};
};
};
};
};
testScript =
let
aggregationQuery = pkgs.writeText "aggregation-query.json" ''
{
"query": "*",
"max_hits": 0,
"aggs": {
"count_per_minute": {
"histogram": {
"field": "timestamp_nanos",
"interval": 60000000
},
"aggs": {
"severity_text_count": {
"terms": {
"field": "severity_text"
}
testScript =
let
aggregationQuery = pkgs.writeText "aggregation-query.json" ''
{
"query": "*",
"max_hits": 0,
"aggs": {
"count_per_minute": {
"histogram": {
"field": "timestamp_nanos",
"interval": 60000000
},
"aggs": {
"severity_text_count": {
"terms": {
"field": "severity_text"
}
}
}
}
}
'';
in
''
quickwit.wait_for_unit("quickwit")
quickwit.wait_for_open_port(7280)
quickwit.wait_for_open_port(7281)
quickwit.wait_until_succeeds(
"journalctl -o cat -u quickwit.service | grep 'transitioned to ready state'"
)
syslog.wait_for_unit("vector")
syslog.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'Vector has started'"
)
quickwit.wait_until_succeeds(
"journalctl -o cat -u quickwit.service | grep 'publish-new-splits'"
)
# Wait for logs to be generated
# Test below aggregates by the minute
syslog.sleep(60 * 2)
quickwit.wait_until_succeeds(
"curl -sSf -XGET http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search?query=severity_text:ERROR |"
+ " jq '.num_hits' | grep -v '0'"
)
quickwit.wait_until_succeeds(
"journalctl -o cat -u quickwit.service | grep 'SearchRequest'"
)
quickwit.wait_until_succeeds(
"curl -sSf -XPOST -H 'Content-Type: application/json' http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search --data @${aggregationQuery} |"
+ " jq '.num_hits' | grep -v '0'"
)
quickwit.wait_until_succeeds(
"journalctl -o cat -u quickwit.service | grep 'count_per_minute'"
)
}
'';
}
)
in
''
quickwit.wait_for_unit("quickwit")
quickwit.wait_for_open_port(7280)
quickwit.wait_for_open_port(7281)
quickwit.wait_until_succeeds(
"journalctl -o cat -u quickwit.service | grep 'transitioned to ready state'"
)
syslog.wait_for_unit("vector")
syslog.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'Vector has started'"
)
quickwit.wait_until_succeeds(
"journalctl -o cat -u quickwit.service | grep 'publish-new-splits'"
)
# Wait for logs to be generated
# Test below aggregates by the minute
syslog.sleep(60 * 2)
quickwit.wait_until_succeeds(
"curl -sSf -XGET http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search?query=severity_text:ERROR |"
+ " jq '.num_hits' | grep -v '0'"
)
quickwit.wait_until_succeeds(
"journalctl -o cat -u quickwit.service | grep 'SearchRequest'"
)
quickwit.wait_until_succeeds(
"curl -sSf -XPOST -H 'Content-Type: application/json' http://127.0.0.1:7280/api/v1/otel-logs-v0_7/search --data @${aggregationQuery} |"
+ " jq '.num_hits' | grep -v '0'"
)
quickwit.wait_until_succeeds(
"journalctl -o cat -u quickwit.service | grep 'count_per_minute'"
)
'';
}