diff --git a/nixos/tests/vector/default.nix b/nixos/tests/vector/default.nix index 7ec435bd848c..764689d5cb03 100644 --- a/nixos/tests/vector/default.nix +++ b/nixos/tests/vector/default.nix @@ -8,6 +8,7 @@ file-sink = import ./file-sink.nix { inherit system pkgs; }; api = import ./api.nix { inherit system pkgs; }; dnstap = import ./dnstap.nix { inherit system pkgs; }; + journald-clickhouse = import ./journald-clickhouse.nix { inherit system pkgs; }; nginx-clickhouse = import ./nginx-clickhouse.nix { inherit system pkgs; }; syslog-quickwit = import ./syslog-quickwit.nix { inherit system pkgs; }; } diff --git a/nixos/tests/vector/journald-clickhouse.nix b/nixos/tests/vector/journald-clickhouse.nix new file mode 100644 index 000000000000..6979ed5a9c87 --- /dev/null +++ b/nixos/tests/vector/journald-clickhouse.nix @@ -0,0 +1,157 @@ +import ../make-test-python.nix ( + { lib, pkgs, ... }: + let + # Take the original journald message and create a new payload which only + # contains the relevant fields - these must match the database columns. + journalVrlRemapTransform = { + journald_remap = { + inputs = [ "journald" ]; + type = "remap"; + source = '' + m = {} + m.app = .SYSLOG_IDENTIFIER + m.host = .host + m.severity = to_int(.PRIORITY) ?? 0 + m.level = to_syslog_level(m.severity) ?? "" + m.message = strip_ansi_escape_codes!(.message) + m.timestamp = .timestamp + m.uid = to_int(._UID) ?? 0 + m.pid = to_int(._PID) ?? 0 + . = [m] + ''; + }; + }; + in + { + name = "vector-journald-clickhouse"; + meta.maintainers = [ pkgs.lib.maintainers.happysalada ]; + + nodes = { + clickhouse = + { config, pkgs, ... }: + { + virtualisation.diskSize = 5 * 1024; + virtualisation.memorySize = 4096; + + networking.firewall.allowedTCPPorts = [ 6000 ]; + + services.vector = { + enable = true; + journaldAccess = true; + + settings = { + sources = { + journald = { + type = "journald"; + }; + + vector_source = { + type = "vector"; + address = "[::]:6000"; + }; + }; + + transforms = journalVrlRemapTransform; + + sinks = { + clickhouse = { + type = "clickhouse"; + inputs = [ + "journald_remap" + "vector_source" + ]; + endpoint = "http://localhost:8123"; + database = "journald"; + table = "logs"; + date_time_best_effort = true; + }; + }; + }; + + }; + + services.clickhouse = { + enable = true; + }; + }; + + vector = + { config, pkgs, ... }: + { + services.vector = { + enable = true; + journaldAccess = true; + + settings = { + sources = { + journald = { + type = "journald"; + }; + }; + + transforms = journalVrlRemapTransform; + + sinks = { + vector_sink = { + type = "vector"; + inputs = [ "journald_remap" ]; + address = "clickhouse:6000"; + }; + }; + }; + }; + }; + }; + + testScript = + let + # work around quote/substitution complexity by Nix, Perl, bash and SQL. + databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS journald"; + + # https://clickhouse.com/blog/storing-log-data-in-clickhouse-fluent-bit-vector-open-telemetry + tableDDL = pkgs.writeText "table.sql" '' + CREATE TABLE IF NOT EXISTS journald.logs ( + timestamp DateTime64(6), + app LowCardinality(String), + host LowCardinality(String), + level LowCardinality(String), + severity UInt8, + message String, + uid UInt16, + pid UInt32, + ) + ENGINE = MergeTree() + ORDER BY (host, app, timestamp) + PARTITION BY toYYYYMM(timestamp) + ''; + + selectQuery = pkgs.writeText "select.sql" '' + SELECT COUNT(host) FROM journald.logs + WHERE message LIKE '%Vector has started%' + ''; + in + '' + clickhouse.wait_for_unit("clickhouse") + clickhouse.wait_for_open_port(6000) + clickhouse.wait_for_open_port(8123) + + clickhouse.succeed( + "cat ${databaseDDL} | clickhouse-client" + ) + + clickhouse.succeed( + "cat ${tableDDL} | clickhouse-client" + ) + + for machine in clickhouse, vector: + machine.wait_for_unit("vector") + machine.wait_until_succeeds( + "journalctl -o cat -u vector.service | grep 'Vector has started'" + ) + + clickhouse.wait_until_succeeds( + "cat ${selectQuery} | clickhouse-client | grep 2" + ) + ''; + } +)