#!/usr/bin/env python from __future__ import annotations import json import logging import re import subprocess from dataclasses import dataclass from datetime import datetime, timedelta from time import sleep from typing import Collection, Final, NewType, Tuple ContainerId = NewType("ContainerId", str) ContainerName = NewType("ContainerName", str) SAMPLE_INTERVAL_SECONDS: Final[int] = 2 @dataclass class Sample: instant: datetime stats: dict[ContainerId, Stats] @dataclass class Stats: memory_usage_bytes: int def main(): logging.basicConfig(level=logging.INFO) samples: list[Sample] = [] labels: dict[ContainerId, ContainerName] = {} first_pass = True # First wait for any docker container to exist. while True: sample, labels_in_sample = take_sample() if labels_in_sample: break if first_pass: first_pass = False logging.info("Waiting for a docker container to exist to start recording.") sleep(1) # And then record memory until no containers exist. while True: sample, labels_in_sample = take_sample() if not labels_in_sample: break samples.append(sample) labels = {**labels, **labels_in_sample} sleep(SAMPLE_INTERVAL_SECONDS) if labels: # Draws a red horizontal line at 32 GiB since that is the memory limit for cloud run. write_plot( samples, labels, horizontal_lines=[(32 * 1024**3, "red", "Cloud Run Max Memory")], ) def write_plot( samples: Collection[Sample], labels: dict[ContainerId, ContainerName], *, horizontal_lines: Collection[Tuple[int, str, str | None]] = [], ): starting_time_per_container = { container_id: min( (sample.instant for sample in samples if container_id in sample.stats) ) for container_id in labels.keys() } print( """set terminal svg background '#FFFFFF' set title 'Docker Memory Usage' set xdata time set timefmt '%s' set format x '%tH:%tM:%tS' # Please note this is in SI units (base 10), not IEC (base 2). So, for example, this would show a Gigabyte, not a Gibibyte. set format y '%.0s%cB' set datafile separator "|" """ ) for y_value, color, label in horizontal_lines: print( f'''set arrow from graph 0, first {y_value} to graph 1, first {y_value} nohead linewidth 2 linecolor rgb "{color}"''' ) if label is not None: print(f"""set label "{label}" at graph 0, first {y_value} offset 1,-0.5""") # Include the horizontal lines in the range if len(horizontal_lines) > 0: print(f"""set yrange [*:{max(x[0] for x in horizontal_lines)}<*]""") line_definitions = ", ".join( [ f""""-" using 1:2 title '{gnuplot_escape(name)}' with lines""" for container_id, name in sorted(labels.items()) ] ) print("plot", line_definitions) for container_id in sorted(labels.keys()): start_time = int(starting_time_per_container[container_id].timestamp()) for sample in sorted(samples, key=lambda x: x.instant): if container_id in sample.stats: print( "|".join( [ str(int((sample.instant).timestamp()) - start_time), str(sample.stats[container_id].memory_usage_bytes), ] ) ) print("e") def gnuplot_escape(inp: str) -> str: out = "" for c in inp: if c == "_": out += "\\" out += c return out def take_sample() -> Tuple[Sample, dict[ContainerId, ContainerName]]: labels: dict[ContainerId, ContainerName] = {} stats: dict[ContainerId, Stats] = {} docker_inspect = subprocess.run( ["docker", "stats", "--no-stream", "--no-trunc", "--format", "json"], stdout=subprocess.PIPE, ) for container_stat in ( json.loads(l) for l in docker_inspect.stdout.decode("utf8").splitlines() ): if not container_stat["ID"]: # When containers are starting up, they sometimes have no ID and "--" as the name. continue labels[ContainerId(container_stat["ID"])] = ContainerName( container_stat["Name"] ) memory_usage = parse_mem_usage(container_stat["MemUsage"]) stats[ContainerId(container_stat["ID"])] = Stats( memory_usage_bytes=memory_usage ) for container_id, container_stat in stats.items(): logging.info( f"Recorded stat {labels[container_id]}: {container_stat.memory_usage_bytes} bytes" ) return Sample(instant=datetime.now(), stats=stats), labels def parse_mem_usage(mem_usage: str) -> int: parsed_mem_usage = re.match( r"(?P[0-9]+\.?[0-9]*)(?P[^\s]+)", mem_usage ) if parsed_mem_usage is None: raise Exception(f"Invalid Mem Usage: {mem_usage}") number = float(parsed_mem_usage.group("number")) unit = parsed_mem_usage.group("unit") for multiplier, identifier in enumerate(["B", "KiB", "MiB", "GiB", "TiB"]): if unit == identifier: return int(number * (1024**multiplier)) raise Exception(f"Unrecognized unit: {unit}") if __name__ == "__main__": main()