File System Operation in Pyspark

Sometimes (unfortunately) we need to do the file operation directly in pyspark. Here is the way to do that:

sc=spark.spark.Context
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.fs.conf.Configuration

fs=FileSystem.get(sc._jsc.hadoopConfiguration())
outputPath="./"
status=fs.listStatus(Path(outputPath))
path_result=[file.getPath().getName() for file in status if file.getPath().getName()!='.sparkStaging']

for i in path_result:
    fs.rename(Path(i), Path(i+'_backup'))
    fs.delete(Path(i))

 

This entry was posted in Big Data. Bookmark the permalink.

Leave a comment