File System Operation in Pyspark

Sometimes (unfortunately) we need to do the file operation directly in pyspark. Here is the way to do that:

sc=spark.spark.Context
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.fs.conf.Configuration

fs=FileSystem.get(sc._jsc.hadoopConfiguration())
outputPath="./"
status=fs.listStatus(Path(outputPath))
path_result=[file.getPath().getName() for file in status if file.getPath().getName()!='.sparkStaging']

for i in path_result:
    fs.rename(Path(i), Path(i+'_backup'))
    fs.delete(Path(i))

 

Posted in Big Data | Leave a comment

Pyspark vector to list

In Pyspark, when using ml functions, the inputs/outputs are normally vectors, but some times we want to convert them to/from lists.

list to vector

from pyspark.ml.linalg import SparseVector, DenseVector
list2vec=udf(lambda l: Vectors.dense(l), VectorUDT())
df=df.withColumn('col_1_vec', list2vec(col('col_1'))

dense/sparse vector to list (Array)

def vec2list(v)
  return list([float(x) for x in  DenseVector(v)])

vec2list_wrap=udf(vec2list, ArrayType(FloatType()))

df=df.withColumn('col_1_vec', vec2list_wrap(col('col_1'))
Posted in Big Data | Leave a comment

Pyspark UDF

UDF is particularly useful when writing Pyspark codes. We can define the function we want then apply back to dataframes.

Import everything

import pyspark from pyspark.sql
import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import col, udf, explode, array, lit, concat, desc, substring_index
from pyspark.sql.types import IntegerType, FloatType, StringType, ArratType, StructType, StructField, StringType, DoubleType

Create Function

#this function is just return length of a string
def slen(s):
  return(len(s))

Make it a UDF

slen=udf(slen, IntegerType())
#we can use lambda function to shorten this:
slen=udf(lambda s:len(s), IntegerType())

Call this UDF

df=df.withColumn('col_1_len', slen(col('col_1')))

Key notes:
1) we need to carefully define the return result types. even IntergerType and Float Type are different. So if one return integer from the function, but we define udf as FloatType, we will get all null.
2) If we return a list with integer, we should define ArrayType(IntegerType)
3) If return is null for some corner cases, we will observe failure when run the code (when collect() or show()). In this case we need to first determine if an input is null or not using “is None” in Python function.

Write UDF with multiple columns as inputs.
Sometimes we want to input multiple columns. then do some calculation.

def slen(s1, s2):
  return(len(s1)+len(s1))
slen=udf(slen, IntegerType())
df=df.withColumn('col_1_len', slen(col('col_1'), col('col_2')))

Write UDF with columns and variables as inputs
Sometimes, we want to pass a variable as an input too. In this case, we need to use Python Closures. Here is an example:

# add t to the length of a string
def slen(s, t):
  return(len(s)+t)

def slen_wrap(t):
  return udf(lambda s: slen(s,t), IntegerType())

