Pig Memory Usage Improvement

Problem Statement

  1. Pig hogs memory. In the 0.2.0 version, the expansion factor of data on disk to data in memory is 2-3x. This causes pig problems in terms of efficiently executing users programs. It is caused largely by extensive use Java objects (Integer, etc.) to store internal data.
  2. Java memory management and its garbage collector are poorly suited to the workload of intensive data processing. Pig needs better control over where data is stored and when memory is deallocated. For a complete discussion of this issue see M. A. Shah et. al., Java Support for Data-Intensive Systems: Experiences Building the Telegraph Dataflow System. In particular this paper points out that a memory allocation and garbage collection scheme that is beyond the control of the programmer is a bad fit for a large data processing system.

  3. Currently Pig waits until memory is low to begin spilling bags to disk. This has two issues:
    1. It is difficult to accurately determine when available memory is too low. b. The system tries to spill and continue processing simultaneously. Sometimes the continued processing outruns the spilling and the system still runs out of memory.

Proposed Solution

Switching from using Java containers and objects to using large memory buffers and a page cache will address both of these issues.

Architecture

A basic page caching mechanism will be built. Traditional OS page caches are fairly small (often a few K bytes). This will use large pages (default size around 10M or so). This is chosen for a couple of reasons:

  1. It is convenient to constrain all of the scalar values in a tuple to fit into one buffer. In order to do this the buffer sizes need to be large.
  2. Data in pig tends to be written all at once and read only one or two times. Additionally, data reads generally consist of scans through all of the output of an operator. So there should be relatively little need to bring pages in and out of memory repeatedly.

The page cache will consist of a MemoryManager singleton, TupleBuffers, and DataBuffers. TupleBuffer will contain a DataBuffer. It will also contain the logic to manage spilling and reading the DataBuffer from disk. The MemoryManager will track TupleBuffers that are available for use and the TupleBuffers that are full and eligible to be flushed to disk. It will also track the total number of DataBuffers in memory. The total number of DataBuffers that can be created will be bounded by a configuration value. It should default to something like 250M (or maybe 70% of JVM memory if we can determine how much memory the whole JVM has been given).

When a TupleBuffer needs to create a new DataBuffer (either to store new data or to read in flushed data from disk) then the TupleBuffer will request one from the MemoryManager. If the maximum number of DataBuffers are already in existence, then the MemoryManager will select, via an LRU algorithm, a TupleBuffer that is full and in memory and request that it flush its data to disk. Once it has done that, it will take the now available DataBuffer and return it to the originally requesting TupleBuffer.

In addition to this page caching system, new implementations of the Tuple and Bag interfaces will be written. ManagedTuple will store scalar objects and maps in TupleBuffers as serialized data. It will obtain TupleBuffers from the MemoryManager, thus allowing many tuples to share one TupleBuffer. This should significantly reduce the memory footprint. One tuple will be stored entirely within a given TupleBuffer. As data is written to a TupleBuffer, some amount of space will be saved in the buffer to allow tuples in that buffer to expand. (Question, do we really need this? How often do tuples grow? The case I can think of for this is when a value in a tuple is cast from one type to another.) If a tuple cannot fit entirely in a given TupleBuffer it will request a new TupleBuffer to store its data in. The ManagedTuple will store an array of offsets into the TupleBuffer's DataBuffer corresponding to each scalar and map field. References to tuples and bags will be stored in a separate array of Objects. There is no requirement that the tuples and bags inside a tuple use the same TupleBuffer as the outer tuple.

ManagedTuple will handle translating between serialized data in TupleBuffers and Java objects used in the Tuple interface. Hopefully with boxing for numeric types this will be relatively fast. Strings and maps will be somewhat slower. Changes will also be necessary to DataByteArray to allow it to reference underlying data with an offset and a length, so that it need not copy bytes in and out of the TupleBuffer. A new implementation of Map will also be necessary. This ManagedMap will lay out its keys and values in a TupleBuffer and supports reading of values from that serialized data. In the future we will investigate changing the Tuple interface to include non-object get methods for numeric types so that the tuple can return the data without the need of going through an object conversion. This interface could be used by arithmetic operators which operate on numbers anyway.

In order to facilitate this translation to objects, ManagedTuple will store one byte with each field that records the type of the field, and whether or not the field is null. In the future we can investigate an optimized version of ManagedTuple that takes a schema and thus avoids the need to store type info for every tuple. In this case null information could be stored efficiently in a bit vector.

