blob restore : Log and skip data copy if we miss data for a certain tenant (#10621)

This commit is contained in:
Hui Liu 2023-07-19 09:52:30 -07:00 committed by GitHub
parent 63d387eb0b
commit 7c8c24bc8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 185 additions and 137 deletions

View File

@ -1178,6 +1178,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BLOB_RESTORE_MANIFEST_RETENTION_MAX, 10 );
init( BLOB_RESTORE_MLOGS_RETENTION_SECS, isSimulated ? 180 : 3600 * 24 * 14 );
init( BLOB_RESTORE_LOAD_KEY_VERSION_MAP_STEP_SIZE, 10000 );
init( BLOB_RESTORE_SKIP_EMPTY_RANGES, true );
init( BLOB_GRANULES_FLUSH_BATCH_SIZE, isSimulated ? 2 : 64 );

View File

@ -1200,6 +1200,7 @@ public:
int BLOB_RESTORE_MLOGS_RETENTION_SECS;
int BLOB_RESTORE_LOAD_KEY_VERSION_MAP_STEP_SIZE;
int BLOB_GRANULES_FLUSH_BATCH_SIZE;
bool BLOB_RESTORE_SKIP_EMPTY_RANGES;
// Blob metadata
int64_t BLOB_METADATA_CACHE_TTL;

View File

@ -5674,7 +5674,7 @@ ACTOR Future<Void> truncateMutations(Reference<BlobManagerData> bmData, Version
state std::string timestamp = BackupAgentBase::formatTime(epochs);
state Version truncVersion = wait(timeKeeperVersionFromDatetime(timestamp, bmData->db));
if (truncVersion > 0) {
if (truncVersion > 0 && truncVersion < flushVersion) {
state std::string mlogsUrl =
wait(BlobGranuleBackupConfig().mutationLogsUrl().getD(SystemDBWriteLockedNow(bmData->db.getReference())));
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(mlogsUrl, {}, {});
@ -5682,6 +5682,8 @@ ACTOR Future<Void> truncateMutations(Reference<BlobManagerData> bmData, Version
bmData->stats.lastMLogTruncationVersion = truncVersion;
TraceEvent("TruncateMutationLogs").detail("Version", truncVersion).detail("Timestamp", timestamp);
CODE_PROBE(true, "Flush blob granules and truncate mutation logs");
} else {
TraceEvent("SkipTruncateMutations").detail("Version", truncVersion).detail("FlushVer", flushVersion);
}
return Void();
}
@ -5696,12 +5698,11 @@ ACTOR Future<Void> backupManifest(Reference<BlobManagerData> bmData) {
bmData->dbInfo,
manifestStore,
bmData->epoch,
bmData->manifestDumperSeqNo,
bmData->manifestDumperSeqNo++,
bmData->enableManifestEncryption));
bmData->stats.lastManifestSeqNo = bmData->manifestDumperSeqNo;
bmData->stats.manifestSizeInBytes += bytes;
bmData->stats.lastManifestDumpTs = now();
bmData->manifestDumperSeqNo++;
bmData->manifestCompletitionTrigger.trigger();
return Void();
}

View File

@ -111,7 +111,9 @@ private:
if (granules.empty()) {
TraceEvent("EmptyBlobGranules", self->interf_.id()).log();
CODE_PROBE(true, "Blob restore with no blob granules");
wait(BlobRestoreController::setError(controller, "No blob granules"));
wait(canRestore(self));
wait(preloadApplyMutationsKeyVersionMap(self));
wait(BlobRestoreController::setPhase(controller, APPLYING_MLOGS, self->interf_.id()));
return Void();
}
@ -256,13 +258,17 @@ private:
decodeKeyServersValue(UIDtoTagMap, it.value, src, dest, srcId, destId);
if (std::find_if(src.begin(), src.end(), BlobMigratorInterface::isBlobMigrator) == src.end() &&
std::find_if(dest.begin(), dest.end(), BlobMigratorInterface::isBlobMigrator) ==
dest.end()) {
dest.empty()) {
continue; // not owned by blob migrator
}
state KeyRangeRef range(it.key, keyServers[i + 1].key);
int64_t bytes = sizeInBytes(self, range);
if (bytes == 0) {
// set a non-zero value even if it's an empty range, so that we don't
// move to next phase early
bytes = 1;
}
dprint(" incompleted {}, size: {}\n", range.toString(), bytes);
incompleted += bytes;
}

View File