#call udf
tt=5
df=df.withColumn('col_1_len', slen_wrap(tt)(col('col_1'))

 

Posted in Big Data | Leave a comment

Pyspark Coding Quick Start

In most of the cloud platforms, writing Pyspark code is a must to process the data faster compared with HiveQL. Here is the cheat sheet I used for myself when writing those codes.

Import most of the sql functions and types –

import pyspark
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import col, udf, explode, array, lit, concat, desc, substring_index
from pyspark.sql.types import IntegerType, FloatType, StringType, ArratType, StructType, StructField, StringType, DoubleType

 

Pull data from Hive – 
using  python variables in string can help us better organize the code

input_table=db.table_a
df=spark.sql('''
select * from {input_table}
'''.format(**vars()))

Basic SQL Operations –

#condition filtering -
df_new=df.filter(col('col_1')==lit('abc'))
df_new=df.filter(col('col_2')>=lit(5))
df_new=df.filter(~(col('col_2')==lit(5)))
#in condition
condition_1=['a','b','c','d']
df_new=df.filter(col('col_3').isin(condition_1)

#generate new col
df_new=df.withColumn('col_new', concat(col('col_1'), lit('|'), col('col_2')))
df_new=df.withColumn('col_new', col('col_1')-lit(5))
df_new=df.withColumn('col_new', col('col_1')-col('col_2'))

#case when statement
df_new=df.withColumn('col_new', F.when(col('col_1').isNull(), lit('NONE').otherwise(col('col_1')))
df_new=df.withColumn('col_new', F.when((col('col_1')>=1)&(col('col_2')>2), lit(5).otherwise(col('col_1')))
df_new=df.withColumn('col_new', F.when(col('col_1')>=1, lit(1)).when(col('col_1')>=2, lit(2)).otherwise(col('col_1')))

#select
df_new=df.select('col_1','col_2','col_3')#rename

#alias
df_new=df.select(col('col_1').alias('col_2'))

#drop
df_new=df.drop('col_1','col_2')

df=df.withColumnRenamed('new_name', 'old_name')
#to rename the whole dataframe
new_name=['a','b','c','d']
df=df.toDF(*new_name)

#group by
df_new=df.groupBy('col_1').count()
df_new=df.groupBy('col_1').countDistinct('col_2').alias('col_2_cnt')
#this will rename col_2 as sum(col_2), etc. will need to rename them back
df_new.df.groupBy('col_1').agg({'col_2':'sum', 'col_3:'count', 'col_4':'max'})

#join
#when two data frame have same column names for join. We don't want to duplicate
df_new=df.join(tmp, on=['a','b','c'], how='left_outer')
#when two data frame have diff columns names
df_new=df.join(tmp, (df.name1==tmp.nm1)&(df.name2==tmp.nm2), how='inner')
#pyspark has 'left_anti' or 'right_anti' to get results from left(right) where joined column values are not in right(left)
#cross join
df_new=df.crossJoin(tmp)

#window function
w1=Window.partitionBy('col_1').orderBy('col_2')
df_new=df.withColumn('col_new', F.sum('col_3').over(w1))
w2=Window.partitionBy('col_1').orderBy('col_2').rangeBetween(Window.unboundedProceeding, 0))
df_new=df.withColumn('col_new', F.sum('col_3').over(w2))
w2=Window.partitionBy('col_1').orderBy('col_2').rangeBetween(Window.unboundedProceeding, Window.unbounedFollowing))
df_new=df.withColumn('col_new', F.row_number('col_3').over(w3))

#expode
#create table with col_1, col_2, and expode col_3 (given that col_3 is a list)
df_new=df.select('col_1', 'col_2', explode(col('col_3')).alias('col_3_exp')

#collect /collect_list
#collect return a set with unique records w/o order
#collect_list return a list with orders
df_new=df.groupBy('col_1','col_2').agg(F.collect('col_3_exp').alias('col_3'))
df_new=df.groupBy('col_1','col_2').agg(F.collect_list('col_3_exp').alias('col_3'))
#collect list for two columns (3&4)
df_new=df.groupBy('col_1','col_2').agg(F.collect_list(F.struct('col_3','col_4')).alias('col_5'))

#create schema
#True means nullable
schema=StructType([structField('col_1',StringType, True), structField('col_2', ArrayType(IntegerType()), True)])
tb=spark.createDataFrame(schema)

#write into Hive
df.select('col_1','col_2').write.insertInto('fnl_table')
df.select('col_1','col_2').write.format('parquet').saveAsTable('fnl_table', mode='OVERWRITE')
Posted in Big Data | Leave a comment

Hadoop-Streaming (Using Python) Takeaways

There are couple resources online (http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/)  to introduce how to write Python Map-reduce codes using hadoop-streaming API. The key idea is to pass data between map and reduce using stdin and stdout. The mapper code is to transform the data into key-value string (separated by ‘\t’). The hadoop-streaming API then read the output from mapper, and allocate to nodes based on keys. The reduce code takes the output of mapper job,  then aggregates the results based on the keys.

The simplest example is word count. Suppose we have a word document called “input”, and we want to count the word frequency –
I am a student you are a doctor he is a teacher we are good friends now hadoop hive pig yarn spark scala Capital one facebook amex google amazon linkedin

The mapper code is to create key-value mapping. Here the mapping can be as simple as “word    1”

import sys
for line in sys.stdin:
    line=line.strip()
    word=line.split(' ')
    for i in word:
        print i+'\t'+str(1)

The reducer code is to go through each key-value pair and summarize word frequency.

import sys
curr_word=None
curr_count=0
for line in sys.stdin:
    word=line.split('\t')[0]
    count=line.split('\t')[1]
    if word==curr_word:
        curr_count+=int(count)
    else:
        if curr_word!=None:
            print curr_word+'\t'+str(curr_count)
            curr_word=word
            curr_count=int(count)
if curr_word!=None:
    print curr_word+'\t'+str(curr_count)

The hadoop streaming commend line is:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.10.0.jar \
-file mapper.py \
-mapper "python mapper.py" \
-file reduce.py \
-reduce "python reducer.py" \
-input /qle/input \
-output /qle/output

Here are some takeaways to avoid some common errors:
1) need to locate the jar file for hadoop-stream depending on different hadoop infra (you can google it)
2) the input file is needed to be upload to Hadoop. (using hadoop fs -put local_dir hadoop_dir)
3) We don’t need to upload mapper.py and reducer.py to Hadoop, as soon as we specify in -file argments.
4) If we upload mapper.py and reducer.py to Hadoop, we don’t need -file argments.
5) mapper.py and reducer.py should be all executable (using chmod +x)
6)  If you choose to use shebang (#!/usr/bin/env python). You need to make sure “/usr/bin/env” can find python interpreter in Hadoop (not locally). (Based on my experience, in both Clourdera and MapR, that is not the case). If this is not the case, you will get error code=2 Not Such file or dictionary. By using shebang, you can just write: -mapper mapper.py \ -reducer reducer.py \
7)  If you don’t want to waste time in finding interpreter in Hadoop. you can delete shebang, just use -mapper “python mapper.py” \ -reducer “python reducer.py” \. That works fine too.
8) You need to remove the output directory in Hadoop before creating a new “same name” output directory.

