Hadoop: Processing ZIP files in Map/Reduce

by

in

Simple_Comic_zip

Due to popular request, I’ve updated my simple framework for processing ZIP files in Hadoop Map/Reduce jobs. Previously the only easy solution was to unzip files locally and then upload them to the Hadoop Distributed File System (HDFS) for processing. This adds a lot of unnecessary complexity when you are dealing with thousands of ZIP files; Java already has a ZipInputStream – it should be a lot easier.

 

Map/Reduce Background

When you create a Map/Reduce job in Java, you set the InputFormat & OutputFormat you wish to use. Most people have heard of and used the basic ones like TextInputFormat and SequenceFileInputFormat – but it’s fairly trivial to extend FileInputFormat to create new ones.

There are two parts to the puzzle:

  1. InputFormat which opens the data source and splits the data into chunks,
  2. RecordReader which actually parses the chunks into Key/Value pairs.

Once you’ve implemented these two pieces, the rest of the Map/Reduce framework can use your new-fangled InputFormat as normal. This is also true for OutputFormat‘s exception you need to implement a RecordWriter instead of a RecordReader.

 

Introducing ZipFileInputFormat & ZipFileRecordReader

As you might have guessed, ZipFileInputFormatZipFileRecordReader do exactly what their names suggest. ZipFileInputFormat is actually just a very thin wrapper around FileInputFormat to prevent it from splitting files – ZIP files aren’t a split-able file format. The real meat to the code is in ZipFileRecordReader where it uses the Java built in ZipInputStream to parse out each ZipEntry.

Each file inside a ZIP archive is a separate ZipEntry, among other attributes you can extract the original file name and decompress the contents. Despite most ZIP software presenting hierarchical directory structures, the namespace is effectively ‘flat’ with ZipEntry‘s i.e. you get “subdir2/subdir2subdir/Ulysses-22.txt” as a filename instead of a complex directory structure.

Since filename and content neatly fit the Key/Value format Map/Reduce is accustomed to, it is a no-brainer to use these as the for the ZipFileInputFormat & ZipFileRecordReader. Passing the filename to the mapper allows you to easily filter out the files you desire from ZIP archives while ignoring the others.

 

Zen and the Art of Tolerating Failure

Anyone that’s worked ZIP files before has probably encountered corrupt files at some point. Having worked on a Map/Reduce use-case that trawled through tens of thousands of ZIP files (of varying quality) – I have already encountered Jobs that fail because of tiny file corruptions.

ZipFileInputFormat has a handy method “setLenient( boolean lenient )”, this defaults to false meaning any errors processing a ZIP file will be fatal to the overall Job. However if you are dealing with ZIP files of varying quality you can “setLenient( true )” which means ZIP parsing problems will be quietly ignored.

Note: with the “setLenient( true )” ZIP files may be partially processed. Take the example of a truncated ZIP file, the contents of the ZIP archive up to the point of the file truncation will be passed to Mappers to be processed. Upon encountering the file corruption the Mapper will be informed that the file is “finished / completed”, and move on to the Reducer phase.

 

A word of caution…

One of the consequences of working with ZIP files is that they are not split-able. If your use-case is a single ZIP file with GB’s of content inside it – you might be better off extracting the files into HDFS and processing them as normal. This ZipFileInputFormat solution is ideally suited to use-cases which need to process the contents of hundreds/thousands of ZIP files. Because each ZIP file is processed entirely by one Mapper slot it scales nicely.

 

That’s great, so how do I actually use it? 

I’m glad you asked, in my initial post last year I kinda neglected to demonstrate exactly how to use it. To make amends I updated the source code and added 7 test cases to provide examples of its usage.

ZipFileTest showcases ZipFileInputFormat for common use-cases, because it is based on FileInputFormat you can pass it individual files, directories (which are recursed) or simple wildcard patterns to match multiple files.

This example demonstrates the simplest of use-cases:

Once you’ve prepared the Job, you need to make sense of the data in your Mapper, the example below is based on the infamous Wordcount Map/Reduce example:

The most important thing to note is the Key/Value arguments to the Mapper.

  • Text key – contains the filename
  • BytesWritable value – contains the uncompressed file contents
What you do with it from there is entirely up to you. The code in GitHub includes a bunch of test files that feature Text, Images, Subdirectories as well as non-zip data and an example of ZIP file corruption. Clone it from GitHub and run “mvn test” to try it out on sample data (tested and compatible on Cloudera CDH3 and Amazon’s EMR).

 

If you have any more questions, feel free to comment / contact me. When I get around to it I might post this patch for inclusion in Hadoop, but until then please grab the source and play. Always check the GitHub project page for the updated source.


Comments

