Merge pull request #4503 from sfc-gh-ajbeamon/merge-release-6.3-into-master
Final merge of release-6.3 into master
This commit is contained in:
commit
3751ecd11f
|
@ -2,6 +2,12 @@
|
|||
Release Notes
|
||||
#############
|
||||
|
||||
6.2.33
|
||||
======
|
||||
* Fixed an issue where storage servers could shutdown with ``unknown_error``. `(PR #4437) <https://github.com/apple/foundationdb/pull/4437>`_
|
||||
* Fix backup agent stall when writing to local filesystem with slow metadata operations. `(PR #4428) <https://github.com/apple/foundationdb/pull/4428>`_
|
||||
* Backup agent no longer uses 4k block caching layer on local output files so that write operations are larger. `(PR #4428) <https://github.com/apple/foundationdb/pull/4428>`_
|
||||
|
||||
6.2.32
|
||||
======
|
||||
* Fix an issue where symbolic links in cmake-built RPMs are broken if you unpack the RPM to a custom directory. `(PR #4380) <https://github.com/apple/foundationdb/pull/4380>`_
|
||||
|
|
|
@ -8,28 +8,18 @@ Release Notes
|
|||
|
||||
6.3.11
|
||||
======
|
||||
* Added a hint field in the trace event when all replicas of some data are lost. `(PR #4209) <https://github.com/apple/foundationdb/pull/4209>`_
|
||||
* Rewrote SQLite injected fault handling. `(PR #4212) <https://github.com/apple/foundationdb/pull/4212>`_
|
||||
* Add a SevWarnAlways trace line to help debug a rare failure. `(PR #4214) <https://github.com/apple/foundationdb/pull/4214>`_
|
||||
* Use VFSAsyncFile::checkInjectedError to detect injected faults. `(PR #4253) <https://github.com/apple/foundationdb/pull/4253>`_
|
||||
* Build on Windows using VS 2019 + LLVM/Clang. `(PR #4258) <https://github.com/apple/foundationdb/pull/4258>`_
|
||||
* RateControl support in AFCCached to enable write op throttling. The feature is disabled by default. `(PR #4229) <https://github.com/apple/foundationdb/pull/4229>`_
|
||||
* Add knobs for prefix bloom filters and larger block cache for RocksDB. `(PR #4201) <https://github.com/apple/foundationdb/pull/4201>`_
|
||||
* Adding debug tools to FDB runtime image. `(PR #4247) <https://github.com/apple/foundationdb/pull/4247>`_
|
||||
* Fix bug in simulated coordinator selection. `(PR #4285) <https://github.com/apple/foundationdb/pull/4285>`_
|
||||
* Add option to prevent synchronous file deletes on reads for RocksDB. `(PR #4270) <https://github.com/apple/foundationdb/pull/4270>`_
|
||||
* Report warning when TLS verification fails. `(PR #4299) <https://github.com/apple/foundationdb/pull/4299>`_
|
||||
* Support multiple worker threads for each version of client that is loaded so that each cluster will be serviced by a client thread. `(PR #4269) <https://github.com/apple/foundationdb/pull/4269>`_
|
||||
* Reboot simulated process on io_timeout error. `(PR #4345) <https://github.com/apple/foundationdb/pull/4345>`_
|
||||
* Fix Snapshot backup test failure. `(PR #4372) <https://github.com/apple/foundationdb/pull/4372>`_
|
||||
|
||||
* Support multiple worker threads for each client version that is loaded. `(PR #4269) <https://github.com/apple/foundationdb/pull/4269>`_
|
||||
* fdbcli: Output errors and warnings to stderr. `(PR #4332) <https://github.com/apple/foundationdb/pull/4332>`_
|
||||
* Do not generate machine id in locality field if it is set by the user. `(PR #4022) <https://github.com/apple/foundationdb/pull/4022>`_
|
||||
* Make the RocksDB init method idempotent. `(PR #4400) <https://github.com/apple/foundationdb/pull/4400>`_
|
||||
* Fix bugs turned up by _GLIBCXX_DEBUG. `(PR #4301) <https://github.com/apple/foundationdb/pull/4301>`_
|
||||
* Add New Unit and Integration Tests, and associated infrastructure. `(PR #4366) <https://github.com/apple/foundationdb/pull/4366>`_
|
||||
* Do not rely on shared memory to generate a machine id if it is set explicitly. `(Issue #4022) <https://github.com/apple/foundationdb/pull/4022>`_
|
||||
* Added ``workload.transactions.rejected_for_queued_too_long`` to status to report the number of transaction commits that failed because they were queued too long and could no longer be checked for conflicts. `(PR #4353) <https://github.com/apple/foundationdb/pull/4353>`_
|
||||
* Add knobs for prefix bloom filters and larger block cache for RocksDB. `(PR #4201) <https://github.com/apple/foundationdb/pull/4201>`_
|
||||
* Add option to prevent synchronous file deletes on reads for RocksDB. `(PR #4270) <https://github.com/apple/foundationdb/pull/4270>`_
|
||||
* Build on Windows using VS 2019 + LLVM/Clang. `(PR #4258) <https://github.com/apple/foundationdb/pull/4258>`_
|
||||
|
||||
6.3.10
|
||||
======
|
||||
|
||||
* Make fault tolerance metric calculation in HA clusters consistent with 6.2 branch. `(PR #4175) <https://github.com/apple/foundationdb/pull/4175>`_
|
||||
* Bug fix, stack overflow in redwood storage engine. `(PR #4161) <https://github.com/apple/foundationdb/pull/4161>`_
|
||||
* Bug fix, getting certain special keys fail. `(PR #4128) <https://github.com/apple/foundationdb/pull/4128>`_
|
||||
|
|
|
@ -1431,30 +1431,41 @@ BackupContainerFileSystem::VersionProperty BackupContainerFileSystem::logType()
|
|||
namespace backup_test {
|
||||
|
||||
int chooseFileSize(std::vector<int>& sizes) {
|
||||
int size = 1000;
|
||||
if (!sizes.empty()) {
|
||||
size = sizes.back();
|
||||
int size = sizes.back();
|
||||
sizes.pop_back();
|
||||
return size;
|
||||
}
|
||||
return size;
|
||||
return deterministicRandom()->randomInt(0, 2e6);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> writeAndVerifyFile(Reference<IBackupContainer> c, Reference<IBackupFile> f, int size) {
|
||||
state Standalone<StringRef> content;
|
||||
if (size > 0) {
|
||||
content = makeString(size);
|
||||
for (int i = 0; i < content.size(); ++i)
|
||||
mutateString(content)[i] = (uint8_t)deterministicRandom()->randomInt(0, 256);
|
||||
ACTOR Future<Void> writeAndVerifyFile(Reference<IBackupContainer> c, Reference<IBackupFile> f, int size, FlowLock* lock) {
|
||||
state Standalone<VectorRef<uint8_t>> content;
|
||||
|
||||
wait(f->append(content.begin(), content.size()));
|
||||
wait(lock->take(TaskPriority::DefaultYield, size));
|
||||
state FlowLock::Releaser releaser(*lock, size);
|
||||
|
||||
printf("writeAndVerify size=%d file=%s\n", size, f->getFileName().c_str());
|
||||
content.resize(content.arena(), size);
|
||||
for (int i = 0; i < content.size(); ++i) {
|
||||
content[i] = (uint8_t)deterministicRandom()->randomInt(0, 256);
|
||||
}
|
||||
|
||||
state VectorRef<uint8_t> sendBuf = content;
|
||||
while (sendBuf.size() > 0) {
|
||||
state int n = std::min(sendBuf.size(), deterministicRandom()->randomInt(1, 16384));
|
||||
wait(f->append(sendBuf.begin(), n));
|
||||
sendBuf.pop_front(n);
|
||||
}
|
||||
wait(f->finish());
|
||||
|
||||
state Reference<IAsyncFile> inputFile = wait(c->readFile(f->getFileName()));
|
||||
int64_t fileSize = wait(inputFile->size());
|
||||
ASSERT(size == fileSize);
|
||||
if (size > 0) {
|
||||
state Standalone<StringRef> buf = makeString(size);
|
||||
int b = wait(inputFile->read(mutateString(buf), buf.size(), 0));
|
||||
state Standalone<VectorRef<uint8_t>> buf;
|
||||
buf.resize(buf.arena(), fileSize);
|
||||
int b = wait(inputFile->read(buf.begin(), buf.size(), 0));
|
||||
ASSERT(b == buf.size());
|
||||
ASSERT(buf == content);
|
||||
}
|
||||
|
@ -1491,6 +1502,8 @@ ACTOR static Future<Void> testWriteSnapshotFile(Reference<IBackupFile> file, Key
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> testBackupContainer(std::string url) {
|
||||
state FlowLock lock(100e6);
|
||||
|
||||
printf("BackupContainerTest URL %s\n", url.c_str());
|
||||
|
||||
state Reference<IBackupContainer> c = IBackupContainer::openContainer(url);
|
||||
|
@ -1514,7 +1527,11 @@ ACTOR static Future<Void> testBackupContainer(std::string url) {
|
|||
state Version v = deterministicRandom()->randomInt64(0, std::numeric_limits<Version>::max() / 2);
|
||||
|
||||
// List of sizes to use to test edge cases on underlying file implementations
|
||||
state std::vector<int> fileSizes = { 0, 10000000, 5000005 };
|
||||
state std::vector<int> fileSizes = { 0 };
|
||||
if (StringRef(url).startsWith(LiteralStringRef("blob"))) {
|
||||
fileSizes.push_back(CLIENT_KNOBS->BLOBSTORE_MULTIPART_MIN_PART_SIZE);
|
||||
fileSizes.push_back(CLIENT_KNOBS->BLOBSTORE_MULTIPART_MIN_PART_SIZE + 10);
|
||||
}
|
||||
|
||||
loop {
|
||||
state Version logStart = v;
|
||||
|
@ -1541,7 +1558,7 @@ ACTOR static Future<Void> testBackupContainer(std::string url) {
|
|||
int size = chooseFileSize(fileSizes);
|
||||
snapshotSizes.rbegin()->second += size;
|
||||
// Write in actual range file format, instead of random data.
|
||||
// writes.push_back(writeAndVerifyFile(c, range, size));
|
||||
// writes.push_back(writeAndVerifyFile(c, range, size, &lock));
|
||||
wait(testWriteSnapshotFile(range, begin, end, blockSize));
|
||||
|
||||
if (deterministicRandom()->random01() < .2) {
|
||||
|
@ -1562,7 +1579,7 @@ ACTOR static Future<Void> testBackupContainer(std::string url) {
|
|||
state Reference<IBackupFile> log = wait(c->writeLogFile(logStart, v, 10));
|
||||
logs[logStart] = log->getFileName();
|
||||
int size = chooseFileSize(fileSizes);
|
||||
writes.push_back(writeAndVerifyFile(c, log, size));
|
||||
writes.push_back(writeAndVerifyFile(c, log, size, &lock));
|
||||
|
||||
// Randomly stop after a snapshot has finished and all manually seeded file sizes have been used.
|
||||
if (fileSizes.empty() && !snapshots.empty() && snapshots.rbegin()->second.empty() &&
|
||||
|
|
|
@ -31,21 +31,29 @@ namespace {
|
|||
class BackupFile : public IBackupFile, ReferenceCounted<BackupFile> {
|
||||
public:
|
||||
BackupFile(const std::string& fileName, Reference<IAsyncFile> file, const std::string& finalFullPath)
|
||||
: IBackupFile(fileName), m_file(file), m_finalFullPath(finalFullPath), m_writeOffset(0) {
|
||||
m_buffer.reserve(m_buffer.arena(), CLIENT_KNOBS->BACKUP_LOCAL_FILE_WRITE_BLOCK);
|
||||
: IBackupFile(fileName), m_file(file), m_finalFullPath(finalFullPath), m_writeOffset(0), m_blockSize(CLIENT_KNOBS->BACKUP_LOCAL_FILE_WRITE_BLOCK) {
|
||||
if (BUGGIFY) {
|
||||
m_blockSize = deterministicRandom()->randomInt(100, 20000);
|
||||
}
|
||||
m_buffer.reserve(m_buffer.arena(), m_blockSize);
|
||||
}
|
||||
|
||||
Future<Void> append(const void* data, int len) override {
|
||||
m_buffer.append(m_buffer.arena(), (const uint8_t*)data, len);
|
||||
|
||||
if (m_buffer.size() >= CLIENT_KNOBS->BACKUP_LOCAL_FILE_WRITE_BLOCK) {
|
||||
return flush(CLIENT_KNOBS->BACKUP_LOCAL_FILE_WRITE_BLOCK);
|
||||
if (m_buffer.size() >= m_blockSize) {
|
||||
return flush(m_blockSize);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> flush(int size) {
|
||||
// Avoid empty write
|
||||
if (size == 0) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ASSERT(size <= m_buffer.size());
|
||||
|
||||
// Keep a reference to the old buffer
|
||||
|
@ -66,7 +74,7 @@ public:
|
|||
wait(f->m_file->sync());
|
||||
std::string name = f->m_file->getFilename();
|
||||
f->m_file.clear();
|
||||
renameFile(name, f->m_finalFullPath);
|
||||
wait(IAsyncFileSystem::filesystem()->renameFile(name, f->m_finalFullPath));
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -82,6 +90,7 @@ private:
|
|||
Standalone<VectorRef<uint8_t>> m_buffer;
|
||||
int64_t m_writeOffset;
|
||||
std::string m_finalFullPath;
|
||||
int m_blockSize;
|
||||
};
|
||||
|
||||
ACTOR static Future<BackupContainerFileSystem::FilesAndSizesT> listFiles_impl(std::string path, std::string m_path) {
|
||||
|
@ -249,7 +258,7 @@ Future<Reference<IAsyncFile>> BackupContainerLocalDirectory::readFile(const std:
|
|||
}
|
||||
|
||||
Future<Reference<IBackupFile>> BackupContainerLocalDirectory::writeFile(const std::string& path) {
|
||||
int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE |
|
||||
int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE |
|
||||
IAsyncFile::OPEN_READWRITE;
|
||||
std::string fullPath = joinPath(m_path, path);
|
||||
platform::createDirectory(parentDirectory(fullPath));
|
||||
|
|
|
@ -1205,7 +1205,7 @@ struct BackupRangeTaskFunc : BackupTaskFuncBase {
|
|||
wait(rangeFile.writeKey(nextKey));
|
||||
|
||||
if (BUGGIFY) {
|
||||
rangeFile.padEnd();
|
||||
wait(rangeFile.padEnd());
|
||||
}
|
||||
|
||||
bool usedFile = wait(
|
||||
|
|
|
@ -124,7 +124,7 @@ void ClientKnobs::initialize(bool randomize) {
|
|||
init( TASKBUCKET_MAX_TASK_KEYS, 1000 ); if( randomize && BUGGIFY ) TASKBUCKET_MAX_TASK_KEYS = 20;
|
||||
|
||||
//Backup
|
||||
init( BACKUP_LOCAL_FILE_WRITE_BLOCK, 1024*1024 ); if( randomize && BUGGIFY ) BACKUP_LOCAL_FILE_WRITE_BLOCK = 100;
|
||||
init( BACKUP_LOCAL_FILE_WRITE_BLOCK, 1024*1024 );
|
||||
init( BACKUP_CONCURRENT_DELETES, 100 );
|
||||
init( BACKUP_SIMULATED_LIMIT_BYTES, 1e6 ); if( randomize && BUGGIFY ) BACKUP_SIMULATED_LIMIT_BYTES = 1000;
|
||||
init( BACKUP_GET_RANGE_LIMIT_BYTES, 1e6 );
|
||||
|
|
|
@ -1236,52 +1236,64 @@ std::vector<std::pair<std::string, bool>> MultiVersionApi::copyExternalLibraryPe
|
|||
ASSERT_GE(threadCount, 1);
|
||||
// Copy library for each thread configured per version
|
||||
std::vector<std::pair<std::string, bool>> paths;
|
||||
// It's tempting to use the so once without copying. However, we don't know
|
||||
// if the thing we're about to copy is the shared object executing this code
|
||||
// or not, so this optimization is unsafe.
|
||||
// paths.push_back({path, false});
|
||||
for (int ii = 0; ii < threadCount; ++ii) {
|
||||
std::string filename = basename(path);
|
||||
|
||||
char tempName[PATH_MAX + 12];
|
||||
sprintf(tempName, "/tmp/%s-XXXXXX", filename.c_str());
|
||||
int tempFd = mkstemp(tempName);
|
||||
int fd;
|
||||
if (threadCount == 1) {
|
||||
paths.push_back({ path, false });
|
||||
} else {
|
||||
// It's tempting to use the so once without copying. However, we don't know
|
||||
// if the thing we're about to copy is the shared object executing this code
|
||||
// or not, so this optimization is unsafe.
|
||||
// paths.push_back({path, false});
|
||||
for (int ii = 0; ii < threadCount; ++ii) {
|
||||
std::string filename = basename(path);
|
||||
|
||||
if ((fd = open(path.c_str(), O_RDONLY)) == -1) {
|
||||
TraceEvent("ExternalClientNotFound").detail("LibraryPath", path);
|
||||
throw file_not_found();
|
||||
}
|
||||
char tempName[PATH_MAX + 12];
|
||||
sprintf(tempName, "/tmp/%s-XXXXXX", filename.c_str());
|
||||
int tempFd = mkstemp(tempName);
|
||||
int fd;
|
||||
|
||||
constexpr size_t buf_sz = 4096;
|
||||
char buf[buf_sz];
|
||||
while (1) {
|
||||
ssize_t readCount = read(fd, buf, buf_sz);
|
||||
if (readCount == 0) {
|
||||
// eof
|
||||
break;
|
||||
if ((fd = open(path.c_str(), O_RDONLY)) == -1) {
|
||||
TraceEvent("ExternalClientNotFound").detail("LibraryPath", path);
|
||||
throw file_not_found();
|
||||
}
|
||||
if (readCount == -1) {
|
||||
TraceEvent(SevError, "ExternalClientCopyFailedReadError").GetLastError().detail("LibraryPath", path);
|
||||
throw platform_error();
|
||||
}
|
||||
ssize_t written = 0;
|
||||
while (written != readCount) {
|
||||
ssize_t writeCount = write(tempFd, buf + written, readCount - written);
|
||||
if (writeCount == -1) {
|
||||
TraceEvent(SevError, "ExternalClientCopyFailedWriteError")
|
||||
|
||||
TraceEvent("CopyingExternalClient")
|
||||
.detail("FileName", filename)
|
||||
.detail("LibraryPath", path)
|
||||
.detail("TempPath", tempName);
|
||||
|
||||
constexpr size_t buf_sz = 4096;
|
||||
char buf[buf_sz];
|
||||
while (1) {
|
||||
ssize_t readCount = read(fd, buf, buf_sz);
|
||||
if (readCount == 0) {
|
||||
// eof
|
||||
break;
|
||||
}
|
||||
if (readCount == -1) {
|
||||
TraceEvent(SevError, "ExternalClientCopyFailedReadError")
|
||||
.GetLastError()
|
||||
.detail("LibraryPath", path);
|
||||
throw platform_error();
|
||||
}
|
||||
written += writeCount;
|
||||
ssize_t written = 0;
|
||||
while (written != readCount) {
|
||||
ssize_t writeCount = write(tempFd, buf + written, readCount - written);
|
||||
if (writeCount == -1) {
|
||||
TraceEvent(SevError, "ExternalClientCopyFailedWriteError")
|
||||
.GetLastError()
|
||||
.detail("LibraryPath", path);
|
||||
throw platform_error();
|
||||
}
|
||||
written += writeCount;
|
||||
}
|
||||
}
|
||||
|
||||
close(fd);
|
||||
close(tempFd);
|
||||
|
||||
paths.push_back({ tempName, true }); // use + delete temporary copies of the library.
|
||||
}
|
||||
|
||||
close(fd);
|
||||
close(tempFd);
|
||||
|
||||
paths.push_back({ tempName, true }); // use + delete temporary copies of the library.
|
||||
}
|
||||
|
||||
return paths;
|
||||
|
@ -1415,16 +1427,24 @@ void MultiVersionApi::setupNetwork() {
|
|||
if (externalClients.count(filename) == 0) {
|
||||
externalClients[filename] = {};
|
||||
for (const auto& tmp : copyExternalLibraryPerThread(path)) {
|
||||
TraceEvent("AddingExternalClient")
|
||||
.detail("FileName", filename)
|
||||
.detail("LibraryPath", path)
|
||||
.detail("TempPath", tmp.first);
|
||||
externalClients[filename].push_back(Reference<ClientInfo>(
|
||||
new ClientInfo(new DLApi(tmp.first, tmp.second /*unlink on load*/), path)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (externalClients.empty() && localClientDisabled) {
|
||||
// SOMEDAY: this should be allowed when it's possible to add external clients after the
|
||||
// network is setup.
|
||||
//
|
||||
// Typically we would create a more specific error for this case, but since we expect
|
||||
// this case to go away soon, we can use a trace event and a generic error.
|
||||
TraceEvent(SevWarn, "CannotSetupNetwork")
|
||||
.detail("Reason", "Local client is disabled and no external clients configured");
|
||||
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
|
||||
networkStartSetup = true;
|
||||
|
||||
if (externalClients.empty()) {
|
||||
|
@ -1545,8 +1565,7 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath
|
|||
}
|
||||
std::string clusterFile(clusterFilePath);
|
||||
|
||||
if (threadCount > 1 || localClientDisabled) {
|
||||
ASSERT(localClientDisabled);
|
||||
if (localClientDisabled) {
|
||||
ASSERT(!bypassMultiClientApi);
|
||||
|
||||
int threadIdx = nextThread;
|
||||
|
@ -1562,6 +1581,8 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath
|
|||
|
||||
lock.leave();
|
||||
|
||||
ASSERT_LE(threadCount, 1);
|
||||
|
||||
auto db = localClient->api->createDatabase(clusterFilePath);
|
||||
if (bypassMultiClientApi) {
|
||||
return db;
|
||||
|
|
|
@ -136,6 +136,8 @@ public:
|
|||
// The address of the machine that opened the file
|
||||
NetworkAddress openedAddress;
|
||||
|
||||
bool aio;
|
||||
|
||||
private:
|
||||
// The wrapped IAsyncFile
|
||||
Reference<IAsyncFile> file;
|
||||
|
@ -173,8 +175,10 @@ private:
|
|||
AsyncFileNonDurable(const std::string& filename,
|
||||
Reference<IAsyncFile> file,
|
||||
Reference<DiskParameters> diskParameters,
|
||||
NetworkAddress openedAddress)
|
||||
: openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false) {
|
||||
NetworkAddress openedAddress,
|
||||
bool aio)
|
||||
: openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false),
|
||||
aio(aio) {
|
||||
|
||||
// This is only designed to work in simulation
|
||||
ASSERT(g_network->isSimulated());
|
||||
|
@ -198,7 +202,8 @@ public:
|
|||
ACTOR static Future<Reference<IAsyncFile>> open(std::string filename,
|
||||
std::string actualFilename,
|
||||
Future<Reference<IAsyncFile>> wrappedFile,
|
||||
Reference<DiskParameters> diskParameters) {
|
||||
Reference<DiskParameters> diskParameters,
|
||||
bool aio) {
|
||||
state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
|
||||
state TaskPriority currentTaskID = g_network->getCurrentTask();
|
||||
state Future<Void> shutdown = success(currentProcess->shutdownSignal.getFuture());
|
||||
|
@ -225,7 +230,7 @@ public:
|
|||
}
|
||||
|
||||
state Reference<AsyncFileNonDurable> nonDurableFile(
|
||||
new AsyncFileNonDurable(filename, file, diskParameters, currentProcess->address));
|
||||
new AsyncFileNonDurable(filename, file, diskParameters, currentProcess->address, aio));
|
||||
|
||||
// Causes the approximateSize member to be set
|
||||
state Future<int64_t> sizeFuture = nonDurableFile->size();
|
||||
|
@ -462,20 +467,39 @@ private:
|
|||
|
||||
debugFileCheck("AsyncFileNonDurableWriteAfterWait", self->filename, dataCopy.begin(), offset, length);
|
||||
|
||||
// Only page-aligned writes are supported
|
||||
ASSERT(offset % 4096 == 0 && length % 4096 == 0);
|
||||
// In AIO mode, only page-aligned writes are supported
|
||||
ASSERT(!self->aio || (offset % 4096 == 0 && length % 4096 == 0));
|
||||
|
||||
// Non-durable writes should introduce errors at the page level and corrupt at the sector level
|
||||
// Otherwise, we can perform the entire write at once
|
||||
int pageLength = saveDurable ? length : 4096;
|
||||
int sectorLength = saveDurable ? length : 512;
|
||||
int diskPageLength = saveDurable ? length : 4096;
|
||||
int diskSectorLength = saveDurable ? length : 512;
|
||||
|
||||
vector<Future<Void>> writeFutures;
|
||||
for (int writeOffset = 0; writeOffset < length; writeOffset += pageLength) {
|
||||
for (int writeOffset = 0; writeOffset < length;) {
|
||||
// Number of bytes until the next diskPageLength file offset within the write or the end of the write.
|
||||
int pageLength = diskPageLength;
|
||||
if (!self->aio && !saveDurable) {
|
||||
// If not in AIO mode, and the save is not durable, then we can't perform the entire write all at once
|
||||
// and the first and last pages touched by the write could be partial.
|
||||
pageLength = std::min<int64_t>((int64_t)length - writeOffset,
|
||||
diskPageLength - ((offset + writeOffset) % diskPageLength));
|
||||
}
|
||||
|
||||
// choose a random action to perform on this page write (write correctly, corrupt, or don't write)
|
||||
KillMode pageKillMode = (KillMode)deterministicRandom()->randomInt(0, self->killMode + 1);
|
||||
|
||||
for (int pageOffset = 0; pageOffset < pageLength; pageOffset += sectorLength) {
|
||||
for (int pageOffset = 0; pageOffset < pageLength;) {
|
||||
// Number of bytes until the next diskSectorLength file offset within the write or the end of the write.
|
||||
int sectorLength = diskSectorLength;
|
||||
if (!self->aio && !saveDurable) {
|
||||
// If not in AIO mode, and the save is not durable, then we can't perform the entire write all at
|
||||
// once and the first and last sectors touched by the write could be partial.
|
||||
sectorLength =
|
||||
std::min<int64_t>((int64_t)length - (writeOffset + pageOffset),
|
||||
diskSectorLength - ((offset + writeOffset + pageOffset) % diskSectorLength));
|
||||
}
|
||||
|
||||
// If saving durable, then perform the write correctly. Otherwise, perform the write correcly with a
|
||||
// probability of 1/3. If corrupting the write, then this sector will be written correctly with a 1/4
|
||||
// chance
|
||||
|
@ -550,7 +574,11 @@ private:
|
|||
.detail("Filename", self->filename);
|
||||
TEST(true); // AsyncFileNonDurable dropped write
|
||||
}
|
||||
|
||||
pageOffset += sectorLength;
|
||||
}
|
||||
|
||||
writeOffset += pageLength;
|
||||
}
|
||||
|
||||
wait(waitForAll(writeFutures));
|
||||
|
|
|
@ -681,8 +681,8 @@ private:
|
|||
opId.shortString().c_str(),
|
||||
size);
|
||||
|
||||
if (size == 0) {
|
||||
// KAIO will return EINVAL, as len==0 is an error.
|
||||
// KAIO will return EINVAL, as len==0 is an error.
|
||||
if ((self->flags & IAsyncFile::OPEN_NO_AIO) == 0 && size == 0) {
|
||||
throw io_error();
|
||||
}
|
||||
|
||||
|
@ -2457,7 +2457,8 @@ Future<Reference<class IAsyncFile>> Sim2FileSystem::open(const std::string& file
|
|||
AsyncFileNonDurable::open(filename,
|
||||
actualFilename,
|
||||
SimpleFile::open(filename, flags, mode, diskParameters, false),
|
||||
diskParameters);
|
||||
diskParameters,
|
||||
(flags & IAsyncFile::OPEN_NO_AIO) == 0);
|
||||
}
|
||||
Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open(machineCache[actualFilename]);
|
||||
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
|
||||
|
|
|
@ -44,6 +44,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
|
||||
// TLogs
|
||||
init( TLOG_TIMEOUT, 0.4 ); //cannot buggify because of availability
|
||||
init( TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS, 60 ); if( randomize && BUGGIFY ) TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS = deterministicRandom()->randomInt(5,10);
|
||||
init( RECOVERY_TLOG_SMART_QUORUM_DELAY, 0.25 ); if( randomize && BUGGIFY ) RECOVERY_TLOG_SMART_QUORUM_DELAY = 0.0; // smaller might be better for bug amplification
|
||||
init( TLOG_STORAGE_MIN_UPDATE_INTERVAL, 0.5 );
|
||||
init( BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL, 30 );
|
||||
|
|
|
@ -42,6 +42,7 @@ public:
|
|||
|
||||
// TLogs
|
||||
double TLOG_TIMEOUT; // tlog OR commit proxy failure - master's reaction time
|
||||
double TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS; // Warns if a tlog takes too long to rejoin
|
||||
double RECOVERY_TLOG_SMART_QUORUM_DELAY; // smaller might be better for bug amplification
|
||||
double TLOG_STORAGE_MIN_UPDATE_INTERVAL;
|
||||
double BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL;
|
||||
|
|
|
@ -3027,35 +3027,52 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logServers,
|
||||
FutureStream<struct TLogRejoinRequest> rejoinRequests) {
|
||||
state std::map<UID, ReplyPromise<TLogRejoinReply>> lastReply;
|
||||
state std::set<UID> logsWaiting;
|
||||
state double startTime = now();
|
||||
state Future<Void> warnTimeout = delay(SERVER_KNOBS->TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS);
|
||||
|
||||
for (const auto& log : logServers) {
|
||||
logsWaiting.insert(log.first->get().id());
|
||||
}
|
||||
|
||||
try {
|
||||
loop {
|
||||
TLogRejoinRequest req = waitNext(rejoinRequests);
|
||||
int pos = -1;
|
||||
for (int i = 0; i < logServers.size(); i++) {
|
||||
if (logServers[i].first->get().id() == req.myInterface.id()) {
|
||||
pos = i;
|
||||
break;
|
||||
loop choose {
|
||||
when(TLogRejoinRequest req = waitNext(rejoinRequests)) {
|
||||
int pos = -1;
|
||||
for (int i = 0; i < logServers.size(); i++) {
|
||||
if (logServers[i].first->get().id() == req.myInterface.id()) {
|
||||
pos = i;
|
||||
logsWaiting.erase(logServers[i].first->get().id());
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (pos != -1) {
|
||||
TraceEvent("TLogJoinedMe", dbgid)
|
||||
.detail("TLog", req.myInterface.id())
|
||||
.detail("Address", req.myInterface.commit.getEndpoint().getPrimaryAddress().toString());
|
||||
if (!logServers[pos].first->get().present() ||
|
||||
req.myInterface.commit.getEndpoint() !=
|
||||
logServers[pos].first->get().interf().commit.getEndpoint()) {
|
||||
TLogInterface interf = req.myInterface;
|
||||
filterLocalityDataForPolicyDcAndProcess(logServers[pos].second, &interf.filteredLocality);
|
||||
logServers[pos].first->setUnconditional(OptionalInterface<TLogInterface>(interf));
|
||||
}
|
||||
lastReply[req.myInterface.id()].send(TLogRejoinReply{ false });
|
||||
lastReply[req.myInterface.id()] = req.reply;
|
||||
} else {
|
||||
TraceEvent("TLogJoinedMeUnknown", dbgid)
|
||||
.detail("TLog", req.myInterface.id())
|
||||
.detail("Address", req.myInterface.commit.getEndpoint().getPrimaryAddress().toString());
|
||||
req.reply.send(true);
|
||||
}
|
||||
}
|
||||
if (pos != -1) {
|
||||
TraceEvent("TLogJoinedMe", dbgid)
|
||||
.detail("TLog", req.myInterface.id())
|
||||
.detail("Address", req.myInterface.commit.getEndpoint().getPrimaryAddress().toString());
|
||||
if (!logServers[pos].first->get().present() ||
|
||||
req.myInterface.commit.getEndpoint() !=
|
||||
logServers[pos].first->get().interf().commit.getEndpoint()) {
|
||||
TLogInterface interf = req.myInterface;
|
||||
filterLocalityDataForPolicyDcAndProcess(logServers[pos].second, &interf.filteredLocality);
|
||||
logServers[pos].first->setUnconditional(OptionalInterface<TLogInterface>(interf));
|
||||
when(wait(warnTimeout)) {
|
||||
for (const auto& logId : logsWaiting) {
|
||||
TraceEvent(SevWarnAlways, "TLogRejoinSlow", dbgid)
|
||||
.detail("Elapsed", startTime - now())
|
||||
.detail("LogId", logId);
|
||||
}
|
||||
lastReply[req.myInterface.id()].send(TLogRejoinReply{ false });
|
||||
lastReply[req.myInterface.id()] = req.reply;
|
||||
} else {
|
||||
TraceEvent("TLogJoinedMeUnknown", dbgid)
|
||||
.detail("TLog", req.myInterface.id())
|
||||
.detail("Address", req.myInterface.commit.getEndpoint().getPrimaryAddress().toString());
|
||||
req.reply.send(true);
|
||||
warnTimeout = Never();
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
|
|
|
@ -394,12 +394,19 @@ public:
|
|||
UID UIDofLongest;
|
||||
for (const auto& kv : startTimeMap) {
|
||||
const double currentRunningTime = currentTime - kv.second;
|
||||
if (longest < currentRunningTime) {
|
||||
if (longest <= currentRunningTime) {
|
||||
longest = currentRunningTime;
|
||||
UIDofLongest = kv.first;
|
||||
}
|
||||
}
|
||||
return { longest, keyRangeMap.at(UIDofLongest) };
|
||||
if (BUGGIFY) {
|
||||
UIDofLongest = deterministicRandom()->randomUniqueID();
|
||||
}
|
||||
auto it = keyRangeMap.find(UIDofLongest);
|
||||
if (it != keyRangeMap.end()) {
|
||||
return { longest, it->second };
|
||||
}
|
||||
return { -1, emptyKeyRange };
|
||||
}
|
||||
|
||||
int numRunning() const { return startTimeMap.size(); }
|
||||
|
|
|
@ -84,3 +84,13 @@ ENV FDB_COORDINATOR ""
|
|||
ENV FDB_COORDINATOR_PORT 4500
|
||||
ENV FDB_CLUSTER_FILE_CONTENTS ""
|
||||
ENV FDB_PROCESS_CLASS unset
|
||||
|
||||
# Adding tini as PID 1 https://github.com/krallin/tini
|
||||
ARG TINI_VERSION=v0.19.0
|
||||
RUN curl -sLO https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-amd64 && \
|
||||
curl -sLO https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-amd64.sha256sum && \
|
||||
sha256sum -c tini-amd64.sha256sum && \
|
||||
rm -f tini-amd64.sha256sum && \
|
||||
chmod +x tini-amd64 && \
|
||||
mv tini-amd64 /usr/bin/tini
|
||||
ENTRYPOINT ["/usr/bin/tini", "-g", "--"]
|
||||
|
|
|
@ -40,7 +40,7 @@ fdb.options.set_trace_enable(args.client_log_dir)
|
|||
fdb.options.set_knob("min_trace_severity=5")
|
||||
|
||||
if not args.skip_so_files:
|
||||
print "Loading .so files"
|
||||
print("Loading .so files")
|
||||
fdb.options.set_external_client_directory(args.build_dir + '/lib')
|
||||
|
||||
if args.threads > 0:
|
||||
|
|
Loading…
Reference in New Issue