Index: src/uk/ac/gla/terrier/utility/Skipable.java
===================================================================
--- src/uk/ac/gla/terrier/utility/Skipable.java	(revision 0)
+++ src/uk/ac/gla/terrier/utility/Skipable.java	(revision 0)
@@ -0,0 +1,7 @@
+package uk.ac.gla.terrier.utility;
+
+import java.io.IOException;
+
+public interface Skipable {
+	public void skip(int numEntries) throws IOException;
+}
Index: src/uk/ac/gla/terrier/structures/collections/FSArrayFile.java
===================================================================
--- src/uk/ac/gla/terrier/structures/collections/FSArrayFile.java	(revision 2549)
+++ src/uk/ac/gla/terrier/structures/collections/FSArrayFile.java	(working copy)
@@ -11,6 +11,7 @@
 
 import uk.ac.gla.terrier.structures.seralization.FixedSizeWriteableFactory;
 import uk.ac.gla.terrier.utility.Files;
+import uk.ac.gla.terrier.utility.Skipable;
 import uk.ac.gla.terrier.utility.io.RandomDataInput;
 
 /** A file for accessing Writable classes written on disk. These must be of fixed size.
@@ -90,7 +91,7 @@
 	}
 	
 	
-	public static class ArrayFileIterator<V extends Writable> implements Iterator<V>, Closeable
+	public static class ArrayFileIterator<V extends Writable> implements Iterator<V>, Closeable, Skipable
 	{
 		FixedSizeWriteableFactory<V> valueFactory;
 		DataInputStream dis;
@@ -103,6 +104,15 @@
 			numberOfEntries = (int) (Files.length(filename) / ((long) valueFactory.getSize()));
 		}
 		
+		public void skip(int numEntries) throws IOException
+		{
+			final long targetSkip = numEntries * ((long) valueFactory.getSize());
+			long skipped = 0;
+			do{
+				skipped += dis.skip(targetSkip);
+			} while(skipped < targetSkip);
+		}
+		
 		public boolean hasNext() {
 			return count < numberOfEntries;
 		}
Index: src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/BitPostingIndexInputFormat.java
===================================================================
--- src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/BitPostingIndexInputFormat.java	(revision 0)
+++ src/uk/ac/gla/terrier/structures/indexing/singlepass/hadoop/BitPostingIndexInputFormat.java	(revision 0)
@@ -0,0 +1,176 @@
+package uk.ac.gla.terrier.structures.indexing.singlepass.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import uk.ac.gla.terrier.structures.BitIndexPointer;
+import uk.ac.gla.terrier.structures.BitPostingIndexInputStream;
+import uk.ac.gla.terrier.structures.Index;
+import uk.ac.gla.terrier.structures.postings.IterablePosting;
+import uk.ac.gla.terrier.utility.Wrapper;
+
+/** An InputFormat for a BitPostingIndex. Splits the main posting file into InputSplits, according to the block
+ * size of the underlying file. The following JobConf properties are used:
+ * <ul>
+ * <li><tt>mapred.index.path</tt> and <tt>mapred.index.prefix</tt> - where to find the index.</li>
+ * <li><tt>mapred.bitpostingindex.structure</tt> - which structure are we splitting?</li>
+ * <li><tt>mapred.bitpostingindex.lookup.structure</tt> - which structure's inputstream is the Iterator of BitIndexPointers?</li>
+ * </ul>
+ */
+public class BitPostingIndexInputFormat extends FileInputFormat<IntWritable, Wrapper<IterablePosting>> {
+
+	public RecordReader<IntWritable, Wrapper<IterablePosting>> getRecordReader(
+				final InputSplit _split, final JobConf job, final Reporter reporter) 
+			throws IOException {
+		
+		final BitPostingIndexInputSplit split = (BitPostingIndexInputSplit)_split;
+		final Index index = Index.createIndex(job.get("mapred.index.path"), job.get("mapred.index.prefix"));
+		final String bitPostingStructureName = job.get("mapred.bitpostingindex.structure");
+		
+		final BitPostingIndexInputStream postingStream = (BitPostingIndexInputStream)index.getIndexStructure(bitPostingStructureName);
+		postingStream.skip(split.getStartingEntryIndex());
+		return new BitPostingIndexRecordReader(postingStream, split.getStartingEntryIndex(), split.getEntryCount());
+	}
+	
+	static class BitPostingIndexRecordReader implements RecordReader<IntWritable, Wrapper<IterablePosting>>
+	{
+		int entryIndex = 0;
+		int entryCount = 0;
+		BitPostingIndexInputStream postingStream;
+		
+		BitPostingIndexRecordReader(BitPostingIndexInputStream _postingStream, int _entryIndex, int _entryCount)
+		{
+			this.postingStream = _postingStream;
+			this.entryIndex = _entryIndex;
+			this.entryCount = _entryCount;
+		}		
+		
+		public void close() throws IOException { }
+
+		public IntWritable createKey() {
+			return new IntWritable();
+		}
+
+		public Wrapper<IterablePosting> createValue() {
+			return new Wrapper<IterablePosting>();
+		}
+
+		public long getPos() throws IOException {
+			return postingStream.getPos().getOffset();
+		}
+
+		public float getProgress() throws IOException {
+			return (float)entryIndex/(float)entryCount;
+		}
+
+		public boolean next(IntWritable docid, Wrapper<IterablePosting> wrapperPostingList)
+				throws IOException {
+			if (! postingStream.hasNext())
+				return false;
+			docid.set(entryIndex++);
+			wrapperPostingList.setObject(postingStream.next());			
+			return true;
+		}
+		
+	}
+
+	@SuppressWarnings("unchecked")
+	/** Make the splits of the index structure */
+	public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+		Index index = Index.createIndex(job.get("mapred.index.path"), job.get("mapred.index.prefix"));
+		String lookupStructureName = job.get("mapred.bitpostingindex.lookup.structure");
+		String bitPostingStructureName = job.get("mapred.bitpostingindex.structure");
+		
+		final Path bitPostingStructureFile = new Path(BitPostingIndexInputStream.getFilename(index, bitPostingStructureName));
+		FileSystem fs = FileSystem.get(job);
+		FileStatus fss = fs.getFileStatus(bitPostingStructureFile);
+		final long bitPostingStructureFSBlockSize = fss.getBlockSize();
+		
+		Iterator<BitIndexPointer> offsetIterator = (Iterator<BitIndexPointer>)index.getIndexStructureInputStream(lookupStructureName);
+		List<InputSplit> splitList = new ArrayList<InputSplit>();
+		int entryIndex = -1;
+		long blockSize = 0; int entriesInBlock = 0;
+		long lastByteOffset = 0;
+		long endOfLastSplit = 0;
+		
+		//iterate through the lookup iterator
+		//split the target bit posting index structure into chunks of size bitPostingStructureFSBlockSize
+		while(offsetIterator.hasNext())
+		{
+			entryIndex++; entriesInBlock++;
+			long offset = offsetIterator.next().getOffset();
+			if (offset - lastByteOffset == 0)
+				continue;
+			long entrySize = offset - lastByteOffset;
+			blockSize += entrySize;
+			//OK, this block is large enough
+			if (blockSize > bitPostingStructureFSBlockSize)
+			{
+				BlockLocation[] blkLocations = fs.getFileBlockLocations(fss, offset - blockSize, blockSize);
+				endOfLastSplit = offset;
+				splitList.add(
+					new BitPostingIndexInputSplit(
+						bitPostingStructureFile, 
+						offset - blockSize, blockSize, 
+						blkLocations[0].getHosts(), entryIndex, entriesInBlock));
+				blockSize = 0; entriesInBlock = 0;
+			}
+			lastByteOffset = offset;			
+		}
+		if (blockSize > 0)
+		{
+			BlockLocation[] blkLocations = fs.getFileBlockLocations(fss, lastByteOffset - endOfLastSplit, blockSize);
+			splitList.add(
+				new BitPostingIndexInputSplit(
+					bitPostingStructureFile, 
+					lastByteOffset - endOfLastSplit, blockSize, 
+					blkLocations[0].getHosts(), entryIndex, entriesInBlock));
+		}
+	
+		return splitList.toArray(new InputSplit[splitList.size()]);
+	}
+	
+	static class BitPostingIndexInputSplit extends FileSplit
+	{
+		int startingEntryIndex;
+		int entryCount;
+		public BitPostingIndexInputSplit(
+				Path file, 
+				long start, long length,
+				String[] hosts, int _startingEntryIndex, int _entryCount) {
+			super(file, start, length, hosts);			
+			startingEntryIndex = _startingEntryIndex;
+			entryCount = _entryCount;
+		}
+		
+		public int getStartingEntryIndex()
+		{
+			return startingEntryIndex;
+		}
+	
+		public int getEntryCount()
+		{
+			return entryCount;
+		}
+	}
+
+	/** Unimplemented */
+	public void validateInput(JobConf job) throws IOException {
+		//TODO
+	}
+
+}
Index: src/uk/ac/gla/terrier/structures/BitPostingIndexInputStream.java
===================================================================
--- src/uk/ac/gla/terrier/structures/BitPostingIndexInputStream.java	(revision 2549)
+++ src/uk/ac/gla/terrier/structures/BitPostingIndexInputStream.java	(working copy)
@@ -7,6 +7,7 @@
 import uk.ac.gla.terrier.compression.BitInputStream;
 import uk.ac.gla.terrier.structures.postings.IterablePosting;
 import uk.ac.gla.terrier.utility.FieldScore;
