/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 2009 Oracle.  All rights reserved.
 *
 * $Id$
 */

#ifndef _DB_STL_DBC_H
#define _DB_STL_DBC_H

#include <set>
#include <errno.h>

#include "dbstl_common.h"
#include "dbstl_dbt.h"
#include "dbstl_exception.h"
#include "dbstl_container.h"
#include "dbstl_resource_manager.h"

START_NS(dbstl)

// Forward declarations.
class db_container;
class DbCursorBase;
template<Typename data_dt>
class RandDbCursor;
class DbstlMultipleKeyDataIterator;
class DbstlMultipleRecnoDataIterator;
using std::set;

/////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////
//
// LazyDupCursor class template definition.
//
// This class allows us to make a shallow copy on construction. When the
// cursor pointer is first dereferenced a deep copy is made.
//
// The allowed type for BaseType is DbCursor<> and RandDbCursor<>
// The expected usage of this class is:
// 1. Create an iterator in container::begin(), the iterator::pcsr.csr_ptr_
// points to an object, thus no need to duplicate.
// 2. The iterator is created with default argument, thus the
// iterator::pcsr.csr_ptr_ and dup_src_ is NULL, and this iterator is
// copied using copy constructor for may be many times, but until the
// cursor is really used, no cursor is duplicated.
//
// There is an informing mechanism between an instance of this class and
// its dup_src_ cursor: when that cursor is about to change state, it will
// inform all registered LazyDupCursor "listeners" of the change, so that
// they will duplicate from the cursor before the change, because that
// is the expected cursor state for the listeners.

template <Typename BaseType>
class LazyDupCursor
{
	// dup_src_ is used by this class internally to duplicate another
	// cursor and set to csr_ptr_, and it is assigned in the copy
	// constructor from another LazyDupCursor object's csr_ptr_; csr_ptr_
	// is the acutual pointer that is used to perform cursor operations.
	//
	BaseType *csr_ptr_, *dup_src_;
	typedef LazyDupCursor<BaseType> self;

public:
	////////////////////////////////////////////////////////////////////
	//
	// Begin public constructors and destructor.
	//
	inline LazyDupCursor()
	{
		csr_ptr_ = NULL;
		dup_src_ = NULL;
	}

	// Used in all iterator types' constructors, dbcptr is created
	// solely for this object, and the cursor is not yet opened, so we
	// simply assign it to csr_ptr_.
	explicit inline LazyDupCursor(BaseType *dbcptr)
	{
		csr_ptr_ = dbcptr;
		// Already have pointer, do not need to duplicate.
		dup_src_ = NULL;
	}

	// Do not copy to csr_ptr_, shallow copy from dp2.csr_ptr_.
	LazyDupCursor(const self& dp2)
	{
		csr_ptr_ = NULL;
		if (dp2.csr_ptr_)
			dup_src_ = dp2.csr_ptr_;
		else
			dup_src_ = dp2.dup_src_;
		if (dup_src_)
			dup_src_->add_dupper(this);
	}

	~LazyDupCursor()
	{
		// Not duplicated yet, remove from dup_src_.
		if (csr_ptr_ == NULL && dup_src_ != NULL)
			dup_src_->erase_dupper(this);
		if (csr_ptr_)
			delete csr_ptr_;// Delete the cursor.
	}

	////////////////////////////////////////////////////////////////////

	// Deep copy.
	inline const self& operator=(const self &dp2)
	{
		BaseType *dcb;

		dcb = dp2.csr_ptr_ ? dp2.csr_ptr_ : dp2.dup_src_;
		this->operator=(dcb);

		return dp2;
	}

	// Deep copy.
	inline BaseType *operator=(BaseType *dcb)
	{

		if (csr_ptr_) {
			// Only dup_src_ will inform this, not csr_ptr_.
			delete csr_ptr_;
			csr_ptr_ = NULL;
		}

		if (dcb)
			csr_ptr_ = new BaseType(*dcb);
		if (dup_src_ != NULL) {
			dup_src_->erase_dupper(this);
			dup_src_ = NULL;
		}

		return dcb;
	}

	void set_cursor(BaseType *dbc)
	{
		assert(dbc != NULL);
		if (csr_ptr_) {
			// Only dup_src_ will inform this, not csr_ptr_.
			delete csr_ptr_;
			csr_ptr_ = NULL;
		}

		csr_ptr_ = dbc;
		if (dup_src_ != NULL) {
			dup_src_->erase_dupper(this);
			dup_src_ = NULL;
		}
	}

	// If dup_src_ is informing this object, pass false parameter.
	inline BaseType* duplicate(bool erase_dupper = true)
	{
		assert(dup_src_ != NULL);
		if (csr_ptr_) {
			// Only dup_src_ will inform this, not csr_ptr_.
			delete csr_ptr_;
			csr_ptr_ = NULL;
		}
		csr_ptr_ = new BaseType(*dup_src_);
		if (erase_dupper)
			dup_src_->erase_dupper(this);
		dup_src_ = NULL;
		return csr_ptr_;
	}

	inline BaseType* operator->()
	{
		if (csr_ptr_)
			return csr_ptr_;

		return duplicate();
	}

	inline operator bool()
	{
		return csr_ptr_ != NULL;
	}

	inline bool operator!()
	{
		return !csr_ptr_;
	}

