Spark - DSE Cassandra Reference Guide

Resources

By default, spark will launch new jobs using all cluster resources. It’s often a best practice to reduce the default configuration of the number of core and memory used to avoid having a long running job (spark shell or sql) stealing all resources.
Finding the best amount of memory and cores for a job isn’t always easy. The following can help define a good default value.

  • Having many core will likely speed-up the computation. Start with spark.cores.max = 3-4 x number executor
  • Having more memory will allow bigger partition. 4-8GB is a good start (see details below).
  • The executor memory is split between all its core. Having 1 executor running 4 cores on a 1GB heap is likely wrong.
  • It’s generally best to define a number of core being a multiple of your number of executor
  • The number of core per executor is computed by totalNumberOfCore / numberOfExecutor
  • Sparklint can be useful to optimize cluster utilization

Running multiple spark application: dynamic resource allocation

Dynamic allocation can be used to allow a spark job to dynamically extends its resources (request more executors). see Spark documentation detail (typically a long running job with small periodic jobs)

Running jobs within a single spark application: fair scheduler

By default, spark will execute jobs in a FIFO order. If multiple jobs need to be running at the same time, the fair scheduler can be used to run multiple jobs in parallel (multiple pools can be defined, each pools can be FIFO or FAIR).
see Spark documentation detail

Running your job

How to submit

There are multiple ways to submit a spark job with DSE:

It’s easier to submit jars to DSEFS, especially when running on cluster mode.


    //send jar to dsefs. DSEFS HTTP api can also be used.
    dse fs "put /home/ss/testspark-0.0.1-SNAPSHOT.jar /testspark-0.0.1-SNAPSHOT.jar"
    //start job
    dse spark-submit --class TestSpark --files /home/ss/logback-spark-test.xml --conf "spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback-spark-test.xml" --conf "spark.executor.extraJavaOptions=-Dlogback.configurationFile=logback-spark-test.xml" --deploy-mode cluster dsefs://localhost:5598/testspark-0.0.1-SNAPSHOT.jar
    

Monitoring apis

Memory & partition size

Memory

By default, spark starts its executors with 1GB of memory. That’s also what you get when you start a spark shell of a sparkSQL with dse sql.
Over those 1GB, 300MB are allocated to spark.
Over the 700MB remaining (1GB-300MB), 60% (by default) are reserved to spark execution (execution and persisted data).
That left roughly 300MB of memory for user. If you use more than this amount of memory in 1 spark partition (task), you’ll get an OOM exception.
Cloudera “How-to: Tune Your Apache Spark Jobs”

Partition number

Understanding spark partitioning

Make sure you read this blog post on Spark Partitioning

Dataframe

The following global parameter configures the number of partitions that are used when a shuffle is performed over sparkSQL/Dataframe (joins, aggregations…).
In addition, spark nows includes a .repartition() method that can be used if necessary.


    spark.sql.shuffle.partitions
    

This setting can’t be changed for a specific operation. Spark will use (max(spark.sql.shuffle.partitions, actualDataframePartitionNumber)) partitions during its join operations.
If you know that your operation will require a lot of partitions, repartition your DF before or use smaller cassandra split size.

RDD

The same parameter is available for RDDs on operations like join, reduceByKey, and parallelize (when not set explicitly by the user).


    spark.default.parallelism
    

How many partitions?

The number of partition is generally driven by the data size. As a good default, try to aim for a partition size of 128MB (128MB = how many data is handled by a partition in memory).
Most of the jobs use from 200 to 2000 partition.
When tuning the number of partitions, consider the following objectives:

  • Avoid OOM
  • Avoid GC saturation (the webui show you the GC time of each task)
  • Avoid spilling to disk
  • Avoid many small operations

Few partitions will result to no/bad parallelism, too many add overhead.
Here is a formula to get a rough idea of the ideal number of partition.
size.of.stage can be find by saving the data and checking its size in the web-ui.


    //If data is being cached:
    memory.available.per.task = (spark.executor.memory - overhead) x spark.memory.fraction x (1 - spark.memory.storageFraction) / spark.executor.cores
    partition.number = size.of.stage / memory.available.per.task
                     = size.of.stage / ((spark.executor.memory - overhead) x spark.memory.fraction x (1 - spark.memory.storageFraction) / spark.executor.cores)
                     = size.of.stage / ((spark.executor.memory - 300) x 0.6 x (1 - 0.5) / spark.executor.cores)
                     = size.of.stage / ((spark.executor.memory - 300) x 0.3 / spark.executor.cores)
    //If no data is being cached, spark.memory.storageFraction = 0, so in this case:
    partition.number = size.of.stage / ((spark.executor.memory - 300) x 0.6 / spark.executor.cores)
    

