diff -urN terrier-3.0-original/src/core/org/terrier/structures/indexing/singlepass/hadoop/BitPostingIndexInputFormat.java terrier-3.0/src/core/org/terrier/structures/indexing/singlepass/hadoop/BitPostingIndexInputFormat.java
--- terrier-3.0-original/src/core/org/terrier/structures/indexing/singlepass/hadoop/BitPostingIndexInputFormat.java	2010-03-10 16:06:31.000000000 +0000
+++ terrier-3.0/src/core/org/terrier/structures/indexing/singlepass/hadoop/BitPostingIndexInputFormat.java	2010-04-22 18:20:48.000000000 +0100
@@ -412,6 +412,8 @@
 				@SuppressWarnings("unchecked")
 				public void incrCounter(Enum arg0, long arg1) {}
 				public void incrCounter(String arg0, String arg1, long arg2) {}
+				public org.apache.hadoop.mapred.Counters.Counter getCounter(Enum arg0) {return null;}
+				public org.apache.hadoop.mapred.Counters.Counter getCounter(String arg0, String arg1) {return null;}
 				public void setStatus(String arg0) {}
 				public void progress() {}}
 			);
diff -urN terrier-3.0-original/src/core/org/terrier/structures/indexing/singlepass/hadoop/FileCollectionRecordReader.java terrier-3.0/src/core/org/terrier/structures/indexing/singlepass/hadoop/FileCollectionRecordReader.java
--- terrier-3.0-original/src/core/org/terrier/structures/indexing/singlepass/hadoop/FileCollectionRecordReader.java	2010-03-10 16:06:51.000000000 +0000
+++ terrier-3.0/src/core/org/terrier/structures/indexing/singlepass/hadoop/FileCollectionRecordReader.java	2010-04-22 18:20:48.000000000 +0100
@@ -39,6 +39,7 @@
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
 import org.apache.log4j.Logger;
 import org.terrier.indexing.Collection;
 import org.terrier.indexing.CollectionFactory;
@@ -55,7 +56,7 @@
  * @version $Revision: 1.2 $
  */
 public class FileCollectionRecordReader 