	inline bool operator==(void *p)
	{
		return csr_ptr_ == p;
	}

	inline BaseType* base_ptr(){
		if (csr_ptr_)
			return csr_ptr_;
		return duplicate();
	}
};


/////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////
//
// DbCursorBase class definition.
//
// DbCursorBase is the base class for DbCursor<> class template, this class
// wraps the Berkeley DB cursor, in order for the ResourceManager to close
// the Berkeley DB cursor and set the pointer to null. 
// If we don't set the cursor to NULL, the handle could become valid again,
// since Berkeley DB recycles handles. DB STL would then try to use the same
// handle across different instances, which is not supported.
//
// In ResourceManager, whenver a cursor is opened, it stores the
// DbCursorBase* pointer, so that when need to close the cursor, it calls
// DbCursorBase::close() function.
//
class DbCursorBase
{
protected:
	Dbc *csr_;
	DbTxn *owner_txn_;
	Db *owner_db_;
	int csr_status_;

public:
	enum DbcGetSkipOptions{SKIP_KEY, SKIP_DATA, SKIP_NONE};
	inline DbTxn *get_owner_txn() const { return owner_txn_;}
	inline void set_owner_txn(DbTxn *otxn) { owner_txn_ = otxn;}

	inline Db *get_owner_db() const { return owner_db_;}
	inline void set_owner_db(Db *odb) { owner_db_ = odb;}

	inline Dbc *get_cursor()  const { return csr_;}
	inline Dbc *&get_cursor_reference() { return csr_;}
	inline void set_cursor(Dbc*csr1)
	{
		if (csr_)
			ResourceManager::instance()->remove_cursor(this);
		csr_ = csr1;
	}

	inline int close()
	{
		int ret = 0;

		if (csr_ != NULL && (((DBC *)csr_)->flags & DBC_ACTIVE) != 0) {
			ret = csr_->close();
			csr_ = NULL;
		}
		return ret;
	}

	DbCursorBase(){
		owner_txn_ = NULL;
		owner_db_ = NULL;
		csr_ = NULL;
		csr_status_ = 0;
	}

	DbCursorBase(const DbCursorBase &csrbase)
	{
		this->operator=(csrbase);
	}

	const DbCursorBase &operator=(const DbCursorBase &csrbase)
	{
		owner_txn_ = csrbase.owner_txn_;
		owner_db_ = csrbase.owner_db_;
		csr_ = NULL; // Need to call DbCursor<>::dup to duplicate.
		csr_status_ = 0;
		return csrbase;
	}

	virtual ~DbCursorBase()
	{
		close();
	}
}; // DbCursorBase

////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////
//
// DbCursor class template definition
//
// DbCursor is the connection between Berkeley DB and dbstl container classes
// it is the wrapper class for Dbc* cursor of Berkeley Db, to be used for
// iterator classes of Berkeley DB backed STL container classes.
// Requirement:
// 1. Deep copy using Dbc->dup.
// 2. Dbc*cursor management via ResourceManager class.
// 3. Provide methods to do increment, decrement and advance operations,
//    advance is only available for random access iterator from DB_RECNO
//    containers.
//

template<typename key_dt, typename data_dt>
class DbCursor : public DbCursorBase{
protected:
	// Lazy duplication support: store the LazyDupCursor objects which
	// will duplicate from this cursor.
	typedef LazyDupCursor<DbCursor<key_dt, data_dt> > dupper_t;
	typedef LazyDupCursor<RandDbCursor<data_dt> > dupperr_t;
	typedef set<LazyDupCursor<DbCursor<key_dt, data_dt> >* > dupset_t;
	typedef set<LazyDupCursor<RandDbCursor<data_dt> >* > dupsetr_t;

	set<LazyDupCursor<DbCursor<key_dt, data_dt> >* > sduppers1_;
	set<LazyDupCursor<RandDbCursor<data_dt> >* > sduppers2_;

	// We must use DB_DBT_USERMEM for Dbc::get and Db::get if they are
	// used in multi-threaded application, so we use key_buf_ and
	// data_buf_ data members for get operations, and initialize them
	// to use user memory.
	Dbt key_buf_, data_buf_;

	// Similar to Berkeley DB C++ API's classes, used to iterate through
	// bulk retrieved key/data pairs.
	DbstlMultipleKeyDataIterator *multi_itr_;
	DbstlMultipleRecnoDataIterator *recno_itr_;

	// Whether to use bulk retrieval. If non-zero, do bulk retrieval,
	// bulk buffer size is this member, otherwise not bulk read.
	// By default this member is 0.
	u_int32_t bulk_retrieval_;
	// Whether to use DB_RMW flag in Dbc::get, by default false.
	bool rmw_get_;

	// Whether to poll data from cursor's current position on every
	// get_current_key/data call.
	// Note that curr_key_/curr_data_ members are always maintained
	// to contain current k/d value of the pair pointed to by csr_.
	// If doing bulk retrieval, this flag is ignored, we will always
	// read data from bulk buffer.
	bool directdb_get_;

