Pyspark Coding Quick Start

In most of the cloud platforms, writing Pyspark code is a must to process the data faster compared with HiveQL. Here is the cheat sheet I used for myself when writing those codes.

Import most of the sql functions and types –

import pyspark
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import col, udf, explode, array, lit, concat, desc, substring_index
from pyspark.sql.types import IntegerType, FloatType, StringType, ArratType, StructType, StructField, StringType, DoubleType

 

Pull data from Hive – 
using  python variables in string can help us better organize the code

input_table=db.table_a
df=spark.sql('''
select * from {input_table}
'''.format(**vars()))

Basic SQL Operations –

#condition filtering -
df_new=df.filter(col('col_1')==lit('abc'))
df_new=df.filter(col('col_2')>=lit(5))
df_new=df.filter(~(col('col_2')==lit(5)))
#in condition
condition_1=['a','b','c','d']
df_new=df.filter(col('col_3').isin(condition_1)

#generate new col
df_new=df.withColumn('col_new', concat(col('col_1'), lit('|'), col('col_2')))
df_new=df.withColumn('col_new', col('col_1')-lit(5))
df_new=df.withColumn('col_new', col('col_1')-col('col_2'))

#case when statement
df_new=df.withColumn('col_new', F.when(col('col_1').isNull(), lit('NONE').otherwise(col('col_1')))
df_new=df.withColumn('col_new', F.when((col('col_1')>=1)&(col('col_2')>2), lit(5).otherwise(col('col_1')))
df_new=df.withColumn('col_new', F.when(col('col_1')>=1, lit(1)).when(col('col_1')>=2, lit(2)).otherwise(col('col_1')))

#select
df_new=df.select('col_1','col_2','col_3')#rename

#alias
df_new=df.select(col('col_1').alias('col_2'))

#drop
df_new=df.drop('col_1','col_2')

df=df.withColumnRenamed('new_name', 'old_name')
#to rename the whole dataframe
new_name=['a','b','c','d']
df=df.toDF(*new_name)

#group by
df_new=df.groupBy('col_1').count()
df_new=df.groupBy('col_1').countDistinct('col_2').alias('col_2_cnt')
#this will rename col_2 as sum(col_2), etc. will need to rename them back
df_new.df.groupBy('col_1').agg({'col_2':'sum', 'col_3:'count', 'col_4':'max'})

#join
#when two data frame have same column names for join. We don't want to duplicate
df_new=df.join(tmp, on=['a','b','c'], how='left_outer')
#when two data frame have diff columns names
df_new=df.join(tmp, (df.name1==tmp.nm1)&(df.name2==tmp.nm2), how='inner')
#pyspark has 'left_anti' or 'right_anti' to get results from left(right) where joined column values are not in right(left)
#cross join
df_new=df.crossJoin(tmp)

#window function
w1=Window.partitionBy('col_1').orderBy('col_2')
df_new=df.withColumn('col_new', F.sum('col_3').over(w1))
w2=Window.partitionBy('col_1').orderBy('col_2').rangeBetween(Window.unboundedProceeding, 0))
df_new=df.withColumn('col_new', F.sum('col_3').over(w2))
w2=Window.partitionBy('col_1').orderBy('col_2').rangeBetween(Window.unboundedProceeding, Window.unbounedFollowing))
df_new=df.withColumn('col_new', F.row_number('col_3').over(w3))

#expode
#create table with col_1, col_2, and expode col_3 (given that col_3 is a list)
df_new=df.select('col_1', 'col_2', explode(col('col_3')).alias('col_3_exp')

#collect /collect_list
#collect return a set with unique records w/o order
#collect_list return a list with orders
df_new=df.groupBy('col_1','col_2').agg(F.collect('col_3_exp').alias('col_3'))
df_new=df.groupBy('col_1','col_2').agg(F.collect_list('col_3_exp').alias('col_3'))
#collect list for two columns (3&4)
df_new=df.groupBy('col_1','col_2').agg(F.collect_list(F.struct('col_3','col_4')).alias('col_5'))

#create schema
#True means nullable
schema=StructType([structField('col_1',StringType, True), structField('col_2', ArrayType(IntegerType()), True)])
tb=spark.createDataFrame(schema)

#write into Hive
df.select('col_1','col_2').write.insertInto('fnl_table')
df.select('col_1','col_2').write.format('parquet').saveAsTable('fnl_table', mode='OVERWRITE')
This entry was posted in Big Data. Bookmark the permalink.

Leave a comment