Index: src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java
===================================================================
--- src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java	(revision 2744)
+++ src/uk/ac/gla/terrier/indexing/hadoop/Hadoop_BasicSinglePassIndexer.java	(working copy)
@@ -48,8 +48,8 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
 
+import uk.ac.gla.terrier.compression.BitIn;
 import uk.ac.gla.terrier.compression.BitOutputStream;
 import uk.ac.gla.terrier.indexing.BasicSinglePassIndexer;
 import uk.ac.gla.terrier.indexing.Document;
@@ -74,9 +74,8 @@
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.IDComparator;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapData;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedPostingList;
-import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTerm;
-import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTermByMapPartitioner;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.SplitAwareWrapper;
+import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.SplitEmittedTerm;
 import uk.ac.gla.terrier.utility.ApplicationSetup;
 import uk.ac.gla.terrier.utility.ArrayUtils;
 import uk.ac.gla.terrier.utility.FieldScore;
@@ -109,8 +108,8 @@
  */
 public class Hadoop_BasicSinglePassIndexer 
 	extends BasicSinglePassIndexer 
-	implements Mapper<Text, SplitAwareWrapper<Document>, MapEmittedTerm, MapEmittedPostingList>,
-	Reducer<MapEmittedTerm, MapEmittedPostingList, Object, Object>
+	implements Mapper<Text, SplitAwareWrapper<Document>, SplitEmittedTerm, MapEmittedPostingList>,
+	Reducer<SplitEmittedTerm, MapEmittedPostingList, Object, Object>
 {
 
 	public static void main(String[] args) throws Exception
@@ -251,7 +250,7 @@
 	 */
 	
 	/** output collector for the current map indexing process */
-	protected OutputCollector<MapEmittedTerm, MapEmittedPostingList> outputPostingListCollector;
+	protected OutputCollector<SplitEmittedTerm, MapEmittedPostingList> outputPostingListCollector;
 	
 	/** Current map number */
 	protected String mapTaskID;
@@ -278,6 +277,7 @@
 						new Path(indexDestination, mapTaskID+".runs").toString())
 				);
 		RunData.writeUTF(mapTaskID);
+		start = true;
 		createMemoryPostings();
 		super.emptyDocIndexEntry = new SimpleDocumentIndexEntry();
 		super.docIndexBuilder = new DocumentIndexBuilder(currentIndex, "document");
@@ -301,7 +301,7 @@
 		logger.info("Map "+mapTaskID+", flush requested, containing "+numberOfDocsSinceFlush+" documents, flush "+flushNo);
 		if (mp == null)
 			throw new IOException("Map flushed before any documents were indexed");
-		mp.finish(new HadoopRunWriter(outputPostingListCollector, mapTaskID, flushNo));
+		mp.finish(new HadoopRunWriter(outputPostingListCollector, mapTaskID, splitnum, flushNo));
 		RunData.writeInt(currentId);
 		if (currentReporter != null)
 			currentReporter.incrCounter(Counters.INDEXER_FLUSHES, 1);
@@ -324,7 +324,7 @@
 	 */
 	public void map(
 			Text key, SplitAwareWrapper<Document> value, 
-			OutputCollector<MapEmittedTerm, MapEmittedPostingList> _outputPostingListCollector, 
+			OutputCollector<SplitEmittedTerm, MapEmittedPostingList> _outputPostingListCollector, 
 			Reporter reporter) 
 		throws IOException 
 	{
@@ -335,6 +335,8 @@
 		
 		if (start) {
 			splitnum = value.getSplitIndex();
+			System.out.println(splitnum);
+			//RunData.writeInt(splitnum);
 			start = false;
 		}
 		
@@ -474,10 +476,8 @@
 		final LinkedList<MapData> runData = new LinkedList<MapData>();
 		DataInputStream runDataIn;
 	
-		final MapEmittedTermByMapPartitioner partitionChecker = new MapEmittedTermByMapPartitioner();
-		partitionChecker.configure(jc);
 		final String jobId = TaskAttemptID.forName(jc.get("mapred.task.id")).getJobID().toString().replaceAll("job", "task");
-		final int thisPartition = TaskAttemptID.forName(jc.get("mapred.task.id")).getTaskID().getId();
+		
 		final FileStatus[] files = FileSystem.get(jc).listStatus(
 			FileOutputFormat.getOutputPath(jc), 
 			new org.apache.hadoop.fs.PathFilter()
@@ -490,12 +490,12 @@
 						return false;
 					
 					//2. is this run part of the maps allocated to us?
-					final TaskID t = TaskID.forName(name.replaceAll("\\.runs$", "") );
-					final int targetP = partitionChecker.calculatePartition(t, jc.getNumReduceTasks());
-					if (thisPartition != targetP)
-					{
-						return false;
-					}
+					//final TaskID t = TaskID.forName(name.replaceAll("\\.runs$", "") );
+					//final int targetP = partitionChecker.calculatePartition(t, jc.getNumReduceTasks());
+					//if (thisPartition != targetP)
+					//{
+					//	return false;
+					//}
 					return true;
 				}
 			}
@@ -506,6 +506,11 @@
 			throw new IOException("No run status files found in "+FileOutputFormat.getOutputPath(jc));
 		}
 		
+		final int thisPartition = TaskAttemptID.forName(jc.get("mapred.task.id")).getTaskID().getId();
+		final SplitEmittedTerm.SETPartitioner partitionChecker = new SplitEmittedTerm.SETPartitioner();
+		partitionChecker.configure(jc);
+		
+		
 		//TaskID previousMapTaskID = null;
 		MapData tempHRD;
 		for (FileStatus file : files) 
@@ -513,16 +518,18 @@
 			logger.info("Run data file "+ file.getPath().toString()+" has length "+Files.length(file.getPath().toString()));
 			runDataIn = new DataInputStream(Files.openFileStream(file.getPath().toString()));
 			tempHRD = new MapData(runDataIn);
-			// Sanity Check the file ordering
+			//check to see if this file contaned our split information
+			if (partitionChecker.calculatePartition(tempHRD.getSplitnum(), jc.getNumReduceTasks()) != thisPartition)
+				continue;
 			
 			mapTaskIDs.add(tempHRD.getMap());
-			//TaskID thisMapTaskID = TaskID.forName(tempHRD.getMap());
-			//previousMapTaskID = thisMapTaskID;
 			runData.add(tempHRD);
 			runDataIn.close();
 		}
+		// Sort by splitnum
 		Collections.sort(runData);
 		Collections.sort(mapTaskIDs, new IDComparator(runData));
+		// A list of the index shards
 		MapIndexPrefixes = mapTaskIDs.toArray(new String[0]);
 		return runData;
 	}
