Merge branch 'apple:master' into task/backup

This commit is contained in:
Vishesh Yadav 2021-08-06 10:45:32 -07:00 committed by GitHub
commit 232f668226
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 521 additions and 212 deletions

View File

@ -436,7 +436,8 @@ if __name__ == '__main__':
# assertions will fail if fdbcli does not work as expected
process_number = int(sys.argv[3])
if process_number == 1:
advanceversion()
# TODO: disable for now, the change can cause the database unavailable
#advanceversion()
cache_range()
consistencycheck()
datadistribution()

View File

@ -1,61 +1,73 @@
function(compile_boost)
# Initialize function incoming parameters
set(options)
set(oneValueArgs TARGET)
set(multiValueArgs BUILD_ARGS CXXFLAGS LDFLAGS)
cmake_parse_arguments(MY "${options}" "${oneValueArgs}"
cmake_parse_arguments(COMPILE_BOOST "${options}" "${oneValueArgs}"
"${multiValueArgs}" ${ARGN} )
# Configure the boost toolset to use
set(BOOTSTRAP_ARGS "--with-libraries=context")
set(B2_COMMAND "./b2")
set(BOOST_COMPILER_FLAGS -fvisibility=hidden -fPIC -std=c++14 -w)
# Configure bootstrap command
set(BOOTSTRAP_COMMAND "./bootstrap.sh")
set(BOOTSTRAP_LIBRARIES "context")
set(BOOST_CXX_COMPILER "${CMAKE_CXX_COMPILER}")
if(APPLE)
set(BOOST_TOOLSET "clang-darwin")
# this is to fix a weird macOS issue -- by default
# cmake would otherwise pass a compiler that can't
# compile boost
set(BOOST_CXX_COMPILER "/usr/bin/clang++")
elseif(CLANG)
if(CLANG)
set(BOOST_TOOLSET "clang")
list(APPEND BOOTSTRAP_ARGS "${BOOTSTRAP_COMMAND} --with-toolset=clang")
if(APPLE)
# this is to fix a weird macOS issue -- by default
# cmake would otherwise pass a compiler that can't
# compile boost
set(BOOST_CXX_COMPILER "/usr/bin/clang++")
endif()
else()
set(BOOST_TOOLSET "gcc")
endif()
if(APPLE OR USE_LIBCXX)
list(APPEND BOOST_COMPILER_FLAGS -stdlib=libc++)
endif()
set(BOOST_ADDITIONAL_COMPILE_OPTIOINS "")
foreach(flag IN LISTS BOOST_COMPILER_FLAGS MY_CXXFLAGS)
string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIOINS "<cxxflags>${flag} ")
endforeach()
foreach(flag IN LISTS MY_LDFLAGS)
string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIOINS "<linkflags>${flag} ")
endforeach()
configure_file(${CMAKE_SOURCE_DIR}/cmake/user-config.jam.cmake ${CMAKE_BINARY_DIR}/user-config.jam)
message(STATUS "Use ${BOOST_TOOLSET} to build boost")
# Configure b2 command
set(B2_COMMAND "./b2")
set(BOOST_COMPILER_FLAGS -fvisibility=hidden -fPIC -std=c++17 -w)
set(BOOST_LINK_FLAGS "")
if(APPLE OR CLANG OR USE_LIBCXX)
list(APPEND BOOST_COMPILER_FLAGS -stdlib=libc++ -nostdlib++)
list(APPEND BOOST_LINK_FLAGS -static-libgcc -lc++ -lc++abi)
endif()
# Update the user-config.jam
set(BOOST_ADDITIONAL_COMPILE_OPTIOINS "")
foreach(flag IN LISTS BOOST_COMPILER_FLAGS COMPILE_BOOST_CXXFLAGS)
string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIONS "<cxxflags>${flag} ")
endforeach()
#foreach(flag IN LISTS BOOST_LINK_FLAGS COMPILE_BOOST_LDFLAGS)
# string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIONS "<linkflags>${flag} ")
#endforeach()
configure_file(${CMAKE_SOURCE_DIR}/cmake/user-config.jam.cmake ${CMAKE_BINARY_DIR}/user-config.jam)
set(USER_CONFIG_FLAG --user-config=${CMAKE_BINARY_DIR}/user-config.jam)
# Build boost
include(ExternalProject)
set(BOOST_INSTALL_DIR "${CMAKE_BINARY_DIR}/boost_install")
ExternalProject_add("${MY_TARGET}Project"
ExternalProject_add("${COMPILE_BOOST_TARGET}Project"
URL "https://boostorg.jfrog.io/artifactory/main/release/1.72.0/source/boost_1_72_0.tar.bz2"
URL_HASH SHA256=59c9b274bc451cf91a9ba1dd2c7fdcaf5d60b1b3aa83f2c9fa143417cc660722
CONFIGURE_COMMAND ./bootstrap.sh ${BOOTSTRAP_ARGS}
BUILD_COMMAND ${B2_COMMAND} link=static ${MY_BUILD_ARGS} --prefix=${BOOST_INSTALL_DIR} ${USER_CONFIG_FLAG} install
CONFIGURE_COMMAND ${BOOTSTRAP_COMMAND} ${BOOTSTRAP_ARGS} --with-libraries=${BOOTSTRAP_LIBRARIES} --with-toolset=${BOOST_TOOLSET}
BUILD_COMMAND ${B2_COMMAND} link=static ${COMPILE_BOOST_BUILD_ARGS} --prefix=${BOOST_INSTALL_DIR} ${USER_CONFIG_FLAG} install
BUILD_IN_SOURCE ON
INSTALL_COMMAND ""
UPDATE_COMMAND ""
BUILD_BYPRODUCTS "${BOOST_INSTALL_DIR}/boost/config.hpp"
"${BOOST_INSTALL_DIR}/lib/libboost_context.a")
add_library(${MY_TARGET}_context STATIC IMPORTED)
add_dependencies(${MY_TARGET}_context ${MY_TARGET}Project)
set_target_properties(${MY_TARGET}_context PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_context.a")
add_library(${COMPILE_BOOST_TARGET}_context STATIC IMPORTED)
add_dependencies(${COMPILE_BOOST_TARGET}_context ${COMPILE_BOOST_TARGET}Project)
set_target_properties(${COMPILE_BOOST_TARGET}_context PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_context.a")
add_library(${MY_TARGET} INTERFACE)
target_include_directories(${MY_TARGET} SYSTEM INTERFACE ${BOOST_INSTALL_DIR}/include)
target_link_libraries(${MY_TARGET} INTERFACE ${MY_TARGET}_context)
endfunction()
add_library(${COMPILE_BOOST_TARGET} INTERFACE)
target_include_directories(${COMPILE_BOOST_TARGET} SYSTEM INTERFACE ${BOOST_INSTALL_DIR}/include)
target_link_libraries(${COMPILE_BOOST_TARGET} INTERFACE ${COMPILE_BOOST_TARGET}_context)
endfunction(compile_boost)
if(USE_SANITIZER)
if(WIN32)
@ -72,10 +84,20 @@ if(USE_SANITIZER)
return()
endif()
list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_72_0)
# since boost 1.72 boost installs cmake configs. We will enforce config mode
set(Boost_USE_STATIC_LIBS ON)
set(BOOST_HINT_PATHS /opt/boost_1_72_0)
# Clang and Gcc will have different name mangling to std::call_once, etc.
if (UNIX AND CMAKE_CXX_COMPILER_ID MATCHES "Clang$")
list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_72_0_clang)
set(BOOST_HINT_PATHS /opt/boost_1_72_0_clang)
message(STATUS "Using Clang version of boost::context")
else ()
list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_72_0)
set(BOOST_HINT_PATHS /opt/boost_1_72_0)
message(STATUS "Using g++ version of boost::context")
endif ()
if(BOOST_ROOT)
list(APPEND BOOST_HINT_PATHS ${BOOST_ROOT})
endif()

