add an option to read from blob storage for moving keys (#7848)

This commit is contained in:
Hui Liu 2022-08-23 06:07:17 -07:00 committed by GitHub
parent 8a77769211
commit 33411fd07e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 97 additions and 5 deletions

View File

@ -719,6 +719,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( STORAGE_LIMIT_BYTES, 500000 ); init( STORAGE_LIMIT_BYTES, 500000 );
init( BUGGIFY_LIMIT_BYTES, 1000 ); init( BUGGIFY_LIMIT_BYTES, 1000 );
init( FETCH_USING_STREAMING, false ); if( randomize && isSimulated && BUGGIFY ) FETCH_USING_STREAMING = true; //Determines if fetch keys uses streaming reads init( FETCH_USING_STREAMING, false ); if( randomize && isSimulated && BUGGIFY ) FETCH_USING_STREAMING = true; //Determines if fetch keys uses streaming reads
init( FETCH_USING_BLOB, false );
init( FETCH_BLOCK_BYTES, 2e6 ); init( FETCH_BLOCK_BYTES, 2e6 );
init( FETCH_KEYS_PARALLELISM_BYTES, 4e6 ); if( randomize && BUGGIFY ) FETCH_KEYS_PARALLELISM_BYTES = 3e6; init( FETCH_KEYS_PARALLELISM_BYTES, 4e6 ); if( randomize && BUGGIFY ) FETCH_KEYS_PARALLELISM_BYTES = 3e6;
init( FETCH_KEYS_PARALLELISM, 2 ); init( FETCH_KEYS_PARALLELISM, 2 );

View File

@ -678,6 +678,7 @@ public:
int STORAGE_LIMIT_BYTES; int STORAGE_LIMIT_BYTES;
int BUGGIFY_LIMIT_BYTES; int BUGGIFY_LIMIT_BYTES;
bool FETCH_USING_STREAMING; bool FETCH_USING_STREAMING;
bool FETCH_USING_BLOB;
int FETCH_BLOCK_BYTES; int FETCH_BLOCK_BYTES;
int FETCH_KEYS_PARALLELISM_BYTES; int FETCH_KEYS_PARALLELISM_BYTES;
int FETCH_KEYS_PARALLELISM; int FETCH_KEYS_PARALLELISM;

View File

@ -518,12 +518,12 @@ public:
state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess(); state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
state TaskPriority currentTaskID = g_network->getCurrentTask(); state TaskPriority currentTaskID = g_network->getCurrentTask();
if (++openCount >= 3000) { if (++openCount >= 6000) {
TraceEvent(SevError, "TooManyFiles").log(); TraceEvent(SevError, "TooManyFiles").log();
ASSERT(false); ASSERT(false);
} }
if (openCount == 2000) { if (openCount == 4000) {
TraceEvent(SevWarnAlways, "DisableConnectionFailures_TooManyFiles").log(); TraceEvent(SevWarnAlways, "DisableConnectionFailures_TooManyFiles").log();
g_simulator.speedUpSimulation = true; g_simulator.speedUpSimulation = true;
g_simulator.connectionFailuresDisableDuration = 1e6; g_simulator.connectionFailuresDisableDuration = 1e6;

View File

@ -23,6 +23,7 @@
#include <type_traits> #include <type_traits>
#include <unordered_map> #include <unordered_map>
#include "fdbclient/BlobGranuleCommon.h"
#include "fmt/format.h" #include "fmt/format.h"
#include "fdbclient/CommitTransaction.h" #include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h" #include "fdbclient/FDBTypes.h"
@ -41,6 +42,8 @@
#include "fdbclient/Tracing.h" #include "fdbclient/Tracing.h"
#include "flow/Util.h" #include "flow/Util.h"
#include "fdbclient/Atomic.h" #include "fdbclient/Atomic.h"
#include "fdbclient/BlobConnectionProvider.h"
#include "fdbclient/BlobGranuleReader.actor.h"
#include "fdbclient/CommitProxyInterface.h" #include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/DatabaseContext.h" #include "fdbclient/DatabaseContext.h"
#include "fdbclient/FDBTypes.h" #include "fdbclient/FDBTypes.h"
@ -1206,6 +1209,9 @@ public:
Reference<EventCacheHolder> storageServerSourceTLogIDEventHolder; Reference<EventCacheHolder> storageServerSourceTLogIDEventHolder;
// Connection to blob store for fetchKeys()
Reference<BlobConnectionProvider> blobConn;
StorageServer(IKeyValueStore* storage, StorageServer(IKeyValueStore* storage,
Reference<AsyncVar<ServerDBInfo> const> const& db, Reference<AsyncVar<ServerDBInfo> const> const& db,
StorageServerInterface const& ssi) StorageServerInterface const& ssi)
@ -1271,6 +1277,14 @@ public:
this->storage.kvGets = &counters.kvGets; this->storage.kvGets = &counters.kvGets;
this->storage.kvScans = &counters.kvScans; this->storage.kvScans = &counters.kvScans;
this->storage.kvCommits = &counters.kvCommits; this->storage.kvCommits = &counters.kvCommits;
if (SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") {
try {
blobConn = BlobConnectionProvider::newBlobConnectionProvider(SERVER_KNOBS->BG_URL);
} catch (Error& e) {
// Skip any error when establishing blob connection
}
}
} }
//~StorageServer() { fclose(log); } //~StorageServer() { fclose(log); }
@ -5442,6 +5456,11 @@ public:
}; };
ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction* tr, KeyRange keys) { ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction* tr, KeyRange keys) {
if (SERVER_KNOBS->FETCH_USING_STREAMING) {
wait(tr->getRangeStream(results, keys, GetRangeLimits(), Snapshot::True));
return Void();
}
state KeySelectorRef begin = firstGreaterOrEqual(keys.begin); state KeySelectorRef begin = firstGreaterOrEqual(keys.begin);
state KeySelectorRef end = firstGreaterOrEqual(keys.end); state KeySelectorRef end = firstGreaterOrEqual(keys.end);
@ -5475,6 +5494,73 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
} }
} }
// Read blob granules mapping from system keyspace. It keeps retrying until reaching maxRetryCount.
ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> tryReadBlobGranules(Transaction* tr,
KeyRange keys,
Version fetchVersion,
int maxRetryCount = 10) {
state int retryCount = 0;
state Version readVersion = fetchVersion;
loop {
try {
Standalone<VectorRef<BlobGranuleChunkRef>> chunks = wait(tr->readBlobGranules(keys, 0, readVersion));
return chunks;
} catch (Error& e) {
if (retryCount >= maxRetryCount) {
throw e;
}
wait(tr->onError(e));
retryCount += 1;
}
}
}
// Read keys from blob storage if they exist. Fail back to tryGetRange, which reads keys
// from storage servers with locally attached disks
ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
Transaction* tr,
KeyRange keys,
Version fetchVersion,
Reference<BlobConnectionProvider> blobConn) {
ASSERT(blobConn.isValid());
try {
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks = wait(tryReadBlobGranules(tr, keys, fetchVersion));
if (chunks.size() == 0) {
throw blob_granule_transaction_too_old(); // no data on blob
}
// todo check if blob storage covers all the expected key range
for (const BlobGranuleChunkRef& chunk : chunks) {
state KeyRangeRef chunkRange = chunk.keyRange;
state RangeResult rows = wait(readBlobGranule(chunk, keys, 0, fetchVersion, blobConn));
TraceEvent("ReadBlobData")
.detail("Rows", rows.size())
.detail("ChunkRange", chunkRange.toString())
.detail("Keys", keys.toString());
if (rows.size() == 0) {
rows.readThrough = KeyRef(rows.arena(), chunkRange.end);
}
results.send(rows);
}
results.sendError(end_of_stream()); // end of range read
} catch (Error& e) {
TraceEvent(SevWarn, "ReadBlobDataFailure")
.suppressFor(5.0)
.detail("Keys", keys.toString())
.detail("FetchVersion", fetchVersion)
.detail("Error", e.what());
tr->reset();
tr->setVersion(fetchVersion);
tr->trState->taskID = TaskPriority::FetchKeys;
wait(tryGetRange(results, tr, keys)); // fail back to storage server
}
return Void();
}
// We have to store the version the change feed was stopped at in the SS instead of just the stopped status // We have to store the version the change feed was stopped at in the SS instead of just the stopped status
// In addition to simplifying stopping logic, it enables communicating stopped status when fetching change feeds // In addition to simplifying stopping logic, it enables communicating stopped status when fetching change feeds
// from other SS correctly // from other SS correctly
@ -6351,9 +6437,13 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
tr.setVersion(fetchVersion); tr.setVersion(fetchVersion);
tr.trState->taskID = TaskPriority::FetchKeys; tr.trState->taskID = TaskPriority::FetchKeys;
state PromiseStream<RangeResult> results; state PromiseStream<RangeResult> results;
state Future<Void> hold = SERVER_KNOBS->FETCH_USING_STREAMING state Future<Void> hold;
? tr.getRangeStream(results, keys, GetRangeLimits(), Snapshot::True) if (SERVER_KNOBS->FETCH_USING_BLOB) {
: tryGetRange(results, &tr, keys); hold = tryGetRangeFromBlob(results, &tr, keys, fetchVersion, data->blobConn);
} else {
hold = tryGetRange(results, &tr, keys);
}
state Key nfk = keys.begin; state Key nfk = keys.begin;
try { try {