Compare commits

...

2 Commits

Author SHA1 Message Date
Tom Alexander
f91d6774ca
Add a schema for source location. 2025-05-05 22:49:06 -04:00
Tom Alexander
c6f417f323
fixup! Add a simple python script to stream logs from google cloud. 2025-05-05 22:47:52 -04:00
5 changed files with 48 additions and 193 deletions

View File

@ -1,127 +0,0 @@
from __future__ import annotations
from collections import deque
from datetime import datetime, timezone
from logging import getLogger
import json
from typing import Callable, Final, Iterator
from pydantic import ValidationError
import requests
from log_stream.log_entry import create_log_entry, LogEntry
BATCH_SIZE: Final[int] = 1000
REQ_TIMEOUT: Final[int] = 60
logger = getLogger("LogStream")
class LogStream:
_query: str
_project: str
_start_date: datetime
_end_date: datetime
_buffer: deque[dict]
_finished: bool
_next_page_token: str | None
_session: requests.Session
_access_token: Callable[[], str]
def __init__(
self,
query: str,
project: str,
start_date: datetime,
end_date: datetime | None,
access_token: Callable[[], str],
) -> None:
self._query = query
self._project = project
self._start_date = start_date
self._end_date = (
end_date if end_date is not None else datetime.now(timezone.utc)
)
self._buffer = deque(maxlen=BATCH_SIZE)
self._finished = False
self._next_page_token = None
self._session = requests.Session()
self._access_token = access_token
def parsed(self) -> ParsedLogStream:
return ParsedLogStream(self)
def __iter__(self) -> Iterator[dict]:
return self
def __next__(self) -> dict:
if len(self._buffer) == 0 and not self._finished:
self._populate_buffer()
if len(self._buffer) == 0:
self._finished = True
raise StopIteration()
return self._buffer.popleft()
def _populate_buffer(self) -> None:
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list
data = {
"filter": "\n".join((self._get_timestamp_filter(), self._query)),
"orderBy": "timestamp desc",
"pageSize": BATCH_SIZE,
"resourceNames": [f"projects/{self._project}"],
}
if self._next_page_token is not None:
data["pageToken"] = self._next_page_token
headers = {
"Authorization": f"Bearer {self._access_token()}",
"x-goog-user-project": self._project,
}
req = requests.Request(
"POST",
"https://logging.googleapis.com/v2/entries:list",
data=data,
headers=headers,
)
prepared = self._session.prepare_request(req)
logger.info(f"Sending a request to {prepared.url}")
response = self._session.send(
prepared,
timeout=REQ_TIMEOUT,
)
response.raise_for_status()
response_body = response.json()
if (token := response_body.get("nextPageToken")) is not None:
self._next_page_token = token
else:
self._finished = True
self._next_page_token = None
self._buffer.extend(response_body.get("entries", []))
def _get_timestamp_filter(self) -> str:
time_format = "%Y-%m-%dT%H:%M:%S.%fZ"
self._start_date.isoformat()
# TODO: convert start/end date to UTC
start = self._start_date.strftime(time_format)
end = self._end_date.strftime(time_format)
return f"""
timestamp>="{start}"
timestamp<="{end}"
"""
class ParsedLogStream:
_inner: LogStream
def __init__(self, inner: LogStream) -> None:
self._inner = inner
def __iter__(self) -> Iterator[LogEntry]:
return self
def __next__(self) -> LogEntry:
raw = next(self._inner)
try:
return create_log_entry(raw)
except ValidationError as e:
logger.error("Failed to parse log message: %s", json.dumps(raw))
raise

View File

