removed logging
This commit is contained in:
parent
22f5033c6a
commit
d7491a8f30
|
@ -645,8 +645,6 @@ private:
|
|||
throw io_error();
|
||||
}
|
||||
|
||||
TraceEvent("SimpleFileWrite").detail("Size", data.size()).detail("Filename", self->filename).detail("Fd", self->h);
|
||||
|
||||
unsigned int write_bytes = 0;
|
||||
if ((write_bytes = _write(self->h, (void*)data.begin(), data.size())) == -1) {
|
||||
TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 4);
|
||||
|
@ -689,13 +687,9 @@ private:
|
|||
throw io_error();
|
||||
}
|
||||
|
||||
TraceEvent("SimpleFileTruncate1").detail("Size", size).detail("Filename", self->filename).detail("Fd", self->h);
|
||||
|
||||
if (self->delayOnWrite)
|
||||
wait(waitUntilDiskReady(self->diskParameters, 0));
|
||||
|
||||
TraceEvent("SimpleFileTruncate2").detail("Size", size).detail("Filename", self->filename).detail("Fd", self->h);
|
||||
|
||||
if (_chsize(self->h, (long)size) == -1) {
|
||||
TraceEvent(SevWarn, "SimpleFileIOError")
|
||||
.detail("Location", 6)
|
||||
|
|
|
@ -1257,9 +1257,6 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
|
|||
const Optional<UID>& debugID = self->debugID;
|
||||
|
||||
if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) {
|
||||
if(self->commitVersion > 327594994) {
|
||||
TraceEvent("DbgAdvMinCommit").detail("Ver", self->commitVersion);
|
||||
}
|
||||
debug_advanceMinCommittedVersion(UID(), self->commitVersion);
|
||||
}
|
||||
|
||||
|
|
|
@ -102,70 +102,6 @@ struct StringBuffer {
|
|||
}
|
||||
};
|
||||
|
||||
#pragma pack(push, 1)
|
||||
struct DbgPageHeader {
|
||||
union {
|
||||
UID hash;
|
||||
struct {
|
||||
uint32_t hash32;
|
||||
uint32_t _unused;
|
||||
uint16_t magic;
|
||||
uint16_t implementationVersion;
|
||||
};
|
||||
};
|
||||
uint64_t seq; // seq is the index of the virtually infinite disk queue file. Its unit is bytes.
|
||||
uint64_t popped;
|
||||
int payloadSize;
|
||||
};
|
||||
// The on disk format depends on the size of PageHeader.
|
||||
static_assert(sizeof(DbgPageHeader) == 36, "PageHeader must be 36 bytes");
|
||||
|
||||
struct DbgPage : DbgPageHeader {
|
||||
static const int maxPayload = _PAGE_SIZE - sizeof(DbgPageHeader);
|
||||
uint8_t payload[maxPayload];
|
||||
|
||||
DiskQueueVersion diskQueueVersion() const { return static_cast<DiskQueueVersion>(implementationVersion); }
|
||||
int remainingCapacity() const { return maxPayload - payloadSize; }
|
||||
uint64_t endSeq() const { return seq + sizeof(DbgPageHeader) + payloadSize; }
|
||||
UID checksum_hashlittle2() const {
|
||||
// SOMEDAY: Better hash?
|
||||
uint32_t part[2] = { 0x12345678, 0xbeefabcd };
|
||||
hashlittle2(&seq, sizeof(DbgPage) - sizeof(UID), &part[0], &part[1]);
|
||||
return UID(int64_t(part[0]) << 32 | part[1], 0xFDB);
|
||||
}
|
||||
uint32_t checksum_crc32c() const {
|
||||
return crc32c_append(0xfdbeefdb, (uint8_t*)&_unused, sizeof(DbgPage) - sizeof(uint32_t));
|
||||
}
|
||||
void updateHash() {
|
||||
switch (diskQueueVersion()) {
|
||||
case DiskQueueVersion::V0: {
|
||||
hash = checksum_hashlittle2();
|
||||
return;
|
||||
}
|
||||
case DiskQueueVersion::V1:
|
||||
default: {
|
||||
hash32 = checksum_crc32c();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
bool checkHash() {
|
||||
switch (diskQueueVersion()) {
|
||||
case DiskQueueVersion::V0: {
|
||||
return hash == checksum_hashlittle2();
|
||||
}
|
||||
case DiskQueueVersion::V1: {
|
||||
return hash32 == checksum_crc32c();
|
||||
}
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
void zeroPad() { memset(payload + payloadSize, 0, maxPayload - payloadSize); }
|
||||
};
|
||||
static_assert(sizeof(DbgPage) == _PAGE_SIZE, "Page must be 4k");
|
||||
#pragma pack(pop)
|
||||
|
||||
struct SyncQueue : ReferenceCounted<SyncQueue> {
|
||||
SyncQueue(int outstandingLimit, Reference<IAsyncFile> file) : outstandingLimit(outstandingLimit), file(file) {
|
||||
for (int i = 0; i < outstandingLimit; i++)
|
||||
|
@ -186,9 +122,7 @@ private:
|
|||
ACTOR static Future<Void> waitAndSync(SyncQueue* self) {
|
||||
wait(self->outstanding.front());
|
||||
self->outstanding.pop_front();
|
||||
TraceEvent("FileSyncStart").detail("Name", self->file->getFilename());
|
||||
wait(self->file->sync());
|
||||
TraceEvent("FileSyncEnd").detail("Name", self->file->getFilename());
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
@ -364,9 +298,7 @@ public:
|
|||
state int64_t remainingFileSize = wait(file->size());
|
||||
|
||||
for (; remainingFileSize > 0; remainingFileSize -= FLOW_KNOBS->INCREMENTAL_DELETE_TRUNCATE_AMOUNT) {
|
||||
TraceEvent("DiskQueueReplaceTruncate").detail("Size", remainingFileSize);
|
||||
wait(file->truncate(remainingFileSize));
|
||||
TraceEvent("DiskQueueReplaceTruncate2").detail("Size", remainingFileSize);
|
||||
wait(file->sync());
|
||||
wait(delay(FLOW_KNOBS->INCREMENTAL_DELETE_INTERVAL));
|
||||
}
|
||||
|
@ -419,8 +351,8 @@ public:
|
|||
int p = self->files[1].size - self->writingPos;
|
||||
if (p > 0) {
|
||||
toSync->push_back(self->files[1].syncQueue);
|
||||
TraceEvent("RDQWriteAndSwap", self->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
|
||||
.detail("WritingPos", self->writingPos).detail("WritingBytes", p);
|
||||
//TraceEvent("RDQWriteAndSwap", self->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
|
||||
// .detail("WritingPos", self->writingPos).detail("WritingBytes", p);
|
||||
waitfor.push_back(self->files[1].f->write(pageData.begin(), p, self->writingPos));
|
||||
pageData = pageData.substr(p);
|
||||
}
|
||||
|
@ -450,9 +382,6 @@ public:
|
|||
.detail("ElidedTruncateSize", maxShrink);
|
||||
Reference<IAsyncFile> newFile = wait(replaceFile(self->files[1].f));
|
||||
self->files[1].setFile(newFile);
|
||||
TraceEvent("DiskQueueReplaceFile2", self->dbgid)
|
||||
.detail("Filename", self->files[1].f->getFilename())
|
||||
.detail("NewSize", self->fileExtensionBytes);
|
||||
waitfor.push_back(self->files[1].f->truncate(self->fileExtensionBytes));
|
||||
self->files[1].size = self->fileExtensionBytes;
|
||||
} else {
|
||||
|
@ -469,7 +398,7 @@ public:
|
|||
} else {
|
||||
// Extend self->files[1] to accomodate the new write and about 10MB or 2x current size for future
|
||||
// writes.
|
||||
TraceEvent("RDQExtend", self->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size);
|
||||
//TraceEvent("RDQExtend", self->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size);
|
||||
int64_t minExtension = pageData.size() + self->writingPos - self->files[1].size;
|
||||
self->files[1].size += std::min(std::max(self->fileExtensionBytes, minExtension),
|
||||
self->files[0].size + self->files[1].size + minExtension);
|
||||
|
@ -486,10 +415,8 @@ public:
|
|||
// If this is the first write to a brand new disk queue file.
|
||||
*self->firstPages[1] = *(const Page*)pageData.begin();
|
||||
}
|
||||
|
||||
DbgPage* dbgPage = (DbgPage*)pageData.begin();
|
||||
TraceEvent("RDQWrite", self->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
|
||||
.detail("WritingPos", self->writingPos).detail("WritingBytes", pageData.size()).detail("Seq", dbgPage->seq);
|
||||
//TraceEvent("RDQWrite", self->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
|
||||
// .detail("WritingPos", self->writingPos).detail("WritingBytes", pageData.size());
|
||||
self->files[1].size = std::max(self->files[1].size, self->writingPos + pageData.size());
|
||||
toSync->push_back(self->files[1].syncQueue);
|
||||
waitfor.push_back(self->files[1].f->write(pageData.begin(), pageData.size(), self->writingPos));
|
||||
|
@ -550,9 +477,9 @@ public:
|
|||
|
||||
self->updatePopped(poppedPages * sizeof(Page));
|
||||
|
||||
TraceEvent("RDQCommitEnd", self->dbgid).detail("DeltaPopped", poppedPages*sizeof(Page)).detail("PoppedCommitted", self->dbg_file0BeginSeq + self->files[0].popped + self->files[1].popped)
|
||||
.detail("File0Size", self->files[0].size).detail("File1Size", self->files[1].size)
|
||||
.detail("File0Name", self->files[0].dbgFilename).detail("SyncedFiles", syncFiles.size());
|
||||
//TraceEvent("RDQCommitEnd", self->dbgid).detail("DeltaPopped", poppedPages*sizeof(Page)).detail("PoppedCommitted", self->dbg_file0BeginSeq + self->files[0].popped + self->files[1].popped)
|
||||
// .detail("File0Size", self->files[0].size).detail("File1Size", self->files[1].size)
|
||||
// .detail("File0Name", self->files[0].dbgFilename).detail("SyncedFiles", syncFiles.size());
|
||||
|
||||
committed.send(Void());
|
||||
} catch (Error& e) {
|
||||
|
@ -721,19 +648,6 @@ public:
|
|||
reads.push_back(self->files[i].f->read(self->firstPages[i], sizeof(Page), 0));
|
||||
wait(waitForAll(reads));
|
||||
|
||||
DbgPage* dbgp1 = (DbgPage*)self->firstPages[0];
|
||||
DbgPage* dbgp2 = (DbgPage*)self->firstPages[1];
|
||||
|
||||
TraceEvent("RDQFirstPages", self->dbgid)
|
||||
.detail("P1Seq", dbgp1->seq)
|
||||
.detail("P2Seq", dbgp2->seq)
|
||||
.detail("P1Hash", dbgp1->checkHash())
|
||||
.detail("P2Hash", dbgp2->checkHash())
|
||||
.detail("File0Name", self->files[0].dbgFilename)
|
||||
.detail("File0size", self->files[0].size)
|
||||
.detail("File1size", self->files[1].size);
|
||||
|
||||
|
||||
// Determine which file comes first
|
||||
if (compare(self->firstPages[1], self->firstPages[0])) {
|
||||
std::swap(self->firstPages[0], self->firstPages[1]);
|
||||
|
@ -780,14 +694,12 @@ public:
|
|||
ASSERT(len == sizeof(Page));
|
||||
|
||||
bool middleValid = compare(self->firstPages[1], middlePage);
|
||||
DbgPage* midpage = (DbgPage*)middlePage;
|
||||
|
||||
TraceEvent("RDQBS", self->dbgid)
|
||||
.detail("Begin", begin)
|
||||
.detail("End", end)
|
||||
.detail("Middle", middle)
|
||||
.detail("Valid", middleValid)
|
||||
.detail("MiddleSeq", midpage->seq)
|
||||
.detail("File0Name", self->files[0].dbgFilename);
|
||||
|
||||
if (middleValid)
|
||||
|
@ -804,11 +716,9 @@ public:
|
|||
int len2 = wait(self->files[1].f->read(middlePage, sizeof(Page), begin * sizeof(Page)));
|
||||
ASSERT(len2 == sizeof(Page) && compare(self->firstPages[1], middlePage));
|
||||
|
||||
DbgPage* finalPage = (DbgPage*)middlePage;
|
||||
TraceEvent("RDQEndFound", self->dbgid)
|
||||
.detail("File0Name", self->files[0].dbgFilename)
|
||||
.detail("Pos", begin)
|
||||
.detail("Seq", finalPage->seq)
|
||||
.detail("FileSize", self->files[1].size);
|
||||
|
||||
return middlePageAllocation;
|
||||
|
@ -860,7 +770,6 @@ public:
|
|||
auto pos = readingPage * sizeof(Page);
|
||||
readingPage += len / sizeof(Page);
|
||||
ASSERT(int64_t(p) % sizeof(Page) == 0);
|
||||
TraceEvent("RDQFillReadingBuffer").detail("File0name", files[readingFile].dbgFilename).detail("Len", len).detail("Pos", pos);
|
||||
return files[readingFile].f->read(p, len, pos);
|
||||
}
|
||||
|
||||
|
@ -1053,10 +962,10 @@ public:
|
|||
warnAlwaysForMemory = false;
|
||||
}
|
||||
|
||||
TraceEvent("DQCommit", dbgid).detail("Pages", pushedPageCount()).detail("LastPoppedSeq", lastPoppedSeq).detail("PoppedSeq", poppedSeq).detail("NextPageSeq", nextPageSeq)
|
||||
.detail("RawFile0Size", rawQueue->files[0].size).detail("RawFile1Size",
|
||||
rawQueue->files[1].size).detail("WritingPos", rawQueue->writingPos) .detail("RawFile0Name",
|
||||
rawQueue->files[0].dbgFilename);
|
||||
//TraceEvent("DQCommit", dbgid).detail("Pages", pushedPageCount()).detail("LastPoppedSeq", lastPoppedSeq).detail("PoppedSeq", poppedSeq).detail("NextPageSeq", nextPageSeq)
|
||||
// .detail("RawFile0Size", rawQueue->files[0].size).detail("RawFile1Size",
|
||||
// rawQueue->files[1].size).detail("WritingPos", rawQueue->writingPos) .detail("RawFile0Name",
|
||||
// rawQueue->files[0].dbgFilename);
|
||||
|
||||
lastCommittedSeq = backPage().endSeq();
|
||||
auto f = rawQueue->pushAndCommit(
|
||||
|
@ -1445,7 +1354,7 @@ private:
|
|||
wait(self->rawQueue->truncateBeforeLastReadPage());
|
||||
break;
|
||||
}
|
||||
TraceEvent("DQRecPage", self->dbgid).detail("NextReadLoc", self->nextReadLocation).detail("Seq", self->readBufPage->seq).detail("Pop", self->readBufPage->popped).detail("Payload", self->readBufPage->payloadSize).detail("File0Name", self->rawQueue->files[0].dbgFilename);
|
||||
//TraceEvent("DQRecPage", self->dbgid).detail("NextReadLoc", self->nextReadLocation).detail("Seq", self->readBufPage->seq).detail("Pop", self->readBufPage->popped).detail("Payload", self->readBufPage->payloadSize).detail("File0Name", self->rawQueue->files[0].dbgFilename);
|
||||
ASSERT(self->readBufPage->seq == pageFloor(self->nextReadLocation));
|
||||
self->lastPoppedSeq = self->readBufPage->popped;
|
||||
}
|
||||
|
|
|
@ -160,7 +160,6 @@ private:
|
|||
loop {
|
||||
state IDiskQueue::location startloc = self->queue->getNextReadLocation();
|
||||
Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t)));
|
||||
TraceEvent("TLogRestRead1", tLog->dbgid).detail("StartLoc", startloc).detail("Size",h.size());
|
||||
if (h.size() != sizeof(uint32_t)) {
|
||||
if (h.size()) {
|
||||
TEST(true); // Zero fill within size field
|
||||
|
@ -176,7 +175,6 @@ private:
|
|||
ASSERT(payloadSize < (100 << 20));
|
||||
|
||||
Standalone<StringRef> e = wait(self->queue->readNext(payloadSize + 1));
|
||||
TraceEvent("TLogRestRead2", tLog->dbgid).detail("Size",e.size()).detail("Expected", payloadSize + 1);
|
||||
if (e.size() != payloadSize + 1) {
|
||||
TEST(true); // Zero fill within payload
|
||||
zeroFillSize = payloadSize + 1 - e.size();
|
||||
|
@ -190,7 +188,6 @@ private:
|
|||
ar >> result;
|
||||
const IDiskQueue::location endloc = self->queue->getNextReadLocation();
|
||||
self->updateVersionSizes(result, tLog, startloc, endloc);
|
||||
TraceEvent("TLogRestRead3", tLog->dbgid).detail("StartLoc",startloc).detail("EndLoc", endloc);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -760,7 +757,7 @@ void TLogQueue::push(T const& qe, Reference<LogData> logData) {
|
|||
const IDiskQueue::location startloc = queue->getNextPushLocation();
|
||||
// FIXME: push shouldn't return anything. We should call getNextPushLocation() again.
|
||||
const IDiskQueue::location endloc = queue->push(wr.toValue());
|
||||
TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Start", startloc).detail("End", endloc).detail("Ver", qe.version);
|
||||
//TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Start", startloc).detail("End", endloc).detail("Ver", qe.version);
|
||||
logData->versionLocation[qe.version] = std::make_pair(startloc, endloc);
|
||||
}
|
||||
|
||||
|
@ -1963,10 +1960,7 @@ ACTOR Future<Void> doQueueCommit(TLogData* self,
|
|||
logData->recoveryComplete.send(Void());
|
||||
}
|
||||
|
||||
if(ver > 327594994) {
|
||||
TraceEvent("TLogCommitDurable2", self->dbgid).detail("Version", ver);
|
||||
}
|
||||
|
||||
//TraceEvent("TLogCommitDurable2", self->dbgid).detail("Version", ver);
|
||||
if (logData->logSystem->get() &&
|
||||
(!logData->isPrimary || logData->logRouterPoppedVersion < logData->logRouterPopToVersion)) {
|
||||
logData->logRouterPoppedVersion = ver;
|
||||
|
@ -2953,11 +2947,11 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
|
|||
}
|
||||
}
|
||||
|
||||
TraceEvent("TLogRecoveredQE", self->dbgid)
|
||||
.detail("LogId", qe.id)
|
||||
.detail("Ver", qe.version)
|
||||
.detail("MessageBytes", qe.messages.size())
|
||||
.detail("Version", logData->version.get());
|
||||
//TraceEvent("TLogRecoveredQE", self->dbgid)
|
||||
// .detail("LogId", qe.id)
|
||||
// .detail("Ver", qe.version)
|
||||
// .detail("MessageBytes", qe.messages.size())
|
||||
// .detail("Version", logData->version.get());
|
||||
|
||||
if (logData) {
|
||||
if (!self->spillOrder.size() || self->spillOrder.back() != qe.id) {
|
||||
|
|
Loading…
Reference in New Issue