@@ -554,21 +561,21 @@
 	 * @param reporter Used to report progress
 	 */
 	public void reduce(
-			MapEmittedTerm Term, 
+			SplitEmittedTerm Term, 
 			Iterator<MapEmittedPostingList> postingIterator, 
 			OutputCollector<Object, Object> output, 
 			Reporter reporter)
 		throws IOException
 	{
 		//if (logger.isDebugEnabled()) logger.debug("Reduce for term "+Term.getText());
-		reporter.setStatus("Reducer is merging term " + Term.getText());
+		reporter.setStatus("Reducer is merging term " + Term.getTerm());
 		if (! reduceStarted)
 		{
 			final LinkedList<MapData> runData = loadRunData();
         	startReduce(runData);
 			reduceStarted = true;
 		}
-		String term = Term.getText().toString().trim();
+		String term = Term.getTerm().trim();
 		if (term.length() == 0)
 			return;
 		runIteratorF.setRunPostingIterator(postingIterator);
@@ -687,7 +694,7 @@
 		try{
 			tempRM.setBos(new BitOutputStream(
 					currentIndex.getPath() + ApplicationSetup.FILE_SEPARATOR 
-					+ currentIndex.getPrefix() + ".inverted.bf"));
+					+ currentIndex.getPrefix() + ".inverted" + BitIn.USUAL_EXTENSION));
 		} catch (IOException ioe) {
 			ioe.printStackTrace();
 		}
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java	(revision 2692)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunsMerger.java	(working copy)
@@ -31,8 +31,6 @@
 import java.util.LinkedList;
 import java.util.ListIterator;
 
-import org.apache.hadoop.mapred.TaskID;
-
 import uk.ac.gla.terrier.structures.BasicLexiconEntry;
 import uk.ac.gla.terrier.structures.LexiconEntry;
 import uk.ac.gla.terrier.structures.LexiconOutputStream;
@@ -91,9 +89,9 @@
 			if (posting.getDf() > maxDF) 
 				maxDF = posting.getDf();
 			
-			final int _runMapID = TaskID.forName(_run.getMapNo()).getId();
-			final int runNumber = run.getRunNo();
-			final int docOffset = getDocumentOffset(_runMapID, runNumber);
+			//final int _runMapID = TaskID.forName(_run.getMapNo()).getId();
+			//final int runNumber = run.getRunNo();
+			final int docOffset = getDocumentOffset(_run.getSplitNo(), _run.getRunNo());
 			lastDocument = posting.append(bos, lastDocument, docOffset);
 			if (le == null)
 				le = posting.getLexiconEntry();
@@ -126,12 +124,12 @@
 		this.numReducers = numReducers;
 	}
 	
-	public int getDocumentOffset(int mapNumber, int flushNumber) throws IOException {
+	public int getDocumentOffset(int splitNo, int flushNumber) throws IOException {
 		int NumPreDocs = 0;
 		MapData correctHRD = null;
 		for (MapData tempHRD : mapData)
 		{
-			if (mapNumber == tempHRD.getMapId() ) {
+			if (splitNo == tempHRD.getSplitnum() ) {
 				//System.out.println("Reducer number : "+reduceNumber+", Splitnum"+tempSplitnum+", Run Map Number : "+_run.getMapNo());
 				correctHRD = tempHRD;
 				break;
@@ -139,7 +137,7 @@
 			NumPreDocs += tempHRD.getMapDocs();
 		}
 		if (correctHRD == null)
-			throw new IOException("Did not find map data for "+ mapNumber);
+			throw new IOException("Did not find map data for split "+ splitNo);
 		
 		// Add the FlushShift
 		int currentFlushDocs=0;
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunWriter.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunWriter.java	(revision 2692)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunWriter.java	(working copy)
@@ -40,11 +40,12 @@
  * @version $Revision: 1.2 $ */
 public class HadoopRunWriter extends RunWriter {
 	/** output collector of Map task */
-	protected OutputCollector<MapEmittedTerm, MapEmittedPostingList> outputCollector = null;
+	protected OutputCollector<SplitEmittedTerm, MapEmittedPostingList> outputCollector = null;
 	/** map task id that is being flushed */
 	protected String mapId;
 	/** flushNo is the number of times this map task is being flushed */
 	protected int flushNo;
+	protected int splitId;
 	
 	/** Create a new HadoopRunWriter, specifying the output collector of the map task
 	 * the run number and the flush number.
@@ -52,12 +53,13 @@
 	 * @param _mapId the task id of the map currently being processed
 	 * @param _flushNo the number of times that this map task has flushed
 	 */
-	public HadoopRunWriter(OutputCollector<MapEmittedTerm, MapEmittedPostingList> _outputCollector,
-			String _mapId, int _flushNo)
+	public HadoopRunWriter(OutputCollector<SplitEmittedTerm, MapEmittedPostingList> _outputCollector,
+			String _mapId, int _splitId, int _flushNo)
 	{
 		this.outputCollector = _outputCollector;
 		this.mapId = _mapId;
 		this.flushNo = _flushNo;
+		this.splitId = _splitId;
 		this.info = "HadoopRunWriter(Map "+ mapId +", flush "+flushNo+")"; 
 	}
 	
@@ -80,10 +82,11 @@
 		
 		//emit the term and its posting list
 		outputCollector.collect(
-				MapEmittedTerm.create_TextPlus(term, mapId, flushNo), 
+				SplitEmittedTerm.createNewTerm(term, splitId, flushNo), 
 				MapEmittedPostingList.create_Hadoop_WritableRunPostingData(
 						mapId,
 						flushNo, 
+						splitId,
 						buffer,
 						post.getDocF(), post.getTF()));
 	}
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java	(revision 2692)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/CollectionRecordReader.java	(working copy)
@@ -147,6 +147,8 @@
 			DocID.set(documentCollection.getDocid());
 		}
 		document.setObject(tempDoc);
+		//System.out.println("Split "+document.getSplitIndex());
+
 		currentDocument++;
 		return true;
 	}
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunPostingIterator.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunPostingIterator.java	(revision 2692)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/HadoopRunPostingIterator.java	(working copy)
@@ -43,6 +43,8 @@
 	protected String mapNo;
 	/** Term that we're processing */
 	protected String term;
+	/** The Split that the current posting comes from */
+	protected int splitNo;
 
 	/** Constructs a new RunPostingIterator.
 	  * @param postingClass is the name of the class to use to read the postings
@@ -80,7 +82,8 @@
 			posting.setDf(post.getDocumentFreq());
 			posting.setTF(post.getTermFreq());
 			mapNo = post.getMap();
-			runNo = post.getRun();
+			flushNo = post.getFlushNo();
+			splitNo = post.getSplitNo();
 		} catch (IOException ioe) {
 			throw new Error(ioe);
 		}
@@ -89,4 +92,13 @@
 
 	/** Returns the map that the current posting came from */
 	public String getMapNo() { return mapNo; }
+
+	/**
+	 * @return the splitNo
+	 */
+	public int getSplitNo() {
+		return splitNo;
+	}
+	
+	
 }
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/Inv2DirectMultiReduce.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/Inv2DirectMultiReduce.java	(revision 2711)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/Inv2DirectMultiReduce.java	(working copy)
@@ -324,7 +324,7 @@
 			OutputCollector<Object, Object> collector, Reporter reporter)
 			throws IOException
 	{
-		/*DocumentIndexEntry die = */diis.next();
+		diis.next();
 		final int targetDocid = _targetDocid.get();
 		reporter.setStatus("Reducing for doc "+ targetDocid);
 		if (actualDocid > targetDocid)
@@ -345,12 +345,15 @@
 		}
 		
 		List<Posting> postingList = new ArrayList<Posting>();
+		int doclen = 0;
 		while(documentPostings.hasNext())
 		{
 			final Posting p = documentPostings.next().asWritablePosting();
 			postingList.add(p);
+			doclen += p.getFrequency();
 			reporter.progress();
 		}
