package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import uk.ac.gla.terrier.structures.LexiconInputStream; import uk.ac.gla.terrier.structures.indexing.singlepass.Posting; public class InvertedIndexInputFormat extends FileInputFormat{ @Override public RecordReader getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new InvertedIndexRecordReader(job, (InvertedIndexSplit)genericSplit); } @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { // Input Paths, only one path should be given to the inverted index Path[] paths = FileInputFormat.getInputPaths(job); // Open the default lexicon LexiconInputStream lex = new LexiconInputStream(); // How many records per map int numEntriesPerBlock = lex.numberOfEntries()/numSplits; // File Splits List splits = new ArrayList(numSplits); int index = 1; int writtenPerBlock =1; // WARNING! : Temp Storing endByteOffsets, endBitOffsets // and terms for each term covered in the current split // in the lexicon. This may become large if there is a // small number of splits long[] endByteOffsets = new long[numEntriesPerBlock]; byte[] endBitOffsets = new byte[numEntriesPerBlock]; String[] terms = new String[numEntriesPerBlock]; long endOff =0; lex.readNextEntry(); long startOff = lex.getStartOffset(); // For each lexicon entry while (index<=lex.numberOfEntries()) { // Save the endByteOffset, endBitOffset and term for that entry endOff = lex.getEndOffset(); endByteOffsets[writtenPerBlock-1] = lex.getEndOffset(); endBitOffsets[writtenPerBlock-1] = lex.getEndBitOffset(); terms[writtenPerBlock-1] = lex.getTerm(); // If we have filled a split's quota of entrys // then write the Split if (writtenPerBlock == numEntriesPerBlock) { writtenPerBlock=0; splits.add(new InvertedIndexSplit(paths[0], startOff, endOff-startOff, job, lex.getNt(), endByteOffsets, endBitOffsets, terms, index)); // Clear the temp storage arrays (resize if on the last split) if (lex.numberOfEntries()-index