diff --git a/fdbclient/VersionedMap.h b/fdbclient/VersionedMap.h index f56b883892..8b7a82063f 100644 --- a/fdbclient/VersionedMap.h +++ b/fdbclient/VersionedMap.h @@ -474,11 +474,25 @@ public: typedef Reference< PTreeT > Tree; Version oldestVersion, latestVersion; - std::map roots; - Tree *latestRoot; + //std::map roots; + std::deque> roots; + //Tree *latestRoot; + struct compare { + bool operator()(const std::pair& value, const Version& key) + { + return (value.first < key); + } + bool operator()(const Version& key, const std::pair& value) + { + return (key < value.first); + } + }; + + // TODO: NEELAM: why is it implemented like this? Why not roots[v]? Tree const& getRoot( Version v ) const { - auto r = roots.upper_bound(v); + //auto r = roots.upper_bound(v); + auto r = upper_bound(roots.begin(), roots.end(), v, compare()); --r; return r->second; } @@ -488,36 +502,52 @@ public: struct iterator; VersionedMap() : oldestVersion(0), latestVersion(0) { - latestRoot = &roots[0]; + //latestRoot = &roots[0]; + roots.emplace_back(0, Tree()); + //latestRoot = &(roots.emplace_back(0, Tree()).second); } VersionedMap( VersionedMap&& v ) BOOST_NOEXCEPT : oldestVersion(v.oldestVersion), latestVersion(v.latestVersion), roots(std::move(v.roots)) { - latestRoot = &roots[latestVersion]; + //latestRoot = &roots[latestVersion]; + //latestRoot = &(roots.back()->second); } void operator = (VersionedMap && v) BOOST_NOEXCEPT { oldestVersion = v.oldestVersion; latestVersion = v.latestVersion; roots = std::move(v.roots); - latestRoot = &roots[latestVersion]; + //latestRoot = &roots[latestVersion]; + //latestRoot = &(roots.back()->second); } Version getLatestVersion() const { return latestVersion; } Version getOldestVersion() const { return oldestVersion; } - Version getNextOldestVersion() const { return roots.upper_bound(oldestVersion)->first; } + //Version getNextOldestVersion() const { return roots.upper_bound(oldestVersion)->first; } + //front element should be the oldest version in the deque, hence the net oldest should be at index 1 + Version getNextOldestVersion() const { return roots[1]->first; } void forgetVersionsBefore(Version newOldestVersion) { ASSERT( newOldestVersion <= latestVersion ); - roots[newOldestVersion] = getRoot(newOldestVersion); - roots.erase(roots.begin(), roots.lower_bound(newOldestVersion)); + // since the specified newOldestVersion might not exist, we copy the root from next lower version to newOldestVersion position + //roots[newOldestVersion] = getRoot(newOldestVersion); + //roots.erase(roots.begin(), roots.lower_bound(newOldestVersion)); + auto r = upper_bound(roots.begin(), roots.end(), newOldestVersion, compare()); + r--; + roots.insert(upper_bound(roots.begin(), roots.end(), newOldestVersion, compare()), *r); + roots.erase(roots.begin(), lower_bound(roots.begin(), roots.end(), newOldestVersion, compare())); oldestVersion = newOldestVersion; } Future forgetVersionsBeforeAsync( Version newOldestVersion, TaskPriority taskID = TaskPriority::DefaultYield ) { ASSERT( newOldestVersion <= latestVersion ); - roots[newOldestVersion] = getRoot(newOldestVersion); + // since the specified newOldestVersion might not exist, we copy the root from next lower version to newOldestVersion position + //roots[newOldestVersion] = getRoot(newOldestVersion); + auto r = upper_bound(roots.begin(), roots.end(), newOldestVersion, compare()); + r--; + roots.insert(upper_bound(roots.begin(), roots.end(), newOldestVersion, compare()), *r); vector toFree; toFree.reserve(10000); - auto newBegin = roots.lower_bound(newOldestVersion); + //auto newBegin = roots.lower_bound(newOldestVersion); + auto newBegin = lower_bound(roots.begin(), roots.end(), newOldestVersion, compare()); Tree *lastRoot = nullptr; for(auto root = roots.begin(); root != newBegin; ++root) { if(root->second) { @@ -541,8 +571,10 @@ public: if (version > latestVersion) { latestVersion = version; Tree r = getRoot(version); - latestRoot = &roots[version]; - *latestRoot = r; + //latestRoot = &roots[version]; + roots.emplace_back(version, Tree()); + //latestRoot = &(roots.emplace_back(version, Tree()).second); + //*latestRoot = r; } else ASSERT( version == latestVersion ); } @@ -551,14 +583,18 @@ public: insert( k, t, latestVersion ); } void insert(const K& k, const T& t, Version insertAt) { - if (PTreeImpl::contains( *latestRoot, latestVersion, k )) PTreeImpl::remove( *latestRoot, latestVersion, k ); // FIXME: Make PTreeImpl::insert do this automatically (see also WriteMap.h FIXME) - PTreeImpl::insert( *latestRoot, latestVersion, MapPair>(k,std::make_pair(t,insertAt)) ); + //if (PTreeImpl::contains( *latestRoot, latestVersion, k )) PTreeImpl::remove( *latestRoot, latestVersion, k ); // FIXME: Make PTreeImpl::insert do this automatically (see also WriteMap.h FIXME) + //PTreeImpl::insert( *latestRoot, latestVersion, MapPair>(k,std::make_pair(t,insertAt)) ); + if (PTreeImpl::contains(roots.back().second, latestVersion, k )) PTreeImpl::remove( roots.back().second, latestVersion, k ); // FIXME: Make PTreeImpl::insert do this automatically (see also WriteMap.h FIXME) + PTreeImpl::insert( roots.back().second, latestVersion, MapPair>(k,std::make_pair(t,insertAt)) ); } void erase(const K& begin, const K& end) { - PTreeImpl::remove( *latestRoot, latestVersion, begin, end ); + //PTreeImpl::remove( *latestRoot, latestVersion, begin, end ); + PTreeImpl::remove( roots.back().second, latestVersion, begin, end ); } void erase(const K& key ) { // key must be present - PTreeImpl::remove( *latestRoot, latestVersion, key ); + PTreeImpl::remove( roots.back().second, latestVersion, key ); + //PTreeImpl::remove( *latestRoot, latestVersion, key ); } void erase(iterator const& item) { // iterator must be in latest version! // SOMEDAY: Optimize to use item.finger and avoid repeated search @@ -653,7 +689,8 @@ public: }; ViewAtVersion at( Version v ) const { return ViewAtVersion(getRoot(v), v); } - ViewAtVersion atLatest() const { return ViewAtVersion(*latestRoot, latestVersion); } + ViewAtVersion atLatest() const { return ViewAtVersion(roots.back().second, latestVersion); } + //ViewAtVersion atLatest() const { return ViewAtVersion(*latestRoot, latestVersion); } // TODO: getHistory?