Merge pull request #4205 from sfc-gh-anoyes/anoyes/merge-release-6.2

Merge release 6.2 into release 6.3
This commit is contained in:
Markus Pilman 2021-01-15 10:55:41 -07:00 committed by GitHub
commit 9bca167cc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 83 additions and 33 deletions

View File

@ -1716,15 +1716,39 @@ public:
class BackupFile : public IBackupFile, ReferenceCounted<BackupFile> {
public:
BackupFile(std::string fileName, Reference<IAsyncFile> file, std::string finalFullPath) : IBackupFile(fileName), m_file(file), m_finalFullPath(finalFullPath) {}
BackupFile(std::string fileName, Reference<IAsyncFile> file, std::string finalFullPath)
: IBackupFile(fileName), m_file(file), m_finalFullPath(finalFullPath), m_writeOffset(0)
{
m_buffer.reserve(m_buffer.arena(), CLIENT_KNOBS->BACKUP_LOCAL_FILE_WRITE_BLOCK);
}
Future<Void> append(const void *data, int len) {
Future<Void> r = m_file->write(data, len, m_offset);
m_offset += len;
m_buffer.append(m_buffer.arena(), (const uint8_t *)data, len);
if(m_buffer.size() >= CLIENT_KNOBS->BACKUP_LOCAL_FILE_WRITE_BLOCK) {
return flush(CLIENT_KNOBS->BACKUP_LOCAL_FILE_WRITE_BLOCK);
}
return Void();
}
Future<Void> flush(int size) {
ASSERT(size <= m_buffer.size());
// Keep a reference to the old buffer
Standalone<VectorRef<uint8_t>> old = m_buffer;
// Make a new buffer, initialized with the excess bytes over the block size from the old buffer
m_buffer = Standalone<VectorRef<uint8_t>>(old.slice(size, old.size()));
// Write the old buffer to the underlying file and update the write offset
Future<Void> r = holdWhile(old, m_file->write(old.begin(), size, m_writeOffset));
m_writeOffset += size;
return r;
}
ACTOR static Future<Void> finish_impl(Reference<BackupFile> f) {
wait(f->flush(f->m_buffer.size()));
wait(f->m_file->truncate(f->size())); // Some IAsyncFile implementations extend in whole block sizes.
wait(f->m_file->sync());
std::string name = f->m_file->getFilename();
@ -1733,6 +1757,10 @@ public:
return Void();
}
int64_t size() const {
return m_buffer.size() + m_writeOffset;
}
Future<Void> finish() {
return finish_impl(Reference<BackupFile>::addRef(this));
}
@ -1742,6 +1770,8 @@ public:
private:
Reference<IAsyncFile> m_file;
Standalone<VectorRef<uint8_t>> m_buffer;
int64_t m_writeOffset;
std::string m_finalFullPath;
};
@ -1874,7 +1904,7 @@ public:
class BackupFile : public IBackupFile, ReferenceCounted<BackupFile> {
public:
BackupFile(std::string fileName, Reference<IAsyncFile> file) : IBackupFile(fileName), m_file(file) {}
BackupFile(std::string fileName, Reference<IAsyncFile> file) : IBackupFile(fileName), m_file(file), m_offset(0) {}
Future<Void> append(const void *data, int len) {
Future<Void> r = m_file->write(data, len, m_offset);
@ -1887,11 +1917,16 @@ public:
return map(m_file->sync(), [=](Void _) { self->m_file.clear(); return Void(); });
}
int64_t size() const {
return m_offset;
}
void addref() final { return ReferenceCounted<BackupFile>::addref(); }
void delref() final { return ReferenceCounted<BackupFile>::delref(); }
private:
Reference<IAsyncFile> m_file;
int64_t m_offset;
};
Future<Reference<IBackupFile>> writeFile(std::string path) final {

View File

@ -40,7 +40,7 @@ Future<Version> timeKeeperVersionFromDatetime(std::string const &datetime, Datab
// TODO: Move the log file and range file format encoding/decoding stuff to this file and behind interfaces.
class IBackupFile {
public:
IBackupFile(std::string fileName) : m_fileName(fileName), m_offset(0) {}
IBackupFile(std::string fileName) : m_fileName(fileName) {}
virtual ~IBackupFile() {}
// Backup files are append-only and cannot have more than 1 append outstanding at once.
virtual Future<Void> append(const void *data, int len) = 0;
@ -48,16 +48,13 @@ public:
inline std::string getFileName() const {
return m_fileName;
}
inline int64_t size() const {
return m_offset;
}
virtual int64_t size() const = 0;
virtual void addref() = 0;
virtual void delref() = 0;
Future<Void> appendStringRefWithLen(Standalone<StringRef> s);
protected:
std::string m_fileName;
int64_t m_offset;
};
// Structures for various backup components

View File

@ -274,6 +274,7 @@ public:
Counter transactionsCommitCompleted;
Counter transactionKeyServerLocationRequests;
Counter transactionKeyServerLocationRequestsCompleted;
Counter transactionStatusRequests;
Counter transactionsTooOld;
Counter transactionsFutureVersions;
Counter transactionsNotCommitted;

View File

@ -122,6 +122,7 @@ void ClientKnobs::initialize(bool randomize) {
init( TASKBUCKET_MAX_TASK_KEYS, 1000 ); if( randomize && BUGGIFY ) TASKBUCKET_MAX_TASK_KEYS = 20;
//Backup
init( BACKUP_LOCAL_FILE_WRITE_BLOCK, 1024*1024 ); if( randomize && BUGGIFY ) BACKUP_LOCAL_FILE_WRITE_BLOCK = 100;
init( BACKUP_CONCURRENT_DELETES, 100 );
init( BACKUP_SIMULATED_LIMIT_BYTES, 1e6 ); if( randomize && BUGGIFY ) BACKUP_SIMULATED_LIMIT_BYTES = 1000;
init( BACKUP_GET_RANGE_LIMIT_BYTES, 1e6 );

View File

@ -118,6 +118,7 @@ public:
int TASKBUCKET_MAX_TASK_KEYS;
// Backup
int BACKUP_LOCAL_FILE_WRITE_BLOCK;
int BACKUP_CONCURRENT_DELETES;
int BACKUP_SIMULATED_LIMIT_BYTES;
int BACKUP_GET_RANGE_LIMIT_BYTES;

View File

@ -651,8 +651,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
: connectionFile(connectionFile), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), taskID(taskID),
clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
apiVersion(apiVersion), switchable(switchable), provisional(false), cc("TransactionMetrics"),
transactionReadVersions("ReadVersions", cc),
transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
transactionReadVersionsCompleted("ReadVersionsCompleted", cc),
transactionReadVersionBatches("ReadVersionBatches", cc),
transactionBatchReadVersions("BatchPriorityReadVersions", cc),
@ -673,12 +672,13 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
transactionsCommitStarted("CommitStarted", cc), transactionsCommitCompleted("CommitCompleted", cc),
transactionKeyServerLocationRequests("KeyServerLocationRequests", cc),
transactionKeyServerLocationRequestsCompleted("KeyServerLocationRequestsCompleted", cc),
transactionsTooOld("TooOld", cc), transactionsFutureVersions("FutureVersions", cc),
transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc),
transactionsResourceConstrained("ResourceConstrained", cc), transactionsThrottled("Throttled", cc),
transactionsProcessBehind("ProcessBehind", cc), outstandingWatches(0), latencies(1000), readLatencies(1000),
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0),
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), internal(internal),
transactionStatusRequests("StatusRequests", cc), transactionsTooOld("TooOld", cc),
transactionsFutureVersions("FutureVersions", cc), transactionsNotCommitted("NotCommitted", cc),
transactionsMaybeCommitted("MaybeCommitted", cc), transactionsResourceConstrained("ResourceConstrained", cc),
transactionsThrottled("Throttled", cc), transactionsProcessBehind("ProcessBehind", cc), outstandingWatches(0),
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
bytesPerCommit(1000), mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
internal(internal),
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)) {
dbId = deterministicRandom()->randomUniqueID();
connected = clientInfo->get().proxies.size() ? Void() : clientInfo->onChange();
@ -716,6 +716,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getDatabase().getPtr() &&
ryw->getDatabase()->getConnectionFile()) {
++ryw->getDatabase()->transactionStatusRequests;
return getJSON(ryw->getDatabase());
} else {
return Optional<Value>();
@ -761,22 +762,35 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
}
}
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
transactionReadVersionsCompleted("ReadVersionsCompleted", cc), transactionReadVersionBatches("ReadVersionBatches", cc), transactionBatchReadVersions("BatchPriorityReadVersions", cc),
transactionDefaultReadVersions("DefaultPriorityReadVersions", cc), transactionImmediateReadVersions("ImmediatePriorityReadVersions", cc),
transactionBatchReadVersionsCompleted("BatchPriorityReadVersionsCompleted", cc), transactionDefaultReadVersionsCompleted("DefaultPriorityReadVersionsCompleted", cc),
transactionImmediateReadVersionsCompleted("ImmediatePriorityReadVersionsCompleted", cc), transactionLogicalReads("LogicalUncachedReads", cc), transactionPhysicalReads("PhysicalReadRequests", cc),
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc), transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
transactionGetRangeRequests("GetRangeRequests", cc), transactionWatchRequests("WatchRequests", cc), transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc),
transactionBytesRead("BytesRead", cc), transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc), transactionCommittedMutations("CommittedMutations", cc),
transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionSetMutations("SetMutations", cc), transactionClearMutations("ClearMutations", cc),
transactionAtomicMutations("AtomicMutations", cc), transactionsCommitStarted("CommitStarted", cc), transactionsCommitCompleted("CommitCompleted", cc),
transactionKeyServerLocationRequests("KeyServerLocationRequests", cc), transactionKeyServerLocationRequestsCompleted("KeyServerLocationRequestsCompleted", cc), transactionsTooOld("TooOld", cc),
transactionsFutureVersions("FutureVersions", cc), transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc),
transactionsResourceConstrained("ResourceConstrained", cc), transactionsThrottled("Throttled", cc), transactionsProcessBehind("ProcessBehind", cc), latencies(1000), readLatencies(1000), commitLatencies(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000),
internal(false) {}
DatabaseContext::DatabaseContext(const Error& err)
: deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc),
transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
transactionReadVersionsCompleted("ReadVersionsCompleted", cc),
transactionReadVersionBatches("ReadVersionBatches", cc),
transactionBatchReadVersions("BatchPriorityReadVersions", cc),
transactionDefaultReadVersions("DefaultPriorityReadVersions", cc),
transactionImmediateReadVersions("ImmediatePriorityReadVersions", cc),
transactionBatchReadVersionsCompleted("BatchPriorityReadVersionsCompleted", cc),
transactionDefaultReadVersionsCompleted("DefaultPriorityReadVersionsCompleted", cc),
transactionImmediateReadVersionsCompleted("ImmediatePriorityReadVersionsCompleted", cc),
transactionLogicalReads("LogicalUncachedReads", cc), transactionPhysicalReads("PhysicalReadRequests", cc),
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc),
transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
transactionGetRangeRequests("GetRangeRequests", cc), transactionWatchRequests("WatchRequests", cc),
transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc), transactionBytesRead("BytesRead", cc),
transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc),
transactionCommittedMutations("CommittedMutations", cc),
transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionSetMutations("SetMutations", cc),
transactionClearMutations("ClearMutations", cc), transactionAtomicMutations("AtomicMutations", cc),
transactionsCommitStarted("CommitStarted", cc), transactionsCommitCompleted("CommitCompleted", cc),
transactionKeyServerLocationRequests("KeyServerLocationRequests", cc),
transactionKeyServerLocationRequestsCompleted("KeyServerLocationRequestsCompleted", cc),
transactionStatusRequests("StatusRequests", cc), transactionsTooOld("TooOld", cc),
transactionsFutureVersions("FutureVersions", cc), transactionsNotCommitted("NotCommitted", cc),
transactionsMaybeCommitted("MaybeCommitted", cc), transactionsResourceConstrained("ResourceConstrained", cc),
transactionsThrottled("Throttled", cc), transactionsProcessBehind("ProcessBehind", cc), latencies(1000),
readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000),
internal(false) {}
Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, TaskPriority taskID, bool lockAware, int apiVersion, bool switchable) {
return Database( new DatabaseContext( Reference<AsyncVar<Reference<ClusterConnectionFile>>>(), clientInfo, clientInfoMonitor, taskID, clientLocality, enableLocalityLoadBalance, lockAware, true, apiVersion, switchable ) );

View File

@ -1234,6 +1234,7 @@ Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool s
} else {
if (key == LiteralStringRef("\xff\xff/status/json")) {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
++tr.getDatabase()->transactionStatusRequests;
return getJSON(tr.getDatabase());
} else {
return Optional<Value>();