Most corporations have huge amounts of data in RDBMS (relational database management system). When considering a RDBMS data transfer and you only need a subset of data to migrate to the cloud, follow this very efficient and easy data ingestion solution using Google’s Dataflow:
Here are the benefits:
Google’s Dataflow is based on Apache Beam open source which is a model and a set of APIs for doing both batch and streaming data processing. This serverless server enables developers to set up processing beam pipelines to integrate, clean and transform data of large data sets, such as those found in big data analytics applications.
We will use Java to implement (also available Python, Scala and go) to show how easy it is to do the pipeline ingestion. We’ll create a Cloud SQL instance in Cloud SQL. This time, we’ll use a PostgreSQL instance, and then create a database. Then, we’ll start creating a JDBC connect directly to an on-premises SQL database and use the corresponding credentials to connect Dataflow to Cloud SQL.
First, initial the pipeline instance:
//define the pipelineOptions, use default
public interface mySqlOptions extends PipelineOptions {
}
// create options for pipeline
mySqlOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(mySqlOptions.class);
// create pipeline
Pipeline p = Pipeline.create(options);
Define read RDB data source from Oracle DB on premise to p-collection by JDBC connection:
// read from oracle on prem
.apply(JdbcIO.<TableRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("org.oracle.Driver", connectUrlPerm)
.withUsername(System.getenv("user"))
.withPassword(password)
)
.withBatchSize(Integer.parseInt(System.getenv("batch_size")))
.withStatement(getStatment(System.getenv("perm_sql")))
.withPreparedStatementSetter(new mySqlIngestion.mySqlStatementSetter()));
Define write p-collection data source to Cloud Postgresql SQL:
p.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("org.postgresql.Driver", conUrl)
.withUsername(System.getenv("user"))
.withPassword(password)
)
.withBatchSize(Integer.parseInt(System.getenv("batch_size")))
.withStatement(getStatment(System.getenv("csql_sql")))
.withPreparedStatementSetter(new MySqlIngestion.mySqlStatementSetter()));
p.run().waitUntilFinish();
}
Trigger all pipeline run:
p.run().waitUntilFinish();
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
Replace the following:
PROJECT_ID: your Cloud project IDBUCKET_NAME: the name of your Cloud Storage bucketREGION: a Dataflow regional endpoint, like us-central1As you can see, you can execute a highly efficient data transfer with just a few line codes. You can also easily switch to different JDBC drivers and RDBMS, for example: change JdbcIO toTextIO for text or BigQueryIO for BigQuery source.
https://cloud.google.com/dataflow
I hope you find this post helpful. Feel free to drop any questions in the comments and don’t forget to sign up for the next post.
Ready to get the database consulting support you need?