Fix memory bytes accounting
Avoid duplicated counting of arena memory since messages from peek cursor can share arena.
This commit is contained in:
parent
17915e13b0
commit
01eff0fc03
|
@ -64,6 +64,10 @@ struct VersionedMessage {
|
|||
}
|
||||
};
|
||||
|
||||
static bool sameArena(const Arena& a, const Arena& b) {
|
||||
return a.impl.getPtr() == b.impl.getPtr();
|
||||
}
|
||||
|
||||
struct BackupData {
|
||||
const UID myId;
|
||||
const Tag tag; // LogRouter tag for this worker, i.e., (-2, i)
|
||||
|
@ -320,7 +324,7 @@ struct BackupData {
|
|||
|
||||
if (messages.size() == num) {
|
||||
messages.clear();
|
||||
lock->release();
|
||||
lock->release(lock->activePermits());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -329,7 +333,7 @@ struct BackupData {
|
|||
for (int i = 0; i < num; i++) {
|
||||
const Arena& a = messages[i].arena;
|
||||
const Arena& b = messages[i + 1].arena;
|
||||
if (a.impl.getPtr() != b.impl.getPtr()) {
|
||||
if (!sameArena(a, b)) {
|
||||
bytes += a.getSize();
|
||||
}
|
||||
}
|
||||
|
@ -693,7 +697,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
|
|||
}
|
||||
logFileFutures.push_back(it->second.container.get().get()->writeTaggedLogFile(
|
||||
it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id, self->totalTags));
|
||||
ASSERT(self->messages.empty() || self->messages[0].getVersion() >= it->second.lastSavedVersion);
|
||||
// ASSERT(self->messages.empty() || self->messages[0].getVersion() >= it->second.lastSavedVersion);
|
||||
it++;
|
||||
}
|
||||
|
||||
|
@ -879,9 +883,13 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
|
|||
|
||||
// Note we aggressively peek (uncommitted) messages, but only committed
|
||||
// messages/mutations will be flushed to disk/blob in uploadData().
|
||||
state Arena prev;
|
||||
while (r->hasMessage()) {
|
||||
self->messages.emplace_back(r->version(), r->getMessage(), r->getTags(), r->arena());
|
||||
wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize()));
|
||||
if (!sameArena(prev, r->arena())) {
|
||||
wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize()));
|
||||
prev = r->arena();
|
||||
}
|
||||
r->nextMessage();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue