Google Cloud Dataproc in ETL pipeline - part 1 (logging)
Google Cloud Dataproc , now generally available, provides access to fully managed Hadoop and Apache Spark clusters, and leverages open source data tools for querying, batch/stream processing, and at-scale machine learning. To get more technical information on the specifics of the platform, refer to Google’s original blog post and product home page . Having access to fully managed Hadoop/Spark based technology and powerful Machine Learning Library (MLlib) as part of Google Cloud Platform makes perfect sense as it allows you to reuse existing code and helps many to overcome the fear of being “locked into” one specific vendor while taking a step into big data processing in the cloud. That said, I would still recommend evaluating Google Cloud Dataflow first while implementing new projects and processes for its efficiency, simplicity and semantic-rich analytics capabilities, especially around stream processing. When Cloud Dataproc was first released to the public, it received positive reviews. Many blogs were written on the subject with few taking it through some “tough” challenges on its promise to deliver cluster startup in "less than 90 seconds”. In general the product was well received, with the overall consensus that it is well positioned against the AWS EMR offering. Being able, in a matter of minutes, to start Spark Cluster without any knowledge of the Hadoop ecosystem and having access to a powerful interactive shell such as Jupyter or Zeppelin is no doubt a Data Scientist’s dream. But with extremely fast startup/shutdown, “by the minute” billing and widely adopted technology stack, it also appears to be a perfect candidate for a processing block in bigger ETL pipelines. Orchestration, workflow engine, and logging are all crucial aspects of such solutions and I am planning to publish a few blog entries as I go through evaluation of each of these areas starting with Logging in this blog.
Cloud Dataproc Logging
Cluster's system and daemon logs are accessible through cluster UIs as well as through SSH-ing to the cluster, but there is a much better way to do this. By default these logs are also pushed to Google Cloud Logging consolidating all logs in one place with flexible Log Viewer UI and filtering. One can even create custom log-based metrics and use these for baselining and/or alerting purposes. All cluster logs are aggregated under a "dataproc-hadoop” tag but “structPayload.filename” field can be used as a filter for specific log file. In addition to relying on Logs Viewer UI, there is a way to integrate specific log messages into Cloud Storage or BigQuery for analysis. Just to get an idea on what logs are available by default, I have exported all Cloud Dataproc messages into BigQuery and queried new table with the following query: SELECT structPayload.filename AS file_name, count(*) AS cnt FROM [dataproc_logs.dataproc_hadoop_20160217] WHERE metadata.labels.key='dataproc.googleapis.com/cluster_id' AND metadata.labels.value = 'cluster-2:205c03ea-6bea-4c80-bdca-beb6b9ffb0d6' GROUP BY file_name- hadoop-hdfs-namenode-cluster-2-m.log
- yarn-yarn-nodemanager-cluster-2-w-0.log
- container_1455740844290_0001_01_000004.stderr
- hadoop-hdfs-secondarynamenode-cluster-2-m.log
- hive-metastore.log
- hadoop-hdfs-datanode-cluster-2-w-1.log
- hive-server2.log
- container_1455740844290_0001_01_000001.stderr
- container_1455740844290_0001_01_000002.stderr
- hadoop-hdfs-datanode-cluster-2-w-0.log
- yarn-yarn-nodemanager-cluster-2-w-1.log
- yarn-yarn-resourcemanager-cluster-2-m.log
- container_1455740844290_0001_01_000003.stderr
- mapred-mapred-historyserver-cluster-2-m.log
Application Logging
You can submit a job to the cluster using Cloud Console, Cloud SDK or REST API. Cloud Dataproc automatically gathers driver (console) output from all the workers, and makes it available through Cloud Console . Logs from the job are also uploaded to the staging bucket specified when starting a cluster and can be accessed from there. Note: One thing I found confusing is that when referencing driver output directory in Cloud Dataproc staging bucket you need Cluster ID (dataproc-cluster-uuid), however it is not yet listed on Cloud Dataproc Console. Having this ID or a direct link to the directory available from the Cluster Overview page is especially critical when starting/stopping many clusters as part of scheduled jobs. One way to get dataproc-cluster-uuid and a few other useful references is to navigate from Cluster "Overview" section to "VM Instances" and then to click on Master or any worker node and scroll down to "Custom metadata” section. Indeed, you can also get it using " gcloud beta dataproc clusters describe <CLUSTER_NAME> |grep clusterUuid" command but it would be nice to have it available through the console in a first place. The job (driver) output however is currently dumped into console ONLY (refer to /etc/spark/conf/log4j.properties on master node) and although accessible through Dataproc Job interface, it is not currently available in Cloud Logging. The easiest way around this issue, which can be easily implemented as part of Cluster initialization actions, is to modify /etc/spark/conf/log4j.properties by replacing " log4j.rootCategory=INFO, console ” with " log4j.rootCategory=INFO, console, file ” and add the following appender: # Adding file appender log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=/var/log/spark/spark-log4j.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n Existing Cloud Dataproc fluentd configuration will automatically tail through all files under /var/log/spark directory adding events into Cloud Logging and should automatically pick up messages going into /var/log/spark/spark-log4j.log . You can verify that logs from the job started to appear in Cloud Logging by firing up one of the examples provided with Cloud Dataproc and filtering Logs Viewer using the following rule: node.metadata.serviceName="dataproc.googleapis.com" structPayload.filename="spark-log4j.log" If after this change messages are still not appearing in Cloud Logging, try restarting fluentd daemon by running "/etc/init.d/google-fluentd restart” command on master node. Once changes are implemented and output is verified you can declare logger in your process as: import pyspark sc = pyspark.SparkContext() logger = sc._jvm.org.apache.log4j.Logger.getLogger(__name__) and submit the job redefining logging level (INFO by default) using "--driver-log-levels". Learn more here.Share this
Previous story
← The value of Pythian’s internship program
You May Also Like
These Related Stories
Datascape Episode 52: Snowflake Snowday Updates Fall 2021
![](https://www.pythian.com/hubfs/Imported_Blog_Media/Twitter-Image-Datascape-Podcast-Episode-52.jpeg)
Datascape Episode 52: Snowflake Snowday Updates Fall 2021
Jan 6, 2022
1
min read
Data encryption at rest in Oracle MySQL 5.7
![](https://www.pythian.com/hubfs/Imported_Blog_Media/featuredimage_wordpress.jpg)
Data encryption at rest in Oracle MySQL 5.7
Apr 20, 2016
5
min read
Building an ETL Pipeline with Multiple External Data Sources in Cloud Data Fusion
![](https://www.pythian.com/hubfs/Imported_Blog_Media/shutterstock_1761809246-scaled.jpg)
Building an ETL Pipeline with Multiple External Data Sources in Cloud Data Fusion
Aug 23, 2022
12
min read
No Comments Yet
Let us know what you think