@ -28,8 +28,8 @@ class BaseLogEntry(BaseModel):
span_id: Annotated[str | None, Field(alias="spanId", default=None)]
trace_sampled: Annotated[bool | None, Field(alias="traceSampled", default=None)]
source_location: Annotated[
dict | None, Field(alias="sourceLocation", default=None)
] # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogEntrySourceLocation
SourceLocation | None, Field(alias="sourceLocation", default=None)
]
split: Annotated[
dict | None, Field(default=None)
] # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSplit
@ -56,6 +56,12 @@ class JsonPayloadLogEntry(BaseLogEntry):
json_payload: Annotated[dict, Field(alias="jsonPayload")]
LogEntry = Annotated[
Union[ProtoPayloadLogEntry, JsonPayloadLogEntry, TextPayloadLogEntry, BaseLogEntry],
Field(),
]
class LogSeverity(str, Enum):
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
DEFAULT = "DEFAULT"
@ -69,10 +75,11 @@ class LogSeverity(str, Enum):
EMERGENCY = "EMERGENCY"
LogEntry = Annotated[
Union[ProtoPayloadLogEntry, JsonPayloadLogEntry, TextPayloadLogEntry, BaseLogEntry],
Field(),
]
class SourceLocation(BaseModel):
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogEntrySourceLocation
file: str
line: str
function: Annotated[str | None, Field(default=None)]
def create_log_entry(entry: dict) -> LogEntry:

View File

@ -1,9 +1,14 @@
from __future__ import annotations
from collections import deque
from datetime import datetime, timezone
from logging import getLogger
from typing import Callable, Final
import json
from typing import Callable, Final, Iterator
from pydantic import ValidationError
import requests
from log_stream.log_entry import create_log_entry, LogEntry
BATCH_SIZE: Final[int] = 1000
REQ_TIMEOUT: Final[int] = 60
logger = getLogger("LogStream")
@ -40,10 +45,13 @@ class LogStream:
self._session = requests.Session()
self._access_token = access_token
def __iter__(self):
def parsed(self) -> ParsedLogStream:
return ParsedLogStream(self)
def __iter__(self) -> Iterator[dict]:
return self
def __next__(self):
def __next__(self) -> dict:
if len(self._buffer) == 0 and not self._finished:
self._populate_buffer()
@ -53,7 +61,8 @@ class LogStream:
return self._buffer.popleft()
def _populate_buffer(self):
def _populate_buffer(self) -> None:
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list
data = {
"filter": "\n".join((self._get_timestamp_filter(), self._query)),
"orderBy": "timestamp desc",
@ -98,3 +107,21 @@ class LogStream:
timestamp>="{start}"
timestamp<="{end}"
"""
class ParsedLogStream:
_inner: LogStream
def __init__(self, inner: LogStream) -> None:
self._inner = inner
def __iter__(self) -> Iterator[LogEntry]:
return self
def __next__(self) -> LogEntry:
raw = next(self._inner)
try:
return create_log_entry(raw)
except ValidationError as e:
logger.error("Failed to parse log message: %s", json.dumps(raw))
raise

View File

@ -4,6 +4,8 @@ import logging.config
import json
import os
from pydantic import ValidationError
from log_stream.log_stream import LogStream
@ -19,8 +21,8 @@ def main():
access_token = os.environ.get("ACCESS_TOKEN")
assert isinstance(access_token, str)
logs = LogStream(log_query, project, one_week_ago, now, lambda: access_token)
for log in logs:
print(json.dumps(log))
for log in logs.parsed():
print(log.model_dump_json())
def setup_logging():

View File

@ -1,54 +0,0 @@
from datetime import datetime, timezone, timedelta
import logging
import logging.config
import json
import os
from pydantic import ValidationError
from log_stream.log_stream import LogStream
def main():
setup_logging()
log_query = """
severity>="ERROR"
"""
now = datetime.now(timezone.utc)
one_week_ago = now - timedelta(days=7)
project = os.environ.get("CLOUDSDK_CORE_PROJECT")
assert isinstance(project, str)
access_token = os.environ.get("ACCESS_TOKEN")
assert isinstance(access_token, str)
logs = LogStream(log_query, project, one_week_ago, now, lambda: access_token)
for log in logs.parsed():
print(log.model_dump_json())
def setup_logging():
logging_config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"regular": {
"format": "%(asctime)s %(name)-12s %(levelname)-8s %(message)s",
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "regular",
}
},
"root": {
"handlers": ["console"],
"level": "INFO",
},
"loggers": {"LogStream": {"level": "WARNING"}},
}
logging.config.dictConfig(logging_config)
if __name__ == "__main__":
main()