Another approximation for the size.of.stage could be the following : shuffle write * Shuffle spill (memory) / Shuffle spill (disk). Shuffle write can be find in the stage detail.

FYI: Shuffle spill (memory) is the size of the deserialized form of the data in memory at the time when we spill it, whereas shuffle spill (disk) is the size of the serialized form of the data on disk after we spill it

Skew data is painful and are likely to create big partitions. Salt technique are generally use to mitigate this issue.
https://dzone.com/articles/why-your-spark-apps-are-slow-or-failing-part-ii-da

Note that Spark handles partitions differently when there is more than 2000 partition. If your partition number is close to this number, try to increase it to more than 2000.

The previous formula is quite theoretical and isn’t always required. The following strategy can be used instead:

  • Start with “decent” default executor memory (4-8g)
  • having OOM or lot of GC time, data is spilling to disk ? =>
    • Increase memory
    • Increase number of partition
    • Reduce C* split size (spark.cassandra.input.split.size_in_mb, see below)
  • Repeat

Spark perf tuning from Holden Karau and Rachel Warren is a good reference.

Cassandra split size

While reading Cassandra table, the connector will try to split the data by spark partition off a given size.
1 spark partition contains N Cassandra partitions. however 1 Cassandra partition will always be loaded in a unique spark partition.
Spark partition size are controlled by the following parameter:


    spark.cassandra.input.split.size_in_mb = 128
    

Note: DSE 5.1 increase this value to 512mb by default for you.

Keep in mind that this setting is just an estimate based on Cassandra table size estimation. To have a better idea of your partition size, check the in/out parameter of each task in the spark webui.
If your spark application is running out of memory, increase spark executor heap and/or lower the original split size/
This value define the partition size when reading data from Cassandra only, it doesn’t apply for joins operation.

Repartition

Repartition is generally used to increase the number of partition of your RDD or Dataframe. It’s an expensive operation since it will require to shuffle all data.
Avoid unless necessary. It’s generally a better practice to lower the original partition size.
It’s generally best to repartition to a number being a multiple of your total number of core.

Coaelesce

Coaelesce reduce the number of partition in a RDD or Dataframe. Unless the shuffle flag is set to true, it’s a simple operation, partitions will just be merged on each executors.

RDD vs DataFrame

Dataframes use Catalyst, a query optimizer with code generations. Spark analyse your operations and builds a tree containing all your steps, generating code on the fly. It also allows spark to be smart and re-order operations if needed (filter will be applied before joins etc.) RDD are a lower level of abstraction and should be used only when Dataframe can’t be used, or in specific case like joins. Switching from one to another has a cost. It’ll change serialization from tungsten to RDD, and also break spark Dataframe lineage, preventing potential optimization.

Serialization

Java Objects consume a lot of memory. Typically, a String will use:

  • 8(mark word) + 4(klass pointer) = 12 bytes for the header
  • 8 bytes for the hashCode
  • 8 bytes for the char[] array reference
  • 4 bytes for the char[] array lenght
  • and finally 2 bytes per char in the string

Dataframe

To solve this issue, Dataframes use Tungsten, a serialization engine designed to store binary data. It allows byte-level comparison, preventing costly object creation. All operations manipulating columns will beneficiate from Tungsten. Transformation/Closure operations (.map(…), .filter(row => …) etc.) will trigger an expensive Ser/Deser step.

Typed Dataset

Since spark 2.0, default builtin codec are available for case classes, making the use of Dataset (typed api) easier.
However, this api will have to serialize/deserialize Java object, loosing Dataframe ability to mutate column in a very efficient way. Custom encoders can be built to provide serialization for non-supported type.


        spark.createDataFrame(purchases, new StructType()
          .add(StructField("id", StringType, true))
          .add(StructField("val1", DoubleType, true))
          .add(StructField("val2", DoubleType, true)))
    

Cassandra Read & Write throughput - data locality

General recommendations

Here are a few advise to make sure you control your Read/Write throughput. Note that the default settings are usually good:

  • A custom connection factory (connection.factory=com.MyClass) can be used to increase the max number of connection and the query pool size
  • Parameters like output.batch.grouping.buffer.size, output.batch.size.bytes, output.concurrent.writes can be tuned to increase the throughput, but the default values are usually good.
  • The spark cassandra connector will try to group data per partition to increase write efficiency. A saveToCassandra operation will likely be faster on an ordered RDD. Do not order your data just for that reason.
  • Unless it’s a free operation, it’s generally best not to focus too much on data locality.
  • Spark will very likely hammer your cluster while reading/joining from a Cassandra table. In some cases, it can be worth limiting the maximum throughput of the connector for a specific operation. use input.reads_per_sec (with continuous paging only) or output.throughput_mb_per_sec.