@ -7922,9 +7922,7 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
// The key range should not cross tenant boundary.
ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> tryReadBlobGranuleChunks(Transaction* tr,
KeyRange keys,
Version fetchVersion,
int maxRetryCount = 10) {
state int retryCount = 0;
Version fetchVersion) {
state Version readVersion = fetchVersion;
loop {
try {
@ -7935,11 +7933,14 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> tryReadBlobGranuleChunk
.detail("FetchVersion", fetchVersion);
return chunks;
} catch (Error& e) {
if (retryCount >= maxRetryCount) {
throw e;
if (SERVER_KNOBS->BLOB_RESTORE_SKIP_EMPTY_RANGES &&
e.code() == error_code_blob_granule_transaction_too_old) {
CODE_PROBE(true, "Skip blob ranges for restore", probe::decoration::rare);
TraceEvent(SevWarn, "SkipBlobGranuleForRestore").error(e).detail("Keys", keys);
Standalone<VectorRef<BlobGranuleChunkRef>> empty;
return empty;
}
wait(tr->onError(e));
retryCount += 1;
}
}
}
@ -7971,14 +7972,14 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
try {
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks =
wait(readBlobGranuleChunks(tr, cx, keys, fetchVersion));
TraceEvent("ReadBlobGranules").detail("Keys", keys).detail("Chunks", chunks.size());
TraceEvent(SevDebug, "ReadBlobGranuleChunks").detail("Keys", keys).detail("Chunks", chunks.size());
state int i;
for (i = 0; i < chunks.size(); ++i) {
state KeyRangeRef chunkRange = chunks[i].keyRange;
// Chunk is empty if no snapshot file. Skip it
if (!chunks[i].snapshotFile.present()) {
TraceEvent("SkipBlobChunk")
TraceEvent("SkipEmptyBlobChunkForRestore")
.detail("Chunk", chunks[i].keyRange)
.detail("Version", chunks[i].includedVersion);
RangeResult rows;
@ -7986,23 +7987,37 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end));
continue;
}
state Reference<BlobConnectionProvider> blobConn = wait(loadBStoreForTenant(tenantData, chunkRange));
state RangeResult rows = wait(readBlobGranule(chunks[i], keys, 0, fetchVersion, blobConn));
try {
state Reference<BlobConnectionProvider> blobConn = wait(loadBStoreForTenant(tenantData, chunkRange));
state RangeResult rows = wait(readBlobGranule(chunks[i], keys, 0, fetchVersion, blobConn));
TraceEvent(SevDebug, "ReadBlobData")
.detail("Rows", rows.size())
.detail("ChunkRange", chunkRange)
.detail("FetchVersion", fetchVersion);
// It should read all the data from that chunk
ASSERT(!rows.more);
if (i == chunks.size() - 1) {
// set more to false when it's the last chunk
rows.more = false;
} else {
rows.more = true;
rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end));
TraceEvent(SevDebug, "ReadBlobData")
.detail("Rows", rows.size())
.detail("ChunkRange", chunkRange)
.detail("FetchVersion", fetchVersion);
// It should read all the data from that chunk
ASSERT(!rows.more);
if (i == chunks.size() - 1) {
// set more to false when it's the last chunk
rows.more = false;
} else {
rows.more = true;
rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end));
}
results.send(rows);
} catch (Error& err) {
if (SERVER_KNOBS->BLOB_RESTORE_SKIP_EMPTY_RANGES &&
(err.code() == error_code_file_not_found ||
err.code() == error_code_blob_granule_transaction_too_old)) {
// skip no data ranges and restore as much data as we can
TraceEvent(SevWarn, "SkipBlobChunkForRestore").error(err).detail("ChunkRange", chunkRange);
RangeResult rows;
results.send(rows);
CODE_PROBE(true, "Skip blob chunks for restore", probe::decoration::rare);
} else {
throw;
}
}
results.send(rows);
}
if (chunks.size() == 0) {
@ -8383,10 +8398,11 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
}
// There are two reasons for change_feed_not_registered:
// 1. The feed was just created, but the ss mutation stream is ahead of the GRV that fetchChangeFeedApplier
// uses to read the change feed data from the database. In this case we need to wait and retry
// 2. The feed was destroyed, but we missed a metadata update telling us this. In this case we need to destroy
// the feed
// 1. The feed was just created, but the ss mutation stream is ahead of the GRV that
// fetchChangeFeedApplier uses to read the change feed data from the database. In this case we need to
// wait and retry
// 2. The feed was destroyed, but we missed a metadata update telling us this. In this case we need to
// destroy the feed
// endVersion >= the metadata create version, so we can safely use it as a proxy
if (beginVersion != 0 || seenNotRegistered || endVersion <= data->desiredOldestVersion.get()) {
// If any of these are true, the feed must be destroyed.
@ -8398,8 +8414,8 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
.detail("Version", cleanupVersion);
if (g_network->isSimulated() && !g_simulator->restarted) {
// verify that the feed was actually destroyed and it's not an error in this inference logic. Restarting
// tests produce false positives because the validation state isn't kept across tests
// verify that the feed was actually destroyed and it's not an error in this inference logic.
// Restarting tests produce false positives because the validation state isn't kept across tests
ASSERT(g_simulator->validationData.allDestroyedChangeFeedIDs.count(changeFeedInfo->id.toString()));
}
@ -8441,9 +8457,9 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
PromiseStream<Key> destroyedFeeds,
UID fetchKeysID) {
// Wait for current TLog batch to finish to ensure that we're fetching metadata at a version >= the version of the
// ChangeServerKeys mutation. This guarantees we don't miss any metadata between the previous batch's version
// (data->version) and the mutation version.
// Wait for current TLog batch to finish to ensure that we're fetching metadata at a version >= the version of
// the ChangeServerKeys mutation. This guarantees we don't miss any metadata between the previous batch's
// version (data->version) and the mutation version.
wait(data->version.whenAtLeast(data->version.get() + 1));
state Version fetchVersion = data->version.get();
@ -8453,9 +8469,11 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
.detail("FKID", fetchKeysID);
state OverlappingChangeFeedsInfo feedMetadata = wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion));
// rest of this actor needs to happen without waits that might yield to scheduler, to avoid races in feed metadata.
// rest of this actor needs to happen without waits that might yield to scheduler, to avoid races in feed
// metadata.
// Find set of feeds we currently have that were not present in fetch, to infer that they may have been destroyed.
// Find set of feeds we currently have that were not present in fetch, to infer that they may have been
// destroyed.
state std::unordered_map<Key, Version> missingFeeds;
auto ranges = data->keyChangeFeed.intersectingRanges(keys);
for (auto& r : ranges) {
@ -8476,8 +8494,8 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
}
}
}
// FIXME: might want to inject delay here sometimes in simulation, so that races that would only happen when a feed
// destroy causes a wait are more prominent?
// FIXME: might want to inject delay here sometimes in simulation, so that races that would only happen when a
// feed destroy causes a wait are more prominent?
std::vector<Key> feedIds;
feedIds.reserve(feedMetadata.feeds.size());
@ -8564,8 +8582,8 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
continue;
}
// we checked all feeds we already owned in this range at the start to reset them if they were removing, and
// this actor would have been cancelled if a later remove happened
// we checked all feeds we already owned in this range at the start to reset them if they were removing,
// and this actor would have been cancelled if a later remove happened
ASSERT(!changeFeedInfo->removing);
if (cfEntry.stopVersion < changeFeedInfo->stopVersion) {
CODE_PROBE(true, "Change feed updated stop version from fetch metadata");
@ -8613,10 +8631,10 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
Version fetchedMetadataVersion = feedMetadata.getFeedMetadataVersion(existingEntry->second->range);
Version lastMetadataVersion = feed.second;
// Look for case where feed's range was moved away, feed was destroyed, and then feed's range was moved back.
// This happens where feed is removing, the fetch metadata is higher than the moved away version, and the feed
// isn't in the fetched response. In that case, the feed must have been destroyed between lastMetadataVersion
// and fetchedMetadataVersion
// Look for case where feed's range was moved away, feed was destroyed, and then feed's range was moved
// back. This happens where feed is removing, the fetch metadata is higher than the moved away version, and
// the feed isn't in the fetched response. In that case, the feed must have been destroyed between
// lastMetadataVersion and fetchedMetadataVersion
if (lastMetadataVersion >= fetchedMetadataVersion) {
CODE_PROBE(true, "Change Feed fetched higher metadata version before moved away", probe::decoration::rare);
continue;
@ -8793,13 +8811,14 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
data->counters.bytesFetched,
data->counters.kvFetched);
// Set read options to use non-caching reads and set Fetch type unless low priority data fetching is disabled by a
// knob
// Set read options to use non-caching reads and set Fetch type unless low priority data fetching is disabled by
// a knob
state ReadOptions readOptions = ReadOptions(
{}, SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY ? ReadType::FETCH : ReadType::NORMAL, CacheResult::False);
// need to set this at the very start of the fetch, to handle any private change feed destroy mutations we get for
// this key range, that apply to change feeds we don't know about yet because their metadata hasn't been fetched yet
// need to set this at the very start of the fetch, to handle any private change feed destroy mutations we get
// for this key range, that apply to change feeds we don't know about yet because their metadata hasn't been
// fetched yet
data->changeFeedDestroys[fetchKeysID] = destroyedFeeds;
// delay(0) to force a return to the run loop before the work of fetchKeys is started.
@ -8807,9 +8826,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
try {
wait(data->coreStarted.getFuture() && delay(0));
// On SS Reboot, durableVersion == latestVersion, so any mutations we add to the mutation log would be skipped
// if added before latest version advances. To ensure this doesn't happen, we wait for version to increase by
// one if this fetchKeys was initiated by a changeServerKeys from restoreDurableState
// On SS Reboot, durableVersion == latestVersion, so any mutations we add to the mutation log would be
// skipped if added before latest version advances. To ensure this doesn't happen, we wait for version to
// increase by one if this fetchKeys was initiated by a changeServerKeys from restoreDurableState
if (data->version.get() == data->durableVersion.get()) {
wait(data->version.whenAtLeast(data->version.get() + 1));
wait(delay(0));
@ -8865,8 +8884,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// until all mutations for a version have been processed. We need to take the durableVersionLock to ensure
// data->version is greater than the version of the mutation which caused the fetch to be initiated.
// We must also ensure we have fetched all change feed metadata BEFORE changing the phase to fetching to ensure
// change feed mutations get applied correctly
// We must also ensure we have fetched all change feed metadata BEFORE changing the phase to fetching to
// ensure change feed mutations get applied correctly
state std::vector<Key> changeFeedsToFetch;
state Reference<BlobRestoreController> restoreController = makeReference<BlobRestoreController>(data->cx, keys);
state bool isFullRestore = wait(BlobRestoreController::isRestoring(restoreController));
@ -8887,8 +8906,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state int debug_nextRetryToLog = 1;
state Error lastError;
// FIXME: The client cache does not notice when servers are added to a team. To read from a local storage server
// we must refresh the cache manually.
// FIXME: The client cache does not notice when servers are added to a team. To read from a local storage
// server we must refresh the cache manually.
data->cx->invalidateCache(Key(), keys);
loop {
@ -8929,9 +8948,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
throw e;
}
// Note that error in getting GRV doesn't affect any storage server state. Therefore, we catch all
// errors here without failing the storage server. When error happens, fetchVersion fall back to
// the above computed fetchVersion.
// Note that error in getting GRV doesn't affect any storage server state. Therefore, we catch
// all errors here without failing the storage server. When error happens, fetchVersion fall
// back to the above computed fetchVersion.
TraceEvent(SevWarn, "FetchKeyGRVError", data->thisServerID).error(e);
lastError = e;
}
@ -8951,8 +8970,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state KeyRef rangeEnd;
if (isFullRestore) {
state BlobRestorePhase phase = wait(BlobRestoreController::currentPhase(restoreController));
// Read from blob only when it's copying data for full restore. Otherwise it may cause data corruptions
// e.g we don't want to copy from blob any more when it's applying mutation logs(APPLYING_MLOGS)
// Read from blob only when it's copying data for full restore. Otherwise it may cause data
// corruptions e.g we don't want to copy from blob any more when it's applying mutation
// logs(APPLYING_MLOGS)
if (phase == BlobRestorePhase::COPYING_DATA || phase == BlobRestorePhase::ERROR) {
wait(loadBGTenantMap(&data->tenantData, &tr));
// only copy the range that intersects with full restore range
@ -9082,11 +9102,11 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
AddingShard* otherShard = data->shards.rangeContaining(blockBegin).value()->adding.get();
keys = shard->keys;
// Split our prior updates. The ones that apply to our new, restricted key range will go back into
// shard->updates, and the ones delivered to the new shard will be discarded because it is in
// WaitPrevious phase (hasn't chosen a fetchVersion yet). What we are doing here is expensive and
// could get more expensive if we started having many more blocks per shard. May need optimization
// in the future.
// Split our prior updates. The ones that apply to our new, restricted key range will go back
// into shard->updates, and the ones delivered to the new shard will be discarded because it is
// in WaitPrevious phase (hasn't chosen a fetchVersion yet). What we are doing here is expensive
// and could get more expensive if we started having many more blocks per shard. May need
// optimization in the future.
std::deque<Standalone<VerUpdateRef>>::iterator u = updatesToSplit.begin();
for (; u != updatesToSplit.end(); ++u) {
splitMutations(data, data->shards, *u);
@ -9119,9 +9139,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
.detail("FKID", interval.pairID)
.detail("SV", data->storageVersion())
.detail("DV", data->durableVersion.get());
// Directly commit()ing the IKVS would interfere with updateStorage, possibly resulting in an incomplete version
// being recovered. Instead we wait for the updateStorage loop to commit something (and consequently also what
// we have written)
// Directly commit()ing the IKVS would interfere with updateStorage, possibly resulting in an incomplete
// version being recovered. Instead we wait for the updateStorage loop to commit something (and consequently
// also what we have written)
state Future<std::unordered_map<Key, Version>> feedFetchMain = dispatchChangeFeeds(data,
fetchKeysID,
@ -9146,13 +9166,13 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
.detail("SV", data->storageVersion())
.detail("DV", data->durableVersion.get());
// Wait to run during update(), after a new batch of versions is received from the tlog but before eager reads
// take place.
// Wait to run during update(), after a new batch of versions is received from the tlog but before eager
// reads take place.
Promise<FetchInjectionInfo*> p;
data->readyFetchKeys.push_back(p);
// After we add to the promise readyFetchKeys, update() would provide a pointer to FetchInjectionInfo that we
// can put mutation in.
// After we add to the promise readyFetchKeys, update() would provide a pointer to FetchInjectionInfo that
// we can put mutation in.
FetchInjectionInfo* batch = wait(p.getFuture());
TraceEvent(SevDebug, "FKUpdateBatch", data->thisServerID).detail("FKID", interval.pairID);
@ -9160,13 +9180,15 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
ASSERT(data->version.get() >= fetchVersion);
// Choose a transferredVersion. This choice and timing ensure that
// * The transferredVersion can be mutated in versionedData
// * The transferredVersion isn't yet committed to storage (so we can write the availability status change)
// * The transferredVersion is <= the version of any of the updates in batch, and if there is an equal version
// * The transferredVersion isn't yet committed to storage (so we can write the availability status
// change)
// * The transferredVersion is <= the version of any of the updates in batch, and if there is an equal
// version
// its mutations haven't been processed yet
shard->transferredVersion = data->version.get() + 1;
// shard->transferredVersion = batch->changes[0].version; //< FIXME: This obeys the documented properties, and
// seems "safer" because it never introduces extra versions into the data structure, but violates some ASSERTs
// currently
// shard->transferredVersion = batch->changes[0].version; //< FIXME: This obeys the documented properties,
// and seems "safer" because it never introduces extra versions into the data structure, but violates some
// ASSERTs currently
data->mutableData().createNewVersion(shard->transferredVersion);
ASSERT(shard->transferredVersion > data->storageVersion());
ASSERT(shard->transferredVersion == data->data().getLatestVersion());
@ -9208,10 +9230,10 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// the minimal version in updates must be larger than fetchVersion
ASSERT(shard->updates.empty() || shard->updates[0].version > fetchVersion);
// Put the updates that were collected during the FinalCommit phase into the batch at the transferredVersion.
// Eager reads will be done for them by update(), and the mutations will come back through
// AddingShard::addMutations and be applied to versionedMap and mutationLog as normal. The lie about their
// version is acceptable because this shard will never be read at versions < transferredVersion
// Put the updates that were collected during the FinalCommit phase into the batch at the
// transferredVersion. Eager reads will be done for them by update(), and the mutations will come back
// through AddingShard::addMutations and be applied to versionedMap and mutationLog as normal. The lie about
// their version is acceptable because this shard will never be read at versions < transferredVersion
for (auto i = shard->updates.begin(); i != shard->updates.end(); ++i) {
i->version = shard->transferredVersion;
@ -9273,8 +9295,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
keys,
true); // keys will be available when getLatestVersion()==transferredVersion is durable
// Note that since it receives a pointer to FetchInjectionInfo, the thread does not leave this actor until this
// point.
// Note that since it receives a pointer to FetchInjectionInfo, the thread does not leave this actor until
// this point.
// Wait for the transferred version (and therefore the shard data) to be committed and durable.
wait(data->durableVersion.whenAtLeast(feedTransferredVersion));
@ -10310,9 +10332,9 @@ void changeServerKeys(StorageServer* data,
}
}
// Shard state depends on nowAssigned and whether the data is available (actually assigned in memory or on the disk)
// up to the given version. The latter depends on data->newestAvailableVersion, so loop over the ranges of that.
// SOMEDAY: Could this just use shards? Then we could explicitly do the removeDataRange here when an
// Shard state depends on nowAssigned and whether the data is available (actually assigned in memory or on the
// disk) up to the given version. The latter depends on data->newestAvailableVersion, so loop over the ranges
// of that. SOMEDAY: Could this just use shards? Then we could explicitly do the removeDataRange here when an
// adding/transferred shard is cancelled
auto vr = data->newestAvailableVersion.intersectingRanges(keys);
std::vector<std::pair<KeyRange, Version>> changeNewestAvailable;
@ -10713,10 +10735,10 @@ void rollback(StorageServer* data, Version rollbackVersion, Version nextVersion)
// to simply restart the storage server actor and restore from the persistent disk state, and then roll
// forward from the TLog's history. It's not quite as efficient, but we rarely have to do this in practice.
// FIXME: This code is relying for liveness on an undocumented property of the log system implementation: that after
// a rollback the rolled back versions will eventually be missing from the peeked log. A more sophisticated
// approach would be to make the rollback range durable and, after reboot, skip over those versions if they appear
// in peek results.
// FIXME: This code is relying for liveness on an undocumented property of the log system implementation: that
// after a rollback the rolled back versions will eventually be missing from the peeked log. A more
// sophisticated approach would be to make the rollback range durable and, after reboot, skip over those
// versions if they appear in peek results.
throw please_reboot();
}
@ -10745,8 +10767,8 @@ void StorageServer::addMutation(Version version,
.detail("ShardEnd", shard.end);
if (!fromFetch) {
// have to do change feed before applyMutation because nonExpanded wasn't copied into the mutation log arena,
// and thus would go out of scope if it wasn't copied into the change feed arena
// have to do change feed before applyMutation because nonExpanded wasn't copied into the mutation log
// arena, and thus would go out of scope if it wasn't copied into the change feed arena
MutationRefAndCipherKeys encrypt = encryptedMutation;
if (encrypt.mutation.isEncrypted() && mutation.type != MutationRef::SetValue &&
@ -10836,8 +10858,8 @@ private:
if (processedStartKey) {
// Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
// We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same
// keys
// We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the
// same keys
ASSERT(m.type == MutationRef::SetValue && m.param1.startsWith(data->sk));
KeyRangeRef keys(startKey.removePrefix(data->sk), m.param1.removePrefix(data->sk));
@ -10857,8 +10879,9 @@ private:
setAssignedStatus(data, keys, nowAssigned);
// The changes for version have already been received (and are being processed now). We need to
// fetch the data for change.version-1 (changes from versions < change.version) If emptyRange, treat
// the shard as empty, see removeKeysFromFailedServer() for more details about this scenario.
// fetch the data for change.version-1 (changes from versions < change.version) If emptyRange,
// treat the shard as empty, see removeKeysFromFailedServer() for more details about this
// scenario.
changeServerKeys(data, keys, nowAssigned, currentVersion - 1, context);
}
}
@ -10961,8 +10984,8 @@ private:
.detail("PopVersion", popVersion)
.detail("Status", status);
// Because of data moves, we can get mutations operating on a change feed we don't yet know about, because
// the metadata fetch hasn't started yet
// Because of data moves, we can get mutations operating on a change feed we don't yet know about,
// because the metadata fetch hasn't started yet
bool createdFeed = false;
bool popMutationLog = false;
bool addMutationToLog = false;
@ -10998,14 +11021,14 @@ private:
} else if (feed != data->uidChangeFeed.end() && feed->second->removing && !feed->second->destroyed &&
status != ChangeFeedStatus::CHANGE_FEED_DESTROY) {
// Because we got a private mutation for this change feed, the feed must have moved back after being
// moved away. Normally we would later find out about this via a fetch, but in the particular case where
// the private mutation is the creation of the change feed, and the following race occurred, we must
// refresh it here:
// 1. This SS found out about the feed from a fetch, from a SS with a higher version that already got
// the feed create mutation
// moved away. Normally we would later find out about this via a fetch, but in the particular case
// where the private mutation is the creation of the change feed, and the following race occurred,
// we must refresh it here:
// 1. This SS found out about the feed from a fetch, from a SS with a higher version that already
// got the feed create mutation
// 2. The shard was moved away
// 3. The shard was moved back, and this SS fetched change feed metadata from a different SS that did
// not yet recieve the private mutation, so the feed was not refreshed
// 3. The shard was moved back, and this SS fetched change feed metadata from a different SS that
// did not yet recieve the private mutation, so the feed was not refreshed
// 4. This SS gets the private mutation, the feed is still marked as removing
TraceEvent(SevDebug, "ResetChangeFeedInfoFromPrivateMutation", data->thisServerID)
.detail("FeedID", changeFeedId)
@ -11178,7 +11201,8 @@ private:
m.param1.removePrefix(systemKeys.begin).removePrefix(storageCachePrefix));
data->cachedRangeMap.insert(keys, true);
// Figure out the affected shard ranges and maintain the cached key-range information in the in-memory map
// Figure out the affected shard ranges and maintain the cached key-range information in the in-memory
// map
// TODO revisit- we are not splitting the cached ranges based on shards as of now.
if (0) {
auto cachedRanges = data->shards.intersectingRanges(keys);
@ -11313,8 +11337,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
// We allow the storage server to make some progress between e-brake periods, referred to as "overage", in
// order to ensure that it advances desiredOldestVersion enough for updateStorage to make enough progress on
// freeing up queue size. We also increase these limits if speed up simulation was set IF they were buggified to
// a very small value.
// freeing up queue size. We also increase these limits if speed up simulation was set IF they were
// buggified to a very small value.
state int64_t hardLimit = SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES;
state int64_t hardLimitOverage = SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES_OVERAGE;
if (g_network->isSimulated() && g_simulator->speedUpSimulation) {
@ -11488,15 +11512,15 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
collectingCipherKeys = false;
eager = UpdateEagerReadInfo(enableClearRangeEagerReads);
} else {
// Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now.
// If there is an epoch end we skip this step, to increase testability and to prevent inserting a
// version in the middle of a rolled back version range.
// Any fetchKeys which are ready to transition their shards to the adding,transferred state do so
// now. If there is an epoch end we skip this step, to increase testability and to prevent inserting
// a version in the middle of a rolled back version range.
while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
auto fk = data->readyFetchKeys.back();
data->readyFetchKeys.pop_back();
fk.send(&fii);
// fetchKeys() would put the data it fetched into the fii. The thread will not return back to this
// actor until it was completed.
// fetchKeys() would put the data it fetched into the fii. The thread will not return back to
// this actor until it was completed.
}
for (auto& c : fii.changes)
@ -11508,8 +11532,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
CODE_PROBE(
true,
"A fetchKeys completed while we were doing this, so eager might be outdated. Read it again.");
// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads
// only selectively
// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the
// reads only selectively
eager = UpdateEagerReadInfo(enableClearRangeEagerReads);
cloneCursor2 = cursor->cloneNoMore();
}
@ -11623,8 +11647,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
Span span("SS:update"_loc, spanContext);
// Drop non-private mutations if TSS fault injection is enabled in simulation, or if this is a TSS in
// quarantine.
// Drop non-private mutations if TSS fault injection is enabled in simulation, or if this is a TSS
// in quarantine.
if (g_network->isSimulated() && data->isTss() && !g_simulator->speedUpSimulation &&
g_simulator->tssMode == ISimulator::TSSMode::EnabledDropMutations &&
data->tssFaultInjectTime.present() && data->tssFaultInjectTime.get() < now() &&
@ -12124,9 +12148,9 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
if (data->tenantMap.getLatestVersion() < newOldestVersion) {
data->tenantMap.createNewVersion(newOldestVersion);
}
// We want to forget things from these data structures atomically with changing oldestVersion (and "before",
// since oldestVersion.set() may trigger waiting actors) forgetVersionsBeforeAsync visibly forgets
// immediately (without waiting) but asynchronously frees memory.
// We want to forget things from these data structures atomically with changing oldestVersion (and
// "before", since oldestVersion.set() may trigger waiting actors) forgetVersionsBeforeAsync visibly
// forgets immediately (without waiting) but asynchronously frees memory.
Future<Void> finishedForgetting =
data->mutableData().forgetVersionsBeforeAsync(newOldestVersion, TaskPriority::UpdateStorage) &&
data->tenantMap.forgetVersionsBeforeAsync(newOldestVersion, TaskPriority::UpdateStorage);
@ -12196,8 +12220,8 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
auto cleanupPending = data->changeFeedCleanupDurable.find(info->second->id);
if (cleanupPending != data->changeFeedCleanupDurable.end() &&
cleanupPending->second <= newOldestVersion) {
// due to a race, we just applied a cleanup mutation, but feed updates happen just after. Don't
// write any mutations for this feed.
// due to a race, we just applied a cleanup mutation, but feed updates happen just after.
// Don't write any mutations for this feed.
curFeed++;
continue;
}
@ -12212,8 +12236,8 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
KeyValueRef(changeFeedDurableKey(info->second->id, it.version),
changeFeedDurableValue(it.encrypted.present() ? it.encrypted.get() : it.mutations,
it.knownCommittedVersion)));
// FIXME: there appears to be a bug somewhere where the exact same mutation appears twice in a row
// in the stream. We should fix this assert to be strictly > and re-enable it
// FIXME: there appears to be a bug somewhere where the exact same mutation appears twice in a
// row in the stream. We should fix this assert to be strictly > and re-enable it
ASSERT(it.version >= info->second->storageVersion);
info->second->storageVersion = it.version;
durableChangeFeedMutations++;
@ -12226,9 +12250,9 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
if (alreadyFetched > info->second->storageVersion) {
info->second->storageVersion = std::min(alreadyFetched, newOldestVersion);
if (alreadyFetched > info->second->storageVersion) {
// This change feed still has pending mutations fetched and written to storage that are higher
// than the new durableVersion. To ensure its storage and durable version get updated, we need
// to add it back to fetchingChangeFeeds
// This change feed still has pending mutations fetched and written to storage that are
// higher than the new durableVersion. To ensure its storage and durable version get
// updated, we need to add it back to fetchingChangeFeeds
data->fetchingChangeFeeds.insert(info->first);
}
}
@ -12334,8 +12358,8 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
curFeed = 0;
while (curFeed < feedFetchVersions.size()) {
auto info = data->uidChangeFeed.find(feedFetchVersions[curFeed].first);
// Don't update if the feed is pending cleanup. Either it will get cleaned up and destroyed, or it will get
// fetched again, where the fetch version will get reset.
// Don't update if the feed is pending cleanup. Either it will get cleaned up and destroyed, or it will
// get fetched again, where the fetch version will get reset.
if (info != data->uidChangeFeed.end() && !data->changeFeedCleanupDurable.count(info->second->id)) {
if (feedFetchVersions[curFeed].second > info->second->durableFetchVersion.get()) {
info->second->durableFetchVersion.set(feedFetchVersions[curFeed].second);
@ -13412,8 +13436,8 @@ ACTOR Future<Void> serveGetMappedKeyValuesRequests(StorageServer* self,
loop {
GetMappedKeyValuesRequest req = waitNext(getMappedKeyValues);
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade
// before doing real work
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so
// downgrade before doing real work
self->actors.add(self->readGuard(req, getMappedKeyValuesQ));
}
}

View File

@ -83,6 +83,18 @@ struct BlobRestoreWorkload : TestWorkload {
// disable manifest backup and log truncation
wait(disableManifestBackup(cx));
// check if we have manifest
Reference<BlobConnectionProvider> manifestStore =
BlobConnectionProvider::newBlobConnectionProvider(self->blobManifestUrl_.toString());
Reference<BackupContainerFileSystem> container = manifestStore->getForRead("");
BackupContainerFileSystem::FilesAndSizesT filesAndSizes = wait(container->listFiles(""));
if (filesAndSizes.empty()) {
fmt::print("Skip blob restore test because of missing manifest\n");
CODE_PROBE(true, "Skip blob restore test because of missing manifest", probe::decoration::rare);
return Void();
}
// check if we have mutation logs
wait(store(self->restoreTargetVersion_, getRestoreVersion(cx, self)));
if (self->restoreTargetVersion_ == invalidVersion) {
CODE_PROBE(true, "Skip blob restore test because of missing mutation logs");
@ -210,12 +222,12 @@ struct BlobRestoreWorkload : TestWorkload {
wait(flushBlobRanges(self->extraDb_, self, {}));
return Void();
}
// TODO need to define more specific error handling
if (phase == BlobRestorePhase::ERROR) {
auto db = SystemDBWriteLockedNow(self->extraDb_.getReference());
std::string error = wait(BlobGranuleRestoreConfig().error().getD(db));
fmt::print("Unexpected restore error code = {}\n", error);
return Void();
ASSERT(phase != BlobRestorePhase::ERROR);
}
wait(delay(5)); // delay to avoid busy loop

View File

@ -10,6 +10,7 @@ storageEngineExcludeTypes = [4, 5]
bg_consistency_check_enabled = 0
shard_encode_location_metadata = false
bw_throttling_enabled = false
blob_restore_skip_empty_ranges = false
[[test]]
testTitle = 'SetupBlob'

View File

@ -10,6 +10,7 @@ storageEngineExcludeTypes = [4, 5]
bg_consistency_check_enabled = 0
shard_encode_location_metadata = false
bw_throttling_enabled = false
blob_restore_skip_empty_ranges = false
[[test]]
testTitle = 'SetupBlob'

View File

@ -10,6 +10,7 @@ storageEngineExcludeTypes = [4, 5]
bg_consistency_check_enabled = 0
shard_encode_location_metadata = false
bw_throttling_enabled = false
blob_restore_skip_empty_ranges = false
[[test]]
testTitle = 'SetupBlob'