+		System.err.println("docid=" + targetDocid + " doclen=" + doclen);
 		Collections.sort(postingList, new PostingIdComparator());
 		//logger.debug("TermId of first of " + postingList.size() + " postings for doc " + targetDocid + " is " + postingList.get(0).getId());
 		BitIndexPointer pointer = postingOutputStream.writePostings(postingList.iterator());
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java	(revision 2692)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTerm.java	(working copy)
@@ -1,278 +0,0 @@
-/*
- * Terrier - Terabyte Retriever
- * Webpage: http://ir.dcs.gla.ac.uk/terrier
- * Contact: terrier{a.}dcs.gla.ac.uk
- * University of Glasgow - Department of Computing Science
- * http://www.gla.uk
- *
- * The contents of this file are subject to the Mozilla Public License
- * Version 1.1 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- * http://www.mozilla.org/MPL/
- *
- * Software distributed under the License is distributed on an "AS IS"
- * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- * the License for the specific language governing rights and limitations
- * under the License.
- *
- * The Original Code is MapEmittedTerm.java.
- *
- * The Original Code is Copyright (C) 2004-2009 the University of Glasgow.
- * All Rights Reserved.
- *
- * Contributor(s):
- *   Richard McCreadie <richardm{a.}dcs.gla.ac.uk> (original author)
- *   
- */
-package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.TaskID;
-
-/**
- * Represents a term emitted during indexing time. Each term also has a 
- * the map number and flush number that is was emitted from.
- * @author Richard McCreadie
- * @version $Revision: 1.2 $
- * @since 2.2
- */
-public class MapEmittedTerm implements WritableComparable<MapEmittedTerm> {
-	
-	/**
-	 * Comparator for MapEmittedTerm objects - order only by Term.
-	 */
-	public static class TermComparator implements RawComparator<MapEmittedTerm> {
-		/**
-		 * Compares Hadoop_TextPlus objects by comparison of the
-		 * Text variables.
-		 */
-		public int compare(MapEmittedTerm a, MapEmittedTerm b)
-		{
-			return a.getText().compareTo(b.getText());
-		}
-
-		/**
-		 * Raw comparison on text objects
-		 * NOT USED
-		 */
-		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) 
-		{
-			DataInputStream b1S = new DataInputStream(new ByteArrayInputStream(b1, s1, l1));
-			DataInputStream b2S = new DataInputStream(new ByteArrayInputStream(b2, s2, l2));
-			try {
-				MapEmittedTerm tempT = new MapEmittedTerm();
-				MapEmittedTerm tempT2 = new MapEmittedTerm();
-				tempT.readFields(b1S);
-				tempT2.readFields(b2S);
-				b1S.close();
-				b2S.close();
-				int value = tempT.getText().compareTo(tempT2.getText());
-				return value;
-			} catch (IOException e) {
-				System.err.println("IO Exception during compare");
-				return 0;
-			}
-		}
-
-	}
-	
-	/**
-	 * Raw Comparator class to compare MapEmittedTerm objects
-	 * stage 1. (Order by term, then by map number, then by flush
-	 * number)
-	 * @author Richard McCreadie and Craig Macdonald
-	 * @since 2.2
-	 * @version $Revision: 1.2 $
-	 */
-	public static class TermMapFlushComparator implements RawComparator<MapEmittedTerm> {
-		
-		protected MapEmittedTerm tempT = new MapEmittedTerm();
-		protected MapEmittedTerm tempT2 = new MapEmittedTerm();
-		
-		public int compare(MapEmittedTerm a, MapEmittedTerm b) {
-			throw new Error("Unsupported method Indexing_CompareTextPlusKey.compare(Indexing_TextPlus,Indexing_TextPlus) was called");
-			//richard's documentation say that this method is not used	
-		}
-
-		/**
-		 * Compare by term (bit comparison on Text object) then by map number (int)
-		 * then by flush number (int).
-		 */
-		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-			// Convert to streams so that the read methods can be used
-			DataInputStream b1S = new DataInputStream(new ByteArrayInputStream(b1, s1, l1));
-			DataInputStream b2S = new DataInputStream(new ByteArrayInputStream(b2, s2, l2));
-			try {
-				// Read in the TextPlus Objects
-				tempT.readFields(b1S);
-				tempT2.readFields(b2S);
-				b1S.close();
-				b2S.close();
-				// Do Comparison Text
-				int value = tempT.getText().compareTo(tempT2.getText());
-				if (value != 0)
-					return value;
-				// If same do Comparison on map task id
-				value = TaskID.forName( tempT.getMap() ).compareTo( TaskID.forName(tempT2.getMap()) );
-				if (value != 0)
-					return value;
-				//lastly check the flush numbers
-				return tempT.getFlush()-tempT2.getFlush();
-			} catch (IOException e) {
-				return 0;
-			}
-		}
-
-	}
-	
-	/**
-	 * Raw Comparator class to compare MapEmittedTerm objects
-	 * stage 1. (Order by term, then by map number). In this case
-	 * the map number is stored as an int rather than as a Map
-	 * task ID and therefore is parsed. This is used for the
-	 * SimpleIndexing variants as they do not flush out data in
-	 * the same manner.
-	 * @author Richard McCreadie and Craig Macdonald
-	 * @since 2.2.1
-	 * @version $Revision: 1.0 $
-	 */
-	public static class TermMapComparator implements RawComparator<MapEmittedTerm> {
-		
-		protected MapEmittedTerm tempT = new MapEmittedTerm();
-		protected MapEmittedTerm tempT2 = new MapEmittedTerm();
-		
-		public int compare(MapEmittedTerm a, MapEmittedTerm b) {
-			throw new Error("Unsupported method Indexing_CompareTextPlusKey.compare(Indexing_TextPlus,Indexing_TextPlus) was called");
-			//richard's documentation say that this method is not used
-			//Richard: Indeed, it is not used - because it is a RawComparator it uses the byte[] compare instead.
-		}
-
-		/**
-		 * Compare by term (bit comparison on Text object) then by map number (int)
-		 * then by flush number (int).
-		 */
-		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-			// Convert to streams so that the read methods can be used
-			DataInputStream b1S = new DataInputStream(new ByteArrayInputStream(b1, s1, l1));
-			DataInputStream b2S = new DataInputStream(new ByteArrayInputStream(b2, s2, l2));
-			try {
-				// Read in the TextPlus Objects
-				tempT.readFields(b1S);
-				tempT2.readFields(b2S);
-				b1S.close();
-				b2S.close();
-				// Do Comparison Text
-				int value = tempT.getText().compareTo(tempT2.getText());
-				if (value != 0)
-					return value;
-				// If same do Comparison on map task id
-				value = Integer.parseInt(tempT.getMap()) - Integer.parseInt(tempT2.getMap());
-				return value;
-			} catch (IOException e) {
-				return 0;
-			}
-		}
-
-	}
-	
-	/** The Map this Term was processed from */
-	protected String mapTaskID;
-	/** The Flush number this term was from */
-	protected int flushNumber;
-	/** The Term */
-	protected Text text=null;
-	
-	/**
-	 * Empty Constructor
-	 */
-	public MapEmittedTerm() {
-		
-	}
-	
-	/**
-	 * Constructor
-	 * @param s - Term
-	 * @param _mapTaskID - Map Number
-	 * @param _flushNumber - Flush Number
-	 */
-	public MapEmittedTerm(String s, String _mapTaskID, int _flushNumber) {
-		mapTaskID = _mapTaskID;
-		flushNumber = _flushNumber;
-		text = new Text(s);
-	}
-	
-	/**
-	 * Factory Method
-	 * @param s - Term
-	 * @param a - Map Number
-	 * @param b - Flush Number
-	 * @return a newly created Indexing_TextPlus
-	 */
-	public static MapEmittedTerm create_TextPlus(String s, String a, int b) {
-		MapEmittedTerm temp = new MapEmittedTerm();
-		temp.setMap(a);
-		temp.setFlush(b);
-		temp.setText(new Text(s));
-		return temp;
-	}
-
-	public String getMap() {
-		return mapTaskID;
-	}
-
-	public void setMap(String id) {
-		mapTaskID = id;
-	}
-
-	public int getFlush() {
-		return flushNumber;
-	}
-
-	public void setFlush(int flush) {
-		flushNumber = flush;
-	}
-
-	public Text getText() {
-		return text;
-	}
-
-	public void setText(Text text) {
-		this.text = text;
-	}
-
-	/**
-	 * Reads in this object from the Input Stream 'in'
-	 */
-	public void readFields(DataInput in) throws IOException {
-		mapTaskID = in.readUTF();
-		flushNumber = in.readInt();
-		text = new Text();
-		text.readFields(in);
-		
-	}
-
-	/**
-	 * Writes this object to the Output Stream 'out'
-	 */
-	public void write(DataOutput out) throws IOException {
-		out.writeUTF(mapTaskID);
-		out.writeInt(flushNumber);
-		text.write(out);
-	}
-
-	/**
-	 * Text Comparator on the Term contained in this object
-	 */
-	public int compareTo(MapEmittedTerm o) {
-		return this.getText().compareTo(o.getText());
-	}
-
-}
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedPostingList.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedPostingList.java	(revision 2692)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedPostingList.java	(working copy)
@@ -43,19 +43,23 @@
 	/** The Map Number */
 	protected String Map;
 	/** The Flush Number */
