infra_snippets/docker/scripts/graph_docker_memory.py

168 lines
5.3 KiB
Python
Executable File

#!/usr/bin/env python
from __future__ import annotations
import json
import logging
import re
import subprocess
from dataclasses import dataclass
from datetime import datetime, timedelta
from time import sleep
from typing import Collection, Final, NewType, Tuple
ContainerId = NewType("ContainerId", str)
ContainerName = NewType("ContainerName", str)
SAMPLE_INTERVAL_SECONDS: Final[int] = 2
@dataclass
class Sample:
instant: datetime
stats: dict[ContainerId, Stats]
@dataclass
class Stats:
memory_usage_bytes: int
def main():
logging.basicConfig(level=logging.INFO)
samples: list[Sample] = []
labels: dict[ContainerId, ContainerName] = {}
first_pass = True
# First wait for any docker container to exist.
while True:
sample, labels_in_sample = take_sample()
if labels_in_sample:
break
if first_pass:
first_pass = False
logging.info("Waiting for a docker container to exist to start recording.")
sleep(1)
# And then record memory until no containers exist.
while True:
sample, labels_in_sample = take_sample()
if not labels_in_sample:
break
samples.append(sample)
labels = {**labels, **labels_in_sample}
sleep(SAMPLE_INTERVAL_SECONDS)
if labels:
# Draws a red horizontal line at 32 GiB since that is the memory limit for cloud run.
write_plot(
samples,
labels,
horizontal_lines=[(32 * 1024**3, "red", "Cloud Run Max Memory")],
)
def write_plot(
samples: Collection[Sample],
labels: dict[ContainerId, ContainerName],
*,
horizontal_lines: Collection[Tuple[int, str, str | None]] = [],
):
starting_time_per_container = {
container_id: min(
(sample.instant for sample in samples if container_id in sample.stats)
)
for container_id in labels.keys()
}
print(
"""set terminal svg background '#FFFFFF'
set title 'Docker Memory Usage'
set xdata time
set timefmt '%s'
set format x '%tH:%tM:%tS'
# Please note this is in SI units (base 10), not IEC (base 2). So, for example, this would show a Gigabyte, not a Gibibyte.
set format y '%.0s%cB'
set datafile separator "|"
"""
)
for y_value, color, label in horizontal_lines:
print(
f'''set arrow from graph 0, first {y_value} to graph 1, first {y_value} nohead linewidth 2 linecolor rgb "{color}"'''
)
if label is not None:
print(f"""set label "{label}" at graph 0, first {y_value} offset 1,-0.5""")
# Include the horizontal lines in the range
if len(horizontal_lines) > 0:
print(f"""set yrange [*:{max(x[0] for x in horizontal_lines)}<*]""")
line_definitions = ", ".join(
[
f""""-" using 1:2 title '{gnuplot_escape(name)}' with lines"""
for container_id, name in sorted(labels.items())
]
)
print("plot", line_definitions)
for container_id in sorted(labels.keys()):
start_time = int(starting_time_per_container[container_id].timestamp())
for sample in sorted(samples, key=lambda x: x.instant):
if container_id in sample.stats:
print(
"|".join(
[
str(int((sample.instant).timestamp()) - start_time),
str(sample.stats[container_id].memory_usage_bytes),
]
)
)
print("e")
def gnuplot_escape(inp: str) -> str:
out = ""
for c in inp:
if c == "_":
out += "\\"
out += c
return out
def take_sample() -> Tuple[Sample, dict[ContainerId, ContainerName]]:
labels: dict[ContainerId, ContainerName] = {}
stats: dict[ContainerId, Stats] = {}
docker_inspect = subprocess.run(
["docker", "stats", "--no-stream", "--no-trunc", "--format", "json"],
stdout=subprocess.PIPE,
)
for container_stat in (
json.loads(l) for l in docker_inspect.stdout.decode("utf8").splitlines()
):
if not container_stat["ID"]:
# When containers are starting up, they sometimes have no ID and "--" as the name.
continue
labels[ContainerId(container_stat["ID"])] = ContainerName(
container_stat["Name"]
)
memory_usage = parse_mem_usage(container_stat["MemUsage"])
stats[ContainerId(container_stat["ID"])] = Stats(
memory_usage_bytes=memory_usage
)
for container_id, container_stat in stats.items():
logging.info(
f"Recorded stat {labels[container_id]}: {container_stat.memory_usage_bytes} bytes"
)
return Sample(instant=datetime.now(), stats=stats), labels
def parse_mem_usage(mem_usage: str) -> int:
parsed_mem_usage = re.match(
r"(?P<number>[0-9]+\.?[0-9]*)(?P<unit>[^\s]+)", mem_usage
)
if parsed_mem_usage is None:
raise Exception(f"Invalid Mem Usage: {mem_usage}")
number = float(parsed_mem_usage.group("number"))
unit = parsed_mem_usage.group("unit")
for multiplier, identifier in enumerate(["B", "KiB", "MiB", "GiB", "TiB"]):
if unit == identifier:
return int(number * (1024**multiplier))
raise Exception(f"Unrecognized unit: {unit}")
if __name__ == "__main__":
main()