Index: src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java
===================================================================
--- src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java	(revision 2622)
+++ src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java	(working copy)
@@ -65,6 +65,7 @@
 import uk.ac.gla.terrier.structures.indexing.singlepass.FieldPostingInRun;
 import uk.ac.gla.terrier.structures.indexing.singlepass.RunsMerger;
 import uk.ac.gla.terrier.structures.indexing.singlepass.SimplePostingInRun;
+import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.ByMapPartitioner;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.HadoopRunIteratorFactory;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.HadoopRunWriter;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.HadoopRunsMerger;
@@ -434,15 +435,30 @@
 		final LinkedList<MapData> runData = new LinkedList<MapData>();
 		DataInputStream runDataIn;
 	
+		final ByMapPartitioner partitionChecker = new ByMapPartitioner();
+		partitionChecker.configure(jc);
 		final String jobId = TaskAttemptID.forName(jc.get("mapred.task.id")).getJobID().toString().replaceAll("job", "task");
+		final int thisPartition = TaskAttemptID.forName(jc.get("mapred.task.id")).getTaskID().getId();
 		final FileStatus[] files = FileSystem.get(jc).listStatus(
 			FileOutputFormat.getOutputPath(jc), 
 			new org.apache.hadoop.fs.PathFilter()
 			{ 
 				public boolean accept(Path path)
 				{ 
+					
 					final String name = path.getName();
-					return name.startsWith( jobId )  && name.endsWith(".runs"); 
+					//1. is this a run file
+					if (!(  name.startsWith( jobId )  && name.endsWith(".runs")))
+						return false;
+					
+					//2. is this run part of the maps allocated to us?
+					final TaskID t = TaskID.forName(name.replaceAll("\\.runs$", "") );
+					final int targetP = partitionChecker.calculatePartition(t, jc.getNumReduceTasks());
+					if (thisPartition != targetP)
+					{
+						return false;
+					}
+					return true;
 				}
 			}
 		);
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/ByMapPartitioner.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/ByMapPartitioner.java	(revision 2606)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/ByMapPartitioner.java	(working copy)
@@ -1,67 +0,0 @@
-/*
- * Terrier - Terabyte Retriever
- * Webpage: http://ir.dcs.gla.ac.uk/terrier
- * Contact: terrier{a.}dcs.gla.ac.uk
- * University of Glasgow - Department of Computing Science
- * http://www.gla.uk
- *
- * The contents of this file are subject to the Mozilla Public License
- * Version 1.1 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- * http://www.mozilla.org/MPL/
- *
- * Software distributed under the License is distributed on an "AS IS"
- * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- * the License for the specific language governing rights and limitations
- * under the License.
- *
- * The Original Code is ByMapPartitioner.java.
- *
- * The Original Code is Copyright (C) 2004-2009 the University of Glasgow.
- * All Rights Reserved.
- *
- * Contributor(s):
- *   Richard McCreadie <richardm{a.}dcs.gla.ac.uk> (original author)
- *   Craig Macdonald <craigm{a.}dcs.gla.ac.uk> 
- */
-
-package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
-
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.mapred.TaskID;
-
-/**
- * Partitions the term postings lists from the map function,
- * such that the created indexes is partitioned evenly across
- * the reducers. This partitioner partitions by an even number of
- * maps, assuming that Map sizes are approximately equal.
- * @author Richard McCreadie and Craig Macdonald
- * @version $Revision: 1.2 $
- * @since 2.2
- */
-public class ByMapPartitioner implements JobConfigurable, Partitioner<MapEmittedTerm, MapEmittedPostingList>{
-
-	protected int numMapTasks = -1;
-	public void configure(JobConf job)
-	{
-		numMapTasks = job.getNumMapTasks();
-	}
-	
-	/**
-	 * Forces each Map output to get its own reduce step
-	 */
-	public int getPartition(MapEmittedTerm key, MapEmittedPostingList value, int numPartitions) {
-		final int mapNumber = getTaskID(value.getMap());
-		final int partitionSize = (int) (Math.ceil( ((double)numMapTasks)/(double)numPartitions));
-		return mapNumber / partitionSize;
-	}
-
-	protected static int getTaskID(String id)
-	{
-		return TaskID.forName(id).getId();
-	}
-
-}
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermByMapPartitioner.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermByMapPartitioner.java	(revision 2606)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermByMapPartitioner.java	(working copy)
@@ -15,7 +15,7 @@
  * the License for the specific language governing rights and limitations
  * under the License.
  *
- * The Original Code is ByMapPartitioner.java.
+ * The Original Code is MapEmittedTermByMapPartitioner.java.
  *
  * The Original Code is Copyright (C) 2004-2009 the University of Glasgow.
  * All Rights Reserved.
@@ -28,7 +28,6 @@
 package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
 
 
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.TaskID;
@@ -42,26 +41,13 @@
  * @version $Revision: 1.2 $
  * @since 2.2
  */
-public class ByMapPartitioner implements JobConfigurable, Partitioner<MapEmittedTerm, MapEmittedPostingList>{
+public class MapEmittedTermByMapPartitioner extends ByMapPartitioner implements JobConfigurable, Partitioner<MapEmittedTerm, MapEmittedPostingList>  {
 
-	protected int numMapTasks = -1;
-	public void configure(JobConf job)
-	{
-		numMapTasks = job.getNumMapTasks();
-	}
-	
 	/**
 	 * Forces each Map output to get its own reduce step
 	 */
 	public int getPartition(MapEmittedTerm key, MapEmittedPostingList value, int numPartitions) {
-		final int mapNumber = getTaskID(value.getMap());
-		final int partitionSize = (int) (Math.ceil( ((double)numMapTasks)/(double)numPartitions));
-		return mapNumber / partitionSize;
-	}
-
-	protected static int getTaskID(String id)
-	{
-		return TaskID.forName(id).getId();
+		return super.calculatePartition(TaskID.forName(value.getMap()), numPartitions);
 	}
 
 }
Index: src/uk/ac/gla/terrier/applications/HadoopIndexing.java
===================================================================
--- src/uk/ac/gla/terrier/applications/HadoopIndexing.java	(revision 2606)
+++ src/uk/ac/gla/terrier/applications/HadoopIndexing.java	(working copy)
@@ -47,7 +47,7 @@
 import uk.ac.gla.terrier.indexing.hadoop.Hadoop_BasicSinglePassIndexer;
 import uk.ac.gla.terrier.indexing.hadoop.Hadoop_BlockSinglePassIndexer;
 import uk.ac.gla.terrier.structures.Index;
-import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.ByMapPartitioner;
+import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTermByMapPartitioner;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedPostingList;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTerm;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MultiFileCollectionInputFormat;
@@ -107,6 +107,12 @@
 			logger.info("Partitioned Mode, "+Integer.parseInt(args[1])+" output indices.");
 			numberOfReducers = Integer.parseInt(args[1]);
 		}
+		else
+		{
+			final int r = Integer.parseInt(ApplicationSetup.getProperty("terrier.hadoop.indexing.reducers", "0"));
+			if (r != 0)
+				numberOfReducers = r;
+		}
 		
 		boolean blockIndexing = ApplicationSetup.BLOCK_INDEXING;
 		if (blockIndexing)
@@ -142,7 +148,7 @@
 		conf.setNumReduceTasks(numberOfReducers);
 		if (numberOfReducers> 1)
 		{
-			conf.setPartitionerClass(ByMapPartitioner.class);
+			conf.setPartitionerClass(MapEmittedTermByMapPartitioner.class);
 		}
 		else
 		{