	// Inform LazyDupCursor objects registered in this object to do
	// duplication because this cursor is to be changed.
	// This function should be called in any function of
	// DbCursor and RandDbCursor whenever the cursor is about to change
	// state(move/close, etc).
	inline void inform_duppers()
	{
		typename dupset_t::iterator i1;
		typename dupsetr_t::iterator i2;
		for (i1 = sduppers1_.begin(); i1 != sduppers1_.end(); i1++)
			(*i1)->duplicate(false);
		for (i2 = sduppers2_.begin(); i2 != sduppers2_.end(); i2++)
			(*i2)->duplicate(false);
		sduppers1_.clear();
		sduppers2_.clear();
	}

public:
	friend class DataItem;

	// Current key/data pair pointed by "csr_" Dbc*cursor. They are both
	// maintained on cursor movement. If directdb_get_ is true,
	// they are both refreshed on every get_current{[_key][_data]} call and 
	// the retrieved key/data pair is returned to user.
	DataItem curr_key_;
	DataItem curr_data_;

	typedef DbCursor<key_dt, data_dt> self;

	// This function is used by all iterators to do equals comparison.
	// Random iterators will also use it to do less than/greater than
	// comparisons.
	// Internally, the page number difference or index difference is
	// returned, so for btree and hash databases, if two cursors point to
	// the same key/data pair, we will get 0 returned, meaning they are
	// equal; if return value is not 0, it means no more than that they
	// they are not equal. We can't assume any order information between
	// the two cursors. For recno databases, we use the recno to do less
	// than and greater than comparison. So we can get a reliable knowledge
	// of the relative position of two iterators from the return value.
	int compare(const self *csr2) const{
		int res, ret;

		BDBOP(((DBC *)csr_)->cmp((DBC *)csr_, (DBC *)csr2->csr_,
		    &res, 0), ret);
		return res;
	}

	////////////////////////////////////////////////////////////////////
	//
	// Add and remove cursor change event listeners.
	//
	inline void add_dupper(dupper_t *dupper)
	{
		sduppers1_.insert(dupper);
	}

	inline void add_dupper(dupperr_t *dupper)
	{
		sduppers2_.insert(dupper);
	}

	inline void erase_dupper(dupper_t *dup1)
	{
		sduppers1_.erase(dup1);
	}

	inline void erase_dupper(dupperr_t *dup1)
	{
		sduppers2_.erase(dup1);
	}

	////////////////////////////////////////////////////////////////////

public:

	inline bool get_rmw()
	{
		return rmw_get_;
	}

	bool set_rmw(bool rmw, DB_ENV *env = NULL )
	{
		u_int32_t flag = 0;
		DB_ENV *dbenv = NULL;
		int ret;

		if (env)
			dbenv = env;
		else
			dbenv = ((DBC*)csr_)->dbenv;
		BDBOP(dbenv->get_open_flags(dbenv, &flag), ret);

		// DB_RMW flag requires locking subsystem started.
		if (rmw && ((flag & DB_INIT_LOCK) || (flag & DB_INIT_CDB) ||
		    (flag & DB_INIT_TXN)))
			rmw_get_ = true;
		else
			rmw_get_ = false;
		return rmw_get_;
	}

	// Modify bulk buffer size. Bulk read is enabled when creating an
	// iterator, so users later can only modify the bulk buffer size
	// to another value, but can't enable/disable bulk read while an
	// iterator is already alive. 
	// Returns true if succeeded, false otherwise.
	inline bool set_bulk_buffer(u_int32_t sz)
	{
		if (bulk_retrieval_ && sz) {
			normalize_bulk_bufsize(sz);
			bulk_retrieval_ = sz;
			return true;
		}

		return false;

	}

	inline u_int32_t get_bulk_bufsize()
	{
		return bulk_retrieval_;
	}

