Spark Cheat Sheet

Posted by xcTorres on March 7, 2021

Since I was a postgraduate in college, I have been using Spark cluster for 4 years. Especially when I work, I feel the super power of Spark. It is very easy for us to handle the big data. Right now it is a necessary tool in our daily work. So here I want to summarize some basic Spark SQL that I have used before.

Spark Init

Jupyter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import os
os.environ["SPARK_HOME"] = '/usr/share/spark-2.4'
os.environ["PYSPARK_PYTHON"] = './python-3.6/pyarrow/bin/python'
os.environ["PYSPARK_DRIVER_PYTHON"] = '/ldap_home/chong.xie/.conda/envs/foody/bin/python'
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
name = 'demo'
port = 30011
SPARK_SERVICE = None
SPARK_CONF = SparkConf().set('spark.locality.wait', '1000ms') \
    .set('spark.executor.instances', '64') \
    .set('spark.executor.cores', '3') \
    .set('spark.executor.memory', '10g') \
    .set('spark.ui.port', port) \
    .set('spark.yarn.queue', 'ds-regular')
spark = SparkSession.builder \
        .config(conf=SPARK_CONF) \
        .enableHiveSupport() \
        .getOrCreate()

Command

We could also use pyspark-submit to run the Spark job.

1
2
3
spark-submit --conf spark.pyspark.python=/usr/share/miniconda2/envs/py36/bin/python \
             --conf spark.pyspark.driver.python=/ldap_home/chong.xie/.conda/envs/foody/bin/python \
             ./feature/merchant_features.py  

Configuration

Here are some configurations we need to notice.

  • spark.executor.instances
    configuration property controls the number of executors requested

  • spark.executor.cores
    configuration property controls the number of concurrent tasks an executor can run

  • spark.executor.memory
    configuration property controls the heap size

  • spark.sql.session.timeZone
    The session time zone is set with the configuration ‘spark.sql.session.timeZone’ and will default to the JVM system local time zone if not set.

  • spark.sql.shuffle.partitions
    Configures the number of partitions that are used when shuffling data for joins or aggregations.

Read & Write

From json

1
2
3
4
5
6
7
8
9
# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files
path = "examples/src/main/resources/people.json"
peopleDF = spark.read.json(path
# The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

From parquet

1
2
3
4
5
# Read
df = spark.read.parquet('path')
# Write
df.write.parquet(path, mode='overwrite') # Overwrite
df.write.parquet(path, mode='append') # Append

From kafka streaming

  • Stream
1
2
3
4
5
6
7
8
9
10
11
12
13
topic = 'topic'
servers="ip:port"
df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", servers) \
        .option("subscribe", topic) \
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", False) \
        .load() \
        .selectExpr("CAST(value AS STRING)")
query = df.writeStream\
          .format('console')\
          .start()
  • Batch query
1
2
3
4
5
6
7
8
9
10
11
topic = 'topic'
servers="ip:port"
df = spark.read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", servers) \
        .option("subscribe", topic) \
        .option("startingOffsets", "earlist") \
        .option("endingOffsets", "earliest") \
        .option("failOnDataLoss", False) \
        .load() \
        .selectExpr("CAST(value AS STRING)")

Spark DataFrame

Select & from

1
2
3
4
5
6
7
 sql =  '''
             select * 
             from {} 
             where create_time >= unix_timestamp('{}')
             and create_time < unix_timestamp('{}')
        '''.format(table, time_from, time_to)
 df = spark.sql(sql)

udf

1
2
3
4
5
6
7
8
9
@F.udf(returnType=T.BooleanType())
def filter_no_delivered_traj(segment):
    no_delivered_flag = True
    for pt in segment:
        if not pt.order_info:
            no_delivered_flag = False
            break
    return no_delivered_flag
filter_df = df.withColumn('is_delivery', filter_no_delivered_traj('segment'))

unix_timestamp

Convert time string with given pattern (‘yyyy-MM-dd HH:mm:ss’, by default) to Unix time stamp (in seconds), using the default timezone and the default locale, return null if fail.

1
2
import pyspark.sql.functions as F 
df.select(F.unix_timestamp('dt', 'yyyy-MM-dd').alias('unix_time'))

from_unixtime

Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the given format.

1
2
3
import pyspark.sql.functions as F
import pyspark.sql.types as T
df = df.withColumn('date_time', F.from_unixtime(F.col('timestamp')))

filter value

1
2
3
4
5
6
7
8
9
10
11
    # Filter null value
    df = df.where(col("dt_mvmt").isNull())  
    df = df.na.drop(subset=["dt_mvmt"])
    df = df.filter(col("dt_mvmt").isNull())

    # Condition
    df = df.filter(col("col") > 2)
    df = df.filter('col > 2')

    #  Multi conditions
    df = df.filter('(col>2) and (col<34)')

