Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/FileCollectionRecordReader.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/FileCollectionRecordReader.java	(revision 2563)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/FileCollectionRecordReader.java	(working copy)
@@ -38,7 +38,6 @@
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MultiFileSplit;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.log4j.Logger;
 
@@ -46,7 +45,6 @@
 import uk.ac.gla.terrier.indexing.CollectionFactory;
 import uk.ac.gla.terrier.indexing.Document;
 import uk.ac.gla.terrier.utility.ApplicationSetup;
-import uk.ac.gla.terrier.utility.Wrapper;
 import uk.ac.gla.terrier.utility.io.CountingInputStream;
 
 /**
@@ -58,8 +56,8 @@
  * @version $Revision: 1.2 $
  */
 public class FileCollectionRecordReader 
-		extends CollectionRecordReader<MultiFileSplit> 
-		implements RecordReader<Text, Wrapper<Document>>
+		extends CollectionRecordReader<PositionAwareSplit<MultiFileSplit>> 
+		implements RecordReader<Text, SplitAwareWrapper<Document>>
 {
 
     /** The logger used */
@@ -71,6 +69,8 @@
 	//TODO: start is unused currently?
 	/** where we started in this file */
 	protected long start;
+	/** length of the file */
+	protected long length;
 	/** factory for accessing compressed files */
 	protected CompressionCodecFactory compressionCodecs = null;
 	
@@ -81,7 +81,7 @@
 	 * @param split - Input Split (multiple Files)
 	 * @throws IOException
 	 */
-	public FileCollectionRecordReader(JobConf jobConf, MultiFileSplit split) throws IOException 
+	public FileCollectionRecordReader(JobConf jobConf, PositionAwareSplit<MultiFileSplit> split) throws IOException 
 	{	
 		super(jobConf, split);
 		compressionCodecs = new CompressionCodecFactory(config);
@@ -100,19 +100,25 @@
 	 * Returns the progress of the reading
 	 */
 	public float getProgress() throws IOException {
-		return (float)collectionIndex/(float)(split.getNumPaths());
+		float fileProgress = 0;
+		final float numPaths = (float)(((MultiFileSplit)split.getSplit()).getNumPaths());
+		if (inputStream != null && length != start)
+			fileProgress = (float)inputStream.getPos()/(float)(length - start);
+		return (fileProgress + (float)collectionIndex)/numPaths;
 	}
 	
+	
+	
 	/** Opens a collection on the next file. */
 	@Override
 	protected Collection openCollectionSplit(int index) throws IOException
 	{
-		if (index >= split.getNumPaths())
+		if (index >= ((MultiFileSplit)split.getSplit()).getNumPaths())
 		{
 			//no more splits left to process
 			return null;
 		}
-		Path file = split.getPath(index);
+		Path file = ((MultiFileSplit)split.getSplit()).getPath(index);
 		logger.info("Opening "+file);
 		long offset = 0;//TODO populate from split?
 		FileSystem fs = file.getFileSystem(config);
@@ -122,7 +128,8 @@
 		//Hadoop cant
 		CompressionCodec codec = compressionCodecs.getCodec(
 			new Path(file.toString().replaceAll("\\.GZ$", ".gz")));
-
+		
+		length = fs.getFileStatus(file).getLen();
 		FSDataInputStream _input = fs.open(file); //TODO: we could use utility.Files here if
 		//no codec was found	
 		InputStream internalInputStream = null;
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/IDComparator.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/IDComparator.java	(revision 0)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/IDComparator.java	(revision 0)
@@ -0,0 +1,35 @@
+package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Compares String objects. It is used to sort the Map ID's for the MapData files loaded
+ * during the reduce step. It makes sure that the Map ID's are in order of the documents
+ * that they processed. 
+ * @author richardm
+ *
+ */
+public class IDComparator implements Comparator<String> {
+
+	Map<String, MapData> id2splitData = new HashMap<String,MapData>();
+	MapData[] splitData;
+	
+	public IDComparator(LinkedList<MapData> mapData) {
+		splitData = mapData.toArray(new MapData[mapData.size()]);
+		for(MapData m : splitData)
+		{
+			id2splitData.put(m.getMap(), m);
+		}
+	}
+	
+	public final int compare(String id1, String id2) {
+		final MapData md1 = id2splitData.get(id1);
+		final MapData md2 = id2splitData.get(id2);
+		if (md1 == null || md2 == null) return -1;
+		return md1.compareTo(md2);
+	}
+	
+}
\ No newline at end of file
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java	(revision 2563)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java	(working copy)
@@ -71,62 +71,26 @@
 		RunIterator run = runsSource.createRunIterator(-1);
 		HadoopRunPostingIterator _run = (HadoopRunPostingIterator)run;
 		
-		boolean firstMap = true;
 		lastTermWritten = null;
 		lastFreq = 0;
 		lastDocFreq= 0;
+		lastDocument = -1;
 		long startOffset = this.getByteOffset();
 		byte startBitOffset = this.getBitOffset();
 		// for each run in the list 
 		int counter = 0;
 		//for one term: for each set of postings for that term
-		final int partitionSize = (int) (Math.ceil( ((double)mapData.size())/(double)this.getNumReducers()));
 		while (run.hasNext()) {
-			
 			PostingInRun posting = run.next();
 			lastTermWritten = posting.getTerm();
-			final int reduceNumber = (TaskID.forName(_run.getMapNo()).getId()/partitionSize);
-			//
+			
 			if (posting.getDf() > maxDF) 
 				maxDF = posting.getDf();
 			
-			int NumPreDocs = 0;
-			MapData correctHRD = null;			
-			for (MapData tempHRD : mapData)
-			{
-				if (tempHRD.getMap().equals(_run.getMapNo()))
-				{
-					correctHRD = tempHRD;
-					break;
-				}
-				//otherwise, we need to check that the map is within our partition
-				//only if it is, then that map should contribute to our docid offset
-				final int tempMapId = TaskID.forName(tempHRD.getMap()).getId();
-				if (reduceNumber == (tempMapId/partitionSize))
-				{
-					NumPreDocs += tempHRD.getMapDocs();
-				}
-			}
-			if (correctHRD == null)
-				throw new IOException("Did not find map data for "+ _run.getMapNo());
-			
-			// Add the FlushShift
-			int currentFlushDocs=0;
-			
-			ListIterator<Integer> LI = correctHRD.getFlushDocSizes().listIterator(0);
-			int currentFlush =0;
-			while (currentFlush<run.getRunNo()) {
-				currentFlushDocs += LI.next(); 
-				currentFlush++;
-			}
-	
-			if (firstMap) {
-				lastDocument = posting.append(bos, -1, NumPreDocs+currentFlushDocs);
-				firstMap = false;
-			}
-			else {
-				lastDocument = posting.append(bos, lastDocument, NumPreDocs+currentFlushDocs);
-			}
+			final int _runMapID = TaskID.forName(_run.getMapNo()).getId();
+			final int runNumber = run.getRunNo();
+			final int docOffset = getDocumentOffset(_runMapID, runNumber);
+			lastDocument = posting.append(bos, lastDocument, docOffset);
 			lastFreq += posting.getTF();
 			lastDocFreq += posting.getDf();
 			counter++;
@@ -151,6 +115,34 @@
 	public void setNumReducers(int numReducers) {
 		this.numReducers = numReducers;
 	}
-
+	
+	public int getDocumentOffset(int mapNumber, int flushNumber) throws IOException {
+		int NumPreDocs = 0;
+		MapData correctHRD = null;
+		for (MapData tempHRD : mapData)
+		{
+			if (mapNumber == tempHRD.getMapId() ) {
+				//System.out.println("Reducer number : "+reduceNumber+", Splitnum"+tempSplitnum+", Run Map Number : "+_run.getMapNo());
+				correctHRD = tempHRD;
+				break;
+			}
+			NumPreDocs += tempHRD.getMapDocs();
+		}
+		if (correctHRD == null)
+			throw new IOException("Did not find map data for "+ mapNumber);
+		
+		// Add the FlushShift
+		int currentFlushDocs=0;
+		ListIterator<Integer> LI = correctHRD.getFlushDocSizes().listIterator(0);
+		//System.out.println("Runs Flush number : "+run.getRunNo()+", Size of HRD :"+correctHRD.getFlushDocSizes().size());
+		int currentFlush =0;
+		while (currentFlush<flushNumber) {
+			//System.out.println("Runs Flush number : "+run.getRunNo()+", FlushCheck : "+currentFlush+", Size of HRD :"+correctHRD.getFlushDocSizes().size());
+			currentFlushDocs += LI.next(); 
+			currentFlush++;
+		}
+		
+		return NumPreDocs+currentFlushDocs;
+	}
 
 }
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/SplitAwareWrapper.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/SplitAwareWrapper.java	(revision 0)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/SplitAwareWrapper.java	(revision 0)
@@ -0,0 +1,22 @@
+package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
+
+import uk.ac.gla.terrier.utility.Wrapper;
+
+public class SplitAwareWrapper<T> extends Wrapper<T> {
+	protected int splitIndex;
+	
+	public SplitAwareWrapper(int index)
+	{
+		super();
+		splitIndex = index;
+	}
+	
+	public int getSplitIndex(){
+		return splitIndex;
+	}
+	
+	public void setSplitIndex(int index)
+	{
+		splitIndex = index;
+	}
+}
Index: src/uk/ac/gla/terrier/indexing/BasicSinglePassIndexer.java
===================================================================
--- src/uk/ac/gla/terrier/indexing/BasicSinglePassIndexer.java	(revision 2563)
+++ src/uk/ac/gla/terrier/indexing/BasicSinglePassIndexer.java	(working copy)
@@ -178,7 +178,7 @@
 				Document doc = collection.getDocument();
 				if (doc == null)
 					continue;
-				numberOfDocuments++;
+				//numberOfDocuments++;
 				/* setup for parsing */
 				createDocumentPostings();
 
@@ -260,6 +260,30 @@
 		finishedInvertedIndexBuild();
 	}
 
+	/** check to see if a flush is required, and perform if necessary */
+	protected void checkFlush() throws IOException
+	{
+		if(docsPerCheck != numberOfDocsSinceCheck)
+			return;
+		if (! memoryCheck.checkMemory())
+		{
+			numberOfDocsSinceCheck = 0;
+			return;
+		}
+		numberOfDocsSinceCheck = 0;
+		try {
+			mp.finish(finishMemoryPosting());
+		} catch (IOException ioe) {
+			logger.error("Failed writing run", ioe);
+		} catch (Error e) {
+			logger.error("Error writing run out", e);
+		}
+		System.gc();
+		createMemoryPostings();
+		memoryCheck.reset();
+		numberOfDocsSinceFlush = 0;		
+	}
+	
 	/**
 	 * Adds the terms and possibly the field information of a document in
 	 * the current lexicon and in the direct index. It also updates the document index.
@@ -271,26 +295,12 @@
 	protected void indexDocument(Map<String,String> docProperties, DocumentPostingList termsInDocument)
 	{
 		if (termsInDocument.getDocumentLength() > 0) {
-			numberOfDocsSinceCheck++; numberOfDocsSinceFlush++;
-			if(docsPerCheck == numberOfDocsSinceCheck){
-				if (memoryCheck.checkMemory())
-				{
-					try {
-						mp.finish(finishMemoryPosting());
-					} catch (IOException ioe) {
-						logger.error("Failed writing run at doc "+docProperties.get("docno"), ioe);
-					} catch (Error e) {
-						logger.error("Error writing run out at doc "+docProperties.get("docno"), e);
-					}
-					System.gc();
-					createMemoryPostings();
-					memoryCheck.reset();
-					numberOfDocsSinceFlush = 0;
-				}
-				numberOfDocsSinceCheck = 0;
-			}
+			numberOfDocsSinceCheck++;
+			numberOfDocsSinceFlush++;
 			
 			try{
+				checkFlush();
+				
 				mp.addTerms(termsInDocument, currentId);
 				docIndexBuilder.addEntryToBuffer(new SimpleDocumentIndexEntry(termsInDocument.getDocumentStatistics()));
 				metaBuilder.writeDocumentEntry(docProperties);
@@ -298,6 +308,7 @@
 				logger.error("Failed to index "+docProperties.get("docno"), ioe);
 			}
 			currentId++;
+			numberOfDocuments++;
 		}
 	}
 
@@ -401,7 +412,6 @@
 
 	@Override
 	protected void load_indexer_properties() {
-		// TODO Auto-generated method stub
 		super.load_indexer_properties();
 		docsPerCheck = ApplicationSetup.DOCS_CHECK_SINGLEPASS;
 		memoryCheck = new RuntimeMemoryChecker();
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/PositionAwareSplit.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/PositionAwareSplit.java	(revision 0)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/PositionAwareSplit.java	(revision 0)
@@ -0,0 +1,83 @@
+package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+public class PositionAwareSplit<T extends InputSplit> implements InputSplit{
+
+	
+	protected T split;
+	protected int splitnum;
+	
+	public PositionAwareSplit() {
+		splitnum=-1;
+	}
+	
+	public PositionAwareSplit(T _split, int _splitnum) {
+		split = _split;
+		splitnum = _splitnum;
+	}
+
+	/**
+	 * @return the splitnum
+	 */
+	public int getSplitIndex() {
+		return splitnum;
+	}
+
+	/**
+	 * @param splitnum the splitnum to set
+	 */
+	public void setSplitIndex(int splitnum) {
+		this.splitnum = splitnum;
+	}
+
+	/**
+	 * @return the split
+	 */
+	public InputSplit getSplit() {
+		return split;
+	}
+
+	/**
+	 * @param split the split to set
+	 */
+	public void setSplit(T split) {
+		this.split = split;
+	}
+
+	public long getLength() throws IOException {
+		return split.getLength();
+	}
+
+	public String[] getLocations() throws IOException {
+		return split.getLocations();
+	}
+
+	@SuppressWarnings("unchecked")
+	public void readFields(DataInput arg0) throws IOException {
+		try {
+			String className = arg0.readUTF();
+
+			Class<?> c = Class.forName(className, false, this.getClass().getClassLoader());
+			split = (T)c.getConstructor(new Class[0]) .newInstance(new Object[0]);
+
+			split.readFields(arg0);
+			splitnum = arg0.readInt();
+		} catch (Exception e) {
+			throw new IOException("Error during the reading of fields of a new PositionAwareSplit");
+		} 
+	}
+
+	public void write(DataOutput arg0) throws IOException {
+		arg0.writeUTF(split.getClass().getName());
+		split.write(arg0);
+		arg0.writeInt(splitnum);
+	}
+	
+	
+	
+}
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java	(revision 2563)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java	(working copy)
@@ -29,16 +29,16 @@
 package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
 
 import java.io.IOException;
-import org.apache.hadoop.mapred.JobConf;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
 
 import uk.ac.gla.terrier.indexing.Collection;
 import uk.ac.gla.terrier.indexing.Document;
-import uk.ac.gla.terrier.utility.Wrapper;
-import uk.ac.gla.terrier.utility.io.WrappedIOException;
 import uk.ac.gla.terrier.utility.io.HadoopUtility;
+import uk.ac.gla.terrier.utility.io.WrappedIOException;
 
 /** An abstract class which provides ways to index a collection, based on
  * a predetermined InputSplit type.
@@ -46,8 +46,9 @@
  * @version $Revision: 1.2 $
  * @param <SPLITTYPE> The subclass of InputSplit that this class should work with
  */
-public abstract class CollectionRecordReader<SPLITTYPE extends InputSplit> {
-
+public abstract class CollectionRecordReader<SPLITTYPE extends PositionAwareSplit<?>>
+	implements RecordReader<Text, SplitAwareWrapper<Document>>
+{
 	/** document collection currently being iterated through. starts as null */
 	protected Collection documentCollection = null;
 	/** the files in this split */
@@ -90,8 +91,8 @@
 	/** Create a new Text value,
 	 * each value is a document
 	 */
-	public Wrapper<Document> createValue() {
-		return new Wrapper<Document>();
+	public SplitAwareWrapper<Document> createValue() {
+		return new SplitAwareWrapper<Document>(split.getSplitIndex());
 	}
 	
 	/**
@@ -112,7 +113,7 @@
 	 * document. Returns true if another document exists
 	 * otherwise returns false.
 	 */
-	public boolean next(Text DocID, Wrapper<Document> document) throws IOException { 
+	public boolean next(Text DocID, SplitAwareWrapper<Document> document) throws IOException { 
 		if (documentCollection == null)
 		{	
 			documentCollection = openCollectionSplit(collectionIndex);
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapData.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapData.java	(revision 2563)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapData.java	(working copy)
@@ -30,6 +30,8 @@
 import java.io.IOException;
 import java.util.LinkedList;
 
+import org.apache.hadoop.mapred.TaskID;
+
 /**
  * Storage class for information about each Map. 
  * Stores the number of the Map, the number of documents processed
@@ -39,7 +41,7 @@
  * @since 2.2
  * @version $Revision: 1.2 $
  */
-public class MapData {
+public class MapData implements Comparable<MapData>{
 	
 	/** TaskID of the Map */
 	protected String mapTaskID;
@@ -47,6 +49,9 @@
 	protected int numMapDocs;
 	/** Number of Documents in each flush of the map */
 	protected LinkedList<Integer> flushDocSizes = new LinkedList<Integer>();
+	/** The Split number **/
+	protected int splitnum;
+	protected int int_mapTaskId;
 	
 	/**
 	 * Constructor - Loads the Map Information from the DataInputStream Provided
@@ -55,18 +60,26 @@
 	public MapData(DataInputStream in) throws IOException{
 		super();
 		mapTaskID = in.readUTF();
+		int_mapTaskId = TaskID.forName(mapTaskID).getId();
 		int flushSize;
 		while ((flushSize = in.readInt()) != -1)
 		{
 			flushDocSizes.add(flushSize);
 		}
 		numMapDocs = in.readInt();
-		System.err.printf("map %s had %d docs, with %d flushes\n", mapTaskID, numMapDocs, flushDocSizes.size());
+		splitnum = in.readInt();
+		System.err.printf("map %s processed split %d which had %d docs, with %d flushes\n", mapTaskID, splitnum, numMapDocs, flushDocSizes.size());
 	}
 
 	public String getMap() {
 		return mapTaskID;
 	}
+	
+	public int getMapId()
+	{
+		return int_mapTaskId;
+	}
+	
 	public int getMapDocs() {
 		return numMapDocs;
 	}
@@ -79,4 +92,17 @@
 	public LinkedList<Integer> getFlushDocSizes() {
 		return flushDocSizes;//size of each run in documents
 	}
+
+	public int compareTo(MapData o) {
+		return splitnum-o.splitnum;
+	}
+
+	/**
+	 * @return the splitnum
+	 */
+	public int getSplitnum() {
+		return splitnum;
+	}
+	
+	
 }
Index: src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java
===================================================================
--- src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java	(revision 2563)
+++ src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java	(working copy)
@@ -31,6 +31,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
@@ -67,13 +68,14 @@
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.HadoopRunIteratorFactory;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.HadoopRunWriter;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.HadoopRunsMerger;
+import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.IDComparator;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapData;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedPostingList;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTerm;
+import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.SplitAwareWrapper;
 import uk.ac.gla.terrier.utility.ApplicationSetup;
 import uk.ac.gla.terrier.utility.FieldScore;
 import uk.ac.gla.terrier.utility.Files;
-import uk.ac.gla.terrier.utility.Wrapper;
 import uk.ac.gla.terrier.utility.io.HadoopPlugin;
 import uk.ac.gla.terrier.utility.io.HadoopUtility;
 import uk.ac.gla.terrier.utility.io.WrappedIOException;
@@ -102,7 +104,7 @@
  */
 public class Hadoop_BasicSinglePassIndexer 
 	extends BasicSinglePassIndexer 
-	implements Mapper<Text, Wrapper<Document>, MapEmittedTerm, MapEmittedPostingList>,
+	implements Mapper<Text, SplitAwareWrapper<Document>, MapEmittedTerm, MapEmittedPostingList>,
 	Reducer<MapEmittedTerm, MapEmittedPostingList, Object, Object>
 {
 
@@ -144,6 +146,10 @@
 	/** JobConf of the current running job */	
 	protected JobConf jc;
 
+	/** The split that these documents came form **/
+	protected int splitnum;
+	protected boolean start;
+	
 	/**
 	 * Empty constructor. 
 	 */
@@ -271,21 +277,29 @@
 	 * @throws IOException
 	 */
 	public void map(
-			Text key, Wrapper<Document> value, 
+			Text key, SplitAwareWrapper<Document> value, 
 			OutputCollector<MapEmittedTerm, MapEmittedPostingList> _outputPostingListCollector, 
 			Reporter reporter) 
 		throws IOException 
 	{
+		
+		
 		final String docno = key.toString();
 		reporter.setStatus("Currently indexing "+docno);
 		final Document doc = value.getObject();
+		
+		if (start) {
+			splitnum = value.getSplitIndex();
+			start = false;
+		}
+		
 		this.outputPostingListCollector = _outputPostingListCollector;
 		
 		/* setup for parsing */
 		createDocumentPostings();
 		String term;//term we're currently processing
 		numOfTokensInDocument = 0;
-		numberOfDocuments++;
+		//numberOfDocuments++;
 		//get each term in the document
 		while (!doc.endOfDocument()) {
 			reporter.progress();
@@ -319,29 +333,27 @@
 			reporter.progress();
 		}
 		termsInDocument.clear();
-		
-		// check to see if we should flush
-		numberOfDocsSinceCheck++; numberOfDocsSinceFlush++;
-		if(docsPerCheck == numberOfDocsSinceCheck)
+	}
+	
+	protected void checkFlush() throws IOException
+	{
+		if(docsPerCheck != numberOfDocsSinceCheck)
+			return;
+		if (! memoryCheck.checkMemory())
 		{
-			if (memoryCheck.checkMemory())
-			{
-				logger.info("Memory running low, flush requested");
-				forceFlush();
-				// clear memory
-				createMemoryPostings();
-				currentId = 0;
-				numberOfDocsSinceFlush = 0;
-				flushNo++;
-				System.gc();
-				memoryCheck.reset();
-			}
 			numberOfDocsSinceCheck = 0;
+			return;
 		}
-
+		numberOfDocsSinceCheck = 0;
+		forceFlush();
+		System.gc();
+		createMemoryPostings();
+		memoryCheck.reset();
+		numberOfDocsSinceFlush = 0;
+		currentId = 0;
+		flushNo++;
 	}
 	
-	
 	/**
 	 * Write the empty document to the inverted index
 	 */
@@ -374,6 +386,7 @@
 		currentIndex.close();
 		RunData.writeInt(-1);
 		RunData.writeInt(numberOfDocuments);
+		RunData.writeInt(splitnum);
 		RunData.close();
 		logger.info("Map "+mapTaskID+ " finishing, indexed "+numberOfDocuments+ " in "+flushNo+" flushes");
 	}
@@ -395,6 +408,7 @@
 	protected void configureReduce() throws Exception
 	{	
 		super.init();
+		start = true;
 		//load in the current index
 		final Path indexDestination = FileOutputFormat.getWorkOutputPath(jc);
 		path = indexDestination.toString();
@@ -459,6 +473,8 @@
 			runData.add(tempHRD);
 			runDataIn.close();
 		}
+		Collections.sort(runData);
+		Collections.sort(mapTaskIDs, new IDComparator(runData));
 		MapIndexPrefixes = mapTaskIDs.toArray(new String[0]);
 		return runData;
 	}
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java	(revision 2563)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java	(working copy)
@@ -133,6 +133,56 @@
 
 	}
 	
+	/**
+	 * Raw Comparator class to compare MapEmittedTerm objects
+	 * stage 1. (Order by term, then by map number). In this case
+	 * the map number is stored as an int rather than as a Map
+	 * task ID and therefore is parsed. This is used for the
+	 * SimpleIndexing variants as they do not flush out data in
+	 * the same manner.
+	 * @author Richard McCreadie and Craig Macdonald
+	 * @since 2.2.1
+	 * @version $Revision: 1.0 $
+	 */
+	public static class TermMapComparator implements RawComparator<MapEmittedTerm> {
+		
+		protected MapEmittedTerm tempT = new MapEmittedTerm();
+		protected MapEmittedTerm tempT2 = new MapEmittedTerm();
+		
+		public int compare(MapEmittedTerm a, MapEmittedTerm b) {
+			throw new Error("Unsupported method Indexing_CompareTextPlusKey.compare(Indexing_TextPlus,Indexing_TextPlus) was called");
+			//richard's documentation say that this method is not used
+			//Richard: Indeed, it is not used - because it is a RawComparator it uses the byte[] compare instead.
+		}
+
+		/**
+		 * Compare by term (bit comparison on Text object) then by map number (int)
+		 * then by flush number (int).
+		 */
+		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+			// Convert to streams so that the read methods can be used
+			DataInputStream b1S = new DataInputStream(new ByteArrayInputStream(b1, s1, l1));
+			DataInputStream b2S = new DataInputStream(new ByteArrayInputStream(b2, s2, l2));
+			try {
+				// Read in the TextPlus Objects
+				tempT.readFields(b1S);
+				tempT2.readFields(b2S);
+				b1S.close();
+				b2S.close();
+				// Do Comparison Text
+				int value = tempT.getText().compareTo(tempT2.getText());
+				if (value != 0)
+					return value;
+				// If same do Comparison on map task id
+				value = Integer.parseInt(tempT.getMap()) - Integer.parseInt(tempT2.getMap());
+				return value;
+			} catch (IOException e) {
+				return 0;
+			}
+		}
+
+	}
+	
 	/** The Map this Term was processed from */
 	protected String mapTaskID;
 	/** The Flush number this term was from */
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileSplit.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileSplit.java	(revision 0)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileSplit.java	(revision 0)
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * A sub-collection of input files. Unlike {@link FileSplit}, MultiFileSplit 
+ * class does not represent a split of a file, but a split of input files 
+ * into smaller sets. The atomic unit of split is a file. <br> 
+ * MultiFileSplit can be used to implement {@link RecordReader}'s, with 
+ * reading one record per file.
+ * @see FileSplit
+ * @see MultiFileInputFormat 
+ */
+public class MultiFileSplit implements InputSplit {
+
+  private Path[] paths;
+  private long[] lengths;
+  private long totLength;
+  private JobConf job;
+
+  public MultiFileSplit() {}
+  
+  public MultiFileSplit(JobConf job, Path[] files, long[] lengths) {
+    this.job = job;
+    this.lengths = lengths;
+    this.paths = files;
+    this.totLength = 0;
+    for(long length : lengths) {
+      totLength += length;
+    }
+  }
+
+  public long getLength() {
+    return totLength;
+  }
+  
+  /** Returns an array containing the lengths of the files in 
+   * the split*/ 
+  public long[] getLengths() {
+    return lengths;
+  }
+  
+  /** Returns the length of the i<sup>th</sup> Path */
+  public long getLength(int i) {
+    return lengths[i];
+  }
+  
+  /** Returns the number of Paths in the split */
+  public int getNumPaths() {
+    return paths.length;
+  }
+
+  /** Returns the i<sup>th</sup> Path */
+  public Path getPath(int i) {
+    return paths[i];
+  }
+  
+  /** Returns all the Paths in the split */
+  public Path[] getPaths() {
+    return paths;
+  }
+
+  public String[] getLocations() throws IOException {
+    HashSet<String> hostSet = new HashSet<String>();
+    JobClient jClient = new JobClient(job);
+    FileSystem fs = jClient.getFs();
+    for (Path file : paths) {
+      FileStatus status = fs.getFileStatus(file);
+      BlockLocation[] blkLocations = fs.getFileBlockLocations(status,
+                                          0, status.getLen());
+      if (blkLocations != null && blkLocations.length > 0) {
+        addToSet(hostSet, blkLocations[0].getHosts());
+      }
+    }
+    return hostSet.toArray(new String[hostSet.size()]);
+  }
+
+  private void addToSet(Set<String> set, String[] array) {
+    for(String s:array)
+      set.add(s); 
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    totLength = in.readLong();
+    int arrLength = in.readInt();
+    lengths = new long[arrLength];
+    for(int i=0; i<arrLength;i++) {
+      lengths[i] = in.readLong();
+    }
+    int filesLength = in.readInt();
+    paths = new Path[filesLength];
+    for(int i=0; i<filesLength;i++) {
+      paths[i] = new Path(Text.readString(in));
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(totLength);
+    out.writeInt(lengths.length);
+    for(long length : lengths)
+      out.writeLong(length);
+    out.writeInt(paths.length);
+    for(Path p : paths) {
+      Text.writeString(out, p.toString());
+    }
+  }
+  
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    for(int i=0; i < paths.length; i++) {
+      sb.append(paths[i].toUri().getPath() + ":0+" + lengths[i]);
+      if (i < paths.length -1) {
+        sb.append("\n");
+      }
+    }
+
+    return sb.toString();
+  }
+}
+
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileCollectionInputFormat.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileCollectionInputFormat.java	(revision 2563)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MultiFileCollectionInputFormat.java	(working copy)
@@ -37,13 +37,11 @@
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MultiFileInputFormat;
-import org.apache.hadoop.mapred.MultiFileSplit;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
 import uk.ac.gla.terrier.indexing.Document;
-import uk.ac.gla.terrier.utility.Wrapper;
 
 /**
  * Input Format Class for Hadoop Indexing. Splits the input collection into
@@ -55,12 +53,13 @@
  * @since 2.2
  * @version $Revision: 1.2 $
  */
-public class MultiFileCollectionInputFormat extends MultiFileInputFormat<Text, Wrapper<Document>>
+public class MultiFileCollectionInputFormat extends MultiFileInputFormat<Text, SplitAwareWrapper<Document>>
 {
 
 	/** logger for this class */
 	protected static final Logger logger = Logger.getLogger(MultiFileCollectionInputFormat.class);
 	
+	@SuppressWarnings("unchecked")
 	@Override
 	/**
 	 * Instantiates a FileCollectionRecordReader using the specified spit (which is
@@ -69,14 +68,14 @@
 	 * @param job JobConf of this job
 	 * @param reported To report progress
 	 */
-	public RecordReader<Text, Wrapper<Document>> getRecordReader(
+	public RecordReader<Text, SplitAwareWrapper<Document>> getRecordReader(
 			InputSplit genericSplit, 
 			JobConf job,
             Reporter reporter) 
 		throws IOException 
 	{
 		reporter.setStatus(genericSplit.toString());
-	    return new FileCollectionRecordReader(job, (MultiFileSplit) genericSplit);
+	    return new FileCollectionRecordReader(job, (PositionAwareSplit<MultiFileSplit>) genericSplit);
 	}
 	
 	@Override
@@ -107,7 +106,7 @@
 			numSplits = 1;
 		}
 		logger.info("Allocating "+paths.length+ " files across "+numSplits +" map tasks");
-		List<MultiFileSplit> splits = new ArrayList<MultiFileSplit>(numSplits);
+		List<PositionAwareSplit<MultiFileSplit>> splits = new ArrayList<PositionAwareSplit<MultiFileSplit>>(numSplits);
 		long[] lengths = new long[paths.length];
 		long totLength = 0;
 		final FileSystem fs = FileSystem.get(job);	
@@ -121,7 +120,9 @@
 		final int numberOfFilesPerSplit = paths.length / numSplits;
 
 		int index = 0;
+		int splitnum = 0;
 		// for each split except the last one (which may be larger than numberOfFilesPerSplit)
+		MultiFileSplit mfs;
 		for(int i=0; i<numSplits-1; i++) 
 		{
 			Path[] splitPaths = new Path[numberOfFilesPerSplit];
@@ -132,7 +133,9 @@
 				index++;
 				pathsUsed++;
 			}
-			splits.add(new MultiFileSplit(job, splitPaths, splitLengths));
+			mfs = new MultiFileSplit(job, splitPaths, splitLengths);
+			splits.add(new PositionAwareSplit<MultiFileSplit>(mfs, splitnum));
+			splitnum++;
 		}
 
 		// Now do the last one containing remaining files
@@ -145,12 +148,14 @@
 			index++;
 			pathsUsed++;
 		}
-		splits.add(new MultiFileSplit(job, splitPaths, splitLengths)); 
+		mfs = new MultiFileSplit(job, splitPaths, splitLengths);
+		splits.add(new PositionAwareSplit<MultiFileSplit>(mfs, splitnum));
+		splitnum++;
 
 		if (!(pathsUsed==paths.length)) {
 			throw new IOException("Number of used paths does not equal total available paths!");
 		}
-		return splits.toArray(new MultiFileSplit[splits.size()]);    
+		return splits.toArray(new PositionAwareSplit[splits.size()]);    
 	}
 
 }