	inline void enlarge_dbt(Dbt &d, u_int32_t sz)
	{
		void *p;

		p = DbstlReAlloc(d.get_data(), sz);
		dbstl_assert(p != NULL);
		d.set_ulen(sz);
		d.set_data(p);
		d.set_size(sz);
	}
	// Move forward or backward, often by 1 key/data pair, we can use
	// different flags for Dbc::get function. Then update the key/data
	// pair and csr_status_ members.
	//
	int increment(int flag)
	{
		int ret = 0;
		Dbt &k = key_buf_, &d = data_buf_;
		u_int32_t sz, getflags = 0, bulk_bufsz;

		if (csr_ == NULL)
			return INVALID_ITERATOR_CURSOR;
		curr_key_.reset();
		curr_data_.reset();
		inform_duppers();

		// Berkeley DB cursor flags are not bitwise set, so we can't
		// use bit operations here.
		//
		if (this->bulk_retrieval_ != 0)
			switch (flag) {
			case DB_PREV:
			case DB_PREV_DUP:
			case DB_PREV_NODUP:
			case DB_LAST:
			case DB_JOIN_ITEM:
			case DB_GET_RECNO:
			case DB_SET_RECNO: 
				break;
			default:
				getflags |= DB_MULTIPLE_KEY;
				if (data_buf_.get_ulen() != bulk_retrieval_)
					enlarge_dbt(data_buf_, bulk_retrieval_);
				break;
			}

		if (this->rmw_get_)
			getflags |= DB_RMW;

		// Do not use BDBOP or BDBOP2 here because it is likely
		// that an iteration will step onto end() position.
retry:		ret = csr_->get(&k, &d, flag | getflags);
		if (ret == 0) {
			if (bulk_retrieval_ && (getflags & DB_MULTIPLE_KEY)) {
				// A new retrieval, so both multi_itr_ and
				// recno_itr_ must be NULL.
				if (((DBC*)csr_)->dbtype == DB_RECNO) {
					if (recno_itr_) {
						delete recno_itr_;
						recno_itr_ = NULL;
					}
					recno_itr_ =
				    new DbstlMultipleRecnoDataIterator(d);
				} else {
					if (multi_itr_) {
						delete multi_itr_;
						multi_itr_ = NULL;
					}
					multi_itr_ = new
					    DbstlMultipleKeyDataIterator(d);
				}
			} else {
				// Non bulk retrieval succeeded.
				curr_key_.set_dbt(k, false);
				curr_data_.set_dbt(d, false);
				limit_buf_size_after_use();
			}
		} else if (ret == DB_BUFFER_SMALL) {
			// Either the key or data DBTs might trigger a
			// DB_KEYSMALL return. Only enlarge the DBT if it 
			// is actually too small. 
			if (((sz = d.get_size()) > 0) && (sz > d.get_ulen()))
				enlarge_dbt(d, sz);

			if (((sz = k.get_size()) > 0) && (sz > k.get_ulen()))
				enlarge_dbt(k, sz);

			goto retry;
		} else {
			if (ret == DB_NOTFOUND) {
				ret = INVALID_ITERATOR_POSITION;
				this->curr_key_.reset();
				this->curr_data_.reset();
			} else if (bulk_retrieval_ &&
			    (getflags & DB_MULTIPLE_KEY)){
				BDBOP(((DBC*)csr_)->dbp->
				    get_pagesize(((DBC*)csr_)->
				    dbp, &bulk_bufsz), ret);
				if (bulk_bufsz > d.get_ulen()) {// buf size error
					normalize_bulk_bufsize(bulk_bufsz);
					bulk_retrieval_ = bulk_bufsz;
					enlarge_dbt(d, bulk_bufsz);
					goto retry;
				} else
					throw_bdb_exception(
					    "DbCursor<>::increment", ret);
			} else
				throw_bdb_exception(
				    "DbCursor<>::increment", ret);
		}

		csr_status_ = ret;
		return ret;
	}

	// After each use of key_buf_ and data_buf_, limit their buffer size to
	// a reasonable size so that they don't waste a big memory space.
	inline void limit_buf_size_after_use() 
	{
		if (bulk_retrieval_)
			// Bulk buffer has to be huge, so don't check it.
			return;

		if (key_buf_.get_ulen() > DBSTL_MAX_KEY_BUF_LEN) {
			key_buf_.set_data(DbstlReAlloc(key_buf_.get_data(),
			    DBSTL_MAX_KEY_BUF_LEN));
			key_buf_.set_ulen(DBSTL_MAX_KEY_BUF_LEN);
		}
		if (data_buf_.get_ulen() > DBSTL_MAX_DATA_BUF_LEN) {
			data_buf_.set_data(DbstlReAlloc(data_buf_.get_data(),
			    DBSTL_MAX_DATA_BUF_LEN));
			data_buf_.set_ulen(DBSTL_MAX_DATA_BUF_LEN);
		}
	}

	// Duplicate this object's cursor and set it to dbc1.
	//
	inline int dup(DbCursor<key_dt, data_dt>& dbc1) const
	{
		Dbc* pcsr = 0;
		int ret;

		if (csr_ != 0 && csr_->dup(&pcsr, DB_POSITION) == 0) {
			dbc1.set_cursor(pcsr);
			dbc1.set_owner_db(this->get_owner_db());
			dbc1.set_owner_txn(this->get_owner_txn());
			ResourceManager::instance()->add_cursor(
			    this->get_owner_db(), &dbc1);
			ret = 0;
		} else
			ret = ITERATOR_DUP_ERROR;

		return ret;
	}

public:
	// Open a cursor, do not move it, it is at an invalid position.
	// All cursors should be opened using this method.
	//
	inline int open(db_container *pdbc, int flags)
	{
		int ret;

		Db *pdb = pdbc->get_db_handle();
		if (pdb == NULL)
			return 0;
		if (csr_) // Close before open.
			return 0;
		ret = ResourceManager::instance()->
		    open_cursor(this, pdb, flags);
		set_rmw(rmw_get_);
		this->csr_status_ = ret;
		return ret;
	}

	// Move Berkeley DB cursor to specified key k, by default use DB_SET,
	// but DB_SET_RANGE can and may also be used.
	//
	int move_to(const key_dt&k, u_int32_t flag = DB_SET)
	{
		Dbt &d1 = data_buf_;
		int ret;
		u_int32_t sz;
		DataItem k1(k, true);

		if (csr_ == NULL)
			return INVALID_ITERATOR_CURSOR;

		curr_key_.reset();
		curr_data_.reset();
		inform_duppers();

		// It is likely that k is not in db, causing get(DB_SET) to
		// fail, we should not throw an exception because of this.
		//
		if (rmw_get_)
			flag |= DB_RMW;
retry:		ret = csr_->get(&k1.get_dbt(), &d1, flag);
		if (ret == 0) {
			curr_key_ = k1;
			curr_data_.set_dbt(d1, false);
			limit_buf_size_after_use();
		} else if (ret == DB_BUFFER_SMALL) {
			sz = d1.get_size();
			assert(sz > 0);
			enlarge_dbt(d1, sz);
			goto retry;
		} else {
			if (ret == DB_NOTFOUND) {
				ret = INVALID_ITERATOR_POSITION;
				// Invalidate current values because it is
				// at an invalid position.
				this->curr_key_.reset();
				this->curr_data_.reset();
			} else
				throw_bdb_exception("DbCursor<>::move_to", ret);
		}

		csr_status_ = ret;
		return ret;
	}