+import uk.ac.gla.terrier.utility.Skipable;
 import uk.ac.gla.terrier.utility.io.WrappedIOException;
 
 public class BitPostingIndexInputStream implements PostingIndexInputStream {
@@ -18,16 +19,32 @@
 
 	protected Class<? extends IterablePosting> postingIteratorClass;
 	
+	public static String getFilename(Index _index, String structureName)
+	{
+		return _index.getPath() + "/" + _index.getPrefix() +"."+ structureName + BitIn.USUAL_EXTENSION;
+	}
+	
 	public BitPostingIndexInputStream(
 			Index _index, String structureName, 
 			Iterator<? extends BitIndexPointer> _pointerList,
 			Class<? extends IterablePosting> _postingIteratorClass) throws IOException
 	{
-		file = new BitInputStream(_index.getPath() + "/" + _index.getPrefix() +"."+ structureName + BitIn.USUAL_EXTENSION);
+		file = new BitInputStream(getFilename(_index, structureName));
 		pointerList = _pointerList;
 		postingIteratorClass = _postingIteratorClass;
 	}
 	
+	public BitFilePosition getPos()
+	{
+		return new FilePosition(file.getByteOffset(), file.getBitOffset());
+	}
+	
+	public void skip(int numEntries) throws IOException
+	{
+		((Skipable)pointerList).skip(numEntries);
+		//TODO how to skip?
+	}
+	
 	/** {@inheritDoc} */
 	public IterablePosting getNextPostings() throws IOException {
 		if (! this.hasNext())
@@ -40,7 +57,7 @@
 		final int fieldCount = FieldScore.FIELDS_COUNT;
 		if (fieldCount > 0)
 		{
-			throw new IOException("Fields not support this way");
+			throw new IOException("Fields not supported this way");
 		}
 		IterablePosting rtr = null;
 		try{
Index: src/uk/ac/gla/terrier/compression/BitIn.java
===================================================================
--- src/uk/ac/gla/terrier/compression/BitIn.java	(revision 2549)
+++ src/uk/ac/gla/terrier/compression/BitIn.java	(working copy)
@@ -82,6 +82,14 @@
 	 * @throws IOException if an I/O error occurs
 	 */
     public void skipBits(int len) throws IOException;
+    
+    /** Skip a number of bytes while reading the bit file.
+     * After this opteration, getBitOffset() == 0, so use
+     * skipBits to get getBitOffset() to desired value.
+	 * @param len The number of bytes to skip
+	 * @throws IOException if an I/O error occurs
+	 */
+    public void skipBytes(long len) throws IOException;
 
     /**
      * Aligns the stream to the next byte
Index: src/uk/ac/gla/terrier/compression/BitFileInMemory.java
===================================================================
--- src/uk/ac/gla/terrier/compression/BitFileInMemory.java	(revision 2549)
+++ src/uk/ac/gla/terrier/compression/BitFileInMemory.java	(working copy)
@@ -26,6 +26,7 @@
 
 package uk.ac.gla.terrier.compression;
 import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -236,8 +237,18 @@
 			readByteOffset++;
 			bitOffset = len & 7;
 		}
+		
+		/** {@inheritDoc} */
+		public void skipBytes(long len) throws IOException {
+			if (readByteOffset > inBuffer.length)
+				throw new EOFException();
+			readByteOffset+= len;
+			bitOffset = 0;
+		}
 
 		/** nothing to do */
 		public void close() {}
+
+		
 	}
 }
Index: src/uk/ac/gla/terrier/compression/BitFileBuffered.java
===================================================================
--- src/uk/ac/gla/terrier/compression/BitFileBuffered.java	(revision 2549)
+++ src/uk/ac/gla/terrier/compression/BitFileBuffered.java	(working copy)
@@ -187,7 +187,7 @@
 	
 	
 	
-		private void incrByte(int i)
+		private final void incrByte(int i)
 		{
 			try{
 				//System.out.println("skypping");
@@ -302,14 +302,32 @@
 				return;
 			}
 			len +=  bitOffset - 8;
-   	   final int i = len >> 3;
-   	   if (i > 0)
-   	   {
-   	      incrByte(i);
-   	   }
-			incrByte();
+	   	   final int i = len >> 3;
+	   	   if (i > 0)
+	   	   {
+	   	      incrByte(i);
+	   	   }
+	   	   	incrByte();
 			bitOffset = len & 7;
 		}
+    	
+    	/** {@inheritDoc} */
+		public void skipBytes(long len) throws IOException
+		{
+			if (readByteOffset + len > inBuffer.length)
+			{
+				offset += len;
+				parentFile.seek(offset); // we skip the first bytes of the next block
+				inBuffer = new byte[size];
+				readByteOffset = 0;
+				//logger.info("Reading 1024 bytes. pos="+parentFile.getFilePointer());
+				try{
+					parentFile.readFully(inBuffer);
+				} catch (EOFException eofe) { /* ignore this */}
+			}
+			readByteOffset = 0;
+			bitOffset = 0;
+		}
 		
 	public long getByteOffset(){ return offset;}
 	/**
Index: src/uk/ac/gla/terrier/compression/BitInputStream.java
===================================================================
--- src/uk/ac/gla/terrier/compression/BitInputStream.java	(revision 2542)
+++ src/uk/ac/gla/terrier/compression/BitInputStream.java	(working copy)
@@ -29,6 +29,7 @@
 
 
 import java.io.*;
+
 import uk.ac.gla.terrier.utility.Files;
 /**
  * This class reads from a file or an InputStream integers that can be coded with different encoding algorithms.
@@ -204,6 +205,16 @@
 		bitOffset = len & 7;
 	}
 	
+	/** {@inheritDoc} */
+	public void skipBytes(long len) throws IOException {
+		long skipped = 0;
+		do{
+			skipped += dis.skip(len);
+		} while(skipped < len);
+		bitOffset = 0;
+		byteRead = dis.readByte();
+	}
+	
 	/**
 	 * Reads a new byte from the InputStream if we have finished with the current one. 
 	 * @throws IOException if we have reached the end of the file
@@ -318,4 +329,6 @@
 		byteOffset++;		
 		byteRead = dis.readByte();		
 	}
+
+	
 }
Index: src/uk/ac/gla/terrier/compression/BitFile.java
===================================================================
--- src/uk/ac/gla/terrier/compression/BitFile.java	(revision 2549)
+++ src/uk/ac/gla/terrier/compression/BitFile.java	(working copy)
@@ -26,6 +26,7 @@
  */
 package uk.ac.gla.terrier.compression;
 
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 
@@ -421,6 +422,14 @@
 		readByteOffset++;
 		bitOffset = len & 7;
 	}
+    
+    /** {@inheritDoc} */
+	public void skipBytes(long len) throws IOException {
+		if (readByteOffset + len > inBuffer.length)
+			throw new EOFException();
+		readByteOffset+= len;
+		bitOffset = 0;
+	}
 	
 	/**
 	 * Closes the file. If the file has been written, it is also flushed to disk. 

