Index: src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java
===================================================================
--- src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java	(revision 2659)
+++ src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java	(working copy)
@@ -50,6 +50,7 @@
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskID;
 
+import uk.ac.gla.terrier.compression.BitIn;
 import uk.ac.gla.terrier.compression.BitOutputStream;
 import uk.ac.gla.terrier.indexing.BasicSinglePassIndexer;
 import uk.ac.gla.terrier.indexing.Document;
@@ -74,6 +75,7 @@
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedPostingList;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTerm;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTermByMapPartitioner;
+import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTermBySplitPartitioner;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.SplitAwareWrapper;
 import uk.ac.gla.terrier.utility.ApplicationSetup;
 import uk.ac.gla.terrier.utility.ArrayUtils;
@@ -291,7 +293,7 @@
 		logger.info("Map "+mapTaskID+", flush requested, containing "+numberOfDocsSinceFlush+" documents, flush "+flushNo);
 		if (mp == null)
 			throw new IOException("Map flushed before any documents were indexed");
-		mp.finish(new HadoopRunWriter(outputPostingListCollector, mapTaskID, flushNo));
+		mp.finish(new HadoopRunWriter(outputPostingListCollector, mapTaskID, splitnum, flushNo));
 		RunData.writeInt(currentId);
 		currentReporter.incrCounter(Counters.INDEXER_FLUSHES, 1);
 		System.gc();
@@ -323,7 +325,8 @@
 		final Document doc = value.getObject();
 		
 		if (start) {
-			splitnum = value.getSplitIndex();
+			//splitnum = value.getSplitIndex();
+			RunData.writeInt(splitnum);
 			start = false;
 		}
 		
@@ -455,10 +458,8 @@
 		final LinkedList<MapData> runData = new LinkedList<MapData>();
 		DataInputStream runDataIn;
 	
-		final MapEmittedTermByMapPartitioner partitionChecker = new MapEmittedTermByMapPartitioner();
-		partitionChecker.configure(jc);
 		final String jobId = TaskAttemptID.forName(jc.get("mapred.task.id")).getJobID().toString().replaceAll("job", "task");
-		final int thisPartition = TaskAttemptID.forName(jc.get("mapred.task.id")).getTaskID().getId();
+		
 		final FileStatus[] files = FileSystem.get(jc).listStatus(
 			FileOutputFormat.getOutputPath(jc), 
 			new org.apache.hadoop.fs.PathFilter()
@@ -471,12 +472,12 @@
 						return false;
 					
 					//2. is this run part of the maps allocated to us?
-					final TaskID t = TaskID.forName(name.replaceAll("\\.runs$", "") );
-					final int targetP = partitionChecker.calculatePartition(t, jc.getNumReduceTasks());
-					if (thisPartition != targetP)
-					{
-						return false;
-					}
+					//final TaskID t = TaskID.forName(name.replaceAll("\\.runs$", "") );
+					//final int targetP = partitionChecker.calculatePartition(t, jc.getNumReduceTasks());
+					//if (thisPartition != targetP)
+					//{
+					//	return false;
+					//}
 					return true;
 				}
 			}
@@ -487,6 +488,11 @@
 			throw new IOException("No run status files found in "+FileOutputFormat.getOutputPath(jc));
 		}
 		
+		final int thisPartition = TaskAttemptID.forName(jc.get("mapred.task.id")).getTaskID().getId();
+		final MapEmittedTermBySplitPartitioner partitionChecker = new MapEmittedTermBySplitPartitioner();
+		partitionChecker.configure(jc);
+		
+		
 		//TaskID previousMapTaskID = null;
 		MapData tempHRD;
 		for (FileStatus file : files) 
@@ -494,11 +500,11 @@
 			logger.info("Run data file "+ file.getPath().toString()+" has length "+Files.length(file.getPath().toString()));
 			runDataIn = new DataInputStream(Files.openFileStream(file.getPath().toString()));
 			tempHRD = new MapData(runDataIn);
-			// Sanity Check the file ordering
+			//check to see if this file contaned our split information
+			if (partitionChecker.calculatePartition(tempHRD.getSplitnum(), jc.getNumReduceTasks()) != thisPartition)
+				continue;
 			
 			mapTaskIDs.add(tempHRD.getMap());
-			//TaskID thisMapTaskID = TaskID.forName(tempHRD.getMap());
-			//previousMapTaskID = thisMapTaskID;
 			runData.add(tempHRD);
 			runDataIn.close();
 		}
