Fixed file offset not reset issue.
This commit is contained in:
parent
89519343a7
commit
915ab8c402
|
@ -529,6 +529,7 @@ ACTOR Future<Void> fetchCheckpointFile(Database cx,
|
|||
state int64_t offset = 0;
|
||||
state Reference<IAsyncFile> asyncFile;
|
||||
loop {
|
||||
offset = 0;
|
||||
try {
|
||||
asyncFile = Reference<IAsyncFile>();
|
||||
++attempt;
|
||||
|
@ -559,7 +560,8 @@ ACTOR Future<Void> fetchCheckpointFile(Database cx,
|
|||
offset += rep.data.size();
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_end_of_stream) {
|
||||
if (e.code() != error_code_end_of_stream ||
|
||||
(g_network->isSimulated() && attempt == 1 && deterministicRandom()->coinflip())) {
|
||||
TraceEvent("FetchCheckpointFileError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("RemoteFile", remoteFile)
|
||||
|
|
|
@ -2246,6 +2246,7 @@ ACTOR Future<Void> fetchCheckpointQ(StorageServer* self, FetchCheckpointRequest
|
|||
}
|
||||
|
||||
try {
|
||||
state int64_t totalSize = 0;
|
||||
reader = newCheckpointReader(it->second, deterministicRandom()->randomUniqueID());
|
||||
wait(reader->init(req.token));
|
||||
|
||||
|
@ -2255,12 +2256,14 @@ ACTOR Future<Void> fetchCheckpointQ(StorageServer* self, FetchCheckpointRequest
|
|||
FetchCheckpointReply reply(req.token);
|
||||
reply.data = data;
|
||||
req.reply.send(reply);
|
||||
totalSize += data.size();
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_end_of_stream) {
|
||||
req.reply.sendError(end_of_stream());
|
||||
TraceEvent("ServeFetchCheckpointEnd", self->thisServerID)
|
||||
.detail("CheckpointID", req.checkpointID)
|
||||
.detail("TotalSize", totalSize)
|
||||
.detail("Token", req.token);
|
||||
} else {
|
||||
TraceEvent(SevWarnAlways, "ServerFetchCheckpointFailure")
|
||||
|
|
|
@ -75,9 +75,11 @@ struct SSCheckpointRestoreWorkload : TestWorkload {
|
|||
state KeyRange testRange = KeyRangeRef(key, endKey);
|
||||
state std::vector<CheckpointMetaData> records;
|
||||
|
||||
TraceEvent("TestCheckpointRestoreBegin");
|
||||
int ignore = wait(setDDMode(cx, 0));
|
||||
state Version version = wait(self->writeAndVerify(self, cx, key, oldValue));
|
||||
|
||||
TraceEvent("TestCreatingCheckpoint").detail("Range", testRange);
|
||||
// Create checkpoint.
|
||||
state Transaction tr(cx);
|
||||
state CheckpointFormat format = deterministicRandom()->coinflip() ? RocksDBColumnFamily : RocksDB;
|
||||
|
|
Loading…
Reference in New Issue