-	protected int Run;
+	protected int flushNo;
+	/** The Split Number */
+	protected int splitNo;
 	
 	/**
 	 * Constructor
 	 * @param map - Map task id
-	 * @param run - Flush Number
+	 * @param flush - Flush Number
+	 * @param split - Split Number
 	 * @param c - Document Frequency
 	 * @param c2 - Term Frequency
 	 */
-	public MapEmittedPostingList (String map, int run, int c, int c2) {
+	public MapEmittedPostingList (String map, int flush, int split, int c, int c2) {
 		super(c,c2);
 		Map = map;
-		Run =run;
+		flushNo =flush;
+		splitNo = split;
 	}
 	
 	/**
@@ -78,13 +82,14 @@
 	 * Factory Method
 	 * @param mapTaskID - Map Number
 	 * @param flushNo - Flush Number
+	 * @param splitNo - Split Number
 	 * @param postingList - Posting List
 	 * @param DocumentFreq - Document Frequency
 	 * @param TermFreq - Term Frequency
 	 * @return a newly created Indexing_WritableRunPostingData
 	 */
-	public static MapEmittedPostingList create_Hadoop_WritableRunPostingData (String mapTaskID, int flushNo, byte[] postingList, int DocumentFreq, int TermFreq) {
-		MapEmittedPostingList w = new MapEmittedPostingList(mapTaskID, flushNo, DocumentFreq, TermFreq);
+	public static MapEmittedPostingList create_Hadoop_WritableRunPostingData (String mapTaskID, int flushNo, int splitNo, byte[] postingList, int DocumentFreq, int TermFreq) {
+		MapEmittedPostingList w = new MapEmittedPostingList(mapTaskID, flushNo, splitNo, DocumentFreq, TermFreq);
 		w.setArray(postingList);
 		return w;
 	}
@@ -106,7 +111,7 @@
 	 * Returns the Map & Flush Number
 	 */
 	public String toString() {
-		return "MapNo="+Map+ ",FlushNo="+Run;
+		return "MapNo="+Map+ ",FlushNo="+flushNo;
 	}
 
 	public String getMap() {
@@ -117,25 +122,40 @@
 		Map = map;
 	}
 	
-	public int getRun() {
-		return Run;
+	public int getFlushNo() {
+		return flushNo;
+	}
+
+	public void setFlushNo(int flush) {
+		flushNo = flush;
+	}
+
+	/**
+	 * @return the splitNo
+	 */
+	public int getSplitNo() {
+		return splitNo;
 	}
 
-	public void setRun(int run) {
-		Run = run;
+	/**
+	 * @param splitNo the splitNo to set
+	 */
+	public void setSplitNo(int splitNo) {
+		this.splitNo = splitNo;
 	}
 
 	/**
 	 * Reads this object from the input stream 'in' 
 	 */
-	public void readFields(DataInput arg0) throws IOException {
-		arraylength = arg0.readInt();
-		Map = arg0.readUTF();
-		Run = arg0.readInt();
-		DocumentFreq = arg0.readInt();
-		TermFreq = arg0.readInt();
+	public void readFields(DataInput in) throws IOException {
+		arraylength = in.readInt();
+		Map = in.readUTF();
+		flushNo = in.readInt();
+		splitNo = in.readInt();
+		DocumentFreq = in.readInt();
+		TermFreq = in.readInt();
 		array = new byte[arraylength];
-		arg0.readFully(array);
+		in.readFully(array);
 		//System.err.println("DEBUG: Finished Read, ArrayL:"+arraylength+" RunNo:"+Run+" DocF:"+DocumentFreq+" TermF:"+TermFreq+" Buffer:"+array.toString());
 		
 	}
@@ -143,28 +163,30 @@
 	/**
 	 * Reads this object from the input stream 'in' apart from the
 	 * array. 
-	 * @param arg0
+	 * @param in
 	 * @throws IOException
 	 */
-	public void readFieldsMinusArray(DataInput arg0) throws IOException {
-		arraylength = arg0.readInt();
-		Map = arg0.readUTF();
-		Run = arg0.readInt();
-		DocumentFreq = arg0.readInt();
-		TermFreq = arg0.readInt();
+	public void readFieldsMinusArray(DataInput in) throws IOException {
+		arraylength = in.readInt();
+		Map = in.readUTF();
+		flushNo = in.readInt();
+		splitNo = in.readInt();
+		DocumentFreq = in.readInt();
+		TermFreq = in.readInt();
 		array = new byte[1];
 		//System.err.println("DEBUG: Finished Read, ArrayL:"+arraylength+" RunNo:"+Run+" DocF:"+DocumentFreq+" TermF:"+TermFreq+" Buffer:"+array.toString());
 		
 	}
 
 	/** Write this object to the output stream 'out' */
-	public void write(DataOutput arg0) throws IOException {
-		arg0.writeInt(array.length);
-		arg0.writeUTF(Map);
-		arg0.writeInt(Run);
-		arg0.writeInt(DocumentFreq);
-		arg0.writeInt(TermFreq);
-		arg0.write(array);
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(array.length);
+		out.writeUTF(Map);
+		out.writeInt(flushNo);
+		out.writeInt(splitNo);
+		out.writeInt(DocumentFreq);
+		out.writeInt(TermFreq);
+		out.write(array);
 		//System.err.println("DEBUG: Finished Write, ArrayL:"+array.length+" RunNo:"+Run+" DocF:"+DocumentFreq+" TermF:"+TermFreq+" Buffer:"+array.toString());
 	}
 	
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermByMapPartitioner.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermByMapPartitioner.java	(revision 2692)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/MapEmittedTermByMapPartitioner.java	(working copy)
@@ -1,70 +0,0 @@
-/*
- * Terrier - Terabyte Retriever
- * Webpage: http://ir.dcs.gla.ac.uk/terrier
- * Contact: terrier{a.}dcs.gla.ac.uk
- * University of Glasgow - Department of Computing Science
- * http://www.gla.uk
- *
- * The contents of this file are subject to the Mozilla Public License
- * Version 1.1 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- * http://www.mozilla.org/MPL/
- *
- * Software distributed under the License is distributed on an "AS IS"
- * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- * the License for the specific language governing rights and limitations
- * under the License.
- *
- * The Original Code is MapEmittedTermByMapPartitioner.java.
- *
- * The Original Code is Copyright (C) 2004-2009 the University of Glasgow.
- * All Rights Reserved.
- *
- * Contributor(s):
- *   Richard McCreadie <richardm{a.}dcs.gla.ac.uk> (original author)
- *   Craig Macdonald <craigm{a.}dcs.gla.ac.uk> 
- */
-
-package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
-
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.mapred.TaskID;
-
-/**
- * Partitions the term postings lists from the map function,
- * such that the created indexes is partitioned evenly across
- * the reducers. This partitioner partitions by an even number of
- * maps, assuming that Map sizes are approximately equal.
- * @author Richard McCreadie and Craig Macdonald
- * @version $Revision: 1.2 $
- * @since 2.2
- */
-public class MapEmittedTermByMapPartitioner implements JobConfigurable, Partitioner<MapEmittedTerm, MapEmittedPostingList>  {
-
-	protected int numMapTasks = -1;
-
-	protected static int getTaskID(String id) {
-		return TaskID.forName(id).getId();
-	}
-
-	public void configure(JobConf job) {
-		numMapTasks = job.getNumMapTasks();
-	}
-	
-	public int calculatePartition(TaskID task, int numPartitions) {
-		final int mapNumber = task.getId();
-		final int partitionSize = (int) (Math.ceil( ((double)numMapTasks)/(double)numPartitions));
-		return mapNumber / partitionSize;
-	}
-	
-	/**
-	 * Forces each Map output to get its own reduce step
-	 */
-	public int getPartition(MapEmittedTerm key, MapEmittedPostingList value, int numPartitions) {
-		return this.calculatePartition(TaskID.forName(value.getMap()), numPartitions);
-	}
-
-}
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/SplitEmittedTerm.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/SplitEmittedTerm.java	(revision 0)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/SplitEmittedTerm.java	(revision 0)
@@ -0,0 +1,314 @@
+package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.Partitioner;
+
+/**
+ * Represents a Term key used during MapReduce Indexing. Term keys are emitted from
+ * each map task, and are used for sorting and partitioning the output. Paritioning
+ * is done by splitno. Two options for sorting (a) term only, (b) term, split, flush
+ * @author richardm
+ * @since 3.0
+ */
+public class SplitEmittedTerm implements WritableComparable<SplitEmittedTerm>{
+
+	/**
+	 * Factory method for creating a new Term key object
+	 * @param term
+	 * @param splitno
+	 * @param flushno
+	 * @return
+	 */
+	public static SplitEmittedTerm createNewTerm(String term, int splitno, int flushno) {
+		return new SplitEmittedTerm(term, splitno, flushno);
+	}
+	
+	/** The term */
+	private String term;
+	/** The split that this instance of the term has been processed by */ 
+	private int splitno;
+	/** The flush within the split that this instance of the term was emitted by */
+	private int flushno;
+	
+	/**
+	 * Empty Constructor
+	 */
+	public SplitEmittedTerm() {}
+	
+	/**
+	 * Constructor for a Term key. Is used for sorting map output and partitioning
+	 * posting lists between reducers. Each term is only unique in conjunction with
+	 * the split and flush that it was emitted from.
+	 * @param term 
+	 * @param splitno
+	 * @param flushno
+	 */
+	public SplitEmittedTerm(String term, int splitno, int flushno) {
+		this.term = term;
+		this.splitno = splitno;
+		this.flushno = flushno;
+	}
+	
+	
+	
+	@Override
+	public boolean equals(Object _o)
+	{
+		if (! (_o instanceof SplitEmittedTerm))
+			return false;
+		SplitEmittedTerm o = (SplitEmittedTerm)_o;
+		return this.term.equals(o.term) && this.splitno == o.splitno && this.flushno == o.flushno;		
+	}
+	
+	@Override
+	public String toString() {
+		return term + ":" + splitno + ":" + flushno;
+	}
+
+	/**
+	 * Read in a Term key object from the input stream 'in'
+	 */
+	public void readFields(DataInput in) throws IOException {
+		term = Text.readString(in); //in.readUTF();
+		splitno = WritableUtils.readVInt(in);
+		flushno = WritableUtils.readVInt(in);
+			
+	}
+
+	/**
+	 * Write out this Term key to output stream 'out'
+	 */
+	public void write(DataOutput out) throws IOException {
+		Text.writeString(out, term); //out.writeUTF(term);
+		WritableUtils.writeVInt(out, splitno);
+		WritableUtils.writeVInt(out, flushno);		
+	}
+
+	/**
+	 * Compares this Term key to another term key. Note that terms are
+	 * unique only in conjunction with their associated split and flush.  
+	 */
+	public int compareTo(SplitEmittedTerm term2) {
+		int result;
+		if ((result = term.compareTo(term2.getTerm()))!=0) return result;
+		if ((result = splitno - term2.getSplitno())!=0) return result;
+		return flushno - term2.getFlushno();
+	}	
+	
+	
+
+	/**
+	 * @return the term
+	 */
+	public String getTerm() {
+		return term;
+	}
+
+	/**
+	 * @param term the term to set
+	 */
+	public void setTerm(String term) {
+		this.term = term;
+	}
+
+	/**
+	 * @return the splitno
+	 */
+	public int getSplitno() {
+		return splitno;
+	}
+
+	/**
+	 * @param splitno the splitno to set
+	 */
+	public void setSplitno(int splitno) {
+		this.splitno = splitno;
+	}
+
+	/**
+	 * @return the flushno
+	 */
+	public int getFlushno() {
+		return flushno;
+	}
+
+	/**
+	 * @param flushno the flushno to set
+	 */
+	public void setFlushno(int flushno) {
+		this.flushno = flushno;
+	}
+	
+	/** Sorter by term only */
+	public static class SETRawComparatorTerm implements RawComparator<SplitEmittedTerm>
+	{
+		/**
+		 * Compares raw Term key 1 to raw Term key 2. Note that only terms are considered.  
+		 */
+//		public int compare(byte[] bterm1, int offset1, int length1, byte[] bterm2, int offset2,
+//				int length2)
+//		{
+//			return Text.Comparator.compareBytes(bterm1, offset1, length1, bterm2, offset2, length2);
+//		}
+//		
+		
+		public int compare(byte[] bterm1, int offset1, int length1, byte[] bterm2, int offset2,
+				int length2)
+		{
+			try {
+				return Text.decode(bterm1, offset1, length1).trim().compareTo(Text.decode(bterm2, offset2, length2).trim());
+			} catch (CharacterCodingException e) {
+				return 0;
+			}
+		}
+
+		
+		
+//		public int compare(byte[] bterm1, int offset1, int length1, byte[] bterm2, int offset2,
+//				int length2) 
+//		{
+//			/* Term objects used during raw Term comparisons */
+//			SplitEmittedTerm term1;
+//			SplitEmittedTerm term2;
+//			// Convert to streams so that the read methods can be used
+//			DataInputStream b1S = new DataInputStream(new ByteArrayInputStream(bterm1, offset1, length1));
+//			DataInputStream b2S = new DataInputStream(new ByteArrayInputStream(bterm2, offset2, length2));
+//			try {
+//				term1 = new SplitEmittedTerm();
+//				term1.readFields(b1S);
+//				term2 = new SplitEmittedTerm();
+//				term2.readFields(b2S);
+//				b1S.close();
+//				b2S.close();
+//				
+//				return term1.getTerm().compareTo(term2.getTerm());
+//			} catch (IOException e) {
+//				System.err.println("ERROR during raw comparision of term objects, unable to read input streams.");
+//				e.printStackTrace();
+//			}
+//			return 0;
+//	
+//		}
+
+		public int compare(SplitEmittedTerm o1, SplitEmittedTerm o2) {
+			return o1.getTerm().compareTo(o2.getTerm());
+		}
+	}
+	
+	public static class SETRawComparatorTermSplitFlush implements RawComparator<SplitEmittedTerm>
+	{
+		/**
+		 * Compares raw Term key 1 to raw Term key 2. Note that terms are
+		 * unique only in conjunction with their associated split and flush.  
+		 */		
+		public int compare(byte[] bterm1, int offset1, int length1, byte[] bterm2, int offset2, int length2)
+		{
+			//this implementation doesnt create SplitEmittedTerm objects, saving a bit on gc
+			DataInputStream b1S = new DataInputStream(new ByteArrayInputStream(bterm1, offset1, length1));
+			DataInputStream b2S = new DataInputStream(new ByteArrayInputStream(bterm2, offset2, length2));
+			try {
+				String t1 = Text.readString(b1S);
+				String t2 = Text.readString(b2S);
+				int result = t1.compareTo(t2);
+				if (result != 0)
+					return result;
+				int i1 = WritableUtils.readVInt(b1S);
+				int i2 = WritableUtils.readVInt(b2S);
+				if (i1 != i2)
+					return i1 - i2;
+				i1 = WritableUtils.readVInt(b1S);
+				i2 = WritableUtils.readVInt(b2S);
+				return i1 - i2;
+			} catch (IOException e) {
+				System.err.println("ERROR during raw comparision of term objects, unable to read input streams.");
+				e.printStackTrace();
+				return 0;
+			}			
+		}
+		
+//		public int compare(byte[] bterm1, int offset1, int length1, byte[] bterm2, int offset2,
+//				int length2) {
+//			/** Term objects used during raw Term comparisons */
+//			SplitEmittedTerm term1;
+//			SplitEmittedTerm term2;
+//			// Convert to streams so that the read methods can be used
+//			DataInputStream b1S = new DataInputStream(new ByteArrayInputStream(bterm1, offset1, length1));
+//			DataInputStream b2S = new DataInputStream(new ByteArrayInputStream(bterm2, offset2, length2));
+//			try {
+//				
+//				term1 = new SplitEmittedTerm();
+//				term1.readFields(b1S);
+//				term2 = new SplitEmittedTerm();
+//				term2.readFields(b2S);
+//				b1S.close();
+//				b2S.close();
+//				
+//				return term1.compareTo(term2);
+//			} catch (IOException e) {
+//				System.err.println("ERROR during raw comparision of term objects, unable to read input streams.");
+//				e.printStackTrace();
+//			}
+//			return 0;
+//		}
+
+		/**
+		 * Compares Term key 1 to Term key 2. Note that terms are
+		 * unique only in conjunction with their associated split and flush.  
+		 */
+		public int compare(SplitEmittedTerm term1, SplitEmittedTerm term2) {
+			return term1.compareTo(term2);
+		}
+	}
+	
+	/** Partitions SplitEmittedTerms by split that they came from.
+	 */
+	public static class SETPartitioner implements Partitioner<SplitEmittedTerm, MapEmittedPostingList>, JobConfigurable
+	{
+		/** The number of chunks the collection was split into */
+		private int numSplits;
+		
+		/**
+		 * Configure the partitioner functionality, i.e. calculate the
+		 * number of splits there were.
+		 */
+		public void configure(JobConf conf) {
+			// there is one split per map task
+			numSplits = conf.getNumMapTasks();
+		}
+	
+		/** Retuns the partition for the specified term and posting list, given the specified
+		 * number of partitions.
+		 */
+		public int getPartition(SplitEmittedTerm term, MapEmittedPostingList posting,
+				int numPartitions)
+		{
+			//System.err.println("set="+term.toString() + " partition="+ calculatePartition(term.getSplitno(), numPartitions));
+			return calculatePartition(term.getSplitno(), numPartitions);
+		}
+		
+		/** Calculates the partitions for a given split number.
+		 * @param splitno - which split index, starting at 0
+		 * @param numPartitions - number of partitions (reducers) configured
+		 * @return the reduce partition number to allocate the split to. */
+		public int calculatePartition(int splitno, int numPartitions) {
+			final int partitionSize = (int) (Math.ceil((double)numSplits / (double) numPartitions ));
+			return splitno / partitionSize;
+		}
+	}
+
+	
+	
+	
+}
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/RunIterator.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/RunIterator.java	(revision 2692)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/RunIterator.java	(working copy)
@@ -41,8 +41,8 @@
 	protected Class<? extends PostingInRun> postingClass;
 	/** current posting */
 	protected PostingInRun posting;
-	/** Run number that the current posting came from */
-	protected int runNo;
+	/** Run/Flush that the current posting came from */
+	protected int flushNo;
 
 	/** create a new instance of this class.
 	  * @param _postingClass Class of the PostingInRun type that postings in this run have
@@ -50,7 +50,7 @@
 	protected RunIterator(Class<? extends PostingInRun> _postingClass, int _runNo)
 	{
 		this.postingClass = _postingClass;
-		this.runNo = _runNo;
+		this.flushNo = _runNo;
 	}
 
 	/** Create a new posting */	
@@ -58,10 +58,10 @@
 		posting = postingClass.newInstance();
 	}
 
-	/** Get the run number that the current posting came from */	
+	/** Get the run/flush number that the current posting came from */	
 	public int getRunNo()
 	{
-		return runNo;
+		return flushNo;
 	}
 
 	/** iterator implementation */	
Index: src/uk/ac/gla/terrier/applications/HadoopIndexing.java
===================================================================
--- src/uk/ac/gla/terrier/applications/HadoopIndexing.java	(revision 2727)
+++ src/uk/ac/gla/terrier/applications/HadoopIndexing.java	(working copy)
@@ -49,9 +49,8 @@
 import uk.ac.gla.terrier.indexing.hadoop.Hadoop_BlockSinglePassIndexer;
 import uk.ac.gla.terrier.structures.Index;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedPostingList;
-import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTerm;
-import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MapEmittedTermByMapPartitioner;
 import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.MultiFileCollectionInputFormat;
+import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.SplitEmittedTerm;
 import uk.ac.gla.terrier.utility.ApplicationSetup;
 import uk.ac.gla.terrier.utility.Files;
 import uk.ac.gla.terrier.utility.io.HadoopPlugin;
@@ -127,7 +126,7 @@
 			conf.setReducerClass(Hadoop_BasicSinglePassIndexer.class);
 		}
 		FileOutputFormat.setOutputPath(conf, new Path(ApplicationSetup.TERRIER_INDEX_PATH));
-		conf.setMapOutputKeyClass(MapEmittedTerm.class);
+		conf.setMapOutputKeyClass(SplitEmittedTerm.class);
 		conf.setMapOutputValueClass(MapEmittedPostingList.class);
 		
 		if (! conf.get("mapred.job.tracker").equals("local"))
@@ -141,8 +140,8 @@
 		
 		conf.setInputFormat(MultiFileCollectionInputFormat.class);
 		conf.setOutputFormat(NullOutputFormat.class);
-		conf.setOutputKeyComparatorClass(MapEmittedTerm.TermMapFlushComparator.class);
-		conf.setOutputValueGroupingComparator(MapEmittedTerm.TermComparator.class);
+		conf.setOutputKeyComparatorClass(SplitEmittedTerm.SETRawComparatorTermSplitFlush.class);
+		conf.setOutputValueGroupingComparator(SplitEmittedTerm.SETRawComparatorTerm.class);
 		conf.setReduceSpeculativeExecution(false);
 		//parse the collection.spec
 		BufferedReader specBR = Files.openFileReader(ApplicationSetup.COLLECTION_SPEC);
@@ -159,7 +158,7 @@
 		conf.setNumReduceTasks(numberOfReducers);
 		if (numberOfReducers> 1)
 		{
-			conf.setPartitionerClass(MapEmittedTermByMapPartitioner.class);
+			conf.setPartitionerClass(SplitEmittedTerm.SETPartitioner.class);
 		}
 		else
 		{
Index: src_test/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/TestSplitEmittedTerm.java
===================================================================
--- src_test/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/TestSplitEmittedTerm.java	(revision 0)
+++ src_test/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/TestSplitEmittedTerm.java	(revision 0)
@@ -0,0 +1,267 @@
+package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+
+import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.SplitEmittedTerm.SETPartitioner;
+import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.SplitEmittedTerm.SETRawComparatorTerm;
+import uk.ac.gla.terrier.structures.indexing.singlepass.hadoop.SplitEmittedTerm.SETRawComparatorTermSplitFlush;
+
+/** Tests for SplitEmittedTerm, including the Comparators and Partitioners */
+public class TestSplitEmittedTerm extends TestCase {
+	
+	
+	public void testMethods() throws Exception
+	{
+		SplitEmittedTerm t1 = new SplitEmittedTerm("t1", 10, 34);
+		assertEquals("t1", t1.getTerm());
+		assertEquals(10, t1.getSplitno());
+		assertEquals(34, t1.getFlushno());
+		
+		t1.setFlushno(11);
+		assertEquals(10, t1.getSplitno());
+		assertEquals(11, t1.getFlushno());
+		
+		t1.setSplitno(5);
+		assertEquals(5, t1.getSplitno());
+		assertEquals(11, t1.getFlushno());
+		
+		t1.setTerm("t2");
+		assertEquals("t2", t1.getTerm());
+	}
+	
+	private void checkWritable(final String t, final int split, final int flush) throws Exception
+	{
+		SplitEmittedTerm t1 = new SplitEmittedTerm(t, split, flush);
+		byte[] b = toBytes(t1);
+		
+		SplitEmittedTerm t2 = new SplitEmittedTerm();
+		t2.readFields(new DataInputStream(new ByteArrayInputStream(b)));
+		assertTrue(t1.equals(t2));
+		assertTrue(t2.equals(t1));
+		assertEquals(t, t2.getTerm());
+		assertEquals(split, t2.getSplitno());
+		assertEquals(flush, t2.getFlushno());
+	}
+	
+	public void testWritable() throws Exception
+	{
+		checkWritable("t1", 10, 34);
+		checkWritable("t1", Integer.MAX_VALUE, Integer.MAX_VALUE);
+	}
+	
+	private byte[] toBytes(Writable w) throws Exception
+	{
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputStream dos = new DataOutputStream(baos);
+		w.write(dos);
+		return baos.toByteArray();
+	}
+	
+	private void checkEqualityTerm(String t, int split, int flush) throws Exception
+	{
+		SplitEmittedTerm t1 = new SplitEmittedTerm(t, split, flush);
+		SETRawComparatorTerm compare = new SETRawComparatorTerm();
+		SETRawComparatorTermSplitFlush compare2 = new SETRawComparatorTermSplitFlush();
+		assertEquals(0, t1.compareTo(t1));
+		assertTrue(t1.equals(t1));
+		assertEquals(0, compare.compare(t1, t1));
+		assertEquals(0, compare2.compare(t1, t1));
+		byte[] t1w = toBytes(t1);
+		assertEquals(0, compare.compare(t1w, 0, t1w.length, t1w, 0, t1w.length));
+		assertEquals(0, compare2.compare(t1w, 0, t1w.length, t1w, 0, t1w.length));
+		
+		SplitEmittedTerm t1a = new SplitEmittedTerm(new String(t), split, flush);
+		assertEquals(0, t1.compareTo(t1a));
+		assertEquals(0, t1a.compareTo(t1));
+		assertTrue(t1.equals(t1a));
+		assertTrue(t1a.equals(t1));
+		assertEquals(0, compare.compare(t1, t1a));
+		assertEquals(0, compare.compare(t1a, t1));
+		assertEquals(0, compare2.compare(t1, t1a));
+		assertEquals(0, compare2.compare(t1a, t1));
+	}
+	
+	public void testEqualityTerm() throws Exception
+	{
+		checkEqualityTerm("t1", 0, 0);
+		checkEqualityTerm("t1", Integer.MAX_VALUE, Integer.MAX_VALUE);		
+	}
+	
+	
+	private void checkEqualityTermSplit(String t, int split1, int split2, int flush) throws Exception
+	{
+		SplitEmittedTerm t1 = new SplitEmittedTerm(t, split1, flush);
+		SplitEmittedTerm t2 = new SplitEmittedTerm(new String(t), split2, flush);
+		SETRawComparatorTerm compare = new SETRawComparatorTerm();
+		SETRawComparatorTermSplitFlush compare2 = new SETRawComparatorTermSplitFlush();
+		
+		assertEquals(0, t1.compareTo(t1));
+					
+		assertFalse(t1.equals(t2));
+		assertEquals(0, compare.compare(t1, t2));
+		assertTrue(compare2.compare(t1, t2) < 0);
+		
+		byte[] t1w = toBytes(t1);
+		byte[] t2w = toBytes(t2);
+		assertEquals(0, compare.compare(t1w, 0, t1w.length, t2w, 0, t2w.length));
+		assertTrue("Comparing t1 to t2 as bytes", compare2.compare(t1w, 0, t1w.length, t2w, 0, t2w.length)< 0);
+	}
+	
+	public void testEqualityTermSplit() throws Exception
+	{
+		checkEqualityTermSplit("t1", 0, 1, 0);
+		checkEqualityTermSplit("t1", Integer.MAX_VALUE -1, Integer.MAX_VALUE, Integer.MAX_VALUE);		
+	}
+	
+	private void compareTerm(SplitEmittedTerm t1, SplitEmittedTerm t2) throws Exception
+	{
+		SETRawComparatorTerm compare = new SETRawComparatorTerm();
+		//check for inequality of each pair
+		assertFalse(t1.equals(t2));
+		assertFalse(t2.equals(t1));
+					
+		assertTrue(t1.compareTo(t2) < 0);
+		assertTrue(t2.compareTo(t1) > 0);
+		assertTrue(compare.compare(t1, t2) < 0);
+		assertTrue(compare.compare(t2, t1) > 0);
+		
+		SETRawComparatorTermSplitFlush compare2 = new SETRawComparatorTermSplitFlush();
+		assertTrue(compare2.compare(t1, t2) < 0);
+		assertTrue(compare2.compare(t2, t1) > 0);
+		byte[] t1w = toBytes(t1);
+		byte[] t2w = toBytes(t2);
+		assertTrue(compare.compare(t1w, 0, t1w.length, t2w, 0, t2w.length)< 0);
+		assertTrue("Comparing t1 to t2 as bytes", compare2.compare(t1w, 0, t1w.length, t2w, 0, t2w.length) < 0);
+	}
+
+	public void testCompareTerm() throws Exception
+	{		
+		SplitEmittedTerm t1 = new SplitEmittedTerm("t1", 0, 0);
+		SplitEmittedTerm t2 = new SplitEmittedTerm("t2", 0, 0);
+		compareTerm(t1, t2);
+	
+		t1 = new SplitEmittedTerm("t1", Integer.MAX_VALUE, Integer.MAX_VALUE);
+		t2 = new SplitEmittedTerm("t2", Integer.MAX_VALUE, Integer.MAX_VALUE);
+		compareTerm(t1, t2);
+	}
+	
+	private void compareTermSplit(SplitEmittedTerm t1, SplitEmittedTerm t2) throws Exception
+	{
+		SETRawComparatorTerm compare = new SETRawComparatorTerm();
+		//check for inequality of each pair			
+		assertFalse(t1.equals(t2));
+		assertFalse(t2.equals(t1));
+					
+		assertTrue(t1.compareTo(t2) < 0);
+		assertTrue(t2.compareTo(t1) > 0);
+		assertEquals(0, compare.compare(t1, t2));
+		assertEquals(0, compare.compare(t2, t1));
+		
+		SETRawComparatorTermSplitFlush compare2 = new SETRawComparatorTermSplitFlush();
+		assertTrue(compare2.compare(t1, t2) < 0);
+		assertTrue(compare2.compare(t2, t1) > 0);
+		byte[] t1w = toBytes(t1);
+		byte[] t2w = toBytes(t2);
+		assertEquals(0, compare.compare(t1w, 0, t1w.length, t2w, 0, t2w.length));
+		assertEquals(0, compare.compare(t2w, 0, t2w.length, t1w, 0, t1w.length));
+		assertTrue(compare2.compare(t1w, 0, t1w.length, t2w, 0, t2w.length) < 0);
+		assertTrue(compare2.compare(t2w, 0, t2w.length, t1w, 0, t1w.length) > 0);
+	}
+	
+	public void testCompareTermSplit() throws Exception
+	{
+		
+		SplitEmittedTerm t1 = new SplitEmittedTerm("t1", 0, 0);
+		SplitEmittedTerm t2 = new SplitEmittedTerm("t1", 1, 0);
+		compareTermSplit(t1, t2);
+		
+		t1 = new SplitEmittedTerm("t1", Integer.MAX_VALUE-1, 0);
+		t2 = new SplitEmittedTerm("t1", Integer.MAX_VALUE, 0);
+		compareTermSplit(t1, t2);
+		
+	}
+	
+	private void compareTermFlush(SplitEmittedTerm t1, SplitEmittedTerm t2) throws Exception
+	{
+		SETRawComparatorTerm compare = new SETRawComparatorTerm();
+		//check for inequality of each pair			
+		assertFalse(t1.equals(t2));
+		assertFalse(t2.equals(t1));
+					
+		assertTrue(t1.compareTo(t2) < 0);
+		assertTrue(t2.compareTo(t1) > 0);
+		assertEquals(0, compare.compare(t1, t2));
+		assertEquals(0, compare.compare(t2, t1));
+		
+		SETRawComparatorTermSplitFlush compare2 = new SETRawComparatorTermSplitFlush();
+		assertTrue(compare2.compare(t1, t2) < 0);
+		assertTrue(compare2.compare(t2, t1) > 0);
+		byte[] t1w = toBytes(t1);
+		byte[] t2w = toBytes(t2);
+		assertEquals(0, compare.compare(t1w, 0, t1w.length, t2w, 0, t2w.length));
+		assertEquals(0, compare.compare(t2w, 0, t2w.length, t1w, 0, t1w.length));
+		assertTrue(compare2.compare(t1w, 0, t1w.length, t2w, 0, t2w.length) < 0);
+		assertTrue(compare2.compare(t2w, 0, t2w.length, t1w, 0, t1w.length) > 0);
+	}
+	
+	public void testCompareTermFlush() throws Exception
+	{		
+		SplitEmittedTerm t1 = new SplitEmittedTerm("t1", 0, 0);
+		SplitEmittedTerm t2 = new SplitEmittedTerm("t1", 0, 1);
+		compareTermFlush(t1, t2);
+		t1 = new SplitEmittedTerm("t1", 0, Integer.MAX_VALUE -1);
+		t2 = new SplitEmittedTerm("t1", 0, Integer.MAX_VALUE );
+		compareTermFlush(t1, t2);
+	}
+	
+	/* Test cases for SETPartitioner */
+	
+	/** single map, single reducer */
+	public void testSMSRCalculatePartition() throws Exception
+	{
+		final JobConf j = new JobConf();
+		j.setNumMapTasks(1);
+		final SETPartitioner p = new SETPartitioner();
+		p.configure(j);
+		assertEquals(0, p.calculatePartition(0, 1));
+	}
+	
+	/** multiple map, single reducer */
+	public void testMMSRCalculatePartition() throws Exception
+	{
+		final JobConf j = new JobConf();
+		final int maptasks = 20;
+		j.setNumMapTasks(maptasks);
+		final SETPartitioner p = new SETPartitioner();
+		p.configure(j);
+		assertEquals(0, p.calculatePartition(0, 1));
+		assertEquals(0, p.calculatePartition(19, 1));
+		assertEquals(0, p.calculatePartition(10, 1));
+		
+		
+	}
+	
+	/** multiple map, multiple reducer */
+	public void testMMMRCalculatePartition() throws Exception
+	{
+		final JobConf j = new JobConf();
+		final int maptasks = 20;
+		j.setNumMapTasks(maptasks);
+		final SETPartitioner p = new SETPartitioner();
+		p.configure(j);
+		
+		assertEquals(0, p.calculatePartition(0, 2));
+		assertEquals(0, p.calculatePartition(1, 2));
+		assertEquals(0, p.calculatePartition(9, 2));
+		assertEquals(1, p.calculatePartition(10, 2));
+		assertEquals(1, p.calculatePartition(19, 2));
+	}
+}

