infra_snippets/docker/scripts/graph_docker_memory.py

168 lines
5.3 KiB
Python
Raw Normal View History

#!/usr/bin/env python
from __future__ import annotations
import json
import logging
import re
import subprocess
from dataclasses import dataclass
2025-02-01 14:34:02 -05:00
from datetime import datetime
from time import sleep
from typing import Collection, Final, NewType, Tuple
ContainerId = NewType("ContainerId", str)
ContainerName = NewType("ContainerName", str)
SAMPLE_INTERVAL_SECONDS: Final[int] = 2
@dataclass
class Sample:
instant: datetime
stats: dict[ContainerId, Stats]
@dataclass
class Stats:
memory_usage_bytes: int
def main():
logging.basicConfig(level=logging.INFO)
samples: list[Sample] = []
labels: dict[ContainerId, ContainerName] = {}
first_pass = True
# First wait for any docker container to exist.
while True:
sample, labels_in_sample = take_sample()
if labels_in_sample:
break
if first_pass:
first_pass = False
logging.info("Waiting for a docker container to exist to start recording.")
sleep(1)
# And then record memory until no containers exist.
while True:
sample, labels_in_sample = take_sample()
if not labels_in_sample:
break
samples.append(sample)
labels = {**labels, **labels_in_sample}
sleep(SAMPLE_INTERVAL_SECONDS)
if labels:
# Draws a red horizontal line at 32 GiB since that is the memory limit for cloud run.
write_plot(
samples,
labels,
2025-02-01 14:34:02 -05:00
# horizontal_lines=[(32 * 1024**3, "red", "Cloud Run Max Memory")],
)
def write_plot(
samples: Collection[Sample],
labels: dict[ContainerId, ContainerName],
*,
horizontal_lines: Collection[Tuple[int, str, str | None]] = [],
):
starting_time_per_container = {
container_id: min(
(sample.instant for sample in samples if container_id in sample.stats)
)
for container_id in labels.keys()
}
print(
"""set terminal svg background '#FFFFFF'
set title 'Docker Memory Usage'
set xdata time
set timefmt '%s'
set format x '%tH:%tM:%tS'
# Please note this is in SI units (base 10), not IEC (base 2). So, for example, this would show a Gigabyte, not a Gibibyte.
set format y '%.0s%cB'
set datafile separator "|"
"""
)
for y_value, color, label in horizontal_lines:
print(
f'''set arrow from graph 0, first {y_value} to graph 1, first {y_value} nohead linewidth 2 linecolor rgb "{color}"'''
)
if label is not None:
print(f"""set label "{label}" at graph 0, first {y_value} offset 1,-0.5""")
# Include the horizontal lines in the range
if len(horizontal_lines) > 0:
print(f"""set yrange [*:{max(x[0] for x in horizontal_lines)}<*]""")
line_definitions = ", ".join(
[
f""""-" using 1:2 title '{gnuplot_escape(name)}' with lines"""
for container_id, name in sorted(labels.items())
]
)
print("plot", line_definitions)
for container_id in sorted(labels.keys()):
start_time = int(starting_time_per_container[container_id].timestamp())
for sample in sorted(samples, key=lambda x: x.instant):
if container_id in sample.stats:
print(
"|".join(
[
str(int((sample.instant).timestamp()) - start_time),
str(sample.stats[container_id].memory_usage_bytes),
]
)
)
print("e")
def gnuplot_escape(inp: str) -> str:
out = ""
for c in inp:
if c == "_":
out += "\\"
out += c
return out
def take_sample() -> Tuple[Sample, dict[ContainerId, ContainerName]]:
labels: dict[ContainerId, ContainerName] = {}
stats: dict[ContainerId, Stats] = {}
docker_inspect = subprocess.run(
["docker", "stats", "--no-stream", "--no-trunc", "--format", "json"],
stdout=subprocess.PIPE,
)
for container_stat in (
json.loads(l) for l in docker_inspect.stdout.decode("utf8").splitlines()
):
if not container_stat["ID"]:
# When containers are starting up, they sometimes have no ID and "--" as the name.
continue
labels[ContainerId(container_stat["ID"])] = ContainerName(
container_stat["Name"]
)
memory_usage = parse_mem_usage(container_stat["MemUsage"])
stats[ContainerId(container_stat["ID"])] = Stats(
memory_usage_bytes=memory_usage
)
for container_id, container_stat in stats.items():
logging.info(
f"Recorded stat {labels[container_id]}: {container_stat.memory_usage_bytes} bytes"
)
return Sample(instant=datetime.now(), stats=stats), labels
def parse_mem_usage(mem_usage: str) -> int:
parsed_mem_usage = re.match(
r"(?P<number>[0-9]+\.?[0-9]*)(?P<unit>[^\s]+)", mem_usage
)
if parsed_mem_usage is None:
raise Exception(f"Invalid Mem Usage: {mem_usage}")
number = float(parsed_mem_usage.group("number"))
unit = parsed_mem_usage.group("unit")
for multiplier, identifier in enumerate(["B", "KiB", "MiB", "GiB", "TiB"]):
if unit == identifier:
return int(number * (1024**multiplier))
raise Exception(f"Unrecognized unit: {unit}")
if __name__ == "__main__":
main()