	// Returns the number of keys equal to the current one.
	inline size_t count()
	{
		int ret;
		db_recno_t cnt;

		BDBOP2(csr_->count(&cnt, 0), ret, close());
		return (size_t)cnt;
	}

	int insert(const key_dt&k, const data_dt& d, int pos = DB_BEFORE)
	{
		// !!!XXX:
		//         We do a deep copy of the input data into a local
		//         variable. Apparently not doing so causes issues
		//         when using gcc. Even though the put completes prior
		//         to returning from this function call.
		//         It would be best to avoid this additional copy.
		int ret;
		// (k, d) pair may be a temporary pair, so we must copy them.
		DataItem k1(k, false), d1(d, false);

		inform_duppers();
		if (pos == DB_AFTER) {
			ret = this->csr_->put(&k1.get_dbt(), &d1.get_dbt(),
			    pos);
			// May be using this flag for an empty database,
			// because begin() an iterator of an empty db_vector
			// equals its end() iterator, so use DB_KEYLAST to
			// retry.
			//
			if (ret == EINVAL || ret == 0)
				return ret;
			else if (ret)
				throw_bdb_exception("DbCursor<>::insert", ret);
		}
		if (pos == DB_NODUPDATA)
			BDBOP3(this->csr_->put(&k1.get_dbt(), &d1.get_dbt(),
			    pos), ret, DB_KEYEXIST, close());
		else
			BDBOP2(this->csr_->put(&k1.get_dbt(), &d1.get_dbt(),
			    pos), ret, close());
		this->csr_status_ = ret;
		if (ret == 0) {
			curr_key_ = k1;
			curr_data_ = d1;
		}
		// This cursor points to the new key/data pair now.
		return ret;
	}

	// Replace current cursor-pointed data item with d.
	inline int replace(const data_dt& d)
	{
		Dbt k1;
		int ret;
		// !!!XXX:
		//         We do a deep copy of the input data into a local
		//         variable. Apparently not doing so causes issues
		//         when using gcc. Even though the put completes prior
		//         to returning from this function call.
		//         It would be best to avoid this additional copy.
		// d may be a temporary object, so we must copy it.
		DataItem d1(d, false);

		
		BDBOP2(this->csr_->put(&k1, &d1.get_dbt(), DB_CURRENT),
		    ret, close());
		curr_data_ = d1; // Update current data.
		
		this->csr_status_ = ret;
		return ret;
	}

	// Remove old key and insert new key-psuodo_data. First insert then
	// move to old key and remove it so that the cursor remains at the
	// old key's position, according to DB documentation.
	// But from practice I can see
	// the cursor after delete seems not at old position because a for
	// loop iteration exits prematurelly, not all elements are passed.
	//
	inline int replace_key(const key_dt&k)
	{
		data_dt d;
		key_dt k0;
		int ret;

		this->get_current_key_data(k0, d);
		if (k0 == k)
			return 0;

		DbCursor<key_dt, data_dt> csr2;
		this->dup(csr2);
		// Delete current, then insert new key/data pair.
		ret = csr2.del(); 
		ret = csr2.insert(k, d, DB_KEYLAST);
		this->csr_status_ = ret;
		
		// Now this->csr_ is sitting on an invalid position, its 
		// iterator is invalidated. Must first move it to the next
		// position before using it.
		return ret;
	}

	inline int del()
	{
		int ret;

		inform_duppers();
		BDBOP2(csr_->del(0), ret, close());

		// By default pos.csr_ will stay at where it was after delete,
		// which now is an invalid position. So we need to move to
		// next to conform to stl specifications, but we don't move it
		// here, iterator::erase should move the iterator itself 
		// forward.
		//
		this->csr_status_ = ret;
		return ret;
	}

	// Make sure the bulk buffer is large enough, and a multiple of 1KB. 
	// This function may be called prior to cursor initialization, it is 
	// not possible to verify that the buffer size is a multiple of the 
	// page size here.
	u_int32_t normalize_bulk_bufsize(u_int32_t &bulksz)
	{
		if (bulksz == 0)
			return 0;

		while (bulksz < 16 * sizeof(data_dt))
			bulksz *= 2;

		bulksz = bulksz + 1024 - bulksz % 1024;

		return bulksz;
	}