24 responses to “Hadoop: Processing ZIP files in Map/Reduce”

  1. […] Due to popular request, I've updated my simple framework for processing ZIP files in Hadoop Map/Reduce jobs. Previously the only easy solution was to unzip files locally and then upload them to the Hadoop Distributed File …  […]

  2. Hi Michael,

    Have you tried this code against CDH4? I was able to compile it with no problems but have not used it.

    Also, can this ZipFileInputFormat be used for Hive’s InputFormat? This is where I will mainly use this to query zip files.

    Thanks,
    Ben

  3. Girish avatar
    Girish

    I have updated this to work with the Hadoop 2.0.0-cdh4.1.2
    And also added changes so that it does not get OutOfMemory error when reading huge zipfile (Content).
    I can clean it up and send out a pull request.
    The code is here .
    https://github.com/girish-a1/com-cotdp-hadoop/commit/6641f8704ecd30563e1e8905118e7cc97017377e

  4. Mihai avatar

    @Girish, thanks for getting this working with Hadoop 2.0.0-cdh4.1.2

  5. Raja avatar

    If a zip file size is very large like 100gb and/or files within zip file is large like 5gb then java zip input stream is not working properly.
    There is a limitation in it. Is there any other api to unzip large zip files ?

  6. aparnesh gaurav avatar
    aparnesh gaurav

    Will this work for tar files lying in hdfs ?

    1. cotdp avatar

      Hi Aparnesh, the code above only works for ZIP files but you could achieve something similar using Apache commons-codec’s TAR support: http://commons.apache.org/proper/commons-codec/

      1. Aparnesh Gaurav avatar
        Aparnesh Gaurav

        First of all thanks for super fast reply 🙂 . Actually what i read in definitive guide that .tar files are not supported in hdfs , so even if i use the above approach i will have to pull data to local . I had used TarInputStream which works for local but fails to recognize tar file format on hdfs.

        Actually some one made tar files on linux and then pushed them to hdfs unknowingly that it is not supported on hdfs.Now the data is big and i don’t want to create sequence files every time bringing the data to local.

        Any ideas for this use case ? What should be the input format for tar files in hdfs if i want to process each tar file in a mapper ? Thanks .

      2. Hi Micahel ,can this code can run on MapReduce 2 having yarn bec i tried but its failing
        .

  7. Khas avatar

    Thank you Michael for the info :), so the mapper is called for each zipfile? lets say if I have 100 zipfiles in my directory and I am passing the directory as the input, the mapper is called once for each zipfile, correct?

    1. cotdp avatar

      Hi Kas, ZIP files are not splittable so each ZIP file will be processed by exactly on Mapper. The trick I’ve developed here is that the ZIP file will be uncompressed and each file INSIDE the ZIP file will be passed into your Map function. If you have 100 ZIP files then the Map/Reduce job will see those as 100 inputs, however this InputFormat will generate many more “input splits”, one for each file inside each ZIP file. :o)

  8. Peter avatar

    Thanks Michael! Great Job!

    I am a bit confused about the following line of code:
    if ( filename.endsWith(“.txt”) == false )
    The question is isn’t the input file extension .zip? Where does .txt comes from?

  9. Abhishek avatar
    Abhishek

    I’m having big compressed ‘zip’ file of size greater than 1gb. How to process that since its not splittable ?

  10. […] trying to run a streaming job where the input files are csv inside zip files. I tried using this, however it doesn’t seem for work with CDH4 (I get the error class […]

  11. Rajendra Joshi avatar
    Rajendra Joshi

    To compile ZipFileInputFormat requires ZipFileRecordReader and VisaVersa So how to compile these classes
    Rajendra joshi

  12. siva ramakrishna avatar
    siva ramakrishna

    Hi i have used your logic and implemented for XML files. Its giving me java Heap error. How to resolve it.
    If possible can u share the logic used to work with XML files with bz2

    1. Venkat avatar
      Venkat

      Hi I am also facing the same problem, please can you share the logic to avoid this if you have one, thanks

  13. rishi avatar

    how to access nested Zip in this program

  14. jayaraman avatar
    jayaraman

    I have a compressed library format file namely snappy containing text file, .gz. I cannot refer it as hdfs path. if i use grep it, it is coming as encrypted text. how can i get readable content

  15. Sudarshankumar Kumar avatar
    Sudarshankumar Kumar

    I am getting below exception for the very large zip file
    2016-10-12 02:30:31,051 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Java heap space

    at java.util.Arrays.copyOf(Arrays.java:3236)

    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) .

  16. Sudarshankumar Kumar avatar
    Sudarshankumar Kumar

    Similarly can you please share code for ZipFileOutputStream also, so that we can get reducer output as .zip file.

  17. Aditya Sawant avatar
    Aditya Sawant

    Hey my requirment is that i have to convert a folder recursively as a zip file ..I have figured out that i will have to write customised FileInputFormat and RecordReader I am confused that where do i need to create zipoutputstream ? at mapper end or reducer end .. in the scenario where file is greater than the block size ..

    Basic Requiremnt is to convert a FOLDER PRESENT ON HDFS TO A ZIP FILE ?
    USING MAPREDUCE FRAMEWORK .. IT WOULD BE HELPFUL IF YOU CAN PROVIDE INFO ON THE SAME

  18. Manjot Kaur avatar
    Manjot Kaur

    Hey hi,
    I trying out this example, but along with my main zip file this example is also processing 2 files which I did not pass, Are these some meta files?

  19. Hi,

    the code example in the main article are not visible. I understand this article is 5 years old but i would greatly appreciate if you could fix that part Michael.

    Merci

Leave a Reply

Your email address will not be published. Required fields are marked *