Detailed Design

    /**
     * This is just a wrapper so I can store a byte[] in a container.  Entire class is
     * package level as no one outside of the data package should
     * be interacting with it.
     */
    class DataBuffer {

        static int bufferSize;

        static {
            String bufsize = System.getProperty("pig.memory.databuffer.size", "10240");
            bufferSize = Integer.parseInt(bufsize) * 1024;
        }

        byte[bufferSize] data;
    }

    /**
     * A buffer to manage large byte arrays, count tuples referencing them,
     * and manage spilling them to disk.  Entire class is
     * package level as no one outside of the data package should
     * be interacting with it.
     */
    class TupleBuffer {

        // number of tuples with references in the buffer
        private int refCnt;
        private int nextOffset;
        private boolean isDirty;
        private boolean isOnDisk;
        // Package level so others can see it without the overhead of a read call.
        DataBuffer data;
        File diskCache;

        TupleBuffer() {
            MemoryManager.getMemoryManager().getDataBuffer();
            recycle();
        }

        void recycle() {
            diskCache = null;
            nextOffset = 0;
            refCnt = 0;
            isDirty = false;
            isOnDisk = false;
        }
            
        /**
         * Called by a tuple when it is going to store it's data in this TupleBuffer.
         */
        void attach() {
            refCnt++;
        }

        /**
         * Called by a tuple when it is deallocated.
         */
        void detach() {
            if (--refCnt < 1) {
                MemoryManager.getMemoryManager().recycle(this);
            }
        }

        /**
         * Write data to the buffer.
         * @param data bytes to write to the buffer
         * @return offset where data written starts
         */
        int write(byte[] data) {
            bringIntoMemory();
            isDirty = true;
            write data into this.data;
            move nextOffset;
            if insufficient space return -1
            else return value of nextOffset before write
        }

        /**
         * Bring the buffer into memory.  This must be called
         * before the tuple begins to read the data.
         */
        void bringIntoMemory() {
            if (isOnDisk) {
                MemoryManager.getMemoryManager().getDataBuffer();
                read into memory
                isDirty = false;
                // pushes the buffer back onto the full queue, so it can be
                // flushed again if necessary.
                MemoryManager.getMemoryManger().markFull();
                isOnDisk = false;
            }
        }

        /**
         * Determine if there is space in the buffer for a new tuple.
         * @param size estimated size (in bytes) of the new tuple.
         * @return true if there's room
         */
        boolean isSpaceForNew(int size) {
            assert in memory, otherwise no one should be trying add
            // configurable % should probably default to around 80% so there's room for growth.
            // We need to play with this and see what's optimal.
            boolean isspace = (bufferSize - nextOffset + size) / bufferSize) < some configurable %;
            if (!isspace) MemoryManager.getMemoryManager().markFull(this);
            return isspace;
        }

        /**
         * Determine if there is space in the buffer to expand
         * an existing tuple.
         * @param size bytes to write to the buffer.  This size
         * should be accurate, or at least a guaranteed upper bound.
         * @return true if there's room.
         */
        boolean isSpaceForGrowth(int size) {
            bringIntoMemory();
            return nextOffset + size < bufferSize;
        }

        /**
         * Flush to data to disk. 
         * @return 
         */
        DataBuffer flush() {
            if (isDirty) {
                isOnDisk = true;
                open(diskCache);
                write buffer to diskCache;
                diskCache.deleteOnExit();
                return data;
            }
        }
    }

    /**
     * A class to manage all of the memory and data buffers.  Entire class is
     * package level as no one outside of the data package should
     * be interacting with it.
     *
     * This class tracks TupleBuffers and DataBuffers.  We can create as many
     * TupleBuffers as necessary.  But the number of DataBuffers is bounded
     * by the available memory provided by the system.
     */
    class MemoryManager {

        private static MemoryManager self = null;

        private Set<TupleBuffer> availableTupleBuffers;
        private List<TupleBuffer> fullTupleBuffers;
        private List<DataBuffer> availableDataBuffers;
        private int numCreatedDataBuffers;
        private int maxDataBuffers;

        static getMemoryManager() {
            if (self == null) {
                self = new MemoryManager();
            }
            return MemoryManager.self;
        }

        MemoryManager() {
            Determine available memory based on configuration.
            set maxDataBuffers
        }

        /**
         * Mark a TupleBuffer as no longer available to store new tuples.
         * It may still have room to grow tuples currently stored in it.
         * The TupleBuffer will also be put on the full list so that it
         * is eligible for flushing to disk if necessary.
         * @param tb TupleBuffer to take off the available list.
         */
        void markFull(TupleBuffer tb) {
            availableTupleBuffers.delete(tb);
            fullTupleBuffers.push_back(tb);
        }

        /**
         * Mark a TupleBuffer as available to take new tuples.  All 
         * existing data in this tuple buffer will be dropped.
         */
        void recycle(TupleBuffer tb) {
            tb.recycle();
            avilableTupleBuffers.add(tb);
        }

        /**
         * Get a TupleBuffer.  If there's one to use on the available list
         * then use it, otherwise create a new one.
         */
        TupleBuffer getTupleBuffer() {
            if (!availableTupleBuffers.isEmpty()) {
                return availableTupleBuffers.iterator().getNext();
            } else {
                TupleBuffer tb = new TupleBuffer();
                availableTupleBuffers.add(tb);
                return tb;
            }
        }

        /**
         * Get a DataBuffer.  If there's one to use on the available list
         * then return it.  Otherwise, if we have not yet reached the maximum
         * number of data buffers, create a new one.  As a last resort, tell
         * an existing full TupleBuffer to flush and then use its DataBuffer.
         */
        DataBuffer getDataBuffer() {
            if (!availableDataBuffers.isEmpty()) {
                return availableDataBuffers.pop();
            } else {
                if (numCreatedDataBuffers < maxDataBuffers) {
                    numCreatedDataBuffers++;
                    return new DataBuffer();
                } else {
                    TupleBuffer victim = fullTupleBuffers.pop_front();
                    availableDataBuffers.push(victim.flush());
                    return availableDataBuffers.pop();
                }
            }
        }

    }

    /**
     * An implementation of Tuple that works with managed memory.  It stores
     * complex types (tuples, bags, maps) as objects in an internal array.
     * Scalar types are laid out in a TupleBuffer as raw data.  The format
     * of the layout is 1 byte for type, 4 bytes for length (byte array and
     * char array types only), then the data.
     */
    class ManagedTuple implements Tuple {

        // Reference to the TupleBuffer holding data for this tuple.
        TupleBuffer buf;

        // Offsets into the tuple buffer for each field.  A negative value
        // indicates that the field is a complex type, and the absolute value
        // yields the offsets into the complexFields array.  This is stored
        // as an array to avoid the overhead of an ArrayList.
        private int[] fieldOffsets;

        // Array of complex fields (tuples, bags, maps) in this tuple.  Stored
        // as an array to avoid the overhead of an array list.
        private Object[] complexFields;

        // Do not add any more member variables.  We want to keep the memory
        // footprint of ManagedTuple to an absolute minimum.

        ManagedTuple() {
            buf = MemoryManager.getMemoryManager().getTupleBuffer();
            buf.attach();
        }

        public protected finalize() {
            buf.detach();
        }

        public void append(Object val) {
            if (val is complex) {
                grow fieldOffsets by 1;
                grow complexFields by 1;
                append val to complexFields;
                append negative offset to fieldOffsets;
            } else {
                if (buf.sizeToGrow(val.sizeof())) {
                    grow fieldOffsetsBy 1;
                    buf.bringIntoMemory();
                    fieldOffsets[last] = buf.write(val.toBytes());
                } else {
                    move tuple to new TupleBuffer, adding new last field
                }
            }
        }

        public Object get(int fieldNum) {
            if (fieldOffsets[fieldNum] < 0) {
                return complexFields[fieldOffsets[fieldNum] * -1];
            } else {
                buf.bringIntoMemory();
                determine type from buf.data.data[fieldOffsets[fieldNum]];
                instantiate correct type of object;
                return it;
            }
        }

        public void set(int fieldNum, Object val) {
            if (val is map) {
                m = new ManagedMap();
                m.putAll((Map)val);
                // may actually need to grow complex fields if this field
                // didn't use to be complex or if it was null.
                complexFields[fieldOffsets[fieldNum] * -1] = m;
            } else if (val is tuple or bag) {
                // may actually need to grow complex fields if this field
                // didn't use to be complex or if it was null.
                complexFields[fieldOffsets[fieldNum] * -1] = val;
            } else {
                if (buf.sizeToGrow(val.sizeof())) {
                    buf.bringIntoMemory();
                    fieldOffsets[fieldNum] = buf.write(val.toBytes());
                } else {
                    move tuple to new TupleBuffer;
                }
            }
        }
    }

    /**
     * An implementation of Map for use with managed memory.  Package level
     * access because it will only be used by ManagedTuple.
     */
    class ManagedMap implements Map<String, Object> {

        // Reference to the TupleBuffer holding data for this tuple.
        private TupleBuffer buf;

        void put(String key, Object val) {
            // throw, we can' amend an existing tuple.
        }

        void putAll(Map<String, Object> m) {
            tb.bringIntoMemory();
            write keys and values into buf, using length to delineate;
            value objects should be serialized the same as in tuple with a type;
        }

        Object get(String key) {
            tb.bringIntoMemory();
            Look through the data, skipping key to key until we find the right key;
            Deserialize teh value out of the map into appropriate object type and return.
        }
    }