@@ -661,7 +667,7 @@
 		try{
 			tempRM.setBos(new BitOutputStream(
 					currentIndex.getPath() + ApplicationSetup.FILE_SEPARATOR 
-					+ currentIndex.getPrefix() + ".inverted.bf"));
+					+ currentIndex.getPrefix() + ".inverted" + BitIn.USUAL_EXTENSION));
 		} catch (IOException ioe) {
 			ioe.printStackTrace();
 		}
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunWriter.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunWriter.java	(revision 2659)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunWriter.java	(working copy)
@@ -45,6 +45,7 @@
 	protected String mapId;
 	/** flushNo is the number of times this map task is being flushed */
 	protected int flushNo;
+	protected int splitId;
 	
 	/** Create a new HadoopRunWriter, specifying the output collector of the map task
 	 * the run number and the flush number.
@@ -53,7 +54,7 @@
 	 * @param _flushNo the number of times that this map task has flushed
 	 */
 	public HadoopRunWriter(OutputCollector<MapEmittedTerm, MapEmittedPostingList> _outputCollector,
-			String _mapId, int _flushNo)
+			String _mapId, int _splitId, int _flushNo)
 	{
 		this.outputCollector = _outputCollector;
 		this.mapId = _mapId;
@@ -80,7 +81,7 @@
 		
 		//emit the term and its posting list
 		outputCollector.collect(
-				MapEmittedTerm.create_TextPlus(term, mapId, flushNo), 
+				MapEmittedTerm.create_TextPlus(term, mapId, splitId, flushNo), 
 				MapEmittedPostingList.create_Hadoop_WritableRunPostingData(
 						mapId,
 						flushNo, 
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedPostingList.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedPostingList.java	(revision 2659)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedPostingList.java	(working copy)
@@ -45,6 +45,8 @@
 	/** The Flush Number */
 	protected int Run;
 	
+	protected int splitNo;
+	
 	/**
 	 * Constructor
 	 * @param map - Map task id
@@ -128,14 +130,14 @@
 	/**
 	 * Reads this object from the input stream 'in' 
 	 */
-	public void readFields(DataInput arg0) throws IOException {
-		arraylength = arg0.readInt();
-		Map = arg0.readUTF();
-		Run = arg0.readInt();
-		DocumentFreq = arg0.readInt();
-		TermFreq = arg0.readInt();
+	public void readFields(DataInput in) throws IOException {
+		arraylength = in.readInt();
+		Map = in.readUTF();
+		Run = in.readInt();
+		DocumentFreq = in.readInt();
+		TermFreq = in.readInt();
 		array = new byte[arraylength];
-		arg0.readFully(array);
+		in.readFully(array);
 		//System.err.println("DEBUG: Finished Read, ArrayL:"+arraylength+" RunNo:"+Run+" DocF:"+DocumentFreq+" TermF:"+TermFreq+" Buffer:"+array.toString());
 		
 	}
@@ -143,15 +145,15 @@
 	/**
 	 * Reads this object from the input stream 'in' apart from the
 	 * array. 
-	 * @param arg0
+	 * @param in
 	 * @throws IOException
 	 */
-	public void readFieldsMinusArray(DataInput arg0) throws IOException {
-		arraylength = arg0.readInt();
-		Map = arg0.readUTF();
-		Run = arg0.readInt();
-		DocumentFreq = arg0.readInt();
-		TermFreq = arg0.readInt();
+	public void readFieldsMinusArray(DataInput in) throws IOException {
+		arraylength = in.readInt();
+		Map = in.readUTF();
+		Run = in.readInt();
+		DocumentFreq = in.readInt();
+		TermFreq = in.readInt();
 		array = new byte[1];
 		//System.err.println("DEBUG: Finished Read, ArrayL:"+arraylength+" RunNo:"+Run+" DocF:"+DocumentFreq+" TermF:"+TermFreq+" Buffer:"+array.toString());
 		
@@ -158,13 +160,13 @@
 	}
 
 	/** Write this object to the output stream 'out' */
-	public void write(DataOutput arg0) throws IOException {
-		arg0.writeInt(array.length);
-		arg0.writeUTF(Map);
-		arg0.writeInt(Run);
-		arg0.writeInt(DocumentFreq);
-		arg0.writeInt(TermFreq);
-		arg0.write(array);
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(array.length);
+		out.writeUTF(Map);
+		out.writeInt(Run);
+		out.writeInt(DocumentFreq);
+		out.writeInt(TermFreq);
+		out.write(array);
 		//System.err.println("DEBUG: Finished Write, ArrayL:"+array.length+" RunNo:"+Run+" DocF:"+DocumentFreq+" TermF:"+TermFreq+" Buffer:"+array.toString());
 	}
 	
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java	(revision 2659)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java	(working copy)
@@ -185,6 +185,7 @@
 	
 	/** The Map this Term was processed from */
 	protected String mapTaskID;
+	protected int splitId;
 	/** The Flush number this term was from */
 	protected int flushNumber;
 	/** The Term */
@@ -203,8 +204,9 @@
 	 * @param _mapTaskID - Map Number
 	 * @param _flushNumber - Flush Number
 	 */
-	public MapEmittedTerm(String s, String _mapTaskID, int _flushNumber) {
+	public MapEmittedTerm(String s, String _mapTaskID, int _splitId, int _flushNumber) {
 		mapTaskID = _mapTaskID;
+		splitId = _splitId;
 		flushNumber = _flushNumber;
 		text = new Text(s);
 	}
@@ -216,9 +218,10 @@
 	 * @param b - Flush Number
 	 * @return a newly created Indexing_TextPlus
 	 */
-	public static MapEmittedTerm create_TextPlus(String s, String a, int b) {
+	public static MapEmittedTerm create_TextPlus(String s, String a, int splitId,  int b) {
 		MapEmittedTerm temp = new MapEmittedTerm();
 		temp.setMap(a);
+		temp.setSplitId(splitId);
 		temp.setFlush(b);
 		temp.setText(new Text(s));
 		return temp;
@@ -253,6 +256,7 @@
 	 */
 	public void readFields(DataInput in) throws IOException {
 		mapTaskID = in.readUTF();
+		splitId = in.readInt();
 		flushNumber = in.readInt();
 		text = new Text();
 		text.readFields(in);
@@ -264,6 +268,7 @@
 	 */
 	public void write(DataOutput out) throws IOException {
 		out.writeUTF(mapTaskID);
+		out.writeInt(splitId);
 		out.writeInt(flushNumber);
 		text.write(out);
 	}
@@ -275,4 +280,12 @@
 		return this.getText().compareTo(o.getText());
 	}
 
+	public int getSplitId() {
+		return splitId;
+	}
+
+	public void setSplitId(int splitId) {
+		this.splitId = splitId;
+	}
+
 }
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermBySplitPartitioner.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermBySplitPartitioner.java	(revision 0)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermBySplitPartitioner.java	(revision 0)
@@ -0,0 +1,74 @@
+/*
+ * Terrier - Terabyte Retriever
+ * Webpage: http://ir.dcs.gla.ac.uk/terrier
+ * Contact: terrier{a.}dcs.gla.ac.uk
+ * University of Glasgow - Department of Computing Science
+ * http://www.gla.uk
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * The Original Code is MapEmittedTermByMapPartitioner.java.
+ *
+ * The Original Code is Copyright (C) 2004-2009 the University of Glasgow.
+ * All Rights Reserved.
+ *
+ * Contributor(s):
+ *   Richard McCreadie <richardm{a.}dcs.gla.ac.uk> (original author)
+ *   Craig Macdonald <craigm{a.}dcs.gla.ac.uk> 
+ */
+
+package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
+
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.TaskID;
+
+/**
+ * Partitions the term postings lists from the split id,
+ * such that the created indexes is partitioned evenly across
+ * the reducers. This partitioner partitions by an even splits 
+ * @author Richard McCreadie and Craig Macdonald
+ * @version $Revision: 1.2 $
+ * @since 3.0
+ */
+public class MapEmittedTermBySplitPartitioner implements JobConfigurable, Partitioner<MapEmittedTerm, MapEmittedPostingList>  {
+
+	protected int numSplits = -1;
+
+
+	public void configure(JobConf job)
+	{
+		//TODO: this looks like a hack, because it is expensive to calculate
+		try{
+			numSplits = job.getInputFormat().getSplits(job, job.getNumMapTasks()).length;
+		} catch (Exception e) {
+			System.err.println("Failed to calculate number of splits correctly");
+		}
+	}
+	
+	public int calculatePartition(int splitId, int numPartitions) {
+		//final int mapNumber = task.getId();
+		//final int partitionSize = (int) (Math.ceil( ((double)numMapTasks)/(double)numPartitions));
+		final int partitionSize = (int) (Math.ceil((double)numSplits / (double) numPartitions );
+		return splitId / partitionSize;
+	}
+	
+	/**
+	 * Forces map output from adjacent splits to mostly go to the same reducer
+	 */
+	public int getPartition(MapEmittedTerm key, MapEmittedPostingList value, int numPartitions)
+	{
+		return this.calculatePartition(key.getSplitId(), numPartitions);
+	}
+
+}

