### Eclipse Workspace Patch 1.0 #P terrierSVNtrunk Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java (revision 2726) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java (working copy) @@ -31,8 +31,6 @@ import java.util.LinkedList; import java.util.ListIterator; -import org.apache.hadoop.mapred.TaskID; - import uk.ac.gla.terrier.structures.BasicLexiconEntry; import uk.ac.gla.terrier.structures.LexiconEntry; import uk.ac.gla.terrier.structures.LexiconOutputStream; @@ -91,9 +89,9 @@ if (posting.getDf() > maxDF) maxDF = posting.getDf(); - final int _runMapID = TaskID.forName(_run.getMapNo()).getId(); - final int runNumber = run.getRunNo(); - final int docOffset = getDocumentOffset(_runMapID, runNumber); + //final int _runMapID = TaskID.forName(_run.getMapNo()).getId(); + //final int runNumber = run.getRunNo(); + final int docOffset = getDocumentOffset(_run.getSplitNo(), _run.getRunNo()); lastDocument = posting.append(bos, lastDocument, docOffset); if (le == null) le = posting.getLexiconEntry(); @@ -126,12 +124,12 @@ this.numReducers = numReducers; } - public int getDocumentOffset(int mapNumber, int flushNumber) throws IOException { + public int getDocumentOffset(int splitNo, int flushNumber) throws IOException { int NumPreDocs = 0; MapData correctHRD = null; for (MapData tempHRD : mapData) { - if (mapNumber == tempHRD.getMapId() ) { + if (splitNo == tempHRD.getSplitnum() ) { //System.out.println("Reducer number : "+reduceNumber+", Splitnum"+tempSplitnum+", Run Map Number : "+_run.getMapNo()); correctHRD = tempHRD; break; @@ -139,7 +137,7 @@ NumPreDocs += tempHRD.getMapDocs(); } if (correctHRD == null) - throw new IOException("Did not find map data for "+ mapNumber); + throw new IOException("Did not find map data for split "+ splitNo); // Add the FlushShift int currentFlushDocs=0; Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/RunIterator.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/RunIterator.java (revision 2726) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/RunIterator.java (working copy) @@ -41,8 +41,8 @@ protected Class postingClass; /** current posting */ protected PostingInRun posting; - /** Run number that the current posting came from */ - protected int runNo; + /** Run/Flush that the current posting came from */ + protected int flushNo; /** create a new instance of this class. * @param _postingClass Class of the PostingInRun type that postings in this run have @@ -50,7 +50,7 @@ protected RunIterator(Class _postingClass, int _runNo) { this.postingClass = _postingClass; - this.runNo = _runNo; + this.flushNo = _runNo; } /** Create a new posting */ @@ -58,10 +58,10 @@ posting = postingClass.newInstance(); } - /** Get the run number that the current posting came from */ + /** Get the run/flush number that the current posting came from */ public int getRunNo() { - return runNo; + return flushNo; } /** iterator implementation */ Index: src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java =================================================================== --- src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java (revision 2726) +++ src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java (working copy) @@ -48,8 +48,8 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.mapred.TaskID; +import uk.ac.gla.terrier.compression.BitIn; import uk.ac.gla.terrier.compression.BitOutputStream; import uk.ac.gla.terrier.indexing.BasicSinglePassIndexer; import uk.ac.gla.terrier.indexing.Document; @@ -74,9 +74,8 @@ import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.IDComparator; import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapData; import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedPostingList; -import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTerm; -import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTermByMapPartitioner; import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.SplitAwareWrapper; +import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.SplitEmittedTerm; import uk.ac.gla.terrier.utility.ApplicationSetup; import uk.ac.gla.terrier.utility.ArrayUtils; import uk.ac.gla.terrier.utility.FieldScore; @@ -109,8 +108,8 @@ */ public class Hadoop_BasicSinglePassIndexer extends BasicSinglePassIndexer - implements Mapper, MapEmittedTerm, MapEmittedPostingList>, - Reducer + implements Mapper, SplitEmittedTerm, MapEmittedPostingList>, + Reducer { public static void main(String[] args) throws Exception @@ -247,7 +246,7 @@ */ /** output collector for the current map indexing process */ - protected OutputCollector outputPostingListCollector; + protected OutputCollector outputPostingListCollector; /** Current map number */ protected String mapTaskID; @@ -274,6 +273,7 @@ new Path(indexDestination, mapTaskID+".runs").toString()) ); RunData.writeUTF(mapTaskID); + start = true; createMemoryPostings(); super.emptyDocIndexEntry = new SimpleDocumentIndexEntry(); super.docIndexBuilder = new DocumentIndexBuilder(currentIndex, "document"); @@ -297,7 +297,7 @@ logger.info("Map "+mapTaskID+", flush requested, containing "+numberOfDocsSinceFlush+" documents, flush "+flushNo); if (mp == null) throw new IOException("Map flushed before any documents were indexed"); - mp.finish(new HadoopRunWriter(outputPostingListCollector, mapTaskID, flushNo)); + mp.finish(new HadoopRunWriter(outputPostingListCollector, mapTaskID, splitnum, flushNo)); RunData.writeInt(currentId); currentReporter.incrCounter(Counters.INDEXER_FLUSHES, 1); System.gc(); @@ -319,7 +319,7 @@ */ public void map( Text key, SplitAwareWrapper value, - OutputCollector _outputPostingListCollector, + OutputCollector _outputPostingListCollector, Reporter reporter) throws IOException { @@ -330,6 +330,8 @@ if (start) { splitnum = value.getSplitIndex(); + System.out.println(splitnum); + //RunData.writeInt(splitnum); start = false; } @@ -469,10 +471,8 @@ final LinkedList runData = new LinkedList(); DataInputStream runDataIn; - final MapEmittedTermByMapPartitioner partitionChecker = new MapEmittedTermByMapPartitioner(); - partitionChecker.configure(jc); final String jobId = TaskAttemptID.forName(jc.get("mapred.task.id")).getJobID().toString().replaceAll("job", "task"); - final int thisPartition = TaskAttemptID.forName(jc.get("mapred.task.id")).getTaskID().getId(); + final FileStatus[] files = FileSystem.get(jc).listStatus( FileOutputFormat.getOutputPath(jc), new org.apache.hadoop.fs.PathFilter() @@ -485,12 +485,12 @@ return false; //2. is this run part of the maps allocated to us? - final TaskID t = TaskID.forName(name.replaceAll("\\.runs$", "") ); - final int targetP = partitionChecker.calculatePartition(t, jc.getNumReduceTasks()); - if (thisPartition != targetP) - { - return false; - } + //final TaskID t = TaskID.forName(name.replaceAll("\\.runs$", "") ); + //final int targetP = partitionChecker.calculatePartition(t, jc.getNumReduceTasks()); + //if (thisPartition != targetP) + //{ + // return false; + //} return true; } } @@ -501,6 +501,11 @@ throw new IOException("No run status files found in "+FileOutputFormat.getOutputPath(jc)); } + final int thisPartition = TaskAttemptID.forName(jc.get("mapred.task.id")).getTaskID().getId(); + final SplitEmittedTerm.SETPartitioner partitionChecker = new SplitEmittedTerm.SETPartitioner(); + partitionChecker.configure(jc); + + //TaskID previousMapTaskID = null; MapData tempHRD; for (FileStatus file : files) @@ -508,16 +513,18 @@ logger.info("Run data file "+ file.getPath().toString()+" has length "+Files.length(file.getPath().toString())); runDataIn = new DataInputStream(Files.openFileStream(file.getPath().toString())); tempHRD = new MapData(runDataIn); - // Sanity Check the file ordering + //check to see if this file contaned our split information + if (partitionChecker.calculatePartition(tempHRD.getSplitnum(), jc.getNumReduceTasks()) != thisPartition) + continue; mapTaskIDs.add(tempHRD.getMap()); - //TaskID thisMapTaskID = TaskID.forName(tempHRD.getMap()); - //previousMapTaskID = thisMapTaskID; runData.add(tempHRD); runDataIn.close(); } + // Sort by splitnum Collections.sort(runData); Collections.sort(mapTaskIDs, new IDComparator(runData)); + // A list of the index shards MapIndexPrefixes = mapTaskIDs.toArray(new String[0]); return runData; } @@ -549,21 +556,21 @@ * @param reporter Used to report progress */ public void reduce( - MapEmittedTerm Term, + SplitEmittedTerm Term, Iterator postingIterator, OutputCollector output, Reporter reporter) throws IOException { //if (logger.isDebugEnabled()) logger.debug("Reduce for term "+Term.getText()); - reporter.setStatus("Reducer is merging term " + Term.getText()); + reporter.setStatus("Reducer is merging term " + Term.getTerm()); if (! reduceStarted) { final LinkedList runData = loadRunData(); startReduce(runData); reduceStarted = true; } - String term = Term.getText().toString().trim(); + String term = Term.getTerm().trim(); if (term.length() == 0) return; runIteratorF.setRunPostingIterator(postingIterator); @@ -682,7 +689,7 @@ try{ tempRM.setBos(new BitOutputStream( currentIndex.getPath() + ApplicationSetup.FILE_SEPARATOR - + currentIndex.getPrefix() + ".inverted.bf")); + + currentIndex.getPrefix() + ".inverted" + BitIn.USUAL_EXTENSION)); } catch (IOException ioe) { ioe.printStackTrace(); } Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java (revision 2726) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java (working copy) @@ -185,6 +185,7 @@ /** The Map this Term was processed from */ protected String mapTaskID; + protected int splitId; /** The Flush number this term was from */ protected int flushNumber; /** The Term */ @@ -203,8 +204,9 @@ * @param _mapTaskID - Map Number * @param _flushNumber - Flush Number */ - public MapEmittedTerm(String s, String _mapTaskID, int _flushNumber) { + public MapEmittedTerm(String s, String _mapTaskID, int _splitId, int _flushNumber) { mapTaskID = _mapTaskID; + splitId = _splitId; flushNumber = _flushNumber; text = new Text(s); } @@ -216,9 +218,10 @@ * @param b - Flush Number * @return a newly created Indexing_TextPlus */ - public static MapEmittedTerm create_TextPlus(String s, String a, int b) { + public static MapEmittedTerm create_TextPlus(String s, String a, int splitId, int b) { MapEmittedTerm temp = new MapEmittedTerm(); temp.setMap(a); + temp.setSplitId(splitId); temp.setFlush(b); temp.setText(new Text(s)); return temp; @@ -253,6 +256,7 @@ */ public void readFields(DataInput in) throws IOException { mapTaskID = in.readUTF(); + splitId = in.readInt(); flushNumber = in.readInt(); text = new Text(); text.readFields(in); @@ -264,6 +268,7 @@ */ public void write(DataOutput out) throws IOException { out.writeUTF(mapTaskID); + out.writeInt(splitId); out.writeInt(flushNumber); text.write(out); } @@ -275,4 +280,12 @@ return this.getText().compareTo(o.getText()); } + public int getSplitId() { + return splitId; + } + + public void setSplitId(int splitId) { + this.splitId = splitId; + } + } Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedPostingList.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedPostingList.java (revision 2726) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedPostingList.java (working copy) @@ -43,19 +43,23 @@ /** The Map Number */ protected String Map; /** The Flush Number */ - protected int Run; + protected int flushNo; + /** The Split Number */ + protected int splitNo; /** * Constructor * @param map - Map task id - * @param run - Flush Number + * @param flush - Flush Number + * @param split - Split Number * @param c - Document Frequency * @param c2 - Term Frequency */ - public MapEmittedPostingList (String map, int run, int c, int c2) { + public MapEmittedPostingList (String map, int flush, int split, int c, int c2) { super(c,c2); Map = map; - Run =run; + flushNo =flush; + splitNo = split; } /** @@ -78,13 +82,14 @@ * Factory Method * @param mapTaskID - Map Number * @param flushNo - Flush Number + * @param splitNo - Split Number * @param postingList - Posting List * @param DocumentFreq - Document Frequency * @param TermFreq - Term Frequency * @return a newly created Indexing_WritableRunPostingData */ - public static MapEmittedPostingList create_Hadoop_WritableRunPostingData (String mapTaskID, int flushNo, byte[] postingList, int DocumentFreq, int TermFreq) { - MapEmittedPostingList w = new MapEmittedPostingList(mapTaskID, flushNo, DocumentFreq, TermFreq); + public static MapEmittedPostingList create_Hadoop_WritableRunPostingData (String mapTaskID, int flushNo, int splitNo, byte[] postingList, int DocumentFreq, int TermFreq) { + MapEmittedPostingList w = new MapEmittedPostingList(mapTaskID, flushNo, splitNo, DocumentFreq, TermFreq); w.setArray(postingList); return w; } @@ -106,7 +111,7 @@ * Returns the Map & Flush Number */ public String toString() { - return "MapNo="+Map+ ",FlushNo="+Run; + return "MapNo="+Map+ ",FlushNo="+flushNo; } public String getMap() { @@ -117,25 +122,40 @@ Map = map; } - public int getRun() { - return Run; + public int getFlushNo() { + return flushNo; + } + + public void setFlushNo(int flush) { + flushNo = flush; + } + + /** + * @return the splitNo + */ + public int getSplitNo() { + return splitNo; } - public void setRun(int run) { - Run = run; + /** + * @param splitNo the splitNo to set + */ + public void setSplitNo(int splitNo) { + this.splitNo = splitNo; } /** * Reads this object from the input stream 'in' */ - public void readFields(DataInput arg0) throws IOException { - arraylength = arg0.readInt(); - Map = arg0.readUTF(); - Run = arg0.readInt(); - DocumentFreq = arg0.readInt(); - TermFreq = arg0.readInt(); + public void readFields(DataInput in) throws IOException { + arraylength = in.readInt(); + Map = in.readUTF(); + flushNo = in.readInt(); + splitNo = in.readInt(); + DocumentFreq = in.readInt(); + TermFreq = in.readInt(); array = new byte[arraylength]; - arg0.readFully(array); + in.readFully(array); //System.err.println("DEBUG: Finished Read, ArrayL:"+arraylength+" RunNo:"+Run+" DocF:"+DocumentFreq+" TermF:"+TermFreq+" Buffer:"+array.toString()); } @@ -143,28 +163,30 @@ /** * Reads this object from the input stream 'in' apart from the * array. - * @param arg0 + * @param in * @throws IOException */ - public void readFieldsMinusArray(DataInput arg0) throws IOException { - arraylength = arg0.readInt(); - Map = arg0.readUTF(); - Run = arg0.readInt(); - DocumentFreq = arg0.readInt(); - TermFreq = arg0.readInt(); + public void readFieldsMinusArray(DataInput in) throws IOException { + arraylength = in.readInt(); + Map = in.readUTF(); + flushNo = in.readInt(); + splitNo = in.readInt(); + DocumentFreq = in.readInt(); + TermFreq = in.readInt(); array = new byte[1]; //System.err.println("DEBUG: Finished Read, ArrayL:"+arraylength+" RunNo:"+Run+" DocF:"+DocumentFreq+" TermF:"+TermFreq+" Buffer:"+array.toString()); } /** Write this object to the output stream 'out' */ - public void write(DataOutput arg0) throws IOException { - arg0.writeInt(array.length); - arg0.writeUTF(Map); - arg0.writeInt(Run); - arg0.writeInt(DocumentFreq); - arg0.writeInt(TermFreq); - arg0.write(array); + public void write(DataOutput out) throws IOException { + out.writeInt(array.length); + out.writeUTF(Map); + out.writeInt(flushNo); + out.writeInt(splitNo); + out.writeInt(DocumentFreq); + out.writeInt(TermFreq); + out.write(array); //System.err.println("DEBUG: Finished Write, ArrayL:"+array.length+" RunNo:"+Run+" DocF:"+DocumentFreq+" TermF:"+TermFreq+" Buffer:"+array.toString()); } Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunPostingIterator.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunPostingIterator.java (revision 2726) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunPostingIterator.java (working copy) @@ -43,6 +43,8 @@ protected String mapNo; /** Term that we're processing */ protected String term; + /** The Split that the current posting comes from */ + protected int splitNo; /** Constructs a new RunPostingIterator. * @param postingClass is the name of the class to use to read the postings @@ -80,7 +82,8 @@ posting.setDf(post.getDocumentFreq()); posting.setTF(post.getTermFreq()); mapNo = post.getMap(); - runNo = post.getRun(); + flushNo = post.getFlushNo(); + splitNo = post.getSplitNo(); } catch (IOException ioe) { throw new Error(ioe); } @@ -89,4 +92,13 @@ /** Returns the map that the current posting came from */ public String getMapNo() { return mapNo; } + + /** + * @return the splitNo + */ + public int getSplitNo() { + return splitNo; + } + + } Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunWriter.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunWriter.java (revision 2726) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunWriter.java (working copy) @@ -40,11 +40,12 @@ * @version $Revision: 1.2 $ */ public class HadoopRunWriter extends RunWriter { /** output collector of Map task */ - protected OutputCollector outputCollector = null; + protected OutputCollector outputCollector = null; /** map task id that is being flushed */ protected String mapId; /** flushNo is the number of times this map task is being flushed */ protected int flushNo; + protected int splitId; /** Create a new HadoopRunWriter, specifying the output collector of the map task * the run number and the flush number. @@ -52,12 +53,13 @@ * @param _mapId the task id of the map currently being processed * @param _flushNo the number of times that this map task has flushed */ - public HadoopRunWriter(OutputCollector _outputCollector, - String _mapId, int _flushNo) + public HadoopRunWriter(OutputCollector _outputCollector, + String _mapId, int _splitId, int _flushNo) { this.outputCollector = _outputCollector; this.mapId = _mapId; this.flushNo = _flushNo; + this.splitId = _splitId; this.info = "HadoopRunWriter(Map "+ mapId +", flush "+flushNo+")"; } @@ -80,10 +82,11 @@ //emit the term and its posting list outputCollector.collect( - MapEmittedTerm.create_TextPlus(term, mapId, flushNo), + SplitEmittedTerm.createNewTerm(term, splitId, flushNo), MapEmittedPostingList.create_Hadoop_WritableRunPostingData( mapId, flushNo, + splitId, buffer, post.getDocF(), post.getTF())); } Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java (revision 2726) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java (working copy) @@ -147,6 +147,8 @@ DocID.set(documentCollection.getDocid()); } document.setObject(tempDoc); + //System.out.println("Split "+document.getSplitIndex()); + currentDocument++; return true; } Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermBySplitPartitioner.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermBySplitPartitioner.java (revision 0) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermBySplitPartitioner.java (revision 0) @@ -0,0 +1,74 @@ +/* + * Terrier - Terabyte Retriever + * Webpage: http://ir.dcs.gla.ac.uk/terrier + * Contact: terrier{a.}dcs.gla.ac.uk + * University of Glasgow - Department of Computing Science + * http://www.gla.uk + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * The Original Code is MapEmittedTermByMapPartitioner.java. + * + * The Original Code is Copyright (C) 2004-2009 the University of Glasgow. + * All Rights Reserved. + * + * Contributor(s): + * Richard McCreadie (original author) + * Craig Macdonald + */ + +package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop; + + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.Partitioner; +import org.apache.hadoop.mapred.TaskID; + +/** + * Partitions the term postings lists from the split id, + * such that the created indexes is partitioned evenly across + * the reducers. This partitioner partitions by an even splits + * @author Richard McCreadie and Craig Macdonald + * @version $Revision: 1.2 $ + * @since 3.0 + */ +public class MapEmittedTermBySplitPartitioner implements JobConfigurable, Partitioner { + + protected int numSplits = -1; + + + public void configure(JobConf job) + { + //TODO: this looks like a hack, because it is expensive to calculate + try{ + numSplits = job.getInputFormat().getSplits(job, job.getNumMapTasks()).length; + } catch (Exception e) { + System.err.println("Failed to calculate number of splits correctly"); + } + } + + public int calculatePartition(int splitId, int numPartitions) { + //final int mapNumber = task.getId(); + //final int partitionSize = (int) (Math.ceil( ((double)numMapTasks)/(double)numPartitions)); + final int partitionSize = (int) (Math.ceil((double)numSplits / (double) numPartitions )); + return splitId / partitionSize; + } + + /** + * Forces map output from adjacent splits to mostly go to the same reducer + */ + public int getPartition(MapEmittedTerm key, MapEmittedPostingList value, int numPartitions) + { + return this.calculatePartition(key.getSplitId(), numPartitions); + } + +} Index: src/uk/ac/gla/terrier/applications/HadoopIndexing.java =================================================================== --- src/uk/ac/gla/terrier/applications/HadoopIndexing.java (revision 2726) +++ src/uk/ac/gla/terrier/applications/HadoopIndexing.java (working copy) @@ -49,9 +49,8 @@ import uk.ac.gla.terrier.indexing.hadoop.Hadoop_BlockSinglePassIndexer; import uk.ac.gla.terrier.structures.Index; import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedPostingList; -import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTerm; -import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTermByMapPartitioner; import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MultiFileCollectionInputFormat; +import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.SplitEmittedTerm; import uk.ac.gla.terrier.utility.ApplicationSetup; import uk.ac.gla.terrier.utility.Files; import uk.ac.gla.terrier.utility.io.HadoopPlugin; @@ -127,7 +126,7 @@ conf.setReducerClass(Hadoop_BasicSinglePassIndexer.class); } FileOutputFormat.setOutputPath(conf, new Path(ApplicationSetup.TERRIER_INDEX_PATH)); - conf.setMapOutputKeyClass(MapEmittedTerm.class); + conf.setMapOutputKeyClass(SplitEmittedTerm.class); conf.setMapOutputValueClass(MapEmittedPostingList.class); if (! conf.get("mapred.job.tracker").equals("local")) @@ -141,8 +140,8 @@ conf.setInputFormat(MultiFileCollectionInputFormat.class); conf.setOutputFormat(NullOutputFormat.class); - conf.setOutputKeyComparatorClass(MapEmittedTerm.TermMapFlushComparator.class); - conf.setOutputValueGroupingComparator(MapEmittedTerm.TermComparator.class); + conf.setOutputKeyComparatorClass(SplitEmittedTerm.SETRawComparatorTermSplitFlush.class); + conf.setOutputValueGroupingComparator(SplitEmittedTerm.SETRawComparatorTerm.class); conf.setReduceSpeculativeExecution(false); //parse the collection.spec BufferedReader specBR = Files.openFileReader(ApplicationSetup.COLLECTION_SPEC); @@ -159,7 +158,7 @@ conf.setNumReduceTasks(numberOfReducers); if (numberOfReducers> 1) { - conf.setPartitionerClass(MapEmittedTermByMapPartitioner.class); + conf.setPartitionerClass(SplitEmittedTerm.SETPartitioner.class); } else { Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/SplitEmittedTerm.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/SplitEmittedTerm.java (revision 0) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/SplitEmittedTerm.java (revision 0) @@ -0,0 +1,252 @@ +package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.util.logging.Logger; + +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.Partitioner; + +/** + * Represents a Term key used during MapReduce Indexing. Term keys are emitted from + * each map task, and are used for sorting and partitioning the output. + * @author richardm + * + */ +public class SplitEmittedTerm implements WritableComparable{ + + /** The term */ + private String term; + /** The split that this instance of the term has been processed by */ + private int splitno; + /** The flush within the split that this instance of the term was emitted by */ + private int flushno; + + /** + * Empty Constructor + */ + public SplitEmittedTerm() {} + + /** + * Constructor for a Term key. Is used for sorting map output and partitioning + * posting lists between reducers. Each term is only unique in conjunction with + * the split and flush that it was emitted from. + * @param term + * @param splitno + * @param flushno + */ + public SplitEmittedTerm(String term, int splitno, int flushno) { + this.term = term; + this.splitno = splitno; + this.flushno = flushno; + } + + /** + * Factory method for creating a new Term key object + * @param term + * @param splitno + * @param flushno + * @return + */ + public static SplitEmittedTerm createNewTerm(String term, int splitno, int flushno) { + return new SplitEmittedTerm(term, splitno, flushno); + } + + + @Override + public String toString() { + return term + ":" + splitno + ":" + flushno; + } + + @Override + /** + * Read in a Term key object from the input stream 'in' + */ + public void readFields(DataInput in) throws IOException { + term = in.readUTF(); + splitno = in.readInt(); + flushno = in.readInt(); + } + + @Override + /** + * Write out this Term key to output stream 'out' + */ + public void write(DataOutput out) throws IOException { + out.writeUTF(term); + out.writeInt(splitno); + out.writeInt(flushno); + } + + @Override + /** + * Compares this Term key to another term key. Note that terms are + * unique only in conjunction with their associated split and flush. + */ + public int compareTo(SplitEmittedTerm term2) { + int result; + if ((result = term.compareTo(term2.getTerm()))!=0) return result; + if ((result = splitno - term2.getSplitno())!=0) return result; + return flushno - term2.getFlushno(); + } + + + + /** + * @return the term + */ + public String getTerm() { + return term; + } + + /** + * @param term the term to set + */ + public void setTerm(String term) { + this.term = term; + } + + /** + * @return the splitno + */ + public int getSplitno() { + return splitno; + } + + /** + * @param splitno the splitno to set + */ + public void setSplitno(int splitno) { + this.splitno = splitno; + } + + /** + * @return the flushno + */ + public int getFlushno() { + return flushno; + } + + /** + * @param flushno the flushno to set + */ + public void setFlushno(int flushno) { + this.flushno = flushno; + } + + public static class SETRawComparatorTerm implements RawComparator + { + @Override + /** + * Compares raw Term key 1 to raw Term key 2. Note that only terms are considered. + */ + public int compare(byte[] bterm1, int offset1, int length1, byte[] bterm2, int offset2, + int length2) + { + /** Term objects used during raw Term comparisons */ + SplitEmittedTerm term1; + SplitEmittedTerm term2; + // Convert to streams so that the read methods can be used + DataInputStream b1S = new DataInputStream(new ByteArrayInputStream(bterm1, offset1, length1)); + DataInputStream b2S = new DataInputStream(new ByteArrayInputStream(bterm2, offset2, length2)); + try { + term1 = new SplitEmittedTerm(); + term1.readFields(b1S); + term2 = new SplitEmittedTerm(); + term2.readFields(b2S); + b1S.close(); + b2S.close(); + + return term1.getTerm().compareTo(term2.getTerm()); + } catch (IOException e) { + System.err.println("ERROR during raw comparision of term objects, unable to read input streams."); + e.printStackTrace(); + } + return 0; + + } + + @Override + public int compare(SplitEmittedTerm o1, SplitEmittedTerm o2) { + return o1.getTerm().compareTo(o2.getTerm()); + } + } + + public static class SETRawComparatorTermSplitFlush implements RawComparator + { + @Override + /** + * Compares raw Term key 1 to raw Term key 2. Note that terms are + * unique only in conjunction with their associated split and flush. + */ + public int compare(byte[] bterm1, int offset1, int length1, byte[] bterm2, int offset2, + int length2) { + /** Term objects used during raw Term comparisons */ + SplitEmittedTerm term1; + SplitEmittedTerm term2; + // Convert to streams so that the read methods can be used + DataInputStream b1S = new DataInputStream(new ByteArrayInputStream(bterm1, offset1, length1)); + DataInputStream b2S = new DataInputStream(new ByteArrayInputStream(bterm2, offset2, length2)); + try { + term1 = new SplitEmittedTerm(); + term1.readFields(b1S); + term2 = new SplitEmittedTerm(); + term2.readFields(b2S); + b1S.close(); + b2S.close(); + + return term1.compareTo(term2); + } catch (IOException e) { + System.err.println("ERROR during raw comparision of term objects, unable to read input streams."); + e.printStackTrace(); + } + return 0; + } + + @Override + /** + * Compares Term key 1 to Term key 2. Note that terms are + * unique only in conjunction with their associated split and flush. + */ + public int compare(SplitEmittedTerm term1, SplitEmittedTerm term2) { + return term1.compareTo(term2); + } + } + + + public static class SETPartitioner implements Partitioner, JobConfigurable + { + /** The number of chunks the collection was split into */ + private int numSplits; + + @Override + /** + * Configure the partitioner functionality, i.e. calculate the + * number of splits there were. + */ + public void configure(JobConf conf) { + // there is one split per map task + numSplits = conf.getNumMapTasks(); + } + + @Override + public int getPartition(SplitEmittedTerm term, MapEmittedPostingList posting, + int numPartitions) { + System.err.println("set="+term.toString() + " partition="+ calculatePartition(term.getSplitno(), numPartitions)); + return calculatePartition(term.getSplitno(), numPartitions); + } + + public int calculatePartition(int splitno, int numPartitions) { + final int partitionSize = (int) (Math.ceil((double)numSplits / (double) numPartitions )); + return splitno / partitionSize; + } + } + + +}