### Eclipse Workspace Patch 1.0 #P terrierSVNtrunk Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java (revision 2549) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java (working copy) @@ -80,52 +80,25 @@ // for each run in the list int counter = 0; //for one term: for each set of postings for that term - final int partitionSize = (int) (Math.ceil( ((double)mapData.size())/(double)this.getNumReducers())); + //final int partitionSize = (int) (Math.ceil( ((double)mapData.size())/(double)this.getNumReducers())); while (run.hasNext()) { - + //long now2 = System.currentTimeMillis(); PostingInRun posting = run.next(); lastTermWritten = posting.getTerm(); - final int reduceNumber = (TaskID.forName(_run.getMapNo()).getId()/partitionSize); - // + if (posting.getDf() > maxDF) maxDF = posting.getDf(); - int NumPreDocs = 0; - MapData correctHRD = null; - for (MapData tempHRD : mapData) - { - if (tempHRD.getMap().equals(_run.getMapNo())) - { - correctHRD = tempHRD; - break; - } - //otherwise, we need to check that the map is within our partition - //only if it is, then that map should contribute to our docid offset - final int tempMapId = TaskID.forName(tempHRD.getMap()).getId(); - if (reduceNumber == (tempMapId/partitionSize)) - { - NumPreDocs += tempHRD.getMapDocs(); - } - } - if (correctHRD == null) - throw new IOException("Did not find map data for "+ _run.getMapNo()); - - // Add the FlushShift - int currentFlushDocs=0; - - ListIterator LI = correctHRD.getFlushDocSizes().listIterator(0); - int currentFlush =0; - while (currentFlush LI = correctHRD.getFlushDocSizes().listIterator(0); + //System.out.println("Runs Flush number : "+run.getRunNo()+", Size of HRD :"+correctHRD.getFlushDocSizes().size()); + int currentFlush =0; + while (currentFlush, MapEmittedTerm, MapEmittedPostingList>, + implements Mapper, MapEmittedTerm, MapEmittedPostingList>, Reducer { @@ -112,6 +114,10 @@ /** JobConf of the current running job */ protected JobConf jc; + /** The split that these documents came form **/ + protected int splitnum; + protected boolean start; + /** * Empty constructor. */ @@ -191,9 +197,13 @@ protected void configureMap() throws Exception { super.init(); + start = true; Path indexDestination = FileOutputFormat.getWorkOutputPath(jc); mapTaskID = TaskAttemptID.forName(jc.get("mapred.task.id")).getTaskID().toString(); currentIndex = Index.createNewIndex(indexDestination.toString(), mapTaskID); + //during reduce, we dont want to load indices into memory, as we only use + //them as streams + currentIndex.setIndexProperty("index.preloadIndices.disabled", "true"); RunData = new DataOutputStream( Files.writeFileStream( new Path(indexDestination, mapTaskID+".runs").toString()) @@ -227,14 +237,22 @@ * @throws IOException */ public void map( - Text key, Wrapper value, + Text key, Wrapper value, OutputCollector _outputPostingListCollector, Reporter reporter) throws IOException { + + final String docno = key.toString(); reporter.setStatus("Currently indexing "+docno); - final Document doc = value.getObject(); + final PositionAwareDocument doc = value.getObject(); + + if (start) { + splitnum = doc.getPosition(); + start = false; + } + this.outputPostingListCollector = _outputPostingListCollector; /* setup for parsing */ @@ -330,6 +348,7 @@ currentIndex.close(); RunData.writeInt(-1); RunData.writeInt(numberOfDocuments); + RunData.writeInt(splitnum); RunData.close(); logger.info("Map "+mapTaskID+ " finishing, indexed "+numberOfDocuments+ " in "+flushNo+" flushes"); } @@ -369,6 +388,7 @@ reduceStarted = false; } + @SuppressWarnings("unchecked") protected LinkedList loadRunData() throws IOException { // Load in Run Data @@ -415,6 +435,8 @@ runData.add(tempHRD); runDataIn.close(); } + Collections.sort(runData); + Collections.sort(mapTaskIDs, new IDComparator(runData)); MapIndexPrefixes = mapTaskIDs.toArray(new String[0]); return runData; } @@ -451,6 +473,7 @@ throws IOException { if (logger.isDebugEnabled()) logger.debug("Reduce for term "+Term.getText()); + reporter.setStatus("Reducer is merging term " + Term.getText()); if (! reduceStarted) { final LinkedList runData = loadRunData(); Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileSplit.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileSplit.java (revision 0) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileSplit.java (revision 0) @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; + +/** + * A sub-collection of input files. Unlike {@link FileSplit}, MultiFileSplit + * class does not represent a split of a file, but a split of input files + * into smaller sets. The atomic unit of split is a file.
+ * MultiFileSplit can be used to implement {@link RecordReader}'s, with + * reading one record per file. + * @see FileSplit + * @see MultiFileInputFormat + */ +public class MultiFileSplit implements InputSplit { + + private Path[] paths; + private long[] lengths; + private long totLength; + private JobConf job; + + public MultiFileSplit() {} + + public MultiFileSplit(JobConf job, Path[] files, long[] lengths) { + this.job = job; + this.lengths = lengths; + this.paths = files; + this.totLength = 0; + for(long length : lengths) { + totLength += length; + } + } + + public long getLength() { + return totLength; + } + + /** Returns an array containing the lengths of the files in + * the split*/ + public long[] getLengths() { + return lengths; + } + + /** Returns the length of the ith Path */ + public long getLength(int i) { + return lengths[i]; + } + + /** Returns the number of Paths in the split */ + public int getNumPaths() { + return paths.length; + } + + /** Returns the ith Path */ + public Path getPath(int i) { + return paths[i]; + } + + /** Returns all the Paths in the split */ + public Path[] getPaths() { + return paths; + } + + public String[] getLocations() throws IOException { + HashSet hostSet = new HashSet(); + JobClient jClient = new JobClient(job); + FileSystem fs = jClient.getFs(); + for (Path file : paths) { + FileStatus status = fs.getFileStatus(file); + BlockLocation[] blkLocations = fs.getFileBlockLocations(status, + 0, status.getLen()); + if (blkLocations != null && blkLocations.length > 0) { + addToSet(hostSet, blkLocations[0].getHosts()); + } + } + return hostSet.toArray(new String[hostSet.size()]); + } + + private void addToSet(Set set, String[] array) { + for(String s:array) + set.add(s); + } + + public void readFields(DataInput in) throws IOException { + totLength = in.readLong(); + int arrLength = in.readInt(); + lengths = new long[arrLength]; + for(int i=0; i flushDocSizes = new LinkedList(); + /** The Split number **/ + protected int splitnum; + protected int int_mapTaskId; /** * Constructor - Loads the Map Information from the DataInputStream Provided @@ -55,18 +60,26 @@ public MapData(DataInputStream in) throws IOException{ super(); mapTaskID = in.readUTF(); + int_mapTaskId = TaskID.forName(mapTaskID).getId(); int flushSize; while ((flushSize = in.readInt()) != -1) { flushDocSizes.add(flushSize); } numMapDocs = in.readInt(); - System.err.printf("map %s had %d docs, with %d flushes\n", mapTaskID, numMapDocs, flushDocSizes.size()); + splitnum = in.readInt(); + System.err.printf("map %s processed split %d which had %d docs, with %d flushes\n", mapTaskID, splitnum, numMapDocs, flushDocSizes.size()); } public String getMap() { return mapTaskID; } + + public int getMapId() + { + return int_mapTaskId; + } + public int getMapDocs() { return numMapDocs; } @@ -79,4 +92,18 @@ public LinkedList getFlushDocSizes() { return flushDocSizes;//size of each run in documents } + + @Override + public int compareTo(Object o) { + return splitnum-((MapData)o).splitnum; + } + + /** + * @return the splitnum + */ + public int getSplitnum() { + return splitnum; + } + + } Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java (revision 2549) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java (working copy) @@ -133,6 +133,56 @@ } + /** + * Raw Comparator class to compare MapEmittedTerm objects + * stage 1. (Order by term, then by map number). In this case + * the map number is stored as an int rather than as a Map + * task ID and therefore is parsed. This is used for the + * SimpleIndexing variants as they do not flush out data in + * the same manner. + * @author Richard McCreadie and Craig Macdonald + * @since 2.2.1 + * @version $Revision: 1.0 $ + */ + public static class TermMapComparator implements RawComparator { + + protected MapEmittedTerm tempT = new MapEmittedTerm(); + protected MapEmittedTerm tempT2 = new MapEmittedTerm(); + + public int compare(MapEmittedTerm a, MapEmittedTerm b) { + throw new Error("Unsupported method Indexing_CompareTextPlusKey.compare(Indexing_TextPlus,Indexing_TextPlus) was called"); + //richard's documentation say that this method is not used + //Richard: Indeed, it is not used - because it is a RawComparator it uses the byte[] compare instead. + } + + /** + * Compare by term (bit comparison on Text object) then by map number (int) + * then by flush number (int). + */ + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + // Convert to streams so that the read methods can be used + DataInputStream b1S = new DataInputStream(new ByteArrayInputStream(b1, s1, l1)); + DataInputStream b2S = new DataInputStream(new ByteArrayInputStream(b2, s2, l2)); + try { + // Read in the TextPlus Objects + tempT.readFields(b1S); + tempT2.readFields(b2S); + b1S.close(); + b2S.close(); + // Do Comparison Text + int value = tempT.getText().compareTo(tempT2.getText()); + if (value != 0) + return value; + // If same do Comparison on map task id + value = Integer.parseInt(tempT.getMap()) - Integer.parseInt(tempT2.getMap()); + return value; + } catch (IOException e) { + return 0; + } + } + + } + /** The Map this Term was processed from */ protected String mapTaskID; /** The Flush number this term was from */ Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/PositionAwareDocument.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/PositionAwareDocument.java (revision 0) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/PositionAwareDocument.java (revision 0) @@ -0,0 +1,87 @@ +package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop; + +import java.io.Reader; +import java.util.Map; +import java.util.Set; + +import uk.ac.gla.terrier.indexing.Document; + +public class PositionAwareDocument implements Document{ + + Document doc; + int position; + + public PositionAwareDocument() { + position = -1; + } + + public PositionAwareDocument(Document doc, int position) { + super(); + this.doc = doc; + this.position = position; + } + + + /** + * @return the doc + */ + public Document getDoc() { + return doc; + } + + + /** + * @param doc the doc to set + */ + public void setDoc(Document doc) { + this.doc = doc; + } + + + /** + * @return the position + */ + public int getPosition() { + return position; + } + + + /** + * @param position the position to set + */ + public void setPosition(int position) { + this.position = position; + } + + + @Override + public boolean endOfDocument() { + return doc.endOfDocument(); + } + + @Override + public Map getAllProperties() { + return doc.getAllProperties(); + } + + @Override + public Set getFields() { + return doc.getFields(); + } + + @Override + public String getNextTerm() { + return doc.getNextTerm(); + } + + @Override + public String getProperty(String name) { + return doc.getProperty(name); + } + + @Override + public Reader getReader() { + return doc.getReader(); + } + +} Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/FileCollectionRecordReader.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/FileCollectionRecordReader.java (revision 2549) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/FileCollectionRecordReader.java (working copy) @@ -38,13 +38,11 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MultiFileSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.log4j.Logger; import uk.ac.gla.terrier.indexing.Collection; import uk.ac.gla.terrier.indexing.CollectionFactory; -import uk.ac.gla.terrier.indexing.Document; import uk.ac.gla.terrier.utility.ApplicationSetup; import uk.ac.gla.terrier.utility.Wrapper; import uk.ac.gla.terrier.utility.io.CountingInputStream; @@ -58,8 +56,8 @@ * @version $Revision: 1.2 $ */ public class FileCollectionRecordReader - extends CollectionRecordReader - implements RecordReader> + extends CollectionRecordReader> + implements RecordReader> { /** The logger used */ @@ -81,7 +79,7 @@ * @param split - Input Split (multiple Files) * @throws IOException */ - public FileCollectionRecordReader(JobConf jobConf, MultiFileSplit split) throws IOException + public FileCollectionRecordReader(JobConf jobConf, PositionAwareSplit split) throws IOException { super(jobConf, split); compressionCodecs = new CompressionCodecFactory(config); @@ -100,19 +98,19 @@ * Returns the progress of the reading */ public float getProgress() throws IOException { - return (float)collectionIndex/(float)(split.getNumPaths()); + return (float)collectionIndex/(float)(((MultiFileSplit)split.getSplit()).getNumPaths()); } /** Opens a collection on the next file. */ @Override protected Collection openCollectionSplit(int index) throws IOException { - if (index >= split.getNumPaths()) + if (index >= ((MultiFileSplit)split.getSplit()).getNumPaths()) { //no more splits left to process return null; } - Path file = split.getPath(index); + Path file = ((MultiFileSplit)split.getSplit()).getPath(index); logger.info("Opening "+file); long offset = 0;//TODO populate from split? FileSystem fs = file.getFileSystem(config); Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/IDComparator.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/IDComparator.java (revision 0) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/IDComparator.java (revision 0) @@ -0,0 +1,36 @@ +package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Compares String objects. It is used to sort the Map ID's for the MapData files loaded + * during the reduce step. It makes sure that the Map ID's are in order of the documents + * that they processed. + * @author richardm + * + */ +public class IDComparator implements Comparator { + + Map id2splitData = new HashMap(); + MapData[] splitData; + + public IDComparator(LinkedList mapData) { + splitData = mapData.toArray(new MapData[mapData.size()]); + for(MapData m : splitData) + { + id2splitData.put(m.getMap(), m); + } + } + + @Override + public final int compare(String id1, String id2) { + final MapData md1 = id2splitData.get(id1); + final MapData md2 = id2splitData.get(id2); + if (md1 == null || md2 == null) return -1; + return md1.compareTo(md2); + } + +} \ No newline at end of file Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java (revision 2549) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java (working copy) @@ -32,7 +32,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.InputSplit; import uk.ac.gla.terrier.indexing.Collection; import uk.ac.gla.terrier.indexing.Document; @@ -46,7 +45,7 @@ * @version $Revision: 1.2 $ * @param The subclass of InputSplit that this class should work with */ -public abstract class CollectionRecordReader { +public abstract class CollectionRecordReader> { /** document collection currently being iterated through. starts as null */ protected Collection documentCollection = null; @@ -90,8 +89,8 @@ /** Create a new Text value, * each value is a document */ - public Wrapper createValue() { - return new Wrapper(); + public Wrapper createValue() { + return new Wrapper(); } /** @@ -112,7 +111,7 @@ * document. Returns true if another document exists * otherwise returns false. */ - public boolean next(Text DocID, Wrapper document) throws IOException { + public boolean next(Text DocID, Wrapper document) throws IOException { if (documentCollection == null) { documentCollection = openCollectionSplit(collectionIndex); @@ -145,7 +144,7 @@ { DocID.set(documentCollection.getDocid()); } - document.setObject(tempDoc); + document.setObject(new PositionAwareDocument(tempDoc, split.getSplitnum())); currentDocument++; return true; } Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileCollectionInputFormat.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileCollectionInputFormat.java (revision 2549) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileCollectionInputFormat.java (working copy) @@ -37,12 +37,10 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MultiFileInputFormat; -import org.apache.hadoop.mapred.MultiFileSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -import uk.ac.gla.terrier.indexing.Document; import uk.ac.gla.terrier.utility.Wrapper; /** @@ -55,7 +53,7 @@ * @since 2.2 * @version $Revision: 1.2 $ */ -public class MultiFileCollectionInputFormat extends MultiFileInputFormat> +public class MultiFileCollectionInputFormat extends MultiFileInputFormat> { /** logger for this class */ @@ -69,14 +67,14 @@ * @param job JobConf of this job * @param reported To report progress */ - public RecordReader> getRecordReader( + public RecordReader> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); - return new FileCollectionRecordReader(job, (MultiFileSplit) genericSplit); + return new FileCollectionRecordReader(job, (PositionAwareSplit) genericSplit); } @Override @@ -107,7 +105,7 @@ numSplits = 1; } logger.info("Allocating "+paths.length+ " files across "+numSplits +" map tasks"); - List splits = new ArrayList(numSplits); + List> splits = new ArrayList>(numSplits); long[] lengths = new long[paths.length]; long totLength = 0; final FileSystem fs = FileSystem.get(job); @@ -121,7 +119,9 @@ final int numberOfFilesPerSplit = paths.length / numSplits; int index = 0; + int splitnum = 0; // for each split except the last one (which may be larger than numberOfFilesPerSplit) + MultiFileSplit mfs; for(int i=0; i(mfs, splitnum)); + splitnum++; } // Now do the last one containing remaining files @@ -145,12 +147,14 @@ index++; pathsUsed++; } - splits.add(new MultiFileSplit(job, splitPaths, splitLengths)); + mfs = new MultiFileSplit(job, splitPaths, splitLengths); + splits.add(new PositionAwareSplit(mfs, splitnum)); + splitnum++; if (!(pathsUsed==paths.length)) { throw new IOException("Number of used paths does not equal total available paths!"); } - return splits.toArray(new MultiFileSplit[splits.size()]); + return splits.toArray(new PositionAwareSplit[splits.size()]); } } Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/PositionAwareSplit.java =================================================================== --- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/PositionAwareSplit.java (revision 0) +++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/PositionAwareSplit.java (revision 0) @@ -0,0 +1,88 @@ +package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.mapred.InputSplit; + +import uk.ac.gla.terrier.utility.io.WrappedIOException; + +public class PositionAwareSplit implements InputSplit{ + + + protected T split; + protected int splitnum; + + public PositionAwareSplit() { + splitnum=-1; + } + + public PositionAwareSplit(T _split, int _splitnum) { + split = _split; + splitnum = _splitnum; + } + + /** + * @return the splitnum + */ + public int getSplitnum() { + return splitnum; + } + + /** + * @param splitnum the splitnum to set + */ + public void setSplitnum(int splitnum) { + this.splitnum = splitnum; + } + + /** + * @return the split + */ + public InputSplit getSplit() { + return split; + } + + /** + * @param split the split to set + */ + public void setSplit(T split) { + this.split = split; + } + + @Override + public long getLength() throws IOException { + return split.getLength(); + } + + @Override + public String[] getLocations() throws IOException { + return split.getLocations(); + } + + @Override + public void readFields(DataInput arg0) throws WrappedIOException { + try { + String className = arg0.readUTF(); + + Class c = Class.forName(className, false, this.getClass().getClassLoader()); + split = (T)c.getConstructor(new Class[0]).newInstance(new Object[0]); + + split.readFields(arg0); + splitnum = arg0.readInt(); + } catch (Exception e) { + throw new WrappedIOException("Error during the reading of fields of a new PositionAwareSplit"); + } + } + + @Override + public void write(DataOutput arg0) throws IOException { + arg0.writeUTF(split.getClass().getName()); + split.write(arg0); + arg0.writeInt(splitnum); + } + + + +}