Due to popular request, I’ve updated my simple framework for processing ZIP files in Hadoop Map/Reduce jobs. Previously the only easy solution was to unzip files locally and then upload them to the Hadoop Distributed File System (HDFS) for processing. This adds a lot of unnecessary complexity when you are dealing with thousands of ZIP files; Java already has a ZipInputStream – it should be a lot easier.
When you create a Map/Reduce job in Java, you set the InputFormat & OutputFormat you wish to use. Most people have heard of and used the basic ones like TextInputFormat and SequenceFileInputFormat – but it’s fairly trivial to extend FileInputFormat to create new ones.
There are two parts to the puzzle:
- InputFormat which opens the data source and splits the data into chunks,
- RecordReader which actually parses the chunks into Key/Value pairs.
Once you’ve implemented these two pieces, the rest of the Map/Reduce framework can use your new-fangled InputFormat as normal. This is also true for OutputFormat‘s exception you need to implement a RecordWriter instead of a RecordReader.
Introducing ZipFileInputFormat & ZipFileRecordReader
As you might have guessed, ZipFileInputFormat & ZipFileRecordReader do exactly what their names suggest. ZipFileInputFormat is actually just a very thin wrapper around FileInputFormat to prevent it from splitting files – ZIP files aren’t a split-able file format. The real meat to the code is in ZipFileRecordReader where it uses the Java built in ZipInputStream to parse out each ZipEntry.
Each file inside a ZIP archive is a separate ZipEntry, among other attributes you can extract the original file name and decompress the contents. Despite most ZIP software presenting hierarchical directory structures, the namespace is effectively ‘flat’ with ZipEntry‘s i.e. you get “subdir2/subdir2subdir/Ulysses-22.txt” as a filename instead of a complex directory structure.
Since filename and content neatly fit the Key/Value format Map/Reduce is accustomed to, it is a no-brainer to use these as the for the ZipFileInputFormat & ZipFileRecordReader. Passing the filename to the mapper allows you to easily filter out the files you desire from ZIP archives while ignoring the others.
Zen and the Art of Tolerating Failure
Anyone that’s worked ZIP files before has probably encountered corrupt files at some point. Having worked on a Map/Reduce use-case that trawled through tens of thousands of ZIP files (of varying quality) – I have already encountered Jobs that fail because of tiny file corruptions.
ZipFileInputFormat has a handy method “setLenient( boolean lenient )”, this defaults to false meaning any errors processing a ZIP file will be fatal to the overall Job. However if you are dealing with ZIP files of varying quality you can “setLenient( true )” which means ZIP parsing problems will be quietly ignored.
Note: with the “setLenient( true )” ZIP files may be partially processed. Take the example of a truncated ZIP file, the contents of the ZIP archive up to the point of the file truncation will be passed to Mappers to be processed. Upon encountering the file corruption the Mapper will be informed that the file is “finished / completed”, and move on to the Reducer phase.
A word of caution…
One of the consequences of working with ZIP files is that they are not split-able. If your use-case is a single ZIP file with GB’s of content inside it – you might be better off extracting the files into HDFS and processing them as normal. This ZipFileInputFormat solution is ideally suited to use-cases which need to process the contents of hundreds/thousands of ZIP files. Because each ZIP file is processed entirely by one Mapper slot it scales nicely.
That’s great, so how do I actually use it?
I’m glad you asked, in my initial post last year I kinda neglected to demonstrate exactly how to use it. To make amends I updated the source code and added 7 test cases to provide examples of its usage.
ZipFileTest showcases ZipFileInputFormat for common use-cases, because it is based on FileInputFormat you can pass it individual files, directories (which are recursed) or simple wildcard patterns to match multiple files.
This example demonstrates the simplest of use-cases:
Once you’ve prepared the Job, you need to make sense of the data in your Mapper, the example below is based on the infamous Wordcount Map/Reduce example:
The most important thing to note is the Key/Value arguments to the Mapper.
- Text key – contains the filename
- BytesWritable value – contains the uncompressed file contents
If you have any more questions, feel free to comment / contact me. When I get around to it I might post this patch for inclusion in Hadoop, but until then please grab the source and play. Always check the GitHub project page for the updated source.
23 thoughts on “Hadoop: Processing ZIP files in Map/Reduce”
Pingback: Hadoop: Processing ZIP files in Map/Reduce | Personal Website of ... | R, SAS, SPSS ,Big data, JSON and anything a Predictive Analyst Needs | Scoop.it
Have you tried this code against CDH4? I was able to compile it with no problems but have not used it.
Also, can this ZipFileInputFormat be used for Hive’s InputFormat? This is where I will mainly use this to query zip files.
I have updated this to work with the Hadoop 2.0.0-cdh4.1.2
And also added changes so that it does not get OutOfMemory error when reading huge zipfile (Content).
I can clean it up and send out a pull request.
The code is here .
@Girish, thanks for getting this working with Hadoop 2.0.0-cdh4.1.2
If a zip file size is very large like 100gb and/or files within zip file is large like 5gb then java zip input stream is not working properly.
There is a limitation in it. Is there any other api to unzip large zip files ?
Will this work for tar files lying in hdfs ?
Hi Aparnesh, the code above only works for ZIP files but you could achieve something similar using Apache commons-codec’s TAR support: http://commons.apache.org/proper/commons-codec/
First of all thanks for super fast reply 🙂 . Actually what i read in definitive guide that .tar files are not supported in hdfs , so even if i use the above approach i will have to pull data to local . I had used TarInputStream which works for local but fails to recognize tar file format on hdfs.
Actually some one made tar files on linux and then pushed them to hdfs unknowingly that it is not supported on hdfs.Now the data is big and i don’t want to create sequence files every time bringing the data to local.
Any ideas for this use case ? What should be the input format for tar files in hdfs if i want to process each tar file in a mapper ? Thanks .
Hi Micahel ,can this code can run on MapReduce 2 having yarn bec i tried but its failing
Thank you Michael for the info :), so the mapper is called for each zipfile? lets say if I have 100 zipfiles in my directory and I am passing the directory as the input, the mapper is called once for each zipfile, correct?
Hi Kas, ZIP files are not splittable so each ZIP file will be processed by exactly on Mapper. The trick I’ve developed here is that the ZIP file will be uncompressed and each file INSIDE the ZIP file will be passed into your Map function. If you have 100 ZIP files then the Map/Reduce job will see those as 100 inputs, however this InputFormat will generate many more “input splits”, one for each file inside each ZIP file. :o)
Thanks Michael! Great Job!
I am a bit confused about the following line of code:
if ( filename.endsWith(“.txt”) == false )
The question is isn’t the input file extension .zip? Where does .txt comes from?
I’m having big compressed ‘zip’ file of size greater than 1gb. How to process that since its not splittable ?
Pingback: Hadoop streaming with zip input files - TecHub
To compile ZipFileInputFormat requires ZipFileRecordReader and VisaVersa So how to compile these classes
Hi i have used your logic and implemented for XML files. Its giving me java Heap error. How to resolve it.
If possible can u share the logic used to work with XML files with bz2
Hi I am also facing the same problem, please can you share the logic to avoid this if you have one, thanks
how to access nested Zip in this program
I have a compressed library format file namely snappy containing text file, .gz. I cannot refer it as hdfs path. if i use grep it, it is coming as encrypted text. how can i get readable content
I am getting below exception for the very large zip file
2016-10-12 02:30:31,051 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Java heap space
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) .
Similarly can you please share code for ZipFileOutputStream also, so that we can get reducer output as .zip file.
Hey my requirment is that i have to convert a folder recursively as a zip file ..I have figured out that i will have to write customised FileInputFormat and RecordReader I am confused that where do i need to create zipoutputstream ? at mapper end or reducer end .. in the scenario where file is greater than the block size ..
Basic Requiremnt is to convert a FOLDER PRESENT ON HDFS TO A ZIP FILE ?
USING MAPREDUCE FRAMEWORK .. IT WOULD BE HELPFUL IF YOU CAN PROVIDE INFO ON THE SAME
I trying out this example, but along with my main zip file this example is also processing 2 files which I did not pass, Are these some meta files?