Orchestrating dbt Pipelines With Google Cloud: Part 1
In my previous post I showed you how to use dbt to expedite data preparation tasks on Google BigQuery. This time, I’ll show you how to integrate those dbt pipelines into workflows that load, validate and transform data.
We’ll use two serverless, pay-per-use products to simplify the solution and reduce costs: Google Cloud Run and Google Workflows. Cloud Run is a scalable computer environment for containerized applications. Applications must provide HTTP endpoints so that operations can be invoked via HTTP requests. On the other hand, Cloud Workflows is an orchestrator for HTTP-based cloud services. You can easily define workflows in YAML with error handling, conditional steps and retry logic.
This post is divided into two parts. In part one, we’ll take a look at how to define and deploy Cloud Run services. Further, in part two we’ll see how to define and deploy Google Workflows to orchestrate those services.
Overview
The solution consists of two Cloud Run services: bq-load-svc loads CSV data from GCS into BigQuery and bq-dbt-svc performs data transformations using dbt CLI. Additionally, three Google Workflows orchestrate both services, executing steps in the right order and handling validation errors. Finally, Google Cloud Build builds and deploys the artifacts, Google Container Registry stores the images for the services and Google Secret Manager holds the credentials to BigQuery.
Here’s a diagram showing the components of the solution:
dbt project
The dbt project for this solution is practically the same as the one in my previous post. It uses the same macros, models and tests to prepare/validate raw data before loading it into the data warehouse. The difference is that src_usda.yml now contains BigQuery load job configuration parameters in the “meta” key. This is a great feature, as a result you don’t need to add extra configuration files to your solution. For example, the code below shows the definition for the stdref_fd_group source table:
# src_usda.yml version: 2 sources: - name: usda tables: - name: stdref_fd_group meta: load_job_config: source_format: 'CSV' field_delimiter: '^' quote_character: '~' write_disposition: 'WRITE_TRUNCATE' create_disposition: 'CREATE_IF_NEEDED' columns: - name: fdgrp_cd - name: fdgrp_desc ...
bq-dbt-svc
The service provides a way to interact with the dbt project through HTTP requests. It is a Flask app that implements two endpoints: source and dbt.
The source endpoint fetches the definition of a table in src_usda.yml and returns it as JSON, while the dbt endpoint executes the dbt CLI in a subprocess and returns the results as JSON. The dbt endpoint is similar to the cli_args option of the dbt rpc server.
In fact, I thought about using the dbt rpc server as dbt service, but it keeps its state in memory, making it unsuitable for Cloud Run. As a matter of fact, applications need to be stateless containers, since Cloud Run may stop container instances after a period of inactivity or create multiple instances under heavy loads. The code below shows the function that handles the requests for the dbt endpoint:
# main.py # Execute a dbt command @app.route("/dbt", methods=["POST"]) def run(): app.logger.info("Started processing request on endpoint {}".format( request.base_url)) command = ["dbt"] arguments = [] # Parse the request data request_data = request.get_json() app.logger.info("Request data: {}".format(request_data)) if request_data: if "cli" in request_data.get("params", {}): arguments = request_data["params"]["cli"].split(" ") command.extend(arguments) # Add an argument for the project dir if not specified if not any("--project-dir" in c for c in command): project_dir = os.environ.get("DBT_PROJECT_DIR", None) if project_dir: command.extend(["--project-dir", project_dir]) # Execute the dbt command result = subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # Format the response response = { "result": { "status": "ok" if result.returncode == 0 else "error", "args": result.args, "return_code": result.returncode, "command_output": result.stdout, } } app.logger.info("Command output: {}".format( response["result"]["command_output"])) app.logger.info("Command status: {}".format(response["result"]["status"])) app.logger.info("Finished processing request on endpoint {}".format( request.base_url)) return response, 200
Here’s an example of calling the dbt endpoint locally to compile the staging models:
curl -X POST \ -H 'Content-Type: application/json' \ -d '{"params": {"cli": "compile --project-dir=/workspaces/bq-dbt-poc/bq-dbt-svc/dbt --models=models/staging/usda/*"}}' \ http://172.17.0.2:8080/dbt | jq . { "result": { "args": [ "dbt", "compile", "--project-dir=/workspaces/bq-dbt-poc/bq-dbt-svc/dbt", "--models=models/staging/usda/*" ], "command_output": "Running with dbt=0.19.1\nFound 13 models, 6 tests, 0 snapshots, 4 analyses, 357 macros, 4 operations, 0 seed files, 4 sources, 0 exposures\n\n20:06:38 | Concurrency: 8 threads (target='dev')\n20:06:38 | \n20:06:39 | Done.\n", "return_code": 0, "status": "ok" } }
Finally, below is an example of the Dockerfile. As you can see, both the Flask app and the dbt project are deployed into the Docker image:
FROM fishtownanalytics/dbt:0.19.1 ENV PYTHONUNBUFFERED True ENV APP_HOME /bq-dbt-svc ENV PORT 8080 ENV DBT_PROFILES_DIR ${APP_HOME}/profiles ENV DBT_PROJECT_DIR ${APP_HOME}/dbt ENV FLASK_SERVICE_DIR ${APP_HOME}/flask # Deploy the code WORKDIR ${APP_HOME} COPY dbt/ ${DBT_PROJECT_DIR}/ COPY profiles/profiles.yml ${DBT_PROFILES_DIR}/ COPY flask ${FLASK_SERVICE_DIR}/ # Install dbt dependencies WORKDIR ${DBT_PROJECT_DIR} RUN dbt deps # Install flask service dependencies WORKDIR ${FLASK_SERVICE_DIR} RUN python -m venv venv \ && venv/bin/python -m pip install -r requirements.txt # Start the flask service ENTRYPOINT exec venv/bin/python -m gunicorn \ --bind :${PORT} \ --workers 1 \ --threads 8 \ --timeout 0 \ main:app
bq-load-svc
The service loads CSV files from GCS into BigQuery tables. It’s a Flask app that listens for HTTP POST requests on the load endpoint. After receiving a request, it parses the JSON in its body and launches a BigQuery load job. The code below shows the function that handles the requests for the load endpoint:
# main.py # Load a GCS file into BigQuery @app.route("/load", methods=['POST']) def run(): app.logger.info("Started processing request on endpoint {}".format( request.base_url)) # Parse the request data request_data = request.get_json() app.logger.info("Request data: {}".format(request_data)) request_params = request_data.get("params", {}) source_definition = request_params.get("source_definition", None) source_file_uri = request_params.get("source_file_uri", None) project_id = request_params.get("project_id", None) dataset_id = request_params.get("dataset_id", None) # Verification of missing parameters if not (source_definition and source_file_uri and project_id and dataset_id): if not source_definition: return error_response( "The 'source_definition' parameter is required") if not source_file_uri: return error_response("The 'source_file_uri' parameter is required") if not project_id: return error_response("The 'project_id' parameter is required") if not dataset_id: return error_response("The 'dataset_id' parameter is required") # Load job configuration table_name = source_definition.get("table", {}).get("name", None) if not table_name: return error_response( "No table name specified in 'source_definition.table.name'") client = bigquery.Client(project=project_id) table_ref = client.dataset(dataset_id).table(table_name) table_load_config = source_definition.get("table", {}).get("meta", {}).get( "load_job_config", {}) job_config = bigquery.LoadJobConfig(**table_load_config) # Configure table schema _schema = [] table_columns = source_definition.get("table", {}).get("columns", []) for c in table_columns: if not c.get("name", None): return error_response( "No name specified for column in 'source_definition.table.columns'" ) field = bigquery.SchemaField( c["name"], c.get("meta", {}).get("data_type", "string")) _schema.append(field) job_config.schema = _schema try: job = client.load_table_from_uri(source_file_uri, table_ref, job_config=job_config) job.result() # Wait for the table load to complete response = {"result": {"status": "ok"}} except Exception as e: response = {"result": {"status": "error", "message": e.message}} finally: app.logger.info("Finished processing request on endpoint {}".format( request.base_url)) if response["result"]["status"] == "ok": return response, 200 else: return response, 500
Here’s an example of calling the load endpoint locally to load the gs://gcs-ingestion/SR-Leg_ASC/FD_GROUP.txt file into the stdref_fd_group table:
read -r -d '' DATA << EOM { "params": { "project_id": "bigquery-sandbox", "dataset_id": "bq_demo_ldg", "source_file_uri": "gs://gcs-ingestion/SR-Leg_ASC/FD_GROUP.txt", "source_definition": { "name": "usda", "table": { "columns": [ { "name": "fdgrp_cd" }, { "name": "fdgrp_desc" } ], "meta": { "load_job_config": { "create_disposition": "CREATE_IF_NEEDED", "field_delimiter": "^", "quote_character": "~", "source_format": "CSV", "write_disposition": "WRITE_TRUNCATE" } }, "name": "stdref_fd_group" } } } } EOM curl -X POST \ -H 'Content-Type: application/json' \ -d $DATA \ http://172.17.0.2:8080/load | jq . ``` Response: ```json { "result": { "status": "ok" } }
The content of the source_definition property comes from calling the source endpoint on the bq-dbt-svc service. I put together the JSON for the example, but as we’ll see in part two, a workflow will take care of calling bq-dbt-svc and building the request for bq-load-svc.
Deployment to Google Cloud (GCP)
Deploying the Cloud Run services to GCP is straightforward. The code below shows the Cloud Build config for bq-dbt-svc. First, it builds the docker image and publishes it on the Container Registry. Second, it deploys the container image to Cloud Run with the parameters specified. Finally, it adds an IAM policy binding to the role roles/run.invoker for the service account so that Workflows can invoke the service. The –update-secrets argument mounts the bq-dbt-sa-key secret in Secret Manager as a volume in the container. The secret is the JSON key for the service account, and dbt CLI uses it to authenticate to BigQuery.
# cb-bq-dbt-svc.yml steps: - name: 'gcr.io/cloud-builders/docker' args: [ 'build', '-t', 'gcr.io/$PROJECT_ID/bq-dbt-svc', '.' ] - name: 'gcr.io/cloud-builders/docker' args: ['push', 'gcr.io/$PROJECT_ID/bq-dbt-svc'] - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' entrypoint: gcloud args: ['beta', 'run', 'deploy', 'bq-dbt-svc', '--image', 'gcr.io/$PROJECT_ID/bq-dbt-svc', '--region', 'us-central1', '--platform', 'managed', '--port', '8080', '--cpu', '1', '--memory', '512Mi', '--concurrency', '1', '--service-account', 'bq-dbt-sa@bigquery-sandbox.iam.gserviceaccount.com', '--update-secrets', '/bq-dbt-svc/keys/sa-key.json=bq-dbt-sa-key:latest', '--no-allow-unauthenticated'] - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' entrypoint: gcloud args: ['beta', 'run', 'services', 'add-iam-policy-binding', 'bq-dbt-svc', '--region', 'us-central1', '--member', 'serviceAccount:bq-dbt-sa@bigquery-sandbox.iam.gserviceaccount.com', '--role', 'roles/run.invoker'] images: - 'gcr.io/$PROJECT_ID/bq-dbt-svc'
The command below starts the deployment by submitting the build config to Cloud Build:
gcloud builds submit ./bq-dbt-svc \ --config=./cloud-build/services/cb-bq-dbt-svc.yml \ --project bigquery-sandbox
Make sure the service account has the IAM policy bindings below, otherwise it will not be able to access all the components:
gcloud projects add-iam-policy-binding bigquery-sandbox --member=serviceAccount:bq-dbt-sa@bigquery-sandbox.iam.gserviceaccount.com --role=roles/bigquery.dataOwner gcloud projects add-iam-policy-binding bigquery-sandbox --member=serviceAccount:bq-dbt-sa@bigquery-sandbox.iam.gserviceaccount.com --role=roles/bigquery.jobUser gcloud projects add-iam-policy-binding bigquery-sandbox --member=serviceAccount:bq-dbt-sa@bigquery-sandbox.iam.gserviceaccount.com --role=roles/secretmanager.secretAccessor gcloud projects add-iam-policy-binding bigquery-sandbox --member=serviceAccount:bq-dbt-sa@bigquery-sandbox.iam.gserviceaccount.com --role=roles/storage.objectViewer gcloud projects add-iam-policy-binding bigquery-sandbox --member=serviceAccount:bq-dbt-sa@bigquery-sandbox.iam.gserviceaccount.com --role=roles/logging.logWriter gcloud projects add-iam-policy-binding bigquery-sandbox --member=serviceAccount:bq-dbt-sa@bigquery-sandbox.iam.gserviceaccount.com --role=roles/workflows.invoker
Here’s a screenshot of the two services after being deployed to GCP:
Conclusion
In this post we saw how to define and deploy two data services to Cloud Run. We can now load files from GCS to BigQuery or apply data transformations with dbt by sending HTTP requests. Moreover, we don’t need to provision any infrastructure and the services can scale up under heavy loads. In part two, we will see how to use Google Workflows to orchestrate the calls to these services.
Thanks for reading! Please leave a comment if you have any thoughts or questions and don’t forget to sign up for updates.
Share this
You May Also Like
These Related Stories
No Comments Yet
Let us know what you think