In most of the cloud platforms, writing Pyspark code is a must to process the data faster compared with HiveQL. Here is the cheat sheet I used for myself when writing those codes.
Import most of the sql functions and types –
import pyspark from pyspark.sql import functions as F from pyspark.sql import Window from pyspark.sql.functions import col, udf, explode, array, lit, concat, desc, substring_index from pyspark.sql.types import IntegerType, FloatType, StringType, ArratType, StructType, StructField, StringType, DoubleType
Pull data from Hive –
using python variables in string can help us better organize the code
input_table=db.table_a df=spark.sql(''' select * from {input_table} '''.format(**vars()))
Basic SQL Operations –
#condition filtering - df_new=df.filter(col('col_1')==lit('abc')) df_new=df.filter(col('col_2')>=lit(5)) df_new=df.filter(~(col('col_2')==lit(5))) #in condition condition_1=['a','b','c','d'] df_new=df.filter(col('col_3').isin(condition_1) #generate new col df_new=df.withColumn('col_new', concat(col('col_1'), lit('|'), col('col_2'))) df_new=df.withColumn('col_new', col('col_1')-lit(5)) df_new=df.withColumn('col_new', col('col_1')-col('col_2')) #case when statement df_new=df.withColumn('col_new', F.when(col('col_1').isNull(), lit('NONE').otherwise(col('col_1'))) df_new=df.withColumn('col_new', F.when((col('col_1')>=1)&(col('col_2')>2), lit(5).otherwise(col('col_1'))) df_new=df.withColumn('col_new', F.when(col('col_1')>=1, lit(1)).when(col('col_1')>=2, lit(2)).otherwise(col('col_1'))) #select df_new=df.select('col_1','col_2','col_3')#rename #alias df_new=df.select(col('col_1').alias('col_2')) #drop df_new=df.drop('col_1','col_2') df=df.withColumnRenamed('new_name', 'old_name') #to rename the whole dataframe new_name=['a','b','c','d'] df=df.toDF(*new_name) #group by df_new=df.groupBy('col_1').count() df_new=df.groupBy('col_1').countDistinct('col_2').alias('col_2_cnt') #this will rename col_2 as sum(col_2), etc. will need to rename them back df_new.df.groupBy('col_1').agg({'col_2':'sum', 'col_3:'count', 'col_4':'max'}) #join #when two data frame have same column names for join. We don't want to duplicate df_new=df.join(tmp, on=['a','b','c'], how='left_outer') #when two data frame have diff columns names df_new=df.join(tmp, (df.name1==tmp.nm1)&(df.name2==tmp.nm2), how='inner') #pyspark has 'left_anti' or 'right_anti' to get results from left(right) where joined column values are not in right(left) #cross join df_new=df.crossJoin(tmp) #window function w1=Window.partitionBy('col_1').orderBy('col_2') df_new=df.withColumn('col_new', F.sum('col_3').over(w1)) w2=Window.partitionBy('col_1').orderBy('col_2').rangeBetween(Window.unboundedProceeding, 0)) df_new=df.withColumn('col_new', F.sum('col_3').over(w2)) w2=Window.partitionBy('col_1').orderBy('col_2').rangeBetween(Window.unboundedProceeding, Window.unbounedFollowing)) df_new=df.withColumn('col_new', F.row_number('col_3').over(w3)) #expode #create table with col_1, col_2, and expode col_3 (given that col_3 is a list) df_new=df.select('col_1', 'col_2', explode(col('col_3')).alias('col_3_exp') #collect /collect_list #collect return a set with unique records w/o order #collect_list return a list with orders df_new=df.groupBy('col_1','col_2').agg(F.collect('col_3_exp').alias('col_3')) df_new=df.groupBy('col_1','col_2').agg(F.collect_list('col_3_exp').alias('col_3')) #collect list for two columns (3&4) df_new=df.groupBy('col_1','col_2').agg(F.collect_list(F.struct('col_3','col_4')).alias('col_5')) #create schema #True means nullable schema=StructType([structField('col_1',StringType, True), structField('col_2', ArrayType(IntegerType()), True)]) tb=spark.createDataFrame(schema) #write into Hive df.select('col_1','col_2').write.insertInto('fnl_table') df.select('col_1','col_2').write.format('parquet').saveAsTable('fnl_table', mode='OVERWRITE')