Insight and analysis of technology and business strategy
Spark + Cassandra Best Practices
Spark was created in 2009 as a response to difficulties with map-reduce in Hadoop, particularly in supporting machine learning and other interactive data analysis. Spark simplifies the processing and analysis of data, reducing the number of steps and allowing ease of development. It also provides for reduced latency, since processing is done in-memory. Spark can be used to process and analyze data to and from a variety of data storage sources and destinations. In this blog , we will discuss Spark in conjunction with data stored in Cassandra. Querying and manipulating data in Spark has several advantages over doing so directly in Cassandra, not the least of which is being able to join data performantly. This feature is useful for analytics projects.
Spark Use Cases
Typical use cases for Spark when used with Cassandra are: aggregating data (for example, calculating averages by day or grouping counts by category) and archiving data (for example, sending external data to cold storage before deleting from Cassandra). Spark is also used for batched inserts to Cassandra. Other use cases not particular to Cassandra include a variety of machine learning topics.
Spark in the Data Lifecycle
A data analysis project starts with data ingestion into data storage. From there, data is cleansed and otherwise processed. The resulting data is analyzed, reviewing for patterns and other qualities. It may then be further analyzed using a variety of machine learning methods. End-users will be able to run ad hoc queries and use interfaces to visualize data patterns. Spark has a star role within this data flow architecture.
Spark can be used independently to load data in batches from a variety of data sources (including Cassandra tables) into distributed data structures (RDDs) used in Spark to parallelize analytic jobs. Since one of the key features of RDDs is the ability to do this processing in memory, loading large amounts of data without server-side filtering will slow down your project. The spark-cassandra-connector has this filtering and other capabilities. (See
https://github.com/datastax/spark-cassandra-connector/.) The limitation on memory resources also implies that, once the data is analyzed, it should be persisted (e.g., to a file or database). To avoid some of the limitations of this batch processing, streaming functionality was added to Spark. In Spark 1, this functionality was offered through DStreams. (See
https://spark.apache.org/docs/latest/streaming-programming-guide.html.) Spark 2 -- a more robust version of Spark in general -- includes Structured Streaming. (See
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.) With Structured Streaming, consider that instead of creating a static table based on a batch input, the table is constantly updated with new data from the source. The data will be stored in a data frame and continuously updated with the new data. (Another benefit of using dataframes over RDDs is that the data is intuitively abstracted into columns and rows.) Available data sources on the source side for streaming include the commonly used Apache Kafka. Kafka buffers the ingest, which is key for high-volume streams. See
https://kafka.apache.org/ for more details on Kafka.
Although we are focusing on Cassandra as the data storage in this presentation, other storage sources and destinations are possible. Another frequently used data storage option is Hadoop HDFS. The previously mentioned spark-cassandra-connector has capabilities to write results to Cassandra, and in the case of batch loading, to read data directly from Cassandra. Native data output formats available include both JSON and Parquet. The Parquet format in particular is useful for writing to AWS S3. See
https://aws.amazon.com/about-aws/whats-new/2018/09/amazon-s3-announces-new-features-for-s3-select/ for more information on querying S3 files stored in Parquet format. A good use case for this is archiving data from Cassandra.
Data cleansing involves dealing with questionable data (such as null values) and other preprocessing tasks (such as converting categorical data to mapped integers). Once data is stored in a data frame, it can be transformed into new dataframes based on filters. Other than the fact you have the capability to do this cleansing within the same code (e.g., the Scala script running Spark), Spark does not provide magic to clean data; after all, this takes knowledge about the data and the business to understand and code particular transformation tasks.
Spark dataframes can be easily explored for basic patterns using commands like
describe, which will show the count, mean, standard deviation, minimum value, and maximum value of selected columns. Dataframes can be further transformed with functions like
agg (aggregate). Spark SQL can be used for complex joins and similar tasks, using the SQL language that will be familiar to many data analysts.
Machine Learning and Data Mining
Machine learning and data mining encompass a broad range of data modeling algorithms intended to make predictions or to discover unknown meaning within data. From Spark 2 onward, the main library for Spark machine learning is based on data frames instead of RDDs. You may see this new data frame-based library referred to as Spark ML, but the library name hasn't changed -- it is still MLlib. (See
https://spark.apache.org/docs/latest/ml-guide.html.) Some things that are possible with Spark in this area are recommendation engines, anomaly detection, semantic analysis, and risk analysis.
Ad Hoc Queries
Spark SQL is available to use within any code used with Spark, or from the command line interface; however, the requirement to run ad hoc queries generally implies that business end-users want to access a GUI to both ask questions of the data and create visualizations. This activity could take place using the eventual destination datastore as the backend. Directly from Spark, there are enterprise options such as Tableau, which has a Spark connector. For query speed, a memory-based cache such as Apache Ignite could be used as the analytics backend; to maintain that speed by avoiding disk i/o, the data being used for queries should fit into memory.
Depending on the programming language and platform used, there may be libraries available to directly visualize results. For example, if Python is used in a Jupyter notebook, then Matplotlib will be available. (See
https://matplotlib.org/.) In general, except for investigating data in order to clean it, etc., visualization will be done on the data after it is written to the destination. For business end-users, the above discussion in
Ad Hoc Queries applies.
The general architecture for a Spark + Cassandra project is apparent from the discussion of the
Data Lifecycle above. The core elements are source data storage, a queueing technology, the Spark cluster, and destination data storage. In the case of Cassandra, the source data storage is of course a cluster. Spark will only query a single data center, and to avoid load on a production cluster, this is what you want. Most installations will set up a second data center replicating data from the production data center cluster and attach Spark to this second data center. If you are unable to set up a separate data center to connect to Spark (and we strongly recommend setting it up), be sure to carefully tune the write variables in the spark-cassandra-connector. In addition, if Datastax Enterprise is being used, then DSE Search should be isolated on a third data center. Another consideration is whether to set up Spark on dedicated machines. It is possible to run Spark on the same nodes as Cassandra, and this is the default with DSE Analytics. Again, this is not advised on the main production cluster, but can be done on a second, separate cluster.
Spark code can be written in Python, Scala, Java, or R. SQL can also be used within much of Spark code.
Spark Connector Configuration
Slowing down the throughput (
output.throughput_mb_per_sec) can alleviate latency. For writing, then the Spark batch size (
spark.cassandra.output.batch.size.bytes) should be within the Cassandra configured batch size (
batch_size_fail_threshold_in_kb). Writing more frequently will reduce the size of the write, reducing the latency. Increasing the batch size will reduce the number of times Cassandra has to deal with timestamp management. Spark can cause allocation failures if the batch size is too big. For further documentation on connector settings, see
Security has to be explicitly configured in Spark; it is not on by default. However, the configuration doesn't cover all risk vectors, so review the options carefully. Also, most of these settings can be overridden in code accessing Spark, so it is important to audit your codebase and most important to limit connections from specific hosts further protected by user authentication.
Authentication is turned off by default. It is enabled through two configuration variables, using a shared secret. The two configuration variables are spark.authenticate (default is false; set to true) and spark.authenticate.secret (set to string of shared secret). If YARN is used, then much of this is done by default. If not, then set these two in the spark-defaults.conf file. Then use this secret key when submitting jobs. Note that the secret key can be used to submit jobs by anyone with the key, so protect it well.
Enable logging for all submitted jobs
You can set
spark.eventLog.enabled in the spark-defaults.conf file, but it can be overridden in a user's code (e.g., in the SparkConf) or in shell commands, so it has to be enforced by business policy. Note also that the log file itself (configured via spark.eventLog.dir) should be protected with filesystem permissions to avoid users snooping data within it.
Block Java debugging options in JVM
Make sure the JVM configuration does not include the following options: -Xdebug, -Xrunjdwp, -agentlib:jdwp.
Redact environment data in WebUI
Can disable the whole UI via
spark.ui.enabled, but other than that, or overriding the EnvironmentListener with alternate custom code, there is no way to redact the information in the Environment tab of the UI specifically. It is recommended to enable ACLs for both the WebUI and the history server, which will protect the entirety of the web-based information.
Enable and enforce SASL RPC encryption
The recommendation with Spark is to enable AES encryption since version 2.2, unless using an external Shuffle service. To enable SASL, set the following to true:
Enable encryption on all UIs and clients
To enable AES encryption for data going across the wire, in addition to turning on authentication as above, also set the following to true: spark.network.crypto.enabled. Choose a key length and set via
spark.network.crypto.keyLength, and choose an algorithm from those available in your JRE and set via
spark.network.crypto.keyFactoryAlgorithm. Don't forget to also set configuration from any database (e.g., Cassandra) to Spark, to encrypt that traffic.
Enable encryption on Shuffle service
In addition to the above encryption configuration, set the following to true:
spark.network.crypto.saslFallback. To encrypt the temporary files created by the Shuffle service, set this to true:
spark.io.encryption.enabled. The key size and algorithm can also be set via
spark.io.encryption.keygen.algorithm, but these have reasonable defaults.
Disable Spark REST server
The REST server presents a serious risk, as it does not allow for encryption. Set the following to false:
With experience as an open-source DBA and developer for software-as-a-service environments, Valerie has expertise in web-scale data storage and data delivery, including MySQL, Cassandra, Postgres, and MongoDB.