	////////////////////////////////////////////////////////////////////
	//
	// Begin public constructors and destructor.
	//
	explicit DbCursor(u_int32_t b_bulk_retrieval = 0, bool brmw1 = false,
	    bool directdbget = true) : DbCursorBase(),
	    curr_key_(sizeof(key_dt)), curr_data_(sizeof(data_dt))
	{
		u_int32_t bulksz = sizeof(data_dt); // non-bulk
		rmw_get_ = brmw1;
		this->bulk_retrieval_ = 
		    normalize_bulk_bufsize(b_bulk_retrieval);
		recno_itr_ = NULL;
		multi_itr_ = NULL;

		if (bulk_retrieval_) {
			if (bulksz <= bulk_retrieval_)
				bulksz = bulk_retrieval_;
			else {
				normalize_bulk_bufsize(bulksz);
				bulk_retrieval_ = bulksz;
			}
		}
		key_buf_.set_data(DbstlMalloc(sizeof(key_dt)));
		key_buf_.set_ulen(sizeof(key_dt));
		key_buf_.set_flags(DB_DBT_USERMEM);
		data_buf_.set_data(DbstlMalloc(bulksz));
		data_buf_.set_ulen(bulksz);
		data_buf_.set_flags(DB_DBT_USERMEM);
		directdb_get_ = directdbget;
	}

	// Copy constructor, duplicate cursor here.
	DbCursor(const DbCursor<key_dt, data_dt>& dbc) :
	    DbCursorBase(dbc),
	    curr_key_(dbc.curr_key_), curr_data_(dbc.curr_data_)
	{
		void *pk, *pd;

		dbc.dup(*this);
		csr_status_ = dbc.csr_status_;
		if (csr_ || dbc.csr_)
			this->rmw_get_ = set_rmw(dbc.rmw_get_,
			    ((DBC*)dbc.csr_)->dbenv);
		else
			rmw_get_ = dbc.rmw_get_;

		bulk_retrieval_ = dbc.bulk_retrieval_;

		// Now we have to copy key_buf_ and data_buf_ to support
		// multiple retrieval.
		key_buf_.set_data(pk = DbstlMalloc(dbc.key_buf_.get_ulen()));
		key_buf_.set_ulen(dbc.key_buf_.get_ulen());
		key_buf_.set_size(dbc.key_buf_.get_size());
		key_buf_.set_flags(DB_DBT_USERMEM);
		memcpy(pk, dbc.key_buf_.get_data(), key_buf_.get_ulen());

		data_buf_.set_data(pd = DbstlMalloc(dbc.data_buf_.get_ulen()));
		data_buf_.set_ulen(dbc.data_buf_.get_ulen());
		data_buf_.set_size(dbc.data_buf_.get_size());
		data_buf_.set_flags(DB_DBT_USERMEM);
		memcpy(pd, dbc.data_buf_.get_data(), data_buf_.get_ulen());
		if (dbc.recno_itr_) {
			recno_itr_ = new DbstlMultipleRecnoDataIterator(
			    data_buf_);
			recno_itr_->set_pointer(dbc.recno_itr_->get_pointer());
		} else
			recno_itr_ = NULL;
		if (dbc.multi_itr_) {
			multi_itr_ = new DbstlMultipleKeyDataIterator(
			    data_buf_);
			multi_itr_->set_pointer(dbc.multi_itr_->get_pointer());

		} else
			multi_itr_ = NULL;

		directdb_get_ = dbc.directdb_get_;

		// Do not copy sduppers, they are private to each DbCursor<>
		// object.
	}

	virtual ~DbCursor()
	{
		close(); // Call close() ahead of freeing following buffers.
		free(key_buf_.get_data());
		free(data_buf_.get_data());
		if (multi_itr_)
			delete multi_itr_;
		if (recno_itr_)
			delete recno_itr_;
	}

	////////////////////////////////////////////////////////////////////

	const DbCursor<key_dt, data_dt>& operator=
	    (const DbCursor<key_dt, data_dt>& dbc)
	{
		void *pk;
		u_int32_t ulen;

		DbCursorBase::operator =(dbc);
		dbc.dup(*this);
		curr_key_ = dbc.curr_key_;
		curr_data_ = dbc.curr_data_;
		rmw_get_ = dbc.rmw_get_;
		this->bulk_retrieval_ = dbc.bulk_retrieval_;
		this->directdb_get_ = dbc.directdb_get_;
		// Now we have to copy key_buf_ and data_buf_ to support
		// bulk retrieval.
		key_buf_.set_data(pk = DbstlReAlloc(key_buf_.get_data(),
		    ulen = dbc.key_buf_.get_ulen()));
		key_buf_.set_ulen(ulen);
		key_buf_.set_size(dbc.key_buf_.get_size());
		key_buf_.set_flags(DB_DBT_USERMEM);
		memcpy(pk, dbc.key_buf_.get_data(), ulen);

		data_buf_.set_data(pk = DbstlReAlloc(key_buf_.get_data(),
		    ulen = dbc.key_buf_.get_ulen()));
		data_buf_.set_ulen(ulen);
		data_buf_.set_size(dbc.data_buf_.get_size());
		data_buf_.set_flags(DB_DBT_USERMEM);
		memcpy(pk, dbc.key_buf_.get_data(), ulen);

		if (dbc.recno_itr_) {
			if (recno_itr_) {
				delete recno_itr_;
				recno_itr_ = NULL;
			}
			recno_itr_ = new DbstlMultipleRecnoDataIterator(
			    data_buf_);
			recno_itr_->set_pointer(dbc.recno_itr_->get_pointer());
		} else if (recno_itr_) {
			delete recno_itr_;
			recno_itr_ = NULL;
		}

		if (dbc.multi_itr_) {
			if (multi_itr_) {
				delete multi_itr_;
				multi_itr_ = NULL;
			}
			multi_itr_ = new DbstlMultipleKeyDataIterator(
			    data_buf_);
			multi_itr_->set_pointer(dbc.multi_itr_->get_pointer());

		} else if (multi_itr_) {
			delete multi_itr_;
			multi_itr_ = NULL;
		}

		return dbc;
		// Do not copy sduppers, they are private to each DbCursor<>
		// object.

	}