-		extends CollectionRecordReader<PositionAwareSplit<MultiFileSplit>> 
+		extends CollectionRecordReader<PositionAwareSplit<CombineFileSplit>> 
 		implements RecordReader<Text, SplitAwareWrapper<Document>>
 {
 
@@ -80,7 +81,7 @@
 	 * @param split - Input Split (multiple Files)
 	 * @throws IOException
 	 */
-	public FileCollectionRecordReader(JobConf jobConf, PositionAwareSplit<MultiFileSplit> split) throws IOException 
+	public FileCollectionRecordReader(JobConf jobConf, PositionAwareSplit<CombineFileSplit> split) throws IOException 
 	{	
 		super(jobConf, split);
 		compressionCodecs = new CompressionCodecFactory(config);
@@ -100,7 +101,7 @@
 	 */
 	public float getProgress() throws IOException {
 		float fileProgress = 0;
-		final float numPaths = (float)(((MultiFileSplit)split.getSplit()).getNumPaths());
+		final float numPaths = (float)(((CombineFileSplit)split.getSplit()).getNumPaths());
 		if (inputStream != null && length != start)
 			fileProgress = (float)inputStream.getPos()/(float)(length - start);
 		return (fileProgress + (float)collectionIndex)/numPaths;
@@ -112,12 +113,12 @@
 	@Override
 	protected Collection openCollectionSplit(int index) throws IOException
 	{
-		if (index >= ((MultiFileSplit)split.getSplit()).getNumPaths())
+		if (index >= ((CombineFileSplit)split.getSplit()).getNumPaths())
 		{
 			//no more splits left to process
 			return null;
 		}
-		Path file = ((MultiFileSplit)split.getSplit()).getPath(index);
+		Path file = ((CombineFileSplit)split.getSplit()).getPath(index);
 		logger.info("Opening "+file);
 		long offset = 0;//TODO populate from split?
 		FileSystem fs = file.getFileSystem(config);
diff -urN terrier-3.0-original/src/core/org/terrier/structures/indexing/singlepass/hadoop/MultiFileCollectionInputFormat.java terrier-3.0/src/core/org/terrier/structures/indexing/singlepass/hadoop/MultiFileCollectionInputFormat.java
--- terrier-3.0-original/src/core/org/terrier/structures/indexing/singlepass/hadoop/MultiFileCollectionInputFormat.java	2010-03-10 16:06:36.000000000 +0000
+++ terrier-3.0/src/core/org/terrier/structures/indexing/singlepass/hadoop/MultiFileCollectionInputFormat.java	2010-04-22 20:41:57.000000000 +0100
@@ -26,12 +26,19 @@
  */
 package org.terrier.structures.indexing.singlepass.hadoop;
 
+import gnu.trove.TObjectLongProcedure;
+import gnu.trove.TObjectLongHashMap;
 
 import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -40,6 +47,7 @@
 import org.apache.hadoop.mapred.MultiFileInputFormat;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
 import org.apache.log4j.Logger;
 import org.terrier.indexing.Document;
 
@@ -63,8 +71,8 @@
 	@Override
 	/**
 	 * Instantiates a FileCollectionRecordReader using the specified spit (which is
-	 * assumed to be a MultiFileSplit.
-	 * @param genericSplit contains files to be processed, assumed to be a MultiFileSplit
+	 * assumed to be a CombineFileSplit.
+	 * @param genericSplit contains files to be processed, assumed to be a CombineFileSplit
 	 * @param job JobConf of this job
 	 * @param reported To report progress
 	 */
@@ -75,7 +83,7 @@
 		throws IOException 
 	{
 		reporter.setStatus(genericSplit.toString());
-	    return new FileCollectionRecordReader(job, (PositionAwareSplit<MultiFileSplit>) genericSplit);
+	    return new FileCollectionRecordReader(job, (PositionAwareSplit<CombineFileSplit>) genericSplit);
 	}
 	
 	@Override
@@ -106,15 +114,31 @@
 			numSplits = 1;
 		}
 		logger.info("Allocating "+paths.length+ " files across "+numSplits +" map tasks");
-		List<PositionAwareSplit<MultiFileSplit>> splits = new ArrayList<PositionAwareSplit<MultiFileSplit>>(numSplits);
+		List<PositionAwareSplit<CombineFileSplit>> splits = new ArrayList<PositionAwareSplit<CombineFileSplit>>(numSplits);
 		final int numPaths = paths.length;
 		long[] lengths = new long[numPaths];
+		TObjectLongHashMap<String>[] locations = (TObjectLongHashMap<String>[])Array.newInstance(TObjectLongHashMap.class, numPaths);
 		long totLength = 0;
-		final FileSystem fs = FileSystem.get(job);	
+		final FileSystem fs = FileSystem.get(job);
 		for(int i=0; i<paths.length; i++) 
 		{
-			lengths[i] = fs.getFileStatus(paths[i]).getLen();
+			final FileStatus fss = fs.getFileStatus(paths[i]);
+			lengths[i] = fss.getLen();
 			totLength += lengths[i];
+			final TObjectLongHashMap<String> location2size = locations[i] = new TObjectLongHashMap<String>();
+			final long normalblocksize = fss.getBlockSize();
+			for(long offset = 0; offset < lengths[i]; offset += normalblocksize)
+			{
+				final long blocksize = Math.min(offset + normalblocksize, lengths[i]);
+				final BlockLocation[] blockLocations = fs.getFileBlockLocations(fss, offset, blocksize);
+				for(BlockLocation bl : blockLocations)
+				{
+					for (String host : bl.getHosts())
+					{
+						location2size.adjustOrPutValue(host, blocksize, blocksize);
+					}
+				}
+			}
 		}
 		
 		//we need to over-estimate using ceil, to ensure that the last split is not /too/ big
@@ -122,7 +146,7 @@
 		
 		int pathsUsed = 0;
 		int splitnum = 0;
-		MultiFileSplit mfs;
+		CombineFileSplit mfs;
 		// for each split except the last one (which may be smaller than numberOfFilesPerSplit)
 		while(pathsUsed < numPaths)
 		{
@@ -134,6 +158,43 @@
 			//arrays of information for split
 			Path[] splitPaths = new Path[splitSizeForThisSplit];
 			long[] splitLengths = new long[splitSizeForThisSplit];
+			long[] splitStarts = new long[splitSizeForThisSplit];
+			final TObjectLongHashMap<String> allLocationsForSplit = new TObjectLongHashMap<String>();
+			String[] splitLocations = null; //final recommended locations for this split.
+			for(int i=0;i<splitSizeForThisSplit;i++)
+			{
+				locations[pathsUsed+i].forEachEntry(new  TObjectLongProcedure<String>() {
+					public boolean execute(String a, long b)
+					{
+						allLocationsForSplit.adjustOrPutValue(a, b, b); return true;
+					}
+				});
+				if ( allLocationsForSplit.size() <=3 )
+				{
+					splitLocations = allLocationsForSplit.keys(new String[allLocationsForSplit.size()]);
+				}
+				else
+				{
+					String[] hosts = allLocationsForSplit.keys(new String[allLocationsForSplit.size()]);
+					 Arrays.sort(hosts, new Comparator<String>() {
+                        public int  compare(String o1, String o2) {
+                            long diffamount = allLocationsForSplit.get(o1) - allLocationsForSplit.get(o2);
+                            if (diffamount > 0)
+                            {
+                                return -1;
+                            }
+                            else if (diffamount < 0)
+                            {
+                                return 1;
+                            }
+                            return 0;
+                        }
+                    });
+                    splitLocations = new String[3];
+                    System.arraycopy(hosts, 0, splitLocations, 0, 3);
+				}
+			}
+			
 			
 			//copy information for this split
 			System.arraycopy(lengths, pathsUsed, splitLengths, 0, splitSizeForThisSplit);
@@ -143,8 +204,8 @@
 			
 			//make the actual split object
 			//logger.info("New split of size " + splitSizeForThisSplit);
-			mfs = new MultiFileSplit(job, splitPaths, splitLengths);
-			splits.add(new PositionAwareSplit<MultiFileSplit>(mfs, splitnum));
+			mfs = new CombineFileSplit(job, splitPaths, splitStarts, splitLengths, splitLocations);
+			splits.add(new PositionAwareSplit<CombineFileSplit>(mfs, splitnum));
 			splitnum++;
 		}
 
diff -urN terrier-3.0-original/src/core/org/terrier/structures/indexing/singlepass/hadoop/MultiFileSplit.java terrier-3.0/src/core/org/terrier/structures/indexing/singlepass/hadoop/MultiFileSplit.java
--- terrier-3.0-original/src/core/org/terrier/structures/indexing/singlepass/hadoop/MultiFileSplit.java	2010-03-10 16:06:30.000000000 +0000
+++ terrier-3.0/src/core/org/terrier/structures/indexing/singlepass/hadoop/MultiFileSplit.java	2010-04-22 18:20:49.000000000 +0100
@@ -34,10 +34,10 @@
  * @author Richard McCreadie and Craig Macdonald
  * @since 3.0
  */
-public class MultiFileSplit extends org.apache.hadoop.mapred.MultiFileSplit {
+public class MultiFileSplit extends org.apache.hadoop.mapred.lib.CombineFileSplit {
 
 	public MultiFileSplit() {
-		super(null, null, new long[0]);
+		super();
 	}
 	
 	public MultiFileSplit(JobConf arg0, Path[] arg1, long[] arg2) {
diff -urN terrier-3.0-original/src/core/org/terrier/utility/io/HadoopPlugin.java terrier-3.0/src/core/org/terrier/utility/io/HadoopPlugin.java
--- terrier-3.0-original/src/core/org/terrier/utility/io/HadoopPlugin.java	2010-03-10 16:06:31.000000000 +0000
+++ terrier-3.0/src/core/org/terrier/utility/io/HadoopPlugin.java	2010-04-22 19:01:47.000000000 +0100
@@ -280,8 +280,14 @@
 		Configuration globalConf = getGlobalConfiguration();
 		
 		try {
+			JobConf jc_sampleConf = new JobConf();
 			//see if the current hadoop configuration has a real job tracker configured
-			if (! globalConf.get("mapred.job.tracker").equals("local"))
+			String jt = globalConf.get("mapred.job.tracker");
+			if (jt == null)
+			{
+				jt = new JobConf().get("mapred.job.tracker");
+			}
+			if (jt != null && ! jt.equals("local"))
 			{
 				if (logger.isDebugEnabled()) logger.debug("Default configuration has job tracker set to " + globalConf.get("mapred.job.tracker"));	
 				return new DirectJobFactory(/*globalConf*/);