Posted in Big Data | Leave a comment

MANOVA and multiple comparison

MANOVA is always used when we want to compare 2+ group of populations based on a list of KPIs. For example,  there are three groups of students. Each group of student use a different study material. We want to test if that results in any difference in their final exam scores. Here the exam scores may include different subjects, like Verbal and Math. In this case, we have two continuous dependent variables (Verbal and Math scores), and three groups of students. We will need to use one-way MANOVA to test the differences among groups considering two dependent variables together. In this case, doing multiple ANOVA is not recommended, because that will increase the risk of Type I error. (Assuming you control the Type I error in each ANOVA, however when you perform multiple ANOVA at the same time, your Type I error increases)

MANOVA code is easy to write in SAS as below. We use the proc glm and specify manova option to get the results in MANOVA. Normally we look at the statistic called  Wilk’s Lambda to identify if the differences are statistically significant or not. Wilk’s Lambda can be interpreted as the proportion of the variance in the outcomes that is not explained by an effect.

proc glm data = manova;
  class group;
  model verbal math = group / SS3;
  manova h = group;
run;

Keep in mind that we still need to check the following assumptions before running MANOVA:
The data from group i has common mean vector μi
The data from all groups have common variance-covariance matrix Σ.
The subjects are independently sampled.
The data are multivariate normally distributed.