View File

@ -261,10 +261,6 @@ else()
if (CLANG)
add_compile_options()
# Clang has link errors unless `atomic` is specifically requested.
if(NOT APPLE)
#add_link_options(-latomic)
endif()
if (APPLE OR USE_LIBCXX)
add_compile_options($<$<COMPILE_LANGUAGE:CXX>:-stdlib=libc++>)
if (NOT APPLE)

View File

@ -1 +1 @@
using @BOOST_TOOLSET@ : : @BOOST_CXX_COMPILER@ : @BOOST_ADDITIONAL_COMPILE_OPTIOINS@ ;
using @BOOST_TOOLSET@ : : @BOOST_CXX_COMPILER@ : @BOOST_ADDITIONAL_COMPILE_OPTIONS@ ;

View File

@ -7,6 +7,7 @@ set(FDBCLI_SRCS
FlowLineNoise.h
ForceRecoveryWithDataLossCommand.actor.cpp
MaintenanceCommand.actor.cpp
SetClassCommand.actor.cpp
SnapshotCommand.actor.cpp
ThrottleCommand.actor.cpp
Util.cpp

View File

@ -0,0 +1,123 @@
/*
* SetClassCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/Knobs.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
ACTOR Future<Void> printProcessClass(Reference<IDatabase> db) {
state Reference<ITransaction> tr = db->createTransaction();
loop {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
try {
// Hold the reference to the memory
state ThreadFuture<RangeResult> classTypeFuture =
tr->getRange(fdb_cli::processClassTypeSpecialKeyRange, CLIENT_KNOBS->TOO_MANY);
state ThreadFuture<RangeResult> classSourceFuture =
tr->getRange(fdb_cli::processClassSourceSpecialKeyRange, CLIENT_KNOBS->TOO_MANY);
wait(success(safeThreadFutureToFuture(classSourceFuture)) &&
success(safeThreadFutureToFuture(classTypeFuture)));
RangeResult processTypeList = classTypeFuture.get();
RangeResult processSourceList = classSourceFuture.get();
ASSERT(processSourceList.size() == processTypeList.size());
if (!processTypeList.size())
printf("No processes are registered in the database.\n");
printf("There are currently %zu processes in the database:\n", processTypeList.size());
for (int index = 0; index < processTypeList.size(); index++) {
std::string address =
processTypeList[index].key.removePrefix(fdb_cli::processClassTypeSpecialKeyRange.begin).toString();
// check the addresses are the same in each list
std::string addressFromSourceList =
processSourceList[index]
.key.removePrefix(fdb_cli::processClassSourceSpecialKeyRange.begin)
.toString();
ASSERT(address == addressFromSourceList);
printf(" %s: %s (%s)\n",
address.c_str(),
processTypeList[index].value.toString().c_str(),
processSourceList[index].value.toString().c_str());
}
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
};
ACTOR Future<bool> setProcessClass(Reference<IDatabase> db, KeyRef network_address, KeyRef class_type) {
state Reference<ITransaction> tr = db->createTransaction();
loop {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
try {
tr->set(network_address.withPrefix(fdb_cli::processClassTypeSpecialKeyRange.begin), class_type);
wait(safeThreadFutureToFuture(tr->commit()));
return true;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
} // namespace
namespace fdb_cli {
const KeyRangeRef processClassSourceSpecialKeyRange =
KeyRangeRef(LiteralStringRef("\xff\xff/configuration/process/class_source/"),
LiteralStringRef("\xff\xff/configuration/process/class_source0"));
const KeyRangeRef processClassTypeSpecialKeyRange =
KeyRangeRef(LiteralStringRef("\xff\xff/configuration/process/class_type/"),
LiteralStringRef("\xff\xff/configuration/process/class_type0"));
ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() != 3 && tokens.size() != 1) {
printUsage(tokens[0]);
return false;
} else if (tokens.size() == 1) {
wait(printProcessClass(db));
} else {
bool successful = wait(setProcessClass(db, tokens[1], tokens[2]));
return successful;
}
return true;
}
CommandFactory setClassFactory(
"setclass",
CommandHelp("setclass [<ADDRESS> <CLASS>]",
"change the class of a process",
"If no address and class are specified, lists the classes of all servers.\n\nSetting the class to "
"`default' resets the process class to the class specified on the command line. The available "
"classes are `unset', `storage', `transaction', `resolution', `commit_proxy', `grv_proxy', "
"`master', `test', "
"`stateless', `log', `router', `cluster_controller', `fast_restore', `data_distributor', "
"`coordinator', `ratekeeper', `storage_cache', `backup', and `default'."));
} // namespace fdb_cli

View File

@ -566,15 +566,6 @@ void initHelp() {
"pair in <ADDRESS...> or any LocalityData (like dcid, zoneid, machineid, processid), removes any "
"matching exclusions from the excluded servers and localities list. "
"(A specified IP will match all IP:* exclusion entries)");
helpMap["setclass"] =
CommandHelp("setclass [<ADDRESS> <CLASS>]",
"change the class of a process",
"If no address and class are specified, lists the classes of all servers.\n\nSetting the class to "
"`default' resets the process class to the class specified on the command line. The available "
"classes are `unset', `storage', `transaction', `resolution', `commit_proxy', `grv_proxy', "
"`master', `test', "
"`stateless', `log', `router', `cluster_controller', `fast_restore', `data_distributor', "
"`coordinator', `ratekeeper', `storage_cache', `backup', and `default'.");
helpMap["status"] =
CommandHelp("status [minimal|details|json]",
"get the status of a FoundationDB cluster",
@ -2742,45 +2733,6 @@ ACTOR Future<bool> createSnapshot(Database db, std::vector<StringRef> tokens) {
return false;
}
ACTOR Future<bool> setClass(Database db, std::vector<StringRef> tokens) {
if (tokens.size() == 1) {
vector<ProcessData> _workers = wait(makeInterruptable(getWorkers(db)));
auto workers = _workers; // strip const
if (!workers.size()) {
printf("No processes are registered in the database.\n");
return false;
}
std::sort(workers.begin(), workers.end(), ProcessData::sort_by_address());
printf("There are currently %zu processes in the database:\n", workers.size());
for (const auto& w : workers)
printf(" %s: %s (%s)\n",
w.address.toString().c_str(),
w.processClass.toString().c_str(),
w.processClass.sourceString().c_str());
return false;
}
AddressExclusion addr = AddressExclusion::parse(tokens[1]);
if (!addr.isValid()) {
fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", tokens[1].toString().c_str());
if (tokens[1].toString().find(":tls") != std::string::npos)
printf(" Do not include the `:tls' suffix when naming a process\n");
return true;
}
ProcessClass processClass(tokens[2].toString(), ProcessClass::DBSource);
if (processClass.classType() == ProcessClass::InvalidClass && tokens[2] != LiteralStringRef("default")) {
fprintf(stderr, "ERROR: '%s' is not a valid process class\n", tokens[2].toString().c_str());
return true;
}
wait(makeInterruptable(setClass(db, addr, processClass)));
return false;
};
Reference<ReadYourWritesTransaction> getTransaction(Database db,
Reference<ReadYourWritesTransaction>& tr,
FdbOptions* options,
@ -3689,14 +3641,9 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
if (tokencmp(tokens[0], "setclass")) {
if (tokens.size() != 3 && tokens.size() != 1) {
printUsage(tokens[0]);
bool _result = wait(makeInterruptable(setClassCommandActor(db2, tokens)));
if (!_result)
is_error = true;
} else {
bool err = wait(setClass(db, tokens));
if (err)
is_error = true;
}
continue;
}

View File

@ -64,7 +64,9 @@ extern const KeyRef consistencyCheckSpecialKey;
// maintenance
extern const KeyRangeRef maintenanceSpecialKeyRange;
extern const KeyRef ignoreSSFailureSpecialKey;
// setclass
extern const KeyRangeRef processClassSourceSpecialKeyRange;
extern const KeyRangeRef processClassTypeSpecialKeyRange;
// help functions (Copied from fdbcli.actor.cpp)
// compare StringRef with the given c string
@ -81,6 +83,8 @@ ACTOR Future<bool> consistencyCheckCommandActor(Reference<ITransaction> tr, std:
ACTOR Future<bool> forceRecoveryWithDataLossCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// maintenance command
ACTOR Future<bool> maintenanceCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// setclass command
ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// snapshot command
ACTOR Future<bool> snapshotCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// throttle command

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <atomic>
#include "fdbclient/AsyncTaskThread.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -30,13 +32,22 @@ public:
bool isTerminate() const override { return true; }
};
ACTOR Future<Void> asyncTaskThreadClient(AsyncTaskThread* asyncTaskThread, int* sum, int count) {
ACTOR Future<Void> asyncTaskThreadClient(AsyncTaskThread* asyncTaskThread, std::atomic<int> *sum, int count, int clientId, double meanSleep) {
state int i = 0;
state double randomSleep = 0.0;
for (; i < count; ++i) {
randomSleep = deterministicRandom()->random01() * 2 * meanSleep;
wait(delay(randomSleep));
wait(asyncTaskThread->execAsync([sum = sum] {
++(*sum);
sum->fetch_add(1);
return Void();
}));
TraceEvent("AsyncTaskThreadIncrementedSum")
.detail("Index", i)
.detail("Sum", sum->load())
.detail("ClientId", clientId)
.detail("RandomSleep", randomSleep)
.detail("MeanSleep", meanSleep);
}
return Void();
}
@ -51,7 +62,7 @@ AsyncTaskThread::~AsyncTaskThread() {
bool wakeUp = false;
{
std::lock_guard<std::mutex> g(m);
wakeUp = queue.push(std::make_shared<TerminateTask>());
wakeUp = queue.push(std::make_unique<TerminateTask>());
}
if (wakeUp) {
cv.notify_one();
@ -61,7 +72,7 @@ AsyncTaskThread::~AsyncTaskThread() {
void AsyncTaskThread::run(AsyncTaskThread* self) {
while (true) {
std::shared_ptr<IAsyncTask> task;
std::unique_ptr<IAsyncTask> task;
{
std::unique_lock<std::mutex> lk(self->m);
self->cv.wait(lk, [self] { return !self->queue.canSleep(); });
@ -75,14 +86,30 @@ void AsyncTaskThread::run(AsyncTaskThread* self) {
}
TEST_CASE("/asynctaskthread/add") {
state int sum = 0;
state std::atomic<int> sum = 0;
state AsyncTaskThread asyncTaskThread;
state int numClients = 10;
state int incrementsPerClient = 100;
std::vector<Future<Void>> clients;
clients.reserve(10);
for (int i = 0; i < 10; ++i) {
clients.push_back(asyncTaskThreadClient(&asyncTaskThread, &sum, 100));
clients.reserve(numClients);
for (int clientId = 0; clientId < numClients; ++clientId) {
clients.push_back(asyncTaskThreadClient(&asyncTaskThread, &sum, incrementsPerClient, clientId, deterministicRandom()->random01() * 0.01));
}
wait(waitForAll(clients));
ASSERT_EQ(sum, 1000);
ASSERT_EQ(sum.load(), numClients * incrementsPerClient);
return Void();
}
TEST_CASE("/asynctaskthread/error") {
state AsyncTaskThread asyncTaskThread;
try {
wait(asyncTaskThread.execAsync([]{
throw operation_failed();
return Void();
}));
ASSERT(false);
} catch (Error &e) {
ASSERT_EQ(e.code(), error_code_operation_failed);
}
return Void();
}

View File

@ -48,7 +48,7 @@ public:
};
class AsyncTaskThread {
ThreadSafeQueue<std::shared_ptr<IAsyncTask>> queue;
ThreadSafeQueue<std::unique_ptr<IAsyncTask>> queue;
std::condition_variable cv;
std::mutex m;
std::thread thread;
@ -60,7 +60,7 @@ class AsyncTaskThread {
bool wakeUp = false;
{
std::lock_guard<std::mutex> g(m);
wakeUp = queue.push(std::make_shared<AsyncTask<F>>(func));
wakeUp = queue.push(std::make_unique<AsyncTask<F>>(func));
}
if (wakeUp) {
cv.notify_one();
@ -88,6 +88,7 @@ public:
auto funcResult = func();
onMainThreadVoid([promise, funcResult] { promise.send(funcResult); }, nullptr, priority);
} catch (Error& e) {
TraceEvent("ErrorExecutingAsyncTask").error(e);
onMainThreadVoid([promise, e] { promise.sendError(e); }, nullptr, priority);
}
});

View File

@ -717,11 +717,22 @@ protected:
template <>
inline Tuple Codec<Reference<IBackupContainer>>::pack(Reference<IBackupContainer> const& bc) {
return Tuple().append(StringRef(bc->getURL()));
Tuple tuple;
tuple.append(StringRef(bc->getURL()));
if (bc->getEncryptionKeyFileName().present()) {
tuple.append(bc->getEncryptionKeyFileName().get());
}
return tuple;
}
template <>
inline Reference<IBackupContainer> Codec<Reference<IBackupContainer>>::unpack(Tuple const& val) {
return IBackupContainer::openContainer(val.getString(0).toString());
ASSERT(val.size() == 1 || val.size() == 2);
auto url = val.getString(0).toString();
Optional<std::string> encryptionKeyFileName;
if (val.size() == 2) {
encryptionKeyFileName = val.getString(1).toString();
}
return IBackupContainer::openContainer(url, encryptionKeyFileName);
}
class BackupConfig : public KeyBackedConfig {

View File

@ -284,11 +284,11 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
#ifdef BUILD_AZURE_BACKUP
else if (u.startsWith("azure://"_sr)) {
u.eat("azure://"_sr);
auto address = NetworkAddress::parse(u.eat("/"_sr).toString());
auto accountName = u.eat("@"_sr).toString();
auto endpoint = u.eat("/"_sr).toString();
auto containerName = u.eat("/"_sr).toString();
auto accountName = u.eat("/"_sr).toString();
r = makeReference<BackupContainerAzureBlobStore>(
address, containerName, accountName, encryptionKeyFileName);
endpoint, accountName, containerName, encryptionKeyFileName);
}
#endif
else {
@ -296,6 +296,7 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
throw backup_invalid_url();
}
r->encryptionKeyFileName = encryptionKeyFileName;
r->URL = url;
return r;
} catch (Error& e) {

View File

@ -298,12 +298,14 @@ public:
static std::vector<std::string> getURLFormats();
static Future<std::vector<std::string>> listContainers(const std::string& baseURL);
std::string getURL() const { return URL; }
std::string const &getURL() const { return URL; }
Optional<std::string> const &getEncryptionKeyFileName() const { return encryptionKeyFileName; }
static std::string lastOpenError;
private:
std::string URL;
Optional<std::string> encryptionKeyFileName;
};
namespace fileBackup {

View File

@ -20,37 +20,69 @@
#include "fdbclient/BackupContainerAzureBlobStore.h"
#include "fdbrpc/AsyncFileEncrypted.h"
#include <future>
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
std::string const notFoundErrorCode = "404";
void printAzureError(std::string const& operationName, azure::storage_lite::storage_error const& err) {
printf("(%s) : Error from Azure SDK : %s (%s) : %s",
operationName.c_str(),
err.code_name.c_str(),
err.code.c_str(),
err.message.c_str());
}
template <class T>
T waitAzureFuture(std::future<azure::storage_lite::storage_outcome<T>>&& f, std::string const& operationName) {
auto outcome = f.get();
if (outcome.success()) {
return outcome.response();
} else {
printAzureError(operationName, outcome.error());
throw backup_error();
}
}
} // namespace
class BackupContainerAzureBlobStoreImpl {
public:
using AzureClient = azure::storage_lite::blob_client;
class ReadFile final : public IAsyncFile, ReferenceCounted<ReadFile> {
AsyncTaskThread& asyncTaskThread;
AsyncTaskThread* asyncTaskThread;
std::string containerName;
std::string blobName;
AzureClient* client;
std::shared_ptr<AzureClient> client;
public:
ReadFile(AsyncTaskThread& asyncTaskThread,
const std::string& containerName,
const std::string& blobName,
AzureClient* client)
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
std::shared_ptr<AzureClient> const& client)
: asyncTaskThread(&asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
void addref() override { ReferenceCounted<ReadFile>::addref(); }
void delref() override { ReferenceCounted<ReadFile>::delref(); }
Future<int> read(void* data, int length, int64_t offset) {
return asyncTaskThread.execAsync([client = this->client,
containerName = this->containerName,
blobName = this->blobName,
data,
length,
offset] {
Future<int> read(void* data, int length, int64_t offset) override {
TraceEvent(SevDebug, "BCAzureBlobStoreRead")
.detail("Length", length)
.detail("Offset", offset)
.detail("ContainerName", containerName)
.detail("BlobName", blobName);
return asyncTaskThread->execAsync([client = this->client,
containerName = this->containerName,
blobName = this->blobName,
data,
length,
offset] {
std::ostringstream oss(std::ios::out | std::ios::binary);
client->download_blob_to_stream(containerName, blobName, offset, length, oss);
waitAzureFuture(client->download_blob_to_stream(containerName, blobName, offset, length, oss),
"download_blob_to_stream");
auto str = std::move(oss).str();
memcpy(data, str.c_str(), str.size());
return static_cast<int>(str.size());
@ -61,19 +93,23 @@ public:
Future<Void> truncate(int64_t size) override { throw file_not_writable(); }
Future<Void> sync() override { throw file_not_writable(); }
Future<int64_t> size() const override {
return asyncTaskThread.execAsync([client = this->client,
containerName = this->containerName,
blobName = this->blobName] {
return static_cast<int64_t>(client->get_blob_properties(containerName, blobName).get().response().size);
});
TraceEvent(SevDebug, "BCAzureBlobStoreReadFileSize")
.detail("ContainerName", containerName)
.detail("BlobName", blobName);
return asyncTaskThread->execAsync(
[client = this->client, containerName = this->containerName, blobName = this->blobName] {
auto resp =
waitAzureFuture(client->get_blob_properties(containerName, blobName), "get_blob_properties");
return static_cast<int64_t>(resp.size);
});
}
std::string getFilename() const override { return blobName; }
int64_t debugFD() const override { return 0; }
};
class WriteFile final : public IAsyncFile, ReferenceCounted<WriteFile> {
AsyncTaskThread& asyncTaskThread;
AzureClient* client;
AsyncTaskThread* asyncTaskThread;
std::shared_ptr<AzureClient> client;
std::string containerName;
std::string blobName;
int64_t m_cursor{ 0 };
@ -88,8 +124,8 @@ public:
WriteFile(AsyncTaskThread& asyncTaskThread,
const std::string& containerName,
const std::string& blobName,
AzureClient* client)
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
std::shared_ptr<AzureClient> const& client)
: asyncTaskThread(&asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
void addref() override { ReferenceCounted<WriteFile>::addref(); }
void delref() override { ReferenceCounted<WriteFile>::delref(); }
@ -114,22 +150,33 @@ public:
return Void();
}
Future<Void> sync() override {
TraceEvent(SevDebug, "BCAzureBlobStoreSync")
.detail("Length", buffer.size())
.detail("ContainerName", containerName)
.detail("BlobName", blobName);
auto movedBuffer = std::move(buffer);
buffer.clear();
return asyncTaskThread.execAsync([client = this->client,
containerName = this->containerName,
blobName = this->blobName,
buffer = std::move(movedBuffer)] {
std::istringstream iss(std::move(buffer));
auto resp = client->append_block_from_stream(containerName, blobName, iss).get();
return Void();
});
buffer = {};
if (!movedBuffer.empty()) {
return asyncTaskThread->execAsync([client = this->client,
containerName = this->containerName,
blobName = this->blobName,
buffer = std::move(movedBuffer)] {
std::istringstream iss(std::move(buffer));
waitAzureFuture(client->append_block_from_stream(containerName, blobName, iss),
"append_block_from_stream");
return Void();
});
}
return Void();
}
Future<int64_t> size() const override {
return asyncTaskThread.execAsync(
TraceEvent(SevDebug, "BCAzureBlobStoreSize")
.detail("ContainerName", containerName)
.detail("BlobName", blobName);
return asyncTaskThread->execAsync(
[client = this->client, containerName = this->containerName, blobName = this->blobName] {
auto resp = client->get_blob_properties(containerName, blobName).get().response();
ASSERT(resp.valid()); // TODO: Should instead throw here
auto resp =
waitAzureFuture(client->get_blob_properties(containerName, blobName), "get_blob_properties");
return static_cast<int64_t>(resp.size);
});
}
@ -163,44 +210,53 @@ public:
static bool isDirectory(const std::string& blobName) { return blobName.size() && blobName.back() == '/'; }
// Hack to get around the fact that macros don't work inside actor functions
static Reference<IAsyncFile> encryptFile(Reference<IAsyncFile> const& f, AsyncFileEncrypted::Mode mode) {
Reference<IAsyncFile> result = f;
#if ENCRYPTION_ENABLED
result = makeReference<AsyncFileEncrypted>(result, mode);
#endif
return result;
}
ACTOR static Future<Reference<IAsyncFile>> readFile(BackupContainerAzureBlobStore* self, std::string fileName) {
bool exists = wait(self->blobExists(fileName));
if (!exists) {
throw file_not_found();
}
Reference<IAsyncFile> f =
makeReference<ReadFile>(self->asyncTaskThread, self->containerName, fileName, self->client.get());
#if ENCRYPTION_ENABLED
makeReference<ReadFile>(self->asyncTaskThread, self->containerName, fileName, self->client);
if (self->usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, false);
f = encryptFile(f, AsyncFileEncrypted::Mode::READ_ONLY);
}
#endif
return f;
}
ACTOR static Future<Reference<IBackupFile>> writeFile(BackupContainerAzureBlobStore* self, std::string fileName) {
TraceEvent(SevDebug, "BCAzureBlobStoreCreateWriteFile")
.detail("ContainerName", self->containerName)
.detail("FileName", fileName);
wait(self->asyncTaskThread.execAsync(
[client = self->client.get(), containerName = self->containerName, fileName = fileName] {
auto outcome = client->create_append_blob(containerName, fileName).get();
[client = self->client, containerName = self->containerName, fileName = fileName] {
waitAzureFuture(client->create_append_blob(containerName, fileName), "create_append_blob");
return Void();
}));
auto f = makeReference<WriteFile>(self->asyncTaskThread, self->containerName, fileName, self->client.get());
#if ENCRYPTION_ENABLED
Reference<IAsyncFile> f =
makeReference<WriteFile>(self->asyncTaskThread, self->containerName, fileName, self->client);
if (self->usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, true);
f = encryptFile(f, AsyncFileEncrypted::Mode::APPEND_ONLY);
}
#endif
return makeReference<BackupFile>(fileName, f);
}
static void listFiles(AzureClient* client,
static void listFiles(std::shared_ptr<AzureClient> const& client,
const std::string& containerName,
const std::string& path,
std::function<bool(std::string const&)> folderPathFilter,
BackupContainerFileSystem::FilesAndSizesT& result) {
auto resp = client->list_blobs_segmented(containerName, "/", "", path).get().response();
auto resp = waitAzureFuture(client->list_blobs_segmented(containerName, "/", "", path), "list_blobs_segmented");
for (const auto& blob : resp.blobs) {
if (isDirectory(blob.name) && folderPathFilter(blob.name)) {
if (isDirectory(blob.name) && (!folderPathFilter || folderPathFilter(blob.name))) {
listFiles(client, containerName, blob.name, folderPathFilter, result);
} else {
result.emplace_back(blob.name, blob.content_length);
@ -214,8 +270,12 @@ public:
BackupContainerFileSystem::FilesAndSizesT files = wait(self->listFiles());
filesToDelete = files.size();
}
wait(self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client.get()] {
client->delete_container(containerName).wait();
TraceEvent(SevDebug, "BCAzureBlobStoreDeleteContainer")
.detail("FilesToDelete", filesToDelete)
.detail("ContainerName", self->containerName)
.detail("TrackNumDeleted", pNumDeleted != nullptr);
wait(self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client] {
waitAzureFuture(client->delete_container(containerName), "delete_container");
return Void();
}));
if (pNumDeleted) {
@ -224,36 +284,44 @@ public:
return Void();
}
ACTOR static Future<Void> create(BackupContainerAzureBlobStore* self) {
state Future<Void> f1 =
self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client.get()] {
client->create_container(containerName).wait();
return Void();
});
state Future<Void> f2 = self->usesEncryption() ? self->encryptionSetupComplete() : Void();
return f1 && f2;
}
};
Future<bool> BackupContainerAzureBlobStore::blobExists(const std::string& fileName) {
return asyncTaskThread.execAsync(
[client = this->client.get(), containerName = this->containerName, fileName = fileName] {
auto resp = client->get_blob_properties(containerName, fileName).get().response();
return resp.valid();
});
TraceEvent(SevDebug, "BCAzureBlobStoreCheckExists")
.detail("FileName", fileName)
.detail("ContainerName", containerName);
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName, fileName = fileName] {
auto outcome = client->get_blob_properties(containerName, fileName).get();
if (outcome.success()) {
return true;
} else {
auto const& err = outcome.error();
if (err.code == notFoundErrorCode) {
return false;
} else {
printAzureError("get_blob_properties", err);
throw backup_error();
}
}
});
}
BackupContainerAzureBlobStore::BackupContainerAzureBlobStore(const NetworkAddress& address,
BackupContainerAzureBlobStore::BackupContainerAzureBlobStore(const std::string& endpoint,
const std::string& accountName,
const std::string& containerName,
const Optional<std::string>& encryptionKeyFileName)
: containerName(containerName) {
setEncryptionKey(encryptionKeyFileName);
std::string accountKey = std::getenv("AZURE_KEY");
const char* _accountKey = std::getenv("AZURE_KEY");
if (!_accountKey) {
TraceEvent(SevError, "EnvironmentVariableNotFound").detail("EnvVariable", "AZURE_KEY");
// TODO: More descriptive error?
throw backup_error();
}
std::string accountKey = _accountKey;
auto credential = std::make_shared<azure::storage_lite::shared_key_credential>(accountName, accountKey);
auto storageAccount = std::make_shared<azure::storage_lite::storage_account>(
accountName, credential, false, format("http://%s/%s", address.toString().c_str(), accountName.c_str()));
accountName, credential, true, format("https://%s", endpoint.c_str()));
client = std::make_unique<AzureClient>(storageAccount, 1);
}
@ -265,12 +333,30 @@ void BackupContainerAzureBlobStore::delref() {
}
Future<Void> BackupContainerAzureBlobStore::create() {
return BackupContainerAzureBlobStoreImpl::create(this);
TraceEvent(SevDebug, "BCAzureBlobStoreCreateContainer").detail("ContainerName", containerName);
Future<Void> createContainerFuture =
asyncTaskThread.execAsync([containerName = this->containerName, client = this->client] {
waitAzureFuture(client->create_container(containerName), "create_container");
return Void();
});
Future<Void> encryptionSetupFuture = usesEncryption() ? encryptionSetupComplete() : Void();
return createContainerFuture && encryptionSetupFuture;
}
Future<bool> BackupContainerAzureBlobStore::exists() {
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
auto resp = client->get_container_properties(containerName).get().response();
return resp.valid();
TraceEvent(SevDebug, "BCAzureBlobStoreCheckContainerExists").detail("ContainerName", containerName);
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client] {
auto outcome = client->get_container_properties(containerName).get();
if (outcome.success()) {
return true;
} else {
auto const& err = outcome.error();
if (err.code == notFoundErrorCode) {
return false;
} else {
printAzureError("got_container_properties", err);
throw backup_error();
}
}
});
}
@ -285,22 +371,23 @@ Future<Reference<IBackupFile>> BackupContainerAzureBlobStore::writeFile(const st
Future<BackupContainerFileSystem::FilesAndSizesT> BackupContainerAzureBlobStore::listFiles(
const std::string& path,
std::function<bool(std::string const&)> folderPathFilter) {
return asyncTaskThread.execAsync([client = this->client.get(),
containerName = this->containerName,
path = path,
folderPathFilter = folderPathFilter] {
FilesAndSizesT result;
BackupContainerAzureBlobStoreImpl::listFiles(client, containerName, path, folderPathFilter, result);
return result;
});
TraceEvent(SevDebug, "BCAzureBlobStoreListFiles").detail("ContainerName", containerName).detail("Path", path);
return asyncTaskThread.execAsync(
[client = this->client, containerName = this->containerName, path = path, folderPathFilter = folderPathFilter] {
FilesAndSizesT result;
BackupContainerAzureBlobStoreImpl::listFiles(client, containerName, path, folderPathFilter, result);
return result;
});
}
Future<Void> BackupContainerAzureBlobStore::deleteFile(const std::string& fileName) {
return asyncTaskThread.execAsync(
[containerName = this->containerName, fileName = fileName, client = client.get()]() {
client->delete_blob(containerName, fileName).wait();
return Void();
});
TraceEvent(SevDebug, "BCAzureBlobStoreDeleteFile")
.detail("ContainerName", containerName)
.detail("FileName", fileName);
return asyncTaskThread.execAsync([containerName = this->containerName, fileName = fileName, client = client]() {
client->delete_blob(containerName, fileName).wait();
return Void();
});
}
Future<Void> BackupContainerAzureBlobStore::deleteContainer(int* pNumDeleted) {
@ -313,5 +400,5 @@ Future<std::vector<std::string>> BackupContainerAzureBlobStore::listURLs(const s
}
std::string BackupContainerAzureBlobStore::getURLFormat() {
return "azure://<ip>:<port>/<accountname>/<container>/<path_to_file>";
return "azure://<accountname>@<endpoint>/<container>/";
}

View File

@ -33,7 +33,7 @@ class BackupContainerAzureBlobStore final : public BackupContainerFileSystem,
ReferenceCounted<BackupContainerAzureBlobStore> {
using AzureClient = azure::storage_lite::blob_client;
std::unique_ptr<AzureClient> client;
std::shared_ptr<AzureClient> client;
std::string containerName;
AsyncTaskThread asyncTaskThread;
@ -42,7 +42,7 @@ class BackupContainerAzureBlobStore final : public BackupContainerFileSystem,
friend class BackupContainerAzureBlobStoreImpl;
public:
BackupContainerAzureBlobStore(const NetworkAddress& address,
BackupContainerAzureBlobStore(const std::string& endpoint,
const std::string& accountName,
const std::string& containerName,
const Optional<std::string>& encryptionKeyFileName);

View File

@ -71,7 +71,7 @@ class SimpleConfigTransactionImpl {
if (reply.value.present()) {
return reply.value.get().toValue();
} else {
return {};
return Optional<Value>{};
}
}

View File

@ -399,6 +399,14 @@ ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> rel
return Void();
}
ACTOR static Future<ResolveTransactionBatchReply> trackResolutionMetrics(Reference<Histogram> dist,
Future<ResolveTransactionBatchReply> in) {
state double startTime = now();
ResolveTransactionBatchReply reply = wait(in);
dist->sampleSeconds(now() - startTime);
return reply;
}
namespace CommitBatch {
struct CommitBatchContext {
@ -579,6 +587,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
TEST(pProxyCommitData->latestLocalCommitBatchResolving.get() < localBatchNumber - 1); // Wait for local batch
wait(pProxyCommitData->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber - 1));
double queuingDelay = g_network->now() - timeStart;
pProxyCommitData->stats.commitBatchQueuingDist->sampleSeconds(queuingDelay);
if ((queuingDelay > (double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS / SERVER_KNOBS->VERSIONS_PER_SECOND ||
(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01))) &&
SERVER_KNOBS->PROXY_REJECT_BATCH_QUEUED_TOO_LONG && canReject(trs)) {
@ -619,6 +628,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
pProxyCommitData->commitVersionRequestNumber++,
pProxyCommitData->mostRecentProcessedRequestNumber,
pProxyCommitData->dbgid);
state double beforeGettingCommitVersion = now();
GetCommitVersionReply versionReply = wait(brokenPromiseToNever(
pProxyCommitData->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)));
@ -626,6 +636,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
pProxyCommitData->stats.txnCommitVersionAssigned += trs.size();
pProxyCommitData->stats.lastCommitVersionAssigned = versionReply.version;
pProxyCommitData->stats.getCommitVersionDist->sampleSeconds(now() - beforeGettingCommitVersion);
self->commitVersion = versionReply.version;
self->prevVersion = versionReply.prevVersion;
@ -646,6 +657,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
}
ACTOR Future<Void> getResolution(CommitBatchContext* self) {
state double resolutionStart = now();
// Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with
// resolution processing but is still using CPU
ProxyCommitData* pProxyCommitData = self->pProxyCommitData;
@ -674,8 +686,9 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
std::vector<Future<ResolveTransactionBatchReply>> replies;
for (int r = 0; r < pProxyCommitData->resolvers.size(); r++) {
requests.requests[r].debugID = self->debugID;
replies.push_back(brokenPromiseToNever(
pProxyCommitData->resolvers[r].resolve.getReply(requests.requests[r], TaskPriority::ProxyResolverReply)));
replies.push_back(trackResolutionMetrics(pProxyCommitData->stats.resolverDist[r],
brokenPromiseToNever(pProxyCommitData->resolvers[r].resolve.getReply(
requests.requests[r], TaskPriority::ProxyResolverReply))));
}
self->transactionResolverMap.swap(requests.transactionResolverMap);
@ -700,6 +713,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
std::vector<ResolveTransactionBatchReply> resolutionResp = wait(getAll(replies));
self->resolution.swap(*const_cast<std::vector<ResolveTransactionBatchReply>*>(&resolutionResp));
self->pProxyCommitData->stats.resolutionDist->sampleSeconds(now() - resolutionStart);
if (self->debugID.present()) {
g_traceBatch.addEvent(
"CommitDebug", self->debugID.get().first(), "CommitProxyServer.commitBatch.AfterResolution");
@ -1055,6 +1069,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
}
ACTOR Future<Void> postResolution(CommitBatchContext* self) {
state double postResolutionStart = now();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state std::vector<CommitTransactionRequest>& trs = self->trs;
state const int64_t localBatchNumber = self->localBatchNumber;
@ -1064,6 +1079,8 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
bool queuedCommits = pProxyCommitData->latestLocalCommitBatchLogging.get() < localBatchNumber - 1;
TEST(queuedCommits); // Queuing post-resolution commit processing
wait(pProxyCommitData->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber - 1));
state double postResolutionQueuing = now();
pProxyCommitData->stats.postResolutionDist->sampleSeconds(postResolutionQueuing - postResolutionStart);
wait(yield(TaskPriority::ProxyCommitYield1));
self->computeStart = g_network->timer();
@ -1212,10 +1229,12 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
}
pProxyCommitData->stats.processingMutationDist->sampleSeconds(now() - postResolutionQueuing);
return Void();
}
ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
state double tLoggingStart = now();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state Span span("MP:transactionLogging"_loc, self->span.context);
@ -1249,11 +1268,12 @@ ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
pProxyCommitData->txsPopVersions.emplace_back(self->commitVersion, self->msg.popTo);
}
pProxyCommitData->logSystem->popTxs(self->msg.popTo);
pProxyCommitData->stats.tlogLoggingDist->sampleSeconds(now() - tLoggingStart);
return Void();
}
ACTOR Future<Void> reply(CommitBatchContext* self) {
state double replyStart = now();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state Span span("MP:reply"_loc, self->span.context);
@ -1385,7 +1405,7 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
pProxyCommitData->commitBatchesMemBytesCount -= self->currentBatchMemBytesCount;
ASSERT_ABORT(pProxyCommitData->commitBatchesMemBytesCount >= 0);
wait(self->releaseFuture);
pProxyCommitData->stats.replyCommitDist->sampleSeconds(now() - replyStart);
return Void();
}
@ -1856,7 +1876,10 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
commitData.resolvers = commitData.db->get().resolvers;
ASSERT(commitData.resolvers.size() != 0);
for (int i = 0; i < commitData.resolvers.size(); ++i) {
commitData.stats.resolverDist.push_back(Histogram::getHistogram(
LiteralStringRef("CommitProxy"), "ToResolver_" + commitData.resolvers[i].id().toString(), Histogram::Unit::microseconds));
}
auto rs = commitData.keyResolvers.modify(allKeys);
for (auto r = rs.begin(); r != rs.end(); ++r)
r->value().emplace_back(0, 0);

View File

@ -57,6 +57,8 @@ struct GrvProxyStats {
Deque<int> requestBuckets;
double lastBucketBegin;
double bucketInterval;
Reference<Histogram> grvConfirmEpochLiveDist;
Reference<Histogram> grvGetCommittedVersionRpcDist;
void updateRequestBuckets() {
while (now() - lastBucketBegin > bucketInterval) {
@ -112,7 +114,13 @@ struct GrvProxyStats {
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
recentRequests(0), lastBucketBegin(now()),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS) {
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
grvConfirmEpochLiveDist(Histogram::getHistogram(LiteralStringRef("GrvProxy"),
LiteralStringRef("GrvConfirmEpochLive"),
Histogram::Unit::microseconds)),
grvGetCommittedVersionRpcDist(Histogram::getHistogram(LiteralStringRef("GrvProxy"),
LiteralStringRef("GrvGetCommittedVersionRpc"),
Histogram::Unit::microseconds)) {
// The rate at which the limit(budget) is allowed to grow.
specialCounter(cc, "SystemGRVQueueSize", [this]() { return this->systemGRVQueueSize; });
specialCounter(cc, "DefaultGRVQueueSize", [this]() { return this->defaultGRVQueueSize; });
@ -526,6 +534,8 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
// and no other proxy could have already committed anything without first ending the epoch
state Span span("GP:getLiveCommittedVersion"_loc, parentSpan);
++grvProxyData->stats.txnStartBatch;
state double grvStart = now();
state Future<GetRawCommittedVersionReply> replyFromMasterFuture;
replyFromMasterFuture = grvProxyData->master.getLiveCommittedVersion.getReply(
GetRawCommittedVersionRequest(span.context, debugID), TaskPriority::GetLiveCommittedVersionReply);
@ -537,6 +547,8 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
wait(grvProxyData->lastCommitTime.whenAtLeast(now() - SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION));
}
state double grvConfirmEpochLive = now();
grvProxyData->stats.grvConfirmEpochLiveDist->sampleSeconds(grvConfirmEpochLive - grvStart);
if (debugID.present()) {
g_traceBatch.addEvent(
"TransactionDebug", debugID.get().first(), "GrvProxyServer.getLiveCommittedVersion.confirmEpochLive");
@ -546,6 +558,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
grvProxyData->minKnownCommittedVersion =
std::max(grvProxyData->minKnownCommittedVersion, repFromMaster.minKnownCommittedVersion);
grvProxyData->stats.grvGetCommittedVersionRpcDist->sampleSeconds(now() - grvConfirmEpochLive);
GetReadVersionReply rep;
rep.version = repFromMaster.version;
rep.locked = repFromMaster.locked;

View File

@ -31,6 +31,7 @@
#include "fdbserver/MutationTracking.h"
#include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/Histogram.h"
#include "flow/IndexedSet.h"
#include "flow/Knobs.h"
#include "fdbrpc/ReplicationPolicy.h"
@ -57,6 +58,7 @@ public:
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logRouters;
std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers;
std::vector<Reference<ConnectionResetInfo>> connectionResetTrackers;
std::vector<Reference<Histogram>> tlogPushDistTrackers;
int32_t tLogWriteAntiQuorum;
int32_t tLogReplicationFactor;
std::vector<LocalityData> tLogLocalities; // Stores the localities of the log servers

View File

@ -74,6 +74,15 @@ struct ProxyStats {
int64_t maxComputeNS;
int64_t minComputeNS;
Reference<Histogram> commitBatchQueuingDist;
Reference<Histogram> getCommitVersionDist;
std::vector<Reference<Histogram>> resolverDist;
Reference<Histogram> resolutionDist;
Reference<Histogram> postResolutionDist;
Reference<Histogram> processingMutationDist;
Reference<Histogram> tlogLoggingDist;
Reference<Histogram> replyCommitDist;
int64_t getAndResetMaxCompute() {
int64_t r = maxComputeNS;
maxComputeNS = 0;
@ -113,7 +122,28 @@ struct ProxyStats {
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
maxComputeNS(0), minComputeNS(1e12) {
maxComputeNS(0), minComputeNS(1e12),
commitBatchQueuingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("CommitBatchQueuing"),
Histogram::Unit::microseconds)),
getCommitVersionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("GetCommitVersion"),
Histogram::Unit::microseconds)),
resolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("Resolution"),
Histogram::Unit::microseconds)),
postResolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("PostResolutionQueuing"),
Histogram::Unit::microseconds)),
processingMutationDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("ProcessingMutation"),
Histogram::Unit::microseconds)),
tlogLoggingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("TlogLogging"),
Histogram::Unit::microseconds)),
replyCommitDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("ReplyCommit"),
Histogram::Unit::microseconds)) {
specialCounter(cc, "LastAssignedCommitVersion", [this]() { return this->lastCommitVersionAssigned; });
specialCounter(cc, "Version", [pVersion]() { return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); });

View File

@ -527,6 +527,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
}
ACTOR static Future<TLogCommitReply> recordPushMetrics(Reference<ConnectionResetInfo> self,
Reference<Histogram> dist,
NetworkAddress addr,
Future<TLogCommitReply> in) {
state double startTime = now();
@ -541,6 +542,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
self->fastReplies++;
}
}
dist->sampleSeconds(now() - startTime);
return t;
}
@ -563,12 +565,21 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
it->connectionResetTrackers.push_back(makeReference<ConnectionResetInfo>());
}
}
if (it->tlogPushDistTrackers.empty()) {
for (int i = 0; i < it->logServers.size(); i++) {
it->tlogPushDistTrackers.push_back(
Histogram::getHistogram("ToTlog_" + it->logServers[i]->get().interf().uniqueID.toString(),
it->logServers[i]->get().interf().address().toString(),
Histogram::Unit::microseconds));
}
}
vector<Future<Void>> tLogCommitResults;
for (int loc = 0; loc < it->logServers.size(); loc++) {
Standalone<StringRef> msg = data.getMessages(location);
data.recordEmptyMessage(location, msg);
allReplies.push_back(recordPushMetrics(
it->connectionResetTrackers[loc],
it->tlogPushDistTrackers[loc],
it->logServers[loc]->get().interf().address(),
it->logServers[loc]->get().interf().commit.getReply(TLogCommitRequest(spanContext,
msg.arena(),

View File

@ -219,6 +219,7 @@ public:
template <class U>
Optional(const U& t) : impl(std::in_place, t) {}
Optional(T&& t) : impl(std::in_place, std::move(t)) {}
/* This conversion constructor was nice, but combined with the prior constructor it means that Optional<int> can be
converted to Optional<Optional<int>> in the wrong way (a non-present Optional<int> converts to a non-present

View File

@ -117,10 +117,12 @@ void Histogram::writeToLog() {
TraceEvent e(SevInfo, "Histogram");
e.detail("Group", group).detail("Op", op).detail("Unit", UnitToStringMapper[(size_t)unit]);
int totalCount = 0;
for (uint32_t i = 0; i < 32; i++) {
uint64_t value = uint64_t(1) << (i + 1);
if (buckets[i]) {
totalCount += buckets[i];
switch (unit) {
case Unit::microseconds:
e.detail(format("LessThan%u.%03u", value / 1000, value % 1000), buckets[i]);
@ -140,6 +142,7 @@ void Histogram::writeToLog() {
}
}
}
e.detail("TotalCount", totalCount);
}
std::string Histogram::drawHistogram() {

View File

@ -90,6 +90,7 @@ EncryptionStreamCipher::EncryptionStreamCipher(const StreamCipher::Key& key, con
}
StringRef EncryptionStreamCipher::encrypt(unsigned char const* plaintext, int len, Arena& arena) {
TEST(true); // Encrypting data with StreamCipher
auto ciphertext = new (arena) unsigned char[len + AES_BLOCK_SIZE];
int bytes{ 0 };
EVP_EncryptUpdate(cipher.getCtx(), ciphertext, &bytes, plaintext, len);
@ -110,6 +111,7 @@ DecryptionStreamCipher::DecryptionStreamCipher(const StreamCipher::Key& key, con
}
StringRef DecryptionStreamCipher::decrypt(unsigned char const* ciphertext, int len, Arena& arena) {
TEST(true); // Decrypting data with StreamCipher
auto plaintext = new (arena) unsigned char[len];
int bytesDecrypted{ 0 };
EVP_DecryptUpdate(cipher.getCtx(), plaintext, &bytesDecrypted, ciphertext, len);

View File

@ -52,6 +52,7 @@ class ThreadSafeQueue : NonCopyable {
struct Node : BaseNode, FastAllocated<Node> {
T data;
Node(T const& data) : data(data) {}
Node(T&& data) : data(std::move(data)) {}
};
std::atomic<BaseNode*> head;
BaseNode* tail;
@ -131,9 +132,9 @@ public:
}
// If push() returns true, the consumer may be sleeping and should be woken
bool push(T const& data) {
Node* n = new Node(data);
n->data = data;
template <class U>
bool push(U&& data) {
Node* n = new Node(std::forward<U>(data));
return pushNode(n) == &sleeping;
}