How to Deploy Spark in DataStax Cassandra 5.1

Spark is an open-source, distributed processing system used to manage big data workloads. Spark uses in-memory caching and optimized query execution for fast analytic queries against any data size. Simply put, Spark is used to process data on a very large scale.
How does Spark work?
Spark is a distributed computing engine used to process and analyse huge amounts of data. It works with the integrated system by distributing data across the cluster and processing the data accordingly. Spark uses master/slave architecture i.e. there will be one centralized master and many workers.
This centralized master will communicate with many distributed workers called executors. Each executor is a separate Java process. A Spark application is a combination of master and its own executors. Spark is launched on a set of machines where it works like a master/slave configuration, where there is a master which acts as an interface to the cluster. It monitors the state of each node when jobs are submitted.
What can you do with Spark?
Spark can process petabytes/terabytes of data which is distributed across many clusters. It has an extensive set of libraries for developers and supports a vast number of programming languages, including Python, Scala, Java, R. It’s often used with backend datasets, such as Cassandra, MapR, HBase, Hadoop HDFS.
Typical use-cases include:
- Processing ETL jobs which scans large datasets.
- Machine Learning.
- Processing financial/time series data.
- Processing complex history data for trends.
How can you integrate Spark with DSE Cassandra 5.1?
Execute steps on all nodes in a cluster.
1. Create a Spark user. Execute on ANY ONE node:
CREATE ROLE spark WITH PASSWORD = 'some_password' AND LOGIN = true; GRANT SELECT ON ALL KEYSPACES TO spark; GRANT MODIFY ON ALL KEYSPACES TO spark; GRANT EXECUTE ON REMOTE OBJECT DseResourceManager TO spark; GRANT EXECUTE ON REMOTE OBJECT DseClientTool TO spark; GRANT CREATE ON ANY WORKPOOL TO spark; GRANT MODIFY ON ANY SUBMISSION TO spark; GRANT EXECUTE ON ALL REMOTE CALLS TO spark;
2. Create below dirs related to Spark:
mkdir -p /var/lib/cassandra/spark/worker mkdir -p /var/lib/cassandra/spark/rdd mkdir -p /var/lib/cassandra/spark/dsefs mkdir -p /var/lib/cassandra/spark/dsefs/data mkdir -p /var/log/spark/worker mkdir -p /var/log/spark/master chown -R cassandra.cassandra /var/lib/cassandra/spark chown -R cassandra.cassandra /var/log/spark
3. Start Cassandra in Spark mode:
bin/dse cassandra -k
4. Update Spark environment:
vi /opt/cassandra/dse/resources/spark/conf/spark-env.sh # Make sure port numbers are same on all nodes export SPARK_MASTER_PORT=7077 export SPARK_MASTER_WEBUI_PORT=7080 export SPARK_WORKER_WEBUI_PORT=7081 export SPARK_CASSANDRA_CONNECTION_HOST="<IP OF HOST>" export SPARK_DRIVER_HOST="<IP OF HOST>" export SPARK_WORKER_MEMORY=2048m export SPARK_WORKER_CORES=4 export SPARK_DRIVER_MEMORY="1024M" # Update below paths export SPARK_WORKER_DIR="/var/lib/cassandra/spark/worker" export SPARK_LOCAL_DIRS="/var/lib/cassandra/spark/rdd" export SPARK_WORKER_LOG_DIR="/var/log/spark/worker" export SPARK_MASTER_LOG_DIR="/var/log/spark/master" SPARK_WORKER_OPTS="$SPARK_WORKER_OPTS -Dspark.hadoop.cassandra.username=<cql_username> -Dspark.hadoop.cassandra.password=<cql_password>"
5. Enable DSE file system:
vi /opt/cassandra/dse/resources/dse/conf/dse.yaml spark_security_enabled: true spark_security_encryption_enabled: true spark_ui_options: # encryption: inherit dsefs_options: enabled: keyspace_name: dsefs work_dir: /var/lib/cassandra/spark/dsefs public_port: 5598 private_port: 5599 data_directories: - dir: /var/lib/cassandra/spark/dsefs/data
6. Enable spark UI for authentication:
vi /opt/cassandra/dse/resources/spark/conf/spark-daemon-defaults.conf spark.ui.filters com.datastax.bdp.auth.SparkUIAuthFilter
7. Implement above steps on remaining nodes.
8. Do rolling restart of Cassandra on nodes and verify if Spark is running. Check logs:
service dse stop service dse start $>dsetool ring Address DC Rack Workload Graph Status State Load Owns VNodes Health [0,1] 192.168.1.11 dc1 rack1 Analytics(SM) no Up Normal 1.35 GiB ? 32 1.0 192.168.1.12 dc1 rack1 Analytics(SW) no Up Normal 1.38 GiB ? 32 1.0 192.168.1.13 dc1 rack1 Analytics(SW) no Up Normal 1.24 GiB ? 32 1.0
Setup a Spark connection
1. On any Spark node, export spark config. This will export the spark config needed for connection establishment:
$ dse client-tool configuration export spark-config.jar
2. Copy Spark-config.jar to client node:
$ scp spark-config.jar server-ip@:/path/location
3. On the client node, set Java and DSE 5.1 paths appropriately (Copy DSE 5.1.18 software):
export JAVA_HOME=/opt/cassandra/java export CASSANDRA_HOME=/opt/cassandra/dse-5.1.18 export CASSANDRA_CONF=$CASSANDRA_HOME/resources/cassandra/conf export PATH=$CASSANDRA_HOME/bin:$JAVA_HOME/bin:$PATH
4. On the client node, import Spark config:
Make sure to remove/move existing cqlshrc file in /home/cassandra/.cassandra/ $ dse client-tool configuration import spark-config.jar
5. Launch Spark connection:
$ dse -u spark -p xxxx spark --master dse://<ip-address>:9042 --num-executors 2 --executor-memory 1GB --total-executor-cores 1 The log file is at /home/cassandra/.spark-shell.log Creating a new Spark Session Spark context Web UI available at http://192.168.1.10:4040 Spark Context available as 'sc' (master = dse://192.168.1.11:9042, app id = app-20200109022447-0007). Spark Session available as 'spark'. Spark SqlContext (Deprecated use Spark Session instead) available as 'sqlContext' Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.2.22 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112) Type in expressions to have them evaluated. Type :help for more information. scala> :quit (cassandra@test01):[/home/cassandra] $>
6. Access Spark UI and use your (username – spark) credentials for authentication:
http://<any_node_ip_address:7080
I hope you find this post helpful. Feel free to drop any questions in the comments and don’t forget to sign up for the next post.