	// Move Dbc*cursor to next position. If doing bulk read, read from
	// the bulk buffer. If bulk buffer exhausted, do another bulk read
	// from database, and then read from the bulk buffer. Quit if no
	// more data in database.
	//
	int next(int flag = DB_NEXT)
	{
		Dbt k, d;
		db_recno_t recno;
		int ret;

retry:		if (bulk_retrieval_) {
			if (multi_itr_) {
				if (multi_itr_->next(k, d)) {
					curr_key_.set_dbt(k, false);
					curr_data_.set_dbt(d, false);
					return 0;
				} else {
					delete multi_itr_;
					multi_itr_ = NULL;
				}
			}
			if (recno_itr_) {
				if (recno_itr_->next(recno, d)) {
					curr_key_.set_dbt(k, false);
					curr_data_.set_dbt(d, false);
					return 0;
				} else {
					delete recno_itr_;
					recno_itr_ = NULL;
				}
			}
		}
		ret = increment(flag);
		if (bulk_retrieval_ && ret == 0)
			goto retry;
		return ret;
	}

	inline int prev(int flag = DB_PREV)
	{
		return increment(flag);
	}

	// Move Dbc*cursor to first element. If doing bulk read, read data
	// from bulk buffer.
	int first()
	{
		Dbt k, d;
		db_recno_t recno;
		int ret;

		ret = increment(DB_FIRST);
		if (bulk_retrieval_) {
			if (multi_itr_) {
				if (multi_itr_->next(k, d)) {
					curr_key_.set_dbt(k, false);
					curr_data_.set_dbt(d, false);
					return 0;
				} else {
					delete multi_itr_;
					multi_itr_ = NULL;
				}
			}
			if (recno_itr_) {
				if (recno_itr_->next(recno, d)) {
					curr_key_.set_dbt(k, false);
					curr_data_.set_dbt(d, false);
					return 0;
				} else {
					delete recno_itr_;
					recno_itr_ = NULL;
				}
			}
		}

		return ret;
	}

	inline int last()
	{
		return increment(DB_LAST);
	}

	// Get current key/data pair, shallow copy. Return 0 on success,
	// -1 if no data.
	inline int get_current_key_data(key_dt&k, data_dt&d)
	{
		if (directdb_get_)
			update_current_key_data_from_db(
			    DbCursorBase::SKIP_NONE);
		if (curr_key_.get_data(k) == 0 && curr_data_.get_data(d) == 0)
			return 0;
		else 
			return INVALID_KEY_DATA;
	}

	// Get current data, shallow copy. Return 0 on success, -1 if no data.
	inline int get_current_data(data_dt&d)
	{
		if (directdb_get_)
			update_current_key_data_from_db(DbCursorBase::SKIP_KEY);
		if (curr_data_.get_data(d) == 0)
			return 0;
		else 
			return INVALID_KEY_DATA;
	}

	// Get current key, shallow copy. Return 0 on success, -1 if no data.
	inline int get_current_key(key_dt&k)
	{
		if (directdb_get_)
			update_current_key_data_from_db(
			    DbCursorBase::SKIP_DATA);
		if (curr_key_.get_data(k) == 0)
			return 0;
		else 
			return INVALID_KEY_DATA;
	}

	inline void close()
	{
		if (csr_) {
			inform_duppers();
			ResourceManager::instance()->remove_cursor(this);
		}
		csr_ = NULL;
	}

	// Parameter skipkd specifies skip retrieving key or data:
	// If 0, don't skip, retrieve both;
	// If 1, skip retrieving key;
	// If 2, skip retrieving data.
	// Do not poll from db again if doing bulk retrieval.
	void update_current_key_data_from_db(DbcGetSkipOptions skipkd) {
		int ret;
		u_int32_t sz, sz1, kflags = DB_DBT_USERMEM,
		    dflags = DB_DBT_USERMEM;
		// Do not poll from db again if doing bulk retrieval.
		if (this->bulk_retrieval_)
			return;
		if (this->csr_status_ != 0) {
			curr_key_.reset();
			curr_data_.reset();
			return;
		}
		
		// We will modify flags if skip key or data, so cache old
		// value and set it after get calls.
		if (skipkd != DbCursorBase::SKIP_NONE) {
			kflags = key_buf_.get_flags();
			dflags = data_buf_.get_flags();
		}
		if (skipkd == DbCursorBase::SKIP_KEY) {
			key_buf_.set_dlen(0);
			key_buf_.set_flags(DB_DBT_PARTIAL | DB_DBT_USERMEM);
		}

		if (skipkd == DbCursorBase::SKIP_DATA) {
			data_buf_.set_dlen(0);
			data_buf_.set_flags(DB_DBT_PARTIAL | DB_DBT_USERMEM);
		}
retry:		ret = csr_->get(&key_buf_, &data_buf_, DB_CURRENT);
		if (ret == 0) {
			if (skipkd != DbCursorBase::SKIP_KEY)
				curr_key_ = key_buf_;
			if (skipkd != DbCursorBase::SKIP_DATA)
				curr_data_ = data_buf_;
			limit_buf_size_after_use();
		} else if (ret == DB_BUFFER_SMALL) {
			if ((sz = key_buf_.get_size()) > 0)
				enlarge_dbt(key_buf_, sz);
			if ((sz1 = data_buf_.get_size()) > 0) 
				enlarge_dbt(data_buf_, sz1);
			if (sz == 0 && sz1 == 0)
				THROW0(InvalidDbtException);
			goto retry;
		} else {
			if (skipkd != DbCursorBase::SKIP_NONE) {
				key_buf_.set_flags(kflags);
				data_buf_.set_flags(dflags);
			}
			throw_bdb_exception(
			"DbCursor<>::update_current_key_data_from_db", ret);
		}

		if (skipkd != DbCursorBase::SKIP_NONE) {
			key_buf_.set_flags(kflags);
			data_buf_.set_flags(dflags);
		}
	}
}; // DbCursor<>

