Add support for a second function to log directly to bigquery.

This commit is contained in:
Tom Alexander 2021-07-21 01:35:45 -04:00
parent 23a0d041ba
commit b0092be5d6
Signed by: talexander
GPG Key ID: D3A179C9A53C0EDE
5 changed files with 99 additions and 23 deletions

View File

@ -207,16 +207,43 @@ module "cf_to_pubsub" {
source = "../modules/cf_to_pubsub"
project = var.project
region = var.region
topic_name = "bigquery-etl"
function_name = "cf-to-pubsub"
function_description = "CloudFunction to PubSub"
function_source_name = "cf_to_pubsub"
source_bucket = google_storage_bucket.bucket
service_cloudbuild = google_project_service.cloudbuild
environment_variables = {
GCP_PROJECT = var.project
GCP_TOPIC = "bigquery-etl"
}
}
output "log_to_bq_endpoint" {
description = "https endpoint to log to BigQuery."
output "cf_to_pubsub_endpoint" {
description = "https endpoint to log to BigQuery through pubsub."
value = module.cf_to_pubsub.https_trigger_url
}
module "cf_to_bq" {
source = "../modules/cf_to_pubsub"
project = var.project
region = var.region
function_name = "cf-to-bq"
function_description = "CloudFunction to BigQuery"
function_source_name = "cf_to_bq"
source_bucket = google_storage_bucket.bucket
service_cloudbuild = google_project_service.cloudbuild
environment_variables = {
BQ_TABLE = "${var.project}.pubsub_etl.pubsub_etl"
}
}
output "cf_to_bq_endpoint" {
description = "https endpoint to log to BigQuery directly."
value = module.cf_to_bq.https_trigger_url
}
#################### PubSub to BigQuery ###################
module "bigquery" {

View File

@ -11,9 +11,22 @@ variable "region" {
type = string
}
variable "topic_name" {
description = "The name of topic where the events should be published."
variable "function_name" {
description = "Name for the cloud function. If unspecified, one will be generated."
type = string
default = ""
}
variable "function_description" {
description = "Description for the cloud function."
type = string
default = ""
}
variable "environment_variables" {
description = "Environment variables for the execution of the cloud function."
type = map(any)
default = {}
}
variable "source_bucket" {
@ -23,11 +36,16 @@ variable "source_bucket" {
variable "function_source_name" {
description = "Name of the folder containing the source code for the function."
type = string
default = "cf_to_pubsub"
}
variable "service_cloudbuild" {
description = "THe cloudbuild google_project_service."
description = "The cloudbuild google_project_service."
}
variable "allow_external" {
description = "Whether or not to allow outside traffic ingress."
type = bool
default = true
}
output "https_trigger_url" {
@ -35,6 +53,14 @@ output "https_trigger_url" {
value = google_cloudfunctions_function.function.https_trigger_url
}
locals {
function_name = var.function_name == "" ? "cf-${random_id.function_id.hex}" : var.function_name
}
resource "random_id" "function_id" {
byte_length = 4
}
resource "random_id" "cf_bucket_id" {
byte_length = 4
}
@ -53,8 +79,8 @@ resource "google_storage_bucket_object" "remote_archive" {
}
resource "google_cloudfunctions_function" "function" {
name = "cf-to-pubsub"
description = "CloudFunction to PubSub"
name = local.function_name
description = var.function_description
runtime = "python39"
available_memory_mb = 128
@ -63,13 +89,9 @@ resource "google_cloudfunctions_function" "function" {
trigger_http = true
entry_point = "main"
max_instances = 4
ingress_settings = "ALLOW_ALL"
# ingress_settings = "ALLOW_INTERNAL_ONLY"
ingress_settings = var.allow_external ? "ALLOW_ALL" : "ALLOW_INTERNAL_ONLY"
environment_variables = {
GCP_PROJECT = var.project
GCP_TOPIC = var.topic_name
}
environment_variables = var.environment_variables
depends_on = [
var.service_cloudbuild

View File

@ -0,0 +1,25 @@
import json
import os
from google.cloud import bigquery
client = bigquery.Client()
def push_to_pubsub(request_params):
errors = client.insert_rows_json(os.environ["BQ_TABLE"], [request_params])
if errors != []:
raise Exception("Encountered errors while inserting rows: {}".format(errors))
def main(request):
request_json = request.get_json(silent=True)
request_args = request.args
if request_json:
push_to_pubsub(request_json)
elif request_args:
push_to_pubsub(request_args)
else:
return ("No data provided.", 400)
return {"status": "ok", "source": "cf_to_bq"}

View File

@ -0,0 +1,2 @@
Flask==1.1.2
google-cloud-bigquery==2.22.0

View File

@ -4,7 +4,7 @@ import os
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic = publisher.topic_path(os.environ.get("GCP_PROJECT"), os.environ.get("GCP_TOPIC"))
topic = publisher.topic_path(os.environ["GCP_PROJECT"], os.environ["GCP_TOPIC"])
def push_to_pubsub(request_params):
@ -20,5 +20,5 @@ def main(request):
elif request_args:
push_to_pubsub(request_args)
else:
raise Exception("No data provided.")
return {"status": "ok"}
return ("No data provided.", 400)
return {"status": "ok", "source": "cf_to_pubsub"}