package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.log4j.Logger; import uk.ac.gla.terrier.compression.BitIn; import uk.ac.gla.terrier.compression.BitInputStream; import uk.ac.gla.terrier.utility.FieldScore; public class InvertedIndexSplit extends FileSplit{ /** logger for this class */ protected static final Logger logger = Logger.getLogger(InvertedIndexSplit.class); /** The gamma compressed file containing the terms. */ protected BitIn fileStream; /** filename of the underlying bitfile */ protected String filename = null; /** Is the index open for this Split? **/ protected boolean isIndexOpen = false; /** Indicates whether field information is used.*/ final boolean useFieldInformation = FieldScore.USE_FIELD_INFORMATION; /** Lexicon information for the documents in this split **/ protected long[] endByteOffsets; protected byte[] endBitOffsets; protected String[] terms; /** index of current document **/ protected int index; /** Number of this Split **/ int splitNumber; public InvertedIndexSplit(Path file, long start, long length, JobConf conf, int numDocs, long[] _endByteOffsets, byte[] _endBitOffsets, String[] _terms, int splitnum) { // TODO this constructor for FileSplit is @deprecated, the // new constructor needs the hosts for the file instead of // job configuration super(file, start, length, conf); filename = file.toString(); endByteOffsets = _endByteOffsets; endBitOffsets = _endBitOffsets; terms = _terms; index = -1; splitNumber = splitnum; } protected boolean openInvertedIndexASplitStart() { try { fileStream = new BitInputStream(filename); } catch (IOException e) { logger.error("Could Not Open Inverted Index"); return false; } isIndexOpen = true; return true; } public int[][] getNextDocuments() { if (!isIndexOpen) { if (openInvertedIndexASplitStart() == false) return null; if (index==-1) next(); } try { return getNextDocuments(endByteOffsets.length, endByteOffsets[index], endBitOffsets[index]); } catch (IOException e) { logger.error("Split "+splitNumber+": Failed to read Document "+index+" from the "+endByteOffsets.length+" document inverted index input split."); return null; } } protected int[][] getNextDocuments(int df, long endByteOffset, byte endBitOffset) throws IOException { int[][] documentTerms = null; final int fieldCount = FieldScore.FIELDS_COUNT; if (useFieldInformation) { //if there are tag information to process documentTerms = new int[3][df]; documentTerms[0][0] = fileStream.readGamma() - 1; documentTerms[1][0] = fileStream.readUnary(); documentTerms[2][0] = fileStream.readBinary(fieldCount); for (int i = 1; i < df; i++) { documentTerms[0][i] = fileStream.readGamma() + documentTerms[0][i - 1]; documentTerms[1][i] = fileStream.readUnary(); documentTerms[2][i] = fileStream.readBinary(fieldCount); } } else { //no tag information to process documentTerms = new int[2][df]; //new documentTerms[0][0] = fileStream.readGamma() - 1; documentTerms[1][0] = fileStream.readUnary(); for(int i = 1; i < df; i++){ documentTerms[0][i] = fileStream.readGamma() + documentTerms[0][i - 1]; documentTerms[1][i] = fileStream.readUnary(); } } return documentTerms; } public String getCurrentTerm() { return terms[index]; } public boolean next() { index++; if (index >= endByteOffsets.length) { return false; } return true; } public int getIndex() { return index; } public int getNumDocs() { return endByteOffsets.length; } }