Full table scan - RDD, mapper, Dataset or Dataframe?

DSE continuous paging

Continuous paging allows DSE server to continue to fetch data while spark is processing a page. See DSE blog post for more information It generally boosts cassandra read time and can be enabled with the following option:

spark.dse.continuous_paging_enabled=true

Full table scan with RDD and mapper:

Cassandra connector provide a mapper allowing to automatically mount row to scala case classes. It’s handy but comes at a cost and will add pressure on the gc. It might be preferable not to use the mapper on the hot path.


    spark.sparkContext.cassandraTable[Purchase](DataLoader.Model.ks, table).map(..).count()
    

Full table scan with RDD:


    spark.sparkContext.cassandraTable(DataLoader.Model.ks, table).map(..).count()
    

Full table scan with Dataset:


    spark.read.cassandraFormat(table, DataLoader.Model.ks).load().as[Purchase].map(..).count()
    

Full table scan with Dataframe:


    spark.read.cassandraFormat(table, DataLoader.Model.ks).load().map(..).count()
    

Continuous paging clearly speeds up full scan. Make sure you read this post before enabling it.

RDD and DF are similar especially since we use a .map() operation (we just want to compare C* read time, not DF operation)

The mapper comes at a cost and should be used wisely.

Pushdown filters

Spark can push down filters to the source. If you perform a filter on a partition key, clustering column or token range, it can be pushed-down to Cassandra. explain() can give you an idea of what’s going on, but if you want to be sure a predicat is effectively pushed-down to cassandra, change the following log level:


        Logger.getLogger("org.apache.spark.sql.cassandra").setLevel(Level.DEBUG)
    