If we observed the differences of KPIs among three groups are statistically significant. We will have two follow up questions:

  1. For two dependent variables (Verbal and Math scores), which one (or both) has differences among three groups with statistical significance? Here we have two options, 1) perform multiple ANOVA, 2) use the following sentence in proc glm: manova h=group m=(1 0);
  2. Considering two KPIs together, which group performs best? Which group performs worst? This is a multiple comparison question. Unfortunately the Turkey’s method does not work for MANOVA. There are two options we can go: 1) use Bonferroni pairwise comparison (by controlling Type I error in each comparison). However, Bonferroni comparison is too conservative. 2) use FDR (False Discovery Rate) to take the overall control on the error, which is more liberal. For detail steps, we can refer the paper from Benjamini and Hochberg (1995). For option 1, we can use the following code. However, we will need to adjust Type I error for each comparison.

proc glm data = manova;
class group;
model verbal math = group / SS3;
constrast "group 1 vs. group 2" group 1 -1 0;
manova h = group;
run;

For option 2, it is a little complicated to achieve in SAS. However, it is relatively easy to achieve in R using the following codes:

p.adjust(p, method ="fdr", n = length(p))
Posted in Marketing Analytics | Leave a comment

Collaborative Filtering Quick Start

Collaborative Filtering (CF) technology is most widely used in developing a recommendation engines. There are two types of CF: 1) user-based, and 2) item-based. When encountering this type of problem, normally you have a n*m matrix, where you have n users, and m items. The value in the matrix shows one user’s rating on specific item. The key idea of CF is to build a similarity matrix between users. Or build a similarity matrix between items.  In order to predict the rating of given item from given user, we first choose the top-N similar users (or items). In the user-based CF, we predict the rating based other similar users who have already rated this item . In the item-based CF, we predict the rating of this item based on other similar items that are rated by such user. The similarity matrix is normally built based on Pearson Correlation, Cosine Similarity, or other methods. More details of such methods can be found on: files.grouplens.org/papers/FnT%20CF%20Recsys%20Survey.pdf

Couple tools for one to build CF in python: graphlab, crab, pysuggest, etc. In R, recommenderlab is one of the tool that can be used. Here is a quick example for using recommenderlab:


library("recommenderlab")
m<-matrix(sample(c(as.numeric(0:5), NA), 50, replace=TRUE, prob=c(rep(.4/6,6),.6)), ncol=10,
dimnames=list(user=paste("u", 1:5, sep=''),item=paste("i", 1:10, sep='')))

r <- as(m, "realRatingMatrix")
getRatingMatrix(r)
r_m<-normalize(r)
getRatingMatrix(r_m)

#User-based CF
r_ubcf<-Recommender(r, method = "UBCF")
recom <- predict(r_ubcf, r, n=5, type="ratingMatrix")
getRatingMatrix(recom)
#item-based CF
r_ibcf<-Recommender(r, method = "IBCF")
recom2 <- predict(r_ibcf, r, n=5, type="ratingMatrix")
getRatingMatrix(recom2)

Posted in Data Mining | Leave a comment

Read Data from Dropbox to R

I am recently working on Shinyapps in R and want to save the csv file in Dropbox, then read the data from Dropbox to R. Here is how I do to achieve this:

  1. Authentication – we need to have a Dropbox account then run the folloing R code
    library(rdrop2)
    token<-drop_auth()
    

    By executing this code, my web browser will automatically ask me to connect to Dropbox with my id and password. Then in the Dropbox settings, I can see the “apps linked”, showing “rdrop2” with full Dropbox access

  2. Save the Token – run the code:
    saveRDS(token, "droptoken.rds")
    

    By executing this code, I save my token in my PC. This token file is used in future connection and data exchange. If I want to run my R codes in remote server (like Shinyapps), I need to copy this token file to remote server together with my main R codes.

  3. Pull Data – run the code:
    token<-readRDS("droptoken.rds")
    drop_acc(dtoken=token)
    input_data<-drop_read_csv("my_file.txt", dtoken=token, header=FALSE, sep='\t', col.names=c('c1','c2'))
    

    By executing this code in server, I first read the token (that has been saved in remote server). Then I use drop_acc to log in my Dropbox account, then I use drop_read_csv function to access the data with the given token.

    We can also do other data exchange operations using all drop_* functions in rdrops2 library.  Most of the functions are available in: https://github.com/karthik/rdrop2

    One mistake I have is forgetting to include dtoken=token in my drop_read_csv. When I run the code in local, it asks me to login Dropbox from web browser. When I run the code in remote server, there is an error: oauth_listener() needs an interactive environment.

     

    Also, there is a small issue in Shinyapps. if we have string variable such as “1-Mar-2015”. We cannot use %b to convert this to a date variable. Here is the code I use to solve this issue:

    df$issue_year<-substr(df$issue_d,5,8)
    df$issue_month<-substr(df$issue_d,1,3)
    df$issue_date<- as.Date(with(df, paste(issue_year, match((issue_month),c('Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec')), '1', sep='-')))
    print(df$issue_date)
    