summary

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
df.select('origin_speed').summary().show()
# +-------+------------------+
# |summary|      origin_speed|
# +-------+------------------+
# |  count|           7066188|
# |   mean|4.3542590483886805|
# | stddev| 4.069686621970696|
# |    min|              -2.0|
# |    25%|        0.55032253|
# |    50%|         3.6093123|
# |    75%|          7.272925|
# |    max|             210.0|
# +-------+------------------+
# min, max, avg
df.select(F.min('create_time')).show()
df.select(F.max('create_time'))).show()
df.select(F.avg('create_time')).show()

quantile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Quantile
df.approxQuantile("origin_speed", [0.80, 0.90, 0.95, 0.99, 0.995, 0.999], 0)
# [8.28824234008789, 10.089457511901855, 11.575858116149902, 14.782051086425781, 16.42580223083496]
# Spark SQL
sql = '''
        select 
            experiment_group,
            count(*),
            percentile_approx(CreatedOnToConfirm, 0.25) as CreatedOnToConfirmQuantile25,
            percentile_approx(CreatedOnToConfirm, 0.5) as CreatedOnToConfirmQuantile50,
            percentile_approx(CreatedOnToConfirm, 0.75) as CreatedOnToConfirmQuantile75,
            
        from order_complete_info
        group by experiment_group

      '''
a = spark.sql(sql)
sql = '''
        select percentile_approx(create_time, array(0.25,0.5,0.75)) as create_quantile
        from tmp
      '''
a = spark.sql(sql)

nan value count

1
2
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c), c)).alias(c) for c in ['create_time']]).show()

repartition

Sometimes you need the records in one partition in some constraints. For example, all records of some certain user_id must be in one single partion so that you are allowed to use groupBy function in mapPartition function.

1
2
3
4
stages_time_df = raw_order_status_log_df\
.repartition('order_id') \
.rdd \
.mapPartitions(calculate_stages_time).toDF(sampleRatio=0.1)

Sometimes the numbers of current partitions is very small, it will be less efficient because the extra executors will be idle. So here we could repartition them into bigger size, resulting in all of executor resources you apply for will be fully utilized.

mapPartition

It will run the function in each partition, so you are allowed to set global variable for each partition. For example, if you want to send http requests and it is more efficient to maintain the http connection pool for each partition.

1
2
3
4
5
SCHEMA_RESPONSE = sdf.schema.add(
    T.StructField("mm_response", T.StringType(), True)
)
# Here map_matching maintain a connection pool for all of the records within partitions.
mm_df = sdf.rdd.mapPartitions(map_matching).toDF(schema=SCHEMA_RESPONSE)

groupBy

Groups the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
values = [(1, 2.0), (1, 4.0), (2, 0.0), (2, 100.0), (3, 5.0)]
columns = ['id', 'count']
df = spark.createDataFrame(values, columns)
df.show()
+---+-----+
| id|count|
+---+-----+
|  1|  2.0|
|  1|  4.0|
|  2|  0.0|
|  2|100.0|
|  3|  5.0|
+---+-----+
df.groupBy('id').agg(F.mean('count').alias('count_mean')).show()
+---+----------+
| id|count_mean|
+---+----------+
|  1|       3.0|
|  3|       5.0|
|  2|      50.0|
+---+----------+
# groupBy and collect all of the records into one array
df.groupBy('id').agg(F.collect_list('count').alias('count_list')).show()
+---+------------+
| id|  count_list|
+---+------------+
|  1|  [4.0, 2.0]|
|  3|       [5.0]|
|  2|[0.0, 100.0]| 

explode to multi rows

If the type of one column is an array and you want to explode all of elementa of array into multi Rows, explode fucntion could meet this demand.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
values = [(1, [2,3,4])]
columns = ['id', 'array']
df = spark.createDataFrame(values, columns)
df.show()
+---+---------+
| id|    array|
+---+---------+
|  1|[2, 3, 4]|
+---+---------+
df.select('id', F.explode('array').alias('element')).show()
+---+-------+
| id|element|
+---+-------+
|  1|      2|
|  1|      3|
|  1|      4|
+---+-------+

explode to multi columns

Sometimes one column type is StructType, but we want to explode this column into multi columns and make every StructField become one column. We could use .* operation.

1
    df = df.select('order.*')

Reference

https://spark.apache.org/docs/2.3.0/tuning.html
https://medium.com/@ch.nabarun/apache-spark-optimization-techniques-54864d4fdc0c
https://stackoverflow.com/questions/47669895/how-to-add-multiple-columns-using-udf/51908455 https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html



-->