From c6f417f323af0fe2d7f02d1bdb072f5af7ec6c06 Mon Sep 17 00:00:00 2001 From: Tom Alexander Date: Mon, 5 May 2025 22:47:52 -0400 Subject: [PATCH] fixup! Add a simple python script to stream logs from google cloud. --- google_cloud_logging/log_stream/log_stream.py | 127 ------------------ .../log_stream/{ => log_stream}/log_entry.py | 0 .../log_stream/log_stream/log_stream.py | 35 ++++- .../log_stream/log_stream/main.py | 6 +- google_cloud_logging/log_stream/main.py | 54 -------- 5 files changed, 35 insertions(+), 187 deletions(-) delete mode 100644 google_cloud_logging/log_stream/log_stream.py rename google_cloud_logging/log_stream/{ => log_stream}/log_entry.py (100%) delete mode 100644 google_cloud_logging/log_stream/main.py diff --git a/google_cloud_logging/log_stream/log_stream.py b/google_cloud_logging/log_stream/log_stream.py deleted file mode 100644 index 65be1cf..0000000 --- a/google_cloud_logging/log_stream/log_stream.py +++ /dev/null @@ -1,127 +0,0 @@ -from __future__ import annotations -from collections import deque -from datetime import datetime, timezone -from logging import getLogger -import json -from typing import Callable, Final, Iterator -from pydantic import ValidationError -import requests - -from log_stream.log_entry import create_log_entry, LogEntry - -BATCH_SIZE: Final[int] = 1000 -REQ_TIMEOUT: Final[int] = 60 -logger = getLogger("LogStream") - - -class LogStream: - _query: str - _project: str - _start_date: datetime - _end_date: datetime - _buffer: deque[dict] - _finished: bool - _next_page_token: str | None - _session: requests.Session - _access_token: Callable[[], str] - - def __init__( - self, - query: str, - project: str, - start_date: datetime, - end_date: datetime | None, - access_token: Callable[[], str], - ) -> None: - self._query = query - self._project = project - self._start_date = start_date - self._end_date = ( - end_date if end_date is not None else datetime.now(timezone.utc) - ) - self._buffer = deque(maxlen=BATCH_SIZE) - self._finished = False - self._next_page_token = None - self._session = requests.Session() - self._access_token = access_token - - def parsed(self) -> ParsedLogStream: - return ParsedLogStream(self) - - def __iter__(self) -> Iterator[dict]: - return self - - def __next__(self) -> dict: - if len(self._buffer) == 0 and not self._finished: - self._populate_buffer() - - if len(self._buffer) == 0: - self._finished = True - raise StopIteration() - - return self._buffer.popleft() - - def _populate_buffer(self) -> None: - # https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list - data = { - "filter": "\n".join((self._get_timestamp_filter(), self._query)), - "orderBy": "timestamp desc", - "pageSize": BATCH_SIZE, - "resourceNames": [f"projects/{self._project}"], - } - if self._next_page_token is not None: - data["pageToken"] = self._next_page_token - headers = { - "Authorization": f"Bearer {self._access_token()}", - "x-goog-user-project": self._project, - } - req = requests.Request( - "POST", - "https://logging.googleapis.com/v2/entries:list", - data=data, - headers=headers, - ) - prepared = self._session.prepare_request(req) - logger.info(f"Sending a request to {prepared.url}") - response = self._session.send( - prepared, - timeout=REQ_TIMEOUT, - ) - response.raise_for_status() - response_body = response.json() - if (token := response_body.get("nextPageToken")) is not None: - self._next_page_token = token - else: - self._finished = True - self._next_page_token = None - - self._buffer.extend(response_body.get("entries", [])) - - def _get_timestamp_filter(self) -> str: - time_format = "%Y-%m-%dT%H:%M:%S.%fZ" - self._start_date.isoformat() - # TODO: convert start/end date to UTC - start = self._start_date.strftime(time_format) - end = self._end_date.strftime(time_format) - return f""" - timestamp>="{start}" - timestamp<="{end}" - """ - - -class ParsedLogStream: - _inner: LogStream - - def __init__(self, inner: LogStream) -> None: - self._inner = inner - - def __iter__(self) -> Iterator[LogEntry]: - return self - - def __next__(self) -> LogEntry: - raw = next(self._inner) - try: - return create_log_entry(raw) - except ValidationError as e: - logger.error("Failed to parse log message: %s", json.dumps(raw)) - raise diff --git a/google_cloud_logging/log_stream/log_entry.py b/google_cloud_logging/log_stream/log_stream/log_entry.py similarity index 100% rename from google_cloud_logging/log_stream/log_entry.py rename to google_cloud_logging/log_stream/log_stream/log_entry.py diff --git a/google_cloud_logging/log_stream/log_stream/log_stream.py b/google_cloud_logging/log_stream/log_stream/log_stream.py index 6333518..65be1cf 100644 --- a/google_cloud_logging/log_stream/log_stream/log_stream.py +++ b/google_cloud_logging/log_stream/log_stream/log_stream.py @@ -1,9 +1,14 @@ +from __future__ import annotations from collections import deque from datetime import datetime, timezone from logging import getLogger -from typing import Callable, Final +import json +from typing import Callable, Final, Iterator +from pydantic import ValidationError import requests +from log_stream.log_entry import create_log_entry, LogEntry + BATCH_SIZE: Final[int] = 1000 REQ_TIMEOUT: Final[int] = 60 logger = getLogger("LogStream") @@ -40,10 +45,13 @@ class LogStream: self._session = requests.Session() self._access_token = access_token - def __iter__(self): + def parsed(self) -> ParsedLogStream: + return ParsedLogStream(self) + + def __iter__(self) -> Iterator[dict]: return self - def __next__(self): + def __next__(self) -> dict: if len(self._buffer) == 0 and not self._finished: self._populate_buffer() @@ -53,7 +61,8 @@ class LogStream: return self._buffer.popleft() - def _populate_buffer(self): + def _populate_buffer(self) -> None: + # https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list data = { "filter": "\n".join((self._get_timestamp_filter(), self._query)), "orderBy": "timestamp desc", @@ -98,3 +107,21 @@ class LogStream: timestamp>="{start}" timestamp<="{end}" """ + + +class ParsedLogStream: + _inner: LogStream + + def __init__(self, inner: LogStream) -> None: + self._inner = inner + + def __iter__(self) -> Iterator[LogEntry]: + return self + + def __next__(self) -> LogEntry: + raw = next(self._inner) + try: + return create_log_entry(raw) + except ValidationError as e: + logger.error("Failed to parse log message: %s", json.dumps(raw)) + raise diff --git a/google_cloud_logging/log_stream/log_stream/main.py b/google_cloud_logging/log_stream/log_stream/main.py index 74ea59a..b31c731 100644 --- a/google_cloud_logging/log_stream/log_stream/main.py +++ b/google_cloud_logging/log_stream/log_stream/main.py @@ -4,6 +4,8 @@ import logging.config import json import os +from pydantic import ValidationError + from log_stream.log_stream import LogStream @@ -19,8 +21,8 @@ def main(): access_token = os.environ.get("ACCESS_TOKEN") assert isinstance(access_token, str) logs = LogStream(log_query, project, one_week_ago, now, lambda: access_token) - for log in logs: - print(json.dumps(log)) + for log in logs.parsed(): + print(log.model_dump_json()) def setup_logging(): diff --git a/google_cloud_logging/log_stream/main.py b/google_cloud_logging/log_stream/main.py deleted file mode 100644 index b31c731..0000000 --- a/google_cloud_logging/log_stream/main.py +++ /dev/null @@ -1,54 +0,0 @@ -from datetime import datetime, timezone, timedelta -import logging -import logging.config -import json -import os - -from pydantic import ValidationError - -from log_stream.log_stream import LogStream - - -def main(): - setup_logging() - log_query = """ - severity>="ERROR" - """ - now = datetime.now(timezone.utc) - one_week_ago = now - timedelta(days=7) - project = os.environ.get("CLOUDSDK_CORE_PROJECT") - assert isinstance(project, str) - access_token = os.environ.get("ACCESS_TOKEN") - assert isinstance(access_token, str) - logs = LogStream(log_query, project, one_week_ago, now, lambda: access_token) - for log in logs.parsed(): - print(log.model_dump_json()) - - -def setup_logging(): - logging_config = { - "version": 1, - "disable_existing_loggers": False, - "formatters": { - "regular": { - "format": "%(asctime)s %(name)-12s %(levelname)-8s %(message)s", - } - }, - "handlers": { - "console": { - "class": "logging.StreamHandler", - "level": "INFO", - "formatter": "regular", - } - }, - "root": { - "handlers": ["console"], - "level": "INFO", - }, - "loggers": {"LogStream": {"level": "WARNING"}}, - } - logging.config.dictConfig(logging_config) - - -if __name__ == "__main__": - main()