Posted in Big Data | Leave a comment

Working with AWS redshift

AWS redshift is a MPP database in AWS. So working with redshift should be very similar with working with Teradata. Since this is MPP, we need to pay more attention on index and join operation, because those operations are performed on individual processor level then aggregated.

To use redshift, we need to create roles in IAM management. details see: (http://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-create-an-iam-role.html)

Normally we upload flat datafile to AWS S3, then create tables in redshift. Here are Python codes using psycopg2 and sqlalchemy packages. Key thing is to figure out thee connection string, and engine


import pandas as pd
from sqlalchemy import *
import psycopg2
connenction_string = &quot;dbname='dev' port='5439' user='XXXX' password='XXXX' host='XXXX-redshift.XXXX.us-east-1.redshift.amazonaws.com'&quot;
conn = psycopg2.connect(connenction_string)
cur = conn.cursor()
cur.execute(
'''create table lc_table(
id decimal(18,4) not null distkey sortkey,
member_id decimal(18,4) ,
loan_amnt decimal(18,4) ,
funded_amnt decimal(18,4) ,
funded_amnt_inv decimal(18,4) ,
term varchar(50) ,
int_rate varchar(50) ,
installment decimal(18,4) ,
grade varchar(50) ,
sub_grade varchar(50) ,
emp_length varchar(50) ,
home_ownership varchar(50) ,
annual_inc decimal(18,4) ,
verification_status varchar(50) ,
issue_d varchar(50) ,
loan_status varchar(200) ,
pymnt_plan varchar(200) ,
url varchar(500) ,
purpose varchar(500) ,
zip_code varchar(50) ,
addr_state varchar(50) ,
dti decimal(18,4) ,
delinq_2yrs decimal(18,4) ,
earliest_cr_line varchar(50) ,
fico_range_low decimal(18,4) ,
fico_range_high decimal(18,4) ,
inq_last_6mths decimal(18,4) ,
mths_since_last_delinq decimal(18,4) ,
mths_since_last_record decimal(18,4) ,
open_acc decimal(18,4) ,
pub_rec decimal(18,4) ,
revol_bal decimal(18,4) ,
revol_util varchar(50) ,
total_acc decimal(18,4) ,
initial_list_status varchar(50) ,
out_prncp decimal(18,4) ,
out_prncp_inv decimal(18,4) ,
total_pymnt decimal(18,4) ,
total_pymnt_inv decimal(18,4) ,
total_rec_prncp decimal(18,4) ,
total_rec_int decimal(18,4) ,
total_rec_late_fee decimal(18,4) ,
recoveries decimal(18,4) ,
collection_recovery_fee decimal(18,4) ,
last_pymnt_d varchar(50) ,
last_pymnt_amnt decimal(18,4) ,
next_pymnt_d varchar(50) ,
last_credit_pull_d varchar(50) ,
last_fico_range_high decimal(18,4) ,
last_fico_range_low decimal(18,4) ,
collections_12_mths_ex_med decimal(18,4) ,
mths_since_last_major_derog decimal(18,4) ,
policy_code varchar(200) ,
application_type varchar(200) ,
annual_inc_joint decimal(18,4) ,
dti_joint decimal(18,4) ,
verification_status_joint varchar(200) ,
acc_now_delinq decimal(18,4) ,
tot_coll_amt decimal(18,4) ,
tot_cur_bal decimal(18,4) ,
open_acc_6m decimal(18,4) ,
open_il_6m decimal(18,4) ,
open_il_12m decimal(18,4) ,
open_il_24m decimal(18,4) ,
mths_since_rcnt_il decimal(18,4) ,
total_bal_il decimal(18,4) ,
il_util decimal(18,4) ,
open_rv_12m decimal(18,4) ,
open_rv_24m decimal(18,4) ,
max_bal_bc decimal(18,4) ,
all_util decimal(18,4) ,
total_rev_hi_lim decimal(18,4) ,
inq_fi decimal(18,4) ,
total_cu_tl decimal(18,4) ,
inq_last_12m decimal(18,4) ,
acc_open_past_24mths decimal(18,4) ,
avg_cur_bal decimal(18,4) ,
bc_open_to_buy decimal(18,4) ,
bc_util decimal(18,4) ,
chargeoff_within_12_mths decimal(18,4) ,
delinq_amnt decimal(18,4) ,
mo_sin_old_il_acct decimal(18,4) ,
mo_sin_old_rev_tl_op decimal(18,4) ,
mo_sin_rcnt_rev_tl_op decimal(18,4) ,
mo_sin_rcnt_tl decimal(18,4) ,
mort_acc decimal(18,4) ,
mths_since_recent_bc decimal(18,4) ,
mths_since_recent_bc_dlq decimal(18,4) ,
mths_since_recent_inq decimal(18,4) ,
mths_since_recent_revol_delinq decimal(18,4) ,
num_accts_ever_120_pd decimal(18,4) ,
num_actv_bc_tl decimal(18,4) ,
num_actv_rev_tl decimal(18,4) ,
num_bc_sats decimal(18,4) ,
num_bc_tl decimal(18,4) ,
num_il_tl decimal(18,4) ,
num_op_rev_tl decimal(18,4) ,
num_rev_accts decimal(18,4) ,
num_rev_tl_bal_gt_0 decimal(18,4) ,
num_sats decimal(18,4) ,
num_tl_120dpd_2m decimal(18,4) ,
num_tl_30dpd decimal(18,4) ,
num_tl_90g_dpd_24m decimal(18,4) ,
num_tl_op_past_12m decimal(18,4) ,
pct_tl_nvr_dlq decimal(18,4) ,
percent_bc_gt_75 decimal(18,4) ,
pub_rec_bankruptcies decimal(18,4) ,
tax_liens decimal(18,4) ,
tot_hi_cred_lim decimal(18,4) ,
total_bal_ex_mort decimal(18,4) ,
total_bc_limit decimal(18,4) ,
total_il_high_credit_limit decimal(18,4)
)
''')
conn.commit()

cur = conn.cursor()
cur.execute('''copy lc_table from 's3://lc-analysis/final_lc_data.txt'
credentials 'aws_iam_role=arn:aws:iam::XXXXX:role/redshift_upload'
delimiter '\t' region 'us-east-1';
''')
conn.commit()

engine=create_engine('redshift+psycopg2://XXXX:XXXX@XXXX-redshift.XXXX.us-east-1.redshift.amazonaws.com:5439/dev')
query='''
select * from lc_table where id=4555054;
'''
out=pd.read_sql_query(query, engine)

Posted in Big Data | Leave a comment

AWS S3 Upload

Here is the quick code to use Python boto3 and upload flat files into AWS S3.


import boto3

def s3_upload(filename, outfile, bucket_name):
s3=boto3.resource('s3')
data=open(filename, 'rb')
s3.Bucket(bucket_name).put_object(Key=outfile, Body=data)

s3_upload('D:\\LC_Project\\final_data\\final_lc_data.txt', 'final_lc_data','lc-analysis')

Posted in Big Data | Leave a comment