80 lines
3.1 KiB
Python
80 lines
3.1 KiB
Python
from __future__ import annotations
|
|
from enum import Enum
|
|
from typing import Annotated, Union
|
|
from pydantic import BaseModel, Field, TypeAdapter
|
|
|
|
|
|
class BaseLogEntry(BaseModel):
|
|
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
|
|
|
|
log_name: Annotated[str, Field(alias="logName")]
|
|
resource: dict # https://cloud.google.com/logging/docs/reference/v2/rest/v2/MonitoredResource
|
|
|
|
timestamp: str
|
|
receive_timestamp: Annotated[str, Field(alias="receiveTimestamp")]
|
|
severity: LogSeverity
|
|
insert_id: Annotated[str, Field(alias="insertId")]
|
|
http_request: Annotated[
|
|
dict | None, Field(alias="httpRequest", default=None)
|
|
] # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#HttpRequest
|
|
labels: Annotated[dict[str, str] | None, Field(default=None)]
|
|
metadata: Annotated[
|
|
dict | None, Field(default=None)
|
|
] # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#MonitoredResourceMetadata
|
|
operation: Annotated[
|
|
dict | None, Field(default=None)
|
|
] # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogEntryOperation
|
|
trace: Annotated[str | None, Field(default=None)]
|
|
span_id: Annotated[str | None, Field(alias="spanId", default=None)]
|
|
trace_sampled: Annotated[bool | None, Field(alias="traceSampled", default=None)]
|
|
source_location: Annotated[
|
|
dict | None, Field(alias="sourceLocation", default=None)
|
|
] # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogEntrySourceLocation
|
|
split: Annotated[
|
|
dict | None, Field(default=None)
|
|
] # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSplit
|
|
error_groups: Annotated[
|
|
list[dict] | None, Field(alias="errorGroups", default=None)
|
|
] # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogErrorGroup
|
|
apphub: Annotated[
|
|
dict | None, Field(default=None)
|
|
] # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#AppHub
|
|
|
|
|
|
class ProtoPayloadLogEntry(BaseLogEntry):
|
|
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
|
|
proto_payload: Annotated[dict, Field(alias="protoPayload")]
|
|
|
|
|
|
class TextPayloadLogEntry(BaseLogEntry):
|
|
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
|
|
text_payload: Annotated[str, Field(alias="textPayload")]
|
|
|
|
|
|
class JsonPayloadLogEntry(BaseLogEntry):
|
|
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
|
|
json_payload: Annotated[dict, Field(alias="jsonPayload")]
|
|
|
|
|
|
class LogSeverity(str, Enum):
|
|
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
|
|
DEFAULT = "DEFAULT"
|
|
DEBUG = "DEBUG"
|
|
INFO = "INFO"
|
|
NOTICE = "NOTICE"
|
|
WARNING = "WARNING"
|
|
ERROR = "ERROR"
|
|
CRITICAL = "CRITICAL"
|
|
ALERT = "ALERT"
|
|
EMERGENCY = "EMERGENCY"
|
|
|
|
|
|
LogEntry = Annotated[
|
|
Union[ProtoPayloadLogEntry, JsonPayloadLogEntry, TextPayloadLogEntry, BaseLogEntry],
|
|
Field(),
|
|
]
|
|
|
|
|
|
def create_log_entry(entry: dict) -> LogEntry:
|
|
return TypeAdapter(LogEntry).validate_python(entry)
|