Search request can also be pushed down to DSE. Search predicate allow instant count(*) operation, and are fast for queries retrieving a very small amount of data (<2-3%). It’s usually faster to scan all the table after this limit.
Enable search push-down the following parameter:


        //Globally: spark.sql.dse.solr.enable_optimization=true
        //Or locally per read
        val solrEnabledDataSet = spark.read
            .format("org.apache.spark.sql.cassandra")
            .options(Map(
            "keyspace" -> "ks",
            "table" -> "tab",
            "spark.sql.dse.solr.enable_optimization" -> "true")
            .load()
    

Enable debug to make sure the filter is applied:


        Logger.getLogger("org.apache.spark.sql.SolrPredicateRules").setLevel(Level.DEBUG)
    

Datastax search analytics integration

Shuffle

Shuffle are expensive. Dataframe will try to optimize your lineage to ensure operations like reduces or filters are executed before the shuffle to reduce the amount of data.
When working with RDDs, it’s up to you to make those kind of optimization. This also includes reduce operations (if your final operation is to perform a reduce, always reduce before grouping).
The number of object created during those operation shouldn’t be too high. Always prefer mutable structures or Array[] to object instantiation in the hot path.
It’s easy to detect shuffle in spark webUI (each new stage in the DAG is usually caused by a shuffle).
The DF and RDD .explain() or .toDebugString can also be used:


        #RDD Join with shuffle (search for CoGrouped operations)
        (5) MapPartitionsRDD[12] at join at BenchmarkJoins.scala:126 []
         |  MapPartitionsRDD[11] at join at BenchmarkJoins.scala:126 []
         |  CoGroupedRDD[10] at join at BenchmarkJoins.scala:126 []
         +-(5) MapPartitionsRDD[7] at keyBy at BenchmarkJoins.scala:124 []
         |  |  CassandraTableScanRDD[6] at RDD at CassandraRDD.scala:19 []
         +-(5) MapPartitionsRDD[9] at keyBy at BenchmarkJoins.scala:125 []
            |  CassandraTableScanRDD[8] at RDD at CassandraRDD.scala:19 []

        #RDD Join without shuffle (search for CoGrouped operations)
        (5) MapPartitionsRDD[29] at join at BenchmarkJoins.scala:107 []
         |  MapPartitionsRDD[28] at join at BenchmarkJoins.scala:107 []
         |  CoGroupedRDD[27] at join at BenchmarkJoins.scala:107 []
         |  CassandraTableScanRDD[22] at RDD at CassandraRDD.scala:19 []
         |  CassandraTableScanRDD[26] at RDD at CassandraRDD.scala:19 []

        #Dataframe join
        == Physical Plan ==
        *BroadcastHashJoin [_1#116.user_id], [_2#117.user_id], Inner, BuildRight
        :- *Project [named_struct(user_id, user_id#103L, firstname, firstname#104, lastname, lastname#105, shop_id, shop_id#106L, title, title#107, zipcode, zipcode#108) AS _1#116]
        :  +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@60c8909a [zipcode#108,shop_id#106L,title#107,user_id#103L,firstname#104,lastname#105] ReadSchema: struct<_1:struct<user_id:bigint,firstname:string,lastname:string,shop_id:bigint,title:string,zipc...
        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<user_id:bigint,purchase_id:bigint,item:string,price:int>, false].user_id))
           +- *Project [named_struct(user_id, user_id#94L, purchase_id, purchase_id#95L, item, item#96, price, price#97) AS _2#117]
              +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@61503d00 [user_id#94L,purchase_id#95L,item#96,price#97] ReadSchema: struct<_2:struct<user_id:bigint,purchase_id:bigint,item:string,price:int>>

    

Avoid creating objects in the “hot path”, it will overload the GC. Prefer mutable object / Array.

Joins

Joining small dataset

Dataframe

Spark usually pick a SortMergeJoin to perform joins. However, if the data being joined is small enough, it’ll use a broadcast join. By doing so it’ll first fetch the data and then send it once to each executor, saving unnecessary calls to Cassandra. The threshold is delimited by

spark.sql.autoBroadcastJoinThreshold = 10M

      val shop = spark.read.cassandraFormat(DataLoader.Model.shopTable, DataLoader.Model.ks).load()
      val users = spark.read.cassandraFormat(userTable, DataLoader.Model.ks).load()
      //Force a broadcast join, will override conf threshold. Current limit to 2GB (block size, SPARK-6235)
      import org.apache.spark.sql.functions.broadcast
      val join = users.join(broadcast(shop), "shop_id")
    

RDD

The same logic can be applied to RDD. The small RDD must first be collected and then broadcasted:


      //Start by collecting the small RDD and broadcast it:
      val shops = spark.sparkContext.cassandraTable[Shop](DataLoader.Model.ks, shopTable).keyBy(s => s.shop_id).collect().toMap
      val broadcastShops = spark.sparkContext.broadcast(shops)
      val users = spark.sparkContext.cassandraTable[User](DataLoader.Model.ks, userTable)
      val join: RDD[(User, Shop)] = users.flatMap(user => {
        broadcastShops.value.get(user.shop_id).map {shop => (user, shop)}
      })
    

Normal joins

RDD joins

Joins can be costly. If the 2 RDD being joined aren’t sharing the same partitioner, a shuffle will be require to distribute the key to the same partitions. To prevent that, a common partitioner can be used. Partitioner aren’t preserved by any transformation operation.

To join 1 RDD with a non-cassandra RDD, the connector allows you to use .applyPartitionerFrom(otherRDD). All Partition Keys columns must also be present in the keys of the target RDD.

Join with a Cassandra table

The RDD joinWithCassandra operation will trigger 1 query for each RDD row and won’t trigger shuffle. DataFrame join will first perform 2 full table scan, and then perform a join on those 2 Dataframe, which is likely to be less efficient. Spark SQL performs the same way. This has a very high cost especially when joins are executed against a small percentage of your table. If you want to join 10 rows against a table having 1M entries, spark will start by reading 1M rows and then shuffle to only keep 10 rows. Depending of the join cardinality, it’s generally faster to perform a Full Table Scan of the biggest table and then perform a join against the smaller table.
The following snippets are different joins technique between Users and Purchases.
In these examples, 1 user has 10 to 20 purchases, and we want to join users and purchases (RDD[User, Purchase] or RDD[User, Seq[Purchase]]).

joinWithCassandraTable

    spark.sparkContext.cassandraTable(DataLoader.Model.ks, userTable).joinWithCassandraTable(DataLoader.Model.ks, purchaseTable)
    
joinWithCassandraTable with mapper

    //mapper comes at a cost
    spark.sparkContext.cassandraTable[User](DataLoader.Model.ks, userTable).joinWithCassandraTable[Purchase](DataLoader.Model.ks, purchaseTable)
    
RDD join keyBy

    //Inefficient, will trigger a shuffle, don't do that.
    val purchases = spark.sparkContext.cassandraTable(DataLoader.Model.ks, purchaseTable).keyBy(p => p.user_id)
    val users = spark.sparkContext.cassandraTable(DataLoader.Model.ks, userTable).keyBy(u => u.user_id)
    users.join(purchases)
    
RDD join keyBy same partitioner

    //2 Full table scans and then 1 join without shuffle. Requires an object (Tuple1), Usually less efficient.
      val purchases = spark.sparkContext.cassandraTable(DataLoader.Model.ks, purchaseTable).keyBy[Tuple1[Long]]("user_id")
      val users = spark.sparkContext.cassandraTable(DataLoader.Model.ks, userTable).keyBy[Tuple1[Long]]("user_id").applyPartitionerFrom(purchases)
      users.join(purchases)
    
RDD join span by key

    //Inefficient, will trigger a shuffle, don't do that.
      val purchases = spark.sparkContext.cassandraTable(DataLoader.Model.ks, purchaseTable).spanBy(p => p.user_id)
      val users = spark.sparkContext.cassandraTable(DataLoader.Model.ks, userTable).keyBy(u => u.user_id)
      val joinrdd = purchases.join(users)
    
joinWithCassandraTable span by key

      spark.sparkContext.cassandraTable(DataLoader.Model.ks, purchaseTable).spanBy(r => r.getLong("user_id")).joinWithCassandraTable(DataLoader.Model.ks, userTable)
    
Dataframe joins

    val purchases = spark.read.cassandraFormat(purchaseTable, DataLoader.Model.ks).load()
    val users = spark.read.cassandraFormat(userTable, DataLoader.Model.ks).load()
    val joinDF = users.join(purchases, purchases("user_id") === users("user_id"))
    
Dataset joins

    val purchases = spark.read.cassandraFormat(purchaseTable, DataLoader.Model.ks).load().as[Purchase]
    val users = spark.read.cassandraFormat(userTable, DataLoader.Model.ks).load().as[User]
    val joinDS = users.join(purchases, purchases("user_id") === users("user_id"))
    

Dataframe vs SparkSQL

SparkSQL is easy to read and will result in the same operation as standard Dataframe operation. UDF can also be registered to be accessible from a SparkSQL query.


    df.createOrReplaceTempView("myTable")
    spark.sqlContext.udf.register("strLen", (s: String) => s.length())

    

Cache/persist vs Checkpoint

Use cache() to avoid re-computing your DAG twice when it forks and checkpoint() in long and complex jobs to avoid having to recompute all the lineage if a stage fails.

Consistency

Keep in mind that the default WRITE consistency level is LOCAL_QUORUM, but the default READ consistency consistency is LOCAL_ONE. If you want to use a sparkSQL query to ensure a LOCAL_QUORUM update is successful, switch input.consistency.level to LOCAL_QUORUM while starting a dse sql sesssion.

Thrift server

DSE includes a thrift server, creating 1 spark context ready to execute SparkSQL queries. It’s easy to plug any SQL sources over your Cassandra tables using the ODBC or JDBC driver. If your queries fail due to Out Of Memory errors, change the following settings:

  • Select the minimum field required
  • Limit max data returned to the driver: spark.driver.maxResultSize=200m.
    The spark job/query will fail if more than 200m are returned to the driver (will happen for queries without limit).
  • Increase driver (thrift server) memory: --conf spark.driver.memory=2g.
    Should be >> spark.driver.maxResultSize. (1GB or 2GB is usually enough, could be more if a fair scheduler is used with multiple spark requests running in parallel)
  • Increase executor memory: --conf spark.executor.memory=4g. Should be at least 4GB. If multiple cores are used per executor, each task will roughly have (spark.executor.memory / numberOfCorePerExecutor / 2) RAM available for each spark partition.
  • Make sure your number of cores is in phase with the memory assigned to each executor: --conf spark.cores.max=6.
  • Reduce spark partition size (of the first request): --conf spark.cassandra.input.split.size_in_mb=128
  • Ultimatly, incrementalCollect could be enabled: --conf spark.sql.thriftServer.incrementalCollect=true. Could have performances penalty, avoid if possible

    dse spark-sql-thriftserver start --conf spark.driver.memory=2g --conf spark.cores.max=6 --conf spark.executor.memory=5g --conf spark.cassandra.input.split.size_in_mb=128 --conf spark.driver.maxResultSize=200m --conf spark.sql.thriftServer.incrementalCollect=false
    #test connection with beeline:
    dse beeline
    !connect jdbc:hive2://localhost:10001
    select count(*) from ks.mytable;
    

Extra resources

DSE Analytics
https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/featuresDSE.html

https://www.gitbook.com/book/umbertogriffo/apache-spark-best-practices-and-tuning/details

Performance Tuning Spark:

Concurrency In Spark

Datastax Spark Dependency Management

Spark-Sql-Thriftserver