Join, Group By, and Aggregate in Cloud Data Fusion
Good news!
Cloud Data Fusion is now GA. Announced at Google Next ‘19 UK on November 21, 2019, Cloud Data Fusion is a fully managed, cloud-native, enterprise data integration service for quickly building and managing data pipelines. Cloud Data Fusion web UI allows you to build scalable data integration solutions to clean, prepare, blend, transfer, and transform data, without having to manage the infrastructure. Cloud Data Fusion is powered by the open source project
CDAP. Wondering how to get started with Cloud Data Fusion? This post shows you how to simply build and use the Wrangler and Data Pipelines features in Cloud Data Fusion to clean, transform, and process flight data.
The output is similar to the following:
The output is similar to the following:
That’s it! You’ve just created and ran a complete data pipeline process on Cloud Data Fusion.
ETL Process
The diagram below shows the transformations which are going to take place. This includes reading the two files, transforming the data, and loading it into one output: Total Flights per Airline.Objectives
- Connect Cloud Data Fusion to data sources.
- Apply basic transformations.
- Join and Group By data sources.
- Write to a sink.
Log into the GC Console
First, go to the GCP console and log in using your Google account.Select or Create a GC project
Select a project. If you don’t have any projects, go to the project selector page to create one. For this post, I've created a specific project named flights-analysis. I highly recommend you create a new project for this walkthrough. See Creating your project for more information.Create GCS Bucket and Copy Data
You need data! The two small datasets are located in a GCS bucket that you must copy to your own bucket. First, create your bucket. You can do this by typing "bucket" in the resources and products search field and then selecting Create bucket. In the Create a Bucket page, enter the name of your bucket (1). Remember that these are globally unique and for the purpose of this demonstration we'll use flights-analysis-data2. Select Region (2) for Location type, and select the desired Location (3). We'll use northamerica-northeast1 for this demonstration. Click Create (4) to create the bucket.Activate Cloud Shell
Next, activate Cloud Shell. In the GCP console, click the Open Cloud Shell icon (top-right of the toolbar). You can click Continue when the dialog box appears. It takes a few moments to provision and connect to the environment. When you are connected, you are already authenticated and the project is set to your PROJECT_ID. The output is similar to the following: You need to execute the following commands to copy the required sample files and structure to your GCS bucket. Replace [BUCKET_NAME] with the name of the bucket you created earlier.gsutil cp gs://flights-analysis-sample-data/input/airlines.csv gs://[BUCKET_NAME]]/input/
gsutil cp gs://flights-analysis-sample-data/input/flights_small.csv gs://[BUCKET_NAME]/input/
Create a Cloud Data Fusion Instance
You are now ready to create your Cloud Data Fusion instance.- Go to the Cloud Data Fusion page. You can do this by typing "data fusion" in the resources and products search field and then selecting Data Fusion.
- If the Cloud Data Fusion API is not already enabled, activate it by clicking Enable. This might take a few moments to complete.
- Navigate back to the Data Fusion page. You are now ready to create a Data Fusion instance. Click Create An Instance.
- Enter an Instance name (1), select your Region (2), select Basic for Edition (3), and click Create (4) to deploy your instance. For the purpose of this exercise, we're using flights_data-etl as the name of the instance and northamerica-northeast1 as the region. You can enter your own values for these fields. Note: This requires several minutes to complete.
- After the instance deploys successfully, a green check mark appears. Click View Instance to continue.
- The next page provides details about the instance. Click View instance to go to your Cloud Data Fusion instance.
- Great work! You are now in your Cloud Data Fusion instance. Because you will be designing your data pipeline, click Studio to continue.
Building the Data Pipeline
- After accessing the Studio canvas, you are now ready to build the data pipeline. Make sure you are in Source (1) view, and then click GCS (2). A GCS Source appears on the canvas. Click Properties (3) from the GCS Source to continue.
- The GCS Properties configuration page appears. Specify the following values: - Label (1): “GCS - Flights Data” - Reference Name (2): “gcs_flights_data” - Path (3): Enter the path to the GCS Bucket you created earlier (where you saved the flight_small.csv file). For the purpose of this exercise, I included mine.
- Click Validate (4) to validate the information on the page. You should see (in green) "No errors found." Click X (5) to close/save the GSC properties.
- The next step is to "wrangle" the flights_small.csv dataset. Make sure you are in Transform (1) view, and then click the Wrangler (2). A Wrangler transform appears on the canvas.
- Connect (3) by clicking and dragging the small arrow from the GCS - Flights Data source to the Wrangler transform. Click Properties (4) in the Wrangler transform to continue.
- The Wrangler Properties configuration page appears. Enter “Wrangler - Flights Data” for Label (1). click Wrangle (2) to continue.
- Select the flights_small.csv file located in the GCS bucket you created earlier to continue.
- Next is a series of steps to parse and remove unwanted columns. Click the drop-down [Column transformations] in the body (1) column, and go to Parse -> CSV (2). Click Comma (3) as the delimiter, check Set first row as header (4), then click Apply (5) to continue.
- The result of the parse column transform added new columns that we must remove. Select the highlighted columns (1), click the drop-down [Column transformations] in the body (2) column, then select Delete selected columns (3) to continue.
- The results now show only the columns that you need to move forward. Click Apply to continue.
- Notice that the Recipe box is now populated with the directives you just specified. Click Validate (1) to validate the page. You should see (in green) "No errors found." Lastly, click X (2) to close/save the Wrangler Properties page.
- Now, let’s add the arlines.csv dataset. Make sure you are in Source (1) view, then click GCS (2). Another GCS source appears on the canvas. Click Properties (3) from the GCS Source to continue.
- The GCS Properties configuration page appears. Specify the following values: - Label (1): “GCS - Airlines Data” - Reference Name (2): “gcs_airlines_data” - Path (3): Enter the path to the GCS Bucket you created earlier (where you saved the airlines.csv file). For the purpose of this exercise, I include mine.
- Click Validate (4) to validate the page. You should see (in green) "No errors found." Lastly, click X (5) to close/save the GSC Properties page.
- Similar to what you did earlier for the flights_small.csv dataset, we now must wrangle the airlines.csv dataset. Make sure you are in Transform (1) view, then click Wrangler (2). Another Wrangler transform appears on the canvas.
- Connect (3) by clicking and dragging the small arrow from the GCS - Airlines Data source to the newly created Wrangler transform. Click Properties (4) in the Wrangler transform to continue.
- The Wrangler Properties configuration page appears. Enter “Wrangler - Airlines Data” in Label (1). Click Wrangle (2) to continue.
- Similar to what you did earlier, select the airlines.csv file in your GCS bucket to continue.
- Next is a series of steps to parse and remove unwanted columns. Click the drop-down [Column transformations] in the body (1) column, and go to Parse -> CSV (2). Select Comma (3) as the delimiter, select Set first row as header (4), then click Apply (5) to continue.
- The result of the parse column transform added new columns that we must remove from the body column. Click the drop-down [Column transformations] in the body (1) column, select Delete column (2), then click Apply (3) to continue.
- Notice that the Recipe box is now populated with the directives you just created. Click Validate (1) to validate the page. You should see (in green) "No errors found." Lastly, click X (2) to close/save the Wrangler Properties page.
- You are now ready to join the two datasets. Make sure you are in Analytics (1) view, then click Joiner (2). A Joiner task appears on the canvas.
- Connect (3) by clicking and dragging the small arrow from the Wrangler - Flights Data to the Joiner. Do the same for the Wrangler - Airlines Data. Click Properties (4) from the Joiner task to continue.
- The Joiner Properties configuration box appears. In the Join - Fields section, expand the Wrangler - Airlines Data (1) and clear the Code (2) checkbox. Enter “Airline_name” in the Description (3) field and select Inner (4) for the Join Type.
- In the Join Condition (5) section, enter “Airline” in the Wrangler - Flight Data field, and “Code” in the Wrangler - Airlines Data field. Click Get (6) for the schema, then click Validate (7) to validate the page. You should see (in green) "No errors found." Lastly, click X (8) to close/save the Joiner Properties page.
- Next, we must "group by" and aggregate. Make sure you are in Analytics (1) view, then click Group By (2). A Group By task appears on the canvas.
- Connect (3) by clicking and dragging the small arrow from the Joiner to the Group By task. Click Properties (4) from the Group By task to continue.
- The Group By Properties configuration box appears. In the Group by fields section (1), add the following fields: "Airline," "Airline_code," and "Airline_name."
- In the Aggregates (2) section, enter “Departure_schedule”, select Count as the aggregation, and enter “Flight_count” as an alias. Click Get Schema (3), then click Validate (4) to validate the page. You should see (in green) "No errors found." Lastly, click X (5) to close/save the Group By Properties page.
- To complete your data pipeline, you must set up a sink to output the results. Make sure you are in Sink (1) view, then click GCS (2). A GCS Sink appears on the canvas.
- Connect (3) by selecting and dragging the small arrow from the Group By task to the GCS Sink. Click Properties (4) from the GCS Sink to continue.
- The GCS Properties configuration page appears. Specify the following: - Label (1): “GCS - Flight Count” - Reference Name (2): “gcs_flight_count” - Path (3): Enter the path to the GCS Bucket you created earlier. For the purpose of this exercise, I include mine.
- Select csv for the Format (4) of the output file, then click Validate (5) to validate the page. You should see (in green) "No errors found." Lastly, click X (6) to close/save the GSC Properties page.
- Congrats! You have completed building your data pipeline. Now give your pipeline a Name (1), Save (2) it, then Deploy (3) it. It takes a few moments to finalize and complete the deployment process.
- You are now ready to execute your pipeline. Click Run.
- The pipeline goes through different Status cycles: Deployed, Provisioning, Starting, Running, Deprovisioning and Succeeded. When the pipeline Status is "Succeeded," examine the output file in your GCS bucket.
- To validate the output sink of your pipeline, go to the GCS bucket output folder and execute the following gsutil command to view the results. Make sure to replace [BUCKET_NAME] and [REPLACE_WITH_YOUR_FOLDER_DATE] with your information:
gsutil cat -h gs://[BUCKET_NAME]]/output/flight_count/[REPLACE_WITH_YOUR_FOLDER_DATE]/part-r-00000
Cleanup
To avoid incurring charges to your Google Cloud platform account for the resources used in this post, do the following: If you want to delete the entire project:- In the GC Console, go to the Manage resources page.
- In the project list, select the project you want to delete and click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
- To view your existing Cloud Data Fusion instances, open the Instances page.
- To select an instance, check the box next to the instance name.
- To delete the instance, click Delete.
Share this
You May Also Like
These Related Stories
Why Choose Google Cloud for Your Data Warehouse Migration
Why Choose Google Cloud for Your Data Warehouse Migration
May 11, 2022
3
min read
Pythian EDP QuickStart Components: Part 4
Pythian EDP QuickStart Components: Part 4
Sep 14, 2022
4
min read
Why Migrate Your PostgreSQL, MySQL or SQL Server to Google Cloud SQL?
Why Migrate Your PostgreSQL, MySQL or SQL Server to Google Cloud SQL?
May 2, 2022
3
min read
No Comments Yet
Let us know what you think