diff --git a/terraform/modules/bigquery/bigquery.tf b/terraform/modules/bigquery/bigquery.tf index 9b9b2e4..6374899 100644 --- a/terraform/modules/bigquery/bigquery.tf +++ b/terraform/modules/bigquery/bigquery.tf @@ -33,8 +33,13 @@ resource "random_id" "bigquery_etl_keyring" { byte_length = 4 } -resource "google_kms_crypto_key" "bigquery_etl_key" { - name = "bigquery-etl-key" +resource "google_kms_crypto_key" "topic_key" { + name = "bigquery-etl-topic-key" + key_ring = google_kms_key_ring.bigquery_etl_keyring.id +} + +resource "google_kms_crypto_key" "dataflow_key" { + name = "bigquery-etl-dataflow-key" key_ring = google_kms_key_ring.bigquery_etl_keyring.id } @@ -57,7 +62,7 @@ resource "google_kms_key_ring" "bigquery_etl_keyring" { resource "google_pubsub_topic" "bigquery_etl" { project = var.project name = "bigquery-etl" - kms_key_name = google_kms_crypto_key.bigquery_etl_key.id + kms_key_name = google_kms_crypto_key.topic_key.id depends_on = [ google_project_iam_binding.pubsub_kms ] @@ -82,3 +87,39 @@ resource "google_pubsub_subscription" "bigquery_etl" { minimum_backoff = "10s" } } + +#################### Dataflow ############################# + +resource "google_project_service" "dataflow" { + project = var.project + service = "dataflow.googleapis.com" + disable_dependent_services = true +} + +resource "google_storage_bucket" "temp_storage" { + project = var.project + name = "${var.project}-bigquery-etl" + force_destroy = true +} + +resource "google_dataflow_job" "etl_job" { + project = var.project + name = "bigquery-etl-job" + region = var.region + template_gcs_path = "gs://dataflow-templates-us-central1/latest/PubSub_Subscription_to_BigQuery" + temp_gcs_location = "${google_storage_bucket.temp_storage.url}/temp" + enable_streaming_engine = true + max_workers = 3 + # Can't use kms key with streaming mode :-( + # kms_key_name = google_kms_crypto_key.dataflow_key.name + + + parameters = { + inputSubscription = google_pubsub_subscription.bigquery_etl.id + outputTableSpec = "your-project:your-dataset.your-table-name" + } + + additional_experiments = [ + "enable_streaming_engine" + ] +}