fixup! Add a simple python script to stream logs from google cloud.

This commit is contained in:
Tom Alexander 2025-05-05 22:47:52 -04:00
parent 52571da12e
commit c6f417f323
Signed by: talexander
GPG Key ID: D3A179C9A53C0EDE
5 changed files with 35 additions and 187 deletions

View File

@ -1,127 +0,0 @@
from __future__ import annotations
from collections import deque
from datetime import datetime, timezone
from logging import getLogger
import json
from typing import Callable, Final, Iterator
from pydantic import ValidationError
import requests
from log_stream.log_entry import create_log_entry, LogEntry
BATCH_SIZE: Final[int] = 1000
REQ_TIMEOUT: Final[int] = 60
logger = getLogger("LogStream")
class LogStream:
_query: str
_project: str
_start_date: datetime
_end_date: datetime
_buffer: deque[dict]
_finished: bool
_next_page_token: str | None
_session: requests.Session
_access_token: Callable[[], str]
def __init__(
self,
query: str,
project: str,
start_date: datetime,
end_date: datetime | None,
access_token: Callable[[], str],
) -> None:
self._query = query
self._project = project
self._start_date = start_date
self._end_date = (
end_date if end_date is not None else datetime.now(timezone.utc)
)
self._buffer = deque(maxlen=BATCH_SIZE)
self._finished = False
self._next_page_token = None
self._session = requests.Session()
self._access_token = access_token
def parsed(self) -> ParsedLogStream:
return ParsedLogStream(self)
def __iter__(self) -> Iterator[dict]:
return self
def __next__(self) -> dict:
if len(self._buffer) == 0 and not self._finished:
self._populate_buffer()
if len(self._buffer) == 0:
self._finished = True
raise StopIteration()
return self._buffer.popleft()
def _populate_buffer(self) -> None:
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list
data = {
"filter": "\n".join((self._get_timestamp_filter(), self._query)),
"orderBy": "timestamp desc",
"pageSize": BATCH_SIZE,
"resourceNames": [f"projects/{self._project}"],
}
if self._next_page_token is not None:
data["pageToken"] = self._next_page_token
headers = {
"Authorization": f"Bearer {self._access_token()}",
"x-goog-user-project": self._project,
}
req = requests.Request(
"POST",
"https://logging.googleapis.com/v2/entries:list",
data=data,
headers=headers,
)
prepared = self._session.prepare_request(req)
logger.info(f"Sending a request to {prepared.url}")
response = self._session.send(
prepared,
timeout=REQ_TIMEOUT,
)
response.raise_for_status()
response_body = response.json()
if (token := response_body.get("nextPageToken")) is not None:
self._next_page_token = token
else:
self._finished = True
self._next_page_token = None
self._buffer.extend(response_body.get("entries", []))
def _get_timestamp_filter(self) -> str:
time_format = "%Y-%m-%dT%H:%M:%S.%fZ"
self._start_date.isoformat()
# TODO: convert start/end date to UTC
start = self._start_date.strftime(time_format)
end = self._end_date.strftime(time_format)
return f"""
timestamp>="{start}"
timestamp<="{end}"
"""
class ParsedLogStream:
_inner: LogStream
def __init__(self, inner: LogStream) -> None:
self._inner = inner
def __iter__(self) -> Iterator[LogEntry]:
return self
def __next__(self) -> LogEntry:
raw = next(self._inner)
try:
return create_log_entry(raw)
except ValidationError as e:
logger.error("Failed to parse log message: %s", json.dumps(raw))
raise

View File

@ -1,9 +1,14 @@
from __future__ import annotations
from collections import deque
from datetime import datetime, timezone
from logging import getLogger
from typing import Callable, Final
import json
from typing import Callable, Final, Iterator
from pydantic import ValidationError
import requests
from log_stream.log_entry import create_log_entry, LogEntry
BATCH_SIZE: Final[int] = 1000
REQ_TIMEOUT: Final[int] = 60
logger = getLogger("LogStream")
@ -40,10 +45,13 @@ class LogStream:
self._session = requests.Session()
self._access_token = access_token
def __iter__(self):
def parsed(self) -> ParsedLogStream:
return ParsedLogStream(self)
def __iter__(self) -> Iterator[dict]:
return self
def __next__(self):
def __next__(self) -> dict:
if len(self._buffer) == 0 and not self._finished:
self._populate_buffer()
@ -53,7 +61,8 @@ class LogStream:
return self._buffer.popleft()
def _populate_buffer(self):
def _populate_buffer(self) -> None:
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list
data = {
"filter": "\n".join((self._get_timestamp_filter(), self._query)),
"orderBy": "timestamp desc",
@ -98,3 +107,21 @@ class LogStream:
timestamp>="{start}"
timestamp<="{end}"
"""
class ParsedLogStream:
_inner: LogStream
def __init__(self, inner: LogStream) -> None:
self._inner = inner
def __iter__(self) -> Iterator[LogEntry]:
return self
def __next__(self) -> LogEntry:
raw = next(self._inner)
try:
return create_log_entry(raw)
except ValidationError as e:
logger.error("Failed to parse log message: %s", json.dumps(raw))
raise

View File

@ -4,6 +4,8 @@ import logging.config
import json
import os
from pydantic import ValidationError
from log_stream.log_stream import LogStream
@ -19,8 +21,8 @@ def main():
access_token = os.environ.get("ACCESS_TOKEN")
assert isinstance(access_token, str)
logs = LogStream(log_query, project, one_week_ago, now, lambda: access_token)
for log in logs:
print(json.dumps(log))
for log in logs.parsed():
print(log.model_dump_json())
def setup_logging():

View File

@ -1,54 +0,0 @@
from datetime import datetime, timezone, timedelta
import logging
import logging.config
import json
import os
from pydantic import ValidationError
from log_stream.log_stream import LogStream
def main():
setup_logging()
log_query = """
severity>="ERROR"
"""
now = datetime.now(timezone.utc)
one_week_ago = now - timedelta(days=7)
project = os.environ.get("CLOUDSDK_CORE_PROJECT")
assert isinstance(project, str)
access_token = os.environ.get("ACCESS_TOKEN")
assert isinstance(access_token, str)
logs = LogStream(log_query, project, one_week_ago, now, lambda: access_token)
for log in logs.parsed():
print(log.model_dump_json())
def setup_logging():
logging_config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"regular": {
"format": "%(asctime)s %(name)-12s %(levelname)-8s %(message)s",
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "regular",
}
},
"root": {
"handlers": ["console"],
"level": "INFO",
},
"loggers": {"LogStream": {"level": "WARNING"}},
}
logging.config.dictConfig(logging_config)
if __name__ == "__main__":
main()