A new class ManagedBag will be created that does not extend DefaultAbstractBag. This bag will not support spilling. It will aggressively minimize the use of member variables to keep its memory footprint to a minimum.

Proposed Methodology

As they say in the financial world, this document contains many statements that are forward looking and may or may not work out. We should prototype this along the way to assure ourselves that this will in fact bring the proposed improvement while maintaining performance.

Step one should be to prototype ManagedTuple and ManagedMap. Then a stand alone tool can be written that will create tuples until it runs out of memory. It can then be run with existing DefaultTuples and with ManagedTuples and see if significant improvements are seen. Improvement should be in the range of cutting the memory footprint in half. This test should be run with all scalar data and with data that includes maps and bags of maps (since this is a common use case for our users).

Assuming step one shows promising results, the next step should be to prototype the page caching system. Queries that we know to produce GC overhead type errors in existing code should be run with the page caching system to show that it properly handles the situation with no GC overhead.

Thoughts for Future Work

In the above referenced Telegraph paper the developers took the next step and managed not just the large memory buffers but even the creation and deletion of objects. This meant a couple of things:

  1. They needed to add explicit deallocation to their programming.
  2. They needed to keep pools of available objects for recycling.

We may want to look at these options, though 1 in particular may be difficult as Java programmers (especially those who haven't programmed in C++ or a similar language) have no concept of deallocating objects when they are finished with them. However, they noted that by totally circumventing the Java garbage collector they got around a 2.5x speedup of their system. So it might be worth investigating.

Reader Feedback

Ted Dunning commented in http://mail-archives.apache.org/mod_mbox/hadoop-pig-dev/200905.mbox/%3cc7d45fc70905141943v72591b09u81009cf29b9f58b8@mail.gmail.com%3e

Response: http://mail-archives.apache.org/mod_mbox/hadoop-pig-dev/200905.mbox/%3cE3AC3B63-ADB2-4C49-83FB-49A251CD95FB@yahoo-inc.com%3e

Thejas Nair commented in http://mail-archives.apache.org/mod_mbox/hadoop-pig-dev/200905.mbox/%3CC633011F.42186%25tejas@yahoo-inc.com%3E

Response: http://mail-archives.apache.org/mod_mbox/hadoop-pig-dev/200905.mbox/%3C55CAA9B7-C415-4C63-B80B-62D7A753947B@yahoo-inc.com%3E

Chris Olston made a few comments in a conversation:

1. LRU will sometimes be a bad replacement choice. In the cases where one batch of data for the pipeline will be larger than the total of all the memory pages, MRU would be a better choice.

Response: Agreed, but it seems that in the case that an operator needs a few pages of memory but not all, LRU may be a better choice (assuming there are not other operators taking all the other pages of memory). Since we don't a priori know the size of the input to an operator I don't know how to choose which is better.

2. It might be useful to expand the interface to allow control of how many buffer pages go to a given operator. This has a couple of benefits. One, it is possible to prevent one

Response: I agree that allowing assignments of memory pages to specific operators could be useful. I am concnerned that Pig's planner is not sophisticated enough to make intelligent choices here. I would like to leave this as an area for future work.

An Alternative Approach

And now for something completely different.

There seems to be general agreement that changing Pig's tuples to store data as byte arrays rather than objects is a good idea. Initial tests indicate that storing data as bytes and operating on it as objects is much too expensive in terms of serialization costs. Talking with the hbase team who just went through a similar change, their choice to deal with this was to keep data in byte format at all times, and only handle serialization on the client. Pig could not do this completely as we need to honor the existing UDF interfaces and would need to deserialize objects to pass them to UDFs. But we could change internal operators to work on bytes, change loaders to store bytes, etc. We could even add a byte oriented interface for UDFs to make use of if they wanted. Hbase reported a 10-100x performance speed up when they did this. I do not expect a similar speed up for Pig, but hopefully we would get some boost. This is a very extensive project, as it involves rewriting many Pig operators and built in functions.

There also seems to be general agreement that having Pig manage a buffer pool is a bad idea, or at least too complicated to take on. One observation that is particularly relevant to this is that usually the only case where Pig gets into memory issues is with large bags. If it is possible to solve memory issues only for large bags, then we do not need such a large scale general solution. The following proposal attempts to solve it just for bags.

There are two main cases where bags grow too large. One, the user's data contains large bags, so that Pig is unable to load even a single record in the map phase. This is less common. Two, one or a few groups on the reduce side are so large that they cannot fit into memory. This is quite common, and affects joins and group bys where the UDFs are not algebraic.

Hadoop has recently (in trunk, not yet in a release) added a new MarkableIterator class. This iterator allows the user to set a mark in the iterator provided by the reduce and at a later point reset to that mark. The way this is implemented (as explained to me by Owen O'Malley) is that it remembers its position in the list when the user marks. When the user resets, it restarts the merge, going until it hits the mark. This means that one, it never writes to disk or runs out of disk space, and two the cost is not free because it is rereading from disk and remerging the data.

I propose to use this iterator as follows. A new type of bag will be created, MarkableBag. It will be used by !POPackage and !POJoinPackage to store records coming out of the reduce. It will store X number of records in memory (see below for details on how X will be decided). Once it reaches X, it will quit reading records from the iterator and place a mark at that point in the iterator. The MarkableBagIterator returned by the MarkableBag.iterator() will return those tuples in memory, and then use the underlying reduce iterator to read any remaining records. More details on how this will work:

    class MarkableBag implements DataBag {

        Collection<Tuple> tuples;
        MarkableIterator iter;
        long guid = 0;

        // All the regular bag functions

        private generateGuid(); // generate a guid.

        class Iterator MarkableBagIterator implements Iterator<Tuple> {
            long guid;

            MarkableBagIterator(MarkableDataBag parent) {
                guid = parent.generateGuid();
                // no need for locking, as this is single threaded
                parent.guid = guid;
                parent.iter.reset();
            }

            boolean hasNext() { ... }

            Tuple next() {
                if (parent.guid != guid) throw ConcurrentModificationException;

                if (still in memory) return tuples.next();
                else return parent.iter.next();
            }
        }
    }

The limit here is that only one iterator can read from the bag at a time. Given that Pig's use case is that the bag be handed to each UDF in turn, this restriction is acceptable. The only case we cannot support is where a given UDF wants to read the bag with multiple iterators at a time. I am not aware of any UDF currently doing this, but a user might want to.

The MarkableBag will not work for large tuples in the input data, and will not work for UDFs that wish to have multiple iterators into the bag at the same time. For this reason we will add a second type of bag, SpillableBag. Like MarkableBag it will store X number of records in memory, and then spill remaining records to disk. It will be used in the map phase to store any user generated data. If a UDF in the reduce phase defines a new interface ReadConcurrently (I'm open to a better name here) then SpillableBag will be used.

The difficult question is how to define X (that is, the number of tuples held in memory in each bag). There are two options. One is to make X an absolute number. It would be a java property, and would default to say 1000. The advantages are simplicity and speed (the bag just needs to check its count). Two is to make X a number of bytes, again a java property that would default to a size of a few megabytes. A bag would estimate the size of each tuple in it (by sampling a few) and stop holding new tuples in memory when it reaches a certain limit. This is slightly more precise, but much more expensive. I would vote for starting with option one.

The current SpillableMemoryManager and the bags that register with it would be removed.

The advantages to this are that it is much simpler than the above proposed page cache mechanism. It also has the advantage that is removes the need for the proposed ReadOnce and Accumulating interfaces (see https://issues.apache.org/jira/browse/PIG-844 and https://issues.apache.org/jira/browse/PIG-807) to UDFs. It also removes the current spilling mechanism which is faulty and a performance drag.

PigMemory (last edited 2010-01-28 09:22:57 by s235-200)