////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
//
// RandDbCursor class template definition
//
// RandDbCursor is a random accessible cursor wrapper for use by
// db_vector_iterator, it derives from DbCursor<> class. It has a fixed key
// data type, which is index_type.
//
typedef db_recno_t index_type;
template<Typename data_dt>
class RandDbCursor : public DbCursor<index_type, data_dt>
{
protected:
	friend class DataItem;
	typedef ssize_t difference_type;
public:
	typedef RandDbCursor<data_dt> self;
	typedef DbCursor<index_type, data_dt> base;

	// Return current csr_ pointed element's index in recno database
	// (i.e. the index starting from 1). csr_ must be open and
	// point to an existing key/data pair.
	//
	inline index_type get_current_index() const
	{
		index_type ndx;

		if (this->directdb_get_)
			((self *)this)->update_current_key_data_from_db(
			    DbCursorBase::SKIP_DATA);
		this->curr_key_.get_data(ndx);
		return ndx;
	}

	inline int compare(const self *csr2) const{
		index_type i1, i2;

		i1 = this->get_current_index();
		i2 = csr2->get_current_index();
		return i1 - i2;
	}

	// Insert data d before/after current position.
	int insert(const data_dt& d, int pos = DB_BEFORE){
		int k = 1, ret;
		//data_dt dta;

		// Inserting into empty db, must set key to 1.
		if (pos == DB_KEYLAST)
			k = 1;

		ret = base::insert(k, d, pos);

		// Inserting into a empty db using begin() itr, so flag is
		// DB_AFTER and surely failed, so change to use DB_KEYLAST
		// and try again.
		if (ret == EINVAL) {
			k = 1;
			pos = DB_KEYLAST;
			ret = base::insert(k, d, pos);
		}
		this->csr_status_ = ret;
		return ret;
	}

	/*
	 * Move the cursor n positions, if reaches the beginning or end,
	 * returns DB_NOTFOUND.
	 */
	int advance(difference_type n)
	{
		int ret = 0;
		index_type indx;
		u_int32_t sz, flags = 0;

		indx = this->get_current_index();
		if (n == 0)
			return 0;

		index_type i = (index_type)n;
		indx += i;

		if (n < 0 && indx < 1) { // Index in recno db starts from 1.

			ret = INVALID_ITERATOR_POSITION;
			return ret;
		}
		this->inform_duppers();

		// Do a search to determine whether new position is valid.
		Dbt k, &d = this->data_buf_;

		
		k.set_data(&indx);
		k.set_size(sizeof(indx));
		if (this->rmw_get_)
			flags |= DB_RMW;

retry:		if (this->csr_ && 
		    ((ret = this->csr_->get(&k, &d, DB_SET)) == DB_NOTFOUND)) {
			this->csr_status_ = ret = INVALID_ITERATOR_POSITION;
			this->curr_key_.reset();
			this->curr_data_.reset();
		} else if (ret == DB_BUFFER_SMALL) {
			sz = d.get_size();
			assert(sz > 0);
			this->enlarge_dbt(d, sz);
			goto retry;
		} else if (ret == 0) {
			this->curr_key_.set_dbt(k, false);
			this->curr_data_.set_dbt(d, false);
			this->limit_buf_size_after_use();
		} else
			throw_bdb_exception("RandDbCursor<>::advance", ret);
		this->csr_status_ = ret;
		return ret;
	}

	// Return the last index of recno db (index starting from 1),
	// it will also move the underlying cursor to last key/data pair.
	//
	inline index_type last_index()
	{
		int ret;

		ret = this->last();
		if (ret)
			return 0;// Invalid position.
		else
			return get_current_index();
	}

	explicit RandDbCursor(u_int32_t b_bulk_retrieval = 0,
	    bool b_rmw1 = false, bool directdbget = true)
	    : base(b_bulk_retrieval, b_rmw1, directdbget)
	{
	}

	RandDbCursor(const RandDbCursor<data_dt>& rdbc) : base(rdbc)
	{
	}

	explicit RandDbCursor(Dbc* csr1, int posidx = 0) : base(csr1)
	{
	}

	virtual ~RandDbCursor()
	{
	}

}; // RandDbCursor<>

END_NS //ns dbstl

#endif // !_DB_STL_DBC_H