Apache Beam is a unified model for defining both batch and streaming data-parallel processing pipelines. It’s a software development kit (SDK) to define and construct data processing pipelines as well as runners to execute them.
The Capability Matrix is trying to say the capabilities of individual runners and also provides a high level view of each runner which supports the Beam model.
Based on the capability matrix, the user can decide which runner suits his use case.
In the official website, as mentioned there are many use cases with Apache Beam for the reference:
Few details of interesting case studies :
The key concepts in the programming model are:
Simply we can say PipelineRunner executes a Pipeline and a Pipeline consists of PCollection and PTransform.
Let’s start creating Beam pipelines using Python and Colab. The main focus is batch processing in this blog.
Google Colaboratory (Colab) is a free Jupyter notebook interactive development environment that runs entirely in the cloud.
The initial setup in Colab for running Beam pipelines, go to the new notebook and install apache beam by running the following command in notebook.
!{'pip install apache_beam'}
!{'pip install apache-beam[interactive]'}
Creating directories and listing in the machine.
Uploading files from local.
from google.colab import files upload = files.upload()
Source transforms is one of the transform types and it has TextIO.Read and Create. A source transform conceptually has no input.
In the below code we are passing list of elements for creating a Pcollection and writing to text file.
import apache_beam as beam
p3 = beam.Pipeline()
lines1 = (p3
| beam.Create([100,101,102,103,104,105,106,107,108,109])
| beam.io.WriteToText('output/file2')
)
p3.run()
Output: Result of the above program
Sample data used for the below examples
Here we will be fetching the HR department employee count using Map /Filter transformation.
import apache_beam as beam
p1 = beam.Pipeline()
emp_count = (
p1
|beam.io.ReadFromText('emp1.txt')
|beam.Map(lambda record: record.split(','))
|beam.Filter(lambda record: record[3] == 'HR')
|beam.Map(lambda record: (record[3], 1))
|beam.CombinePerKey(sum)
|beam.io.WriteToText('output/emp_output1')
)
p1.run()
Brief explanation of the Pipeline:
Branching pipeline is used for running 2 or multiple tasks on the same data.
For better understanding, we are using the same sample data and also we are calculating department wise counts.
In the below code, we are trying to get count of employee in each department
import apache_beam as beam
p = beam.Pipeline()
input_collection = (
p
| "Read from text file" >> beam.io.ReadFromText('emp1.txt')
| "Splitting rows based on delimiters" >> beam.Map(lambda record: record.split(','))
)
accounts_empcount = (
input_collection
| beam.Filter(lambda record: record[3] == 'Accounts')
| 'Pair each accounts with 1' >> beam.Map(lambda record: (record[3], 1))
| 'Group and sum1' >> beam.CombinePerKey(sum)
#| 'Write the results of ACCOUNTS' >> beam.io.WriteToText('output/emp4')
)
hr_empcount = (
input_collection
| beam.Filter(lambda record: record[3] == 'HR')
| 'Pair each HR with 1' >> beam.Map(lambda record: (record[3], 1))
| 'Group and sum' >> beam.CombinePerKey(sum)
#| 'Write the results of HR' >> beam.io.WriteToText('output/emp5')
)
empcount = (
input_collection
| 'Pair each dept with 1' >> beam.Map(lambda record: ("ALL_Dept", 1))
| 'Group and sum2' >> beam.CombinePerKey(sum)
#| 'Write the results of all dept' >> beam.io.WriteToText('output/emp7')
)
output =(
(accounts_empcount,hr_empcount,empcount)
| beam.Flatten()
| beam.io.WriteToText('output/emp6')
)
p.run()
import apache_beam as beam
class SplitRow(beam.DoFn):
def process(self, element):
# return type -> list
return [element.split(',')]
class FilterAccountsEmp(beam.DoFn):
def process(self, element):
if element[3] == 'Accounts':
return [element]
class pairDept(beam.DoFn):
def process(self, element):
return [(element[3], 1)]
class empcount(beam.DoFn):
def process(self, element):
(key, values) = element # [Accounts , [1,1] ]
return [(key, sum(values))]
p1 = beam.Pipeline()
account_empcount = (
p1
|beam.io.ReadFromText('emp1.txt')
|beam.ParDo(SplitRow())
|beam.ParDo(FilterAccountsEmp())
|beam.ParDo(pairDept())
| 'Group ' >> beam.GroupByKey()
| 'Sum using ParDo' >> beam.ParDo(empcount())
|beam.io.WriteToText('output/file5')
)
p1.run()
Results we got from the code as shown below.
import apache_beam as beam
def SplitRow(element):
return element.split(',')
class MyCompositeTransform(beam.PTransform):
def expand(self, input_coll):
a = (
input_coll
| 'Pair each respective department with 1' >> beam.Map(lambda record: (record[3], 1))
| 'Grouping & sum' >> beam.CombinePerKey(sum)
)
return a
p = beam.Pipeline()
input_collection = (
p
| "Read from text file" >> beam.io.ReadFromText('emp1.txt')
| "Splitting rows based on delimiters" >> beam.Map(SplitRow)
)
accounts_empcount = (
input_collection
| beam.Filter(lambda record: record[3] == 'Accounts')
| 'accounts MyCompositeTransform' >> MyCompositeTransform()
)
hr_empcount = (
input_collection
| beam.Filter(lambda record: record[3] == 'HR')
| 'hr MyCompositeTransform' >> MyCompositeTransform()
)
output =(
(accounts_empcount,hr_empcount)
| beam.Flatten()
| beam.io.WriteToText('output/emp6')
)
p.run()
Results of the above code:
In this blog i have tried to explain batch processing using Beam Model, in the next blog we will try to cover stream processing using Beam Model.
Apache Beam is a powerful model for data-parallel processing for both streaming and batch. It is also portable and flexible.
The most capable runners at this point in time are clearly Google Cloud Dataflow (for running on GCP) and Apache Flink (for on-premise and non-Google cloud), as detailed by the capability matrix.
Can Apache Beam be the future of data processing? Share your thoughts in the comments and don’t forget to sign up for the next post.
Ready to optimize your Data Analytics for the future?