Merge branch 'master' into fix-threadsafedatabase-safety

# Conflicts:
#	fdbclient/NativeAPI.actor.cpp
This commit is contained in:
A.J. Beamon 2019-03-14 14:50:14 -07:00
commit 59b41b03ed
24 changed files with 513 additions and 115 deletions

View File

@ -334,7 +334,7 @@ The ``expire`` subcommand will remove data from a backup prior to some point in
The expiration CUTOFF must be specified by one of the two following arguments:
``--expire_before_timestamp <DATETIME>``
Specifies the expiration cutoff to DATETIME. Requires a cluster file and will use version/timestamp metadata in the database to convert DATETIME to a database commit version. DATETIME must be in the form "YYYY-MM-DD.HH:MI:SS" in UTC.
Specifies the expiration cutoff to DATETIME. Requires a cluster file and will use version/timestamp metadata in the database to convert DATETIME to a database commit version. DATETIME must be in the form "YYYY/MM/DD.HH:MI:SS+hhmm", for example "2018/12/31.23:59:59-0800".
``--expire_before_version <VERSION>``
Specifies the cutoff by a database commit version.
@ -342,7 +342,7 @@ The expiration CUTOFF must be specified by one of the two following arguments:
Optionally, the user can specify a minimum RESTORABILITY guarantee with one of the following options.
``--restorable_after_timestamp <DATETIME>``
Specifies that the backup must be restorable to DATETIME and later. Requires a cluster file and will use version/timestamp metadata in the database to convert DATETIME to a database commit version. DATETIME must be in the form "YYYY-MM-DD.HH:MI:SS" in UTC.
Specifies that the backup must be restorable to DATETIME and later. Requires a cluster file and will use version/timestamp metadata in the database to convert DATETIME to a database commit version. DATETIME must be in the form "YYYY/MM/DD.HH:MI:SS+hhmm", for example "2018/12/31.23:59:59-0800".
``--restorable_after_version <VERSION>``
Specifies that the backup must be restorable as of VERSION and later.
@ -446,8 +446,8 @@ The ``start`` command will start a new restore on the specified (or default) tag
``-v <VERSION>``
Instead of the latest version the backup can be restored to, restore to VERSION.
``--timestamp <YYYY-MM-DD.HH:MI:SS>``
Instead of the latest version the backup can be restored to, restore to a version from approximately the given timestamp. Requires orig_cluster_file to be specified.
``--timestamp <DATETIME>``
Instead of the latest version the backup can be restored to, restore to a version from approximately the given timestamp. Requires orig_cluster_file to be specified. DATETIME must be in the form "YYYY/MM/DD.HH:MI:SS+hhmm", for example "2018/12/31.23:59:59-0800".
``--orig_cluster_file <CONNFILE>``
The cluster file for the original database from which the backup was created. The original database is only needed to convert a --timestamp argument to a database version.

View File

@ -80,7 +80,8 @@ The following format informally describes the JSON containing the status data. T
"connected_clients": [
{
"address": "127.0.0.1:1234",
"log_group": "default"
"log_group": "default",
"connected_coordinators": 2
}
],
"count": 1,

View File

@ -11,6 +11,7 @@ Features
* Added background actor to remove redundant teams from team collection so that the healthy team number is guaranteed not exceeding the desired number. `(PR #1139) <https://github.com/apple/foundationdb/pull/1139>`_
* Show the number of connected coordinators per client in JSON status `(PR #1222) <https://github.com/apple/foundationdb/pull/1222>`_
* Get read version, read, and commit requests are counted and aggregated by server-side latency in configurable latency bands and output in JSON status. `(PR #1084) <https://github.com/apple/foundationdb/pull/1084>`_
* Added configuration option to choose log spilling implementation `(PR #1160) <https://github.com/apple/foundationdb/pull/1160>`_
@ -18,6 +19,7 @@ Features
* Batch priority transactions are now limited separately by ratekeeper and will be throttled at lower levels of cluster saturation. This makes it possible to run a more intense background load at saturation without significantly affecting normal priority transactions. It is still recommended not to run excessive loads at batch priority. `(PR #1198) <https://github.com/apple/foundationdb/pull/1198>`_
* Restore now requires the destnation cluster to be specified explicitly to avoid confusion. `(PR #1240) <https://github.com/apple/foundationdb/pull/1240>`_
* Restore target version can now be specified by timestamp if the original cluster is available. `(PR #1240) <https://github.com/apple/foundationdb/pull/1240>`_
* Backup status and describe commands now have a --json output option. `(PR #1248) <https://github.com/apple/foundationdb/pull/1248>`_
* Separate data distribution out from master as a new role. `(PR #1062) <https://github.com/apple/foundationdb/pull/1062>`_
* Separate rate keeper out from data distribution as a new role. `(PR ##1176) <https://github.com/apple/foundationdb/pull/1176>`_
* Added a new atomic op `CompareAndClear`. `(PR #1105) <https://github.com/apple/foundationdb/pull/1105>`_
@ -33,6 +35,8 @@ Fixes
-----
* Python: Creating a ``SingleFloat`` for the tuple layer didn't work with integers. `(PR #1216) <https://github.com/apple/foundationdb/pull/1216>`_
* Standardized datetime string format across all backup and restore command options and outputs. `(PR #1248) <https://github.com/apple/foundationdb/pull/1248>`_
* Added `USE_EIO_FILE` knob to fallback to libeio instead of kernel async I/O (KAIO) for systems that do not support KAIO or O_DIRECT flag. `(PR #1283) https://github.com/apple/foundationdb/pull/1283`_
* Added `DISABLE_POSIX_KERNEL_AIO` knob to fallback to libeio instead of kernel async I/O (KAIO) for systems that do not support KAIO or O_DIRECT flag. `(PR #1283) https://github.com/apple/foundationdb/pull/1283`_
Status

View File

@ -94,7 +94,7 @@ enum {
OPT_EXPIRE_BEFORE_VERSION, OPT_EXPIRE_BEFORE_DATETIME, OPT_EXPIRE_DELETE_BEFORE_DAYS,
OPT_EXPIRE_RESTORABLE_AFTER_VERSION, OPT_EXPIRE_RESTORABLE_AFTER_DATETIME, OPT_EXPIRE_MIN_RESTORABLE_DAYS,
OPT_BASEURL, OPT_BLOB_CREDENTIALS, OPT_DESCRIBE_DEEP, OPT_DESCRIBE_TIMESTAMPS,
OPT_DUMP_BEGIN, OPT_DUMP_END,
OPT_DUMP_BEGIN, OPT_DUMP_END, OPT_JSON,
// Backup and Restore constants
OPT_TAGNAME, OPT_BACKUPKEYS, OPT_WAITFORDONE,
@ -251,6 +251,7 @@ CSimpleOpt::SOption g_rgBackupStatusOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_JSON, "--json", SO_NONE},
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -470,6 +471,7 @@ CSimpleOpt::SOption g_rgBackupDescribeOptions[] = {
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_DESCRIBE_DEEP, "--deep", SO_NONE },
{ OPT_DESCRIBE_TIMESTAMPS, "--version_timestamps", SO_NONE },
{ OPT_JSON, "--json", SO_NONE},
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -875,7 +877,7 @@ static void printBackupUsage(bool devhelp) {
" File containing blob credentials in JSON format. Can be specified multiple times for multiple files. See below for more details.\n");
printf(" --expire_before_timestamp DATETIME\n"
" Datetime cutoff for expire operations. Requires a cluster file and will use version/timestamp metadata\n"
" in the database to obtain a cutoff version very close to the timestamp given in YYYY-MM-DD.HH:MI:SS format (UTC).\n");
" in the database to obtain a cutoff version very close to the timestamp given in %s.\n", BackupAgentBase::timeFormat().c_str());
printf(" --expire_before_version VERSION\n"
" Version cutoff for expire operations. Deletes data files containing no data at or after VERSION.\n");
printf(" --delete_before_days NUM_DAYS\n"
@ -953,7 +955,7 @@ static void printRestoreUsage(bool devhelp ) {
printf(TLS_HELP);
#endif
printf(" -v DBVERSION The version at which the database will be restored.\n");
printf(" --timestamp Instead of a numeric version, use this to specify a timestamp in YYYY-MM-DD.HH:MI:SS format (UTC)\n");
printf(" --timestamp Instead of a numeric version, use this to specify a timestamp in %s\n", BackupAgentBase::timeFormat().c_str());
printf(" and it will be converted to a version from that time using metadata in orig_cluster_file.\n");
printf(" --orig_cluster_file CONNFILE\n");
printf(" The cluster file for the original database from which the backup was created. The original database\n");
@ -1296,8 +1298,8 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
tagRoot.create("current_status") = statusText;
tagRoot.create("last_restorable_version") = tagLastRestorableVersions[j].get();
tagRoot.create("last_restorable_seconds_behind") = last_restorable_seconds_behind;
tagRoot.create("running_backup") = (status == BackupAgentBase::STATE_DIFFERENTIAL || status == BackupAgentBase::STATE_BACKUP);
tagRoot.create("running_backup_is_restorable") = (status == BackupAgentBase::STATE_DIFFERENTIAL);
tagRoot.create("running_backup") = (status == BackupAgentBase::STATE_RUNNING_DIFFERENTIAL || status == BackupAgentBase::STATE_RUNNING);
tagRoot.create("running_backup_is_restorable") = (status == BackupAgentBase::STATE_RUNNING_DIFFERENTIAL);
tagRoot.create("range_bytes_written") = tagRangeBytes[j].get();
tagRoot.create("mutation_log_bytes_written") = tagLogBytes[j].get();
tagRoot.create("mutation_stream_id") = backupTagUids[j].toString();
@ -1340,8 +1342,8 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
BackupAgentBase::enumState status = (BackupAgentBase::enumState)backupStatus[i].get();
JSONDoc tagRoot = tagsRoot.create(tagName);
tagRoot.create("running_backup") = (status == BackupAgentBase::STATE_DIFFERENTIAL || status == BackupAgentBase::STATE_BACKUP);
tagRoot.create("running_backup_is_restorable") = (status == BackupAgentBase::STATE_DIFFERENTIAL);
tagRoot.create("running_backup") = (status == BackupAgentBase::STATE_RUNNING_DIFFERENTIAL || status == BackupAgentBase::STATE_RUNNING);
tagRoot.create("running_backup_is_restorable") = (status == BackupAgentBase::STATE_RUNNING_DIFFERENTIAL);
tagRoot.create("range_bytes_written") = tagRangeBytesDR[i].get();
tagRoot.create("mutation_log_bytes_written") = tagLogBytesDR[i].get();
tagRoot.create("mutation_stream_id") = drTagUids[i].toString();
@ -1748,12 +1750,12 @@ ACTOR Future<Void> statusDBBackup(Database src, Database dest, std::string tagNa
return Void();
}
ACTOR Future<Void> statusBackup(Database db, std::string tagName, bool showErrors) {
ACTOR Future<Void> statusBackup(Database db, std::string tagName, bool showErrors, bool json) {
try
{
state FileBackupAgent backupAgent;
std::string statusText = wait(backupAgent.getStatus(db, showErrors, tagName));
std::string statusText = wait(json ? backupAgent.getStatusJSON(db, tagName) : backupAgent.getStatus(db, showErrors, tagName));
printf("%s\n", statusText.c_str());
}
catch (Error& e) {
@ -2163,13 +2165,13 @@ ACTOR Future<Void> deleteBackupContainer(const char *name, std::string destinati
return Void();
}
ACTOR Future<Void> describeBackup(const char *name, std::string destinationContainer, bool deep, Optional<Database> cx) {
ACTOR Future<Void> describeBackup(const char *name, std::string destinationContainer, bool deep, Optional<Database> cx, bool json) {
try {
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
state BackupDescription desc = wait(c->describeBackup(deep));
if(cx.present())
wait(desc.resolveVersionTimes(cx.get()));
printf("%s\n", desc.toString().c_str());
printf("%s\n", (json ? desc.toJSON() : desc.toString()).c_str());
}
catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
@ -2685,6 +2687,7 @@ int main(int argc, char* argv[]) {
Version dumpEnd = std::numeric_limits<Version>::max();
std::string restoreClusterFileDest;
std::string restoreClusterFileOrig;
bool jsonOutput = false;
BackupModifyOptions modifyOptions;
@ -2998,6 +3001,9 @@ int main(int argc, char* argv[]) {
case OPT_DUMP_END:
dumpEnd = parseVersion(args->OptionArg());
break;
case OPT_JSON:
jsonOutput = true;
break;
}
}
@ -3308,7 +3314,7 @@ int main(int argc, char* argv[]) {
case BACKUP_STATUS:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( statusBackup(db, tagName, true) );
f = stopAfter( statusBackup(db, tagName, true, jsonOutput) );
break;
case BACKUP_ABORT:
@ -3363,7 +3369,7 @@ int main(int argc, char* argv[]) {
return FDB_EXIT_ERROR;
// Only pass database optionDatabase Describe will lookup version timestamps if a cluster file was given, but quietly skip them if not.
f = stopAfter( describeBackup(argv[0], destinationContainer, describeDeep, describeTimestamps ? Optional<Database>(db) : Optional<Database>()) );
f = stopAfter( describeBackup(argv[0], destinationContainer, describeDeep, describeTimestamps ? Optional<Database>(db) : Optional<Database>(), jsonOutput) );
break;
case BACKUP_LIST:

View File

@ -38,13 +38,35 @@
class BackupAgentBase : NonCopyable {
public:
// Time formatter for anything backup or restore related
static std::string formatTime(int64_t epochs) {
time_t curTime = (time_t)epochs;
char buffer[128];
struct tm timeinfo;
getLocalTime(&curTime, &timeinfo);
strftime(buffer, 128, "%Y/%m/%d.%H:%M:%S%z", &timeinfo);
return buffer;
}
static std::string timeFormat() {
return "YYYY/MM/DD.HH:MI:SS[+/-]HHMM";
}
static int64_t parseTime(std::string timestamp) {
struct tm out;
if (strptime(timestamp.c_str(), "%Y/%m/%d.%H:%M:%S%z", &out) == nullptr) {
return -1;
}
return (int64_t) mktime(&out);
}
// Type of program being executed
enum enumActionResult {
RESULT_SUCCESSFUL = 0, RESULT_ERRORED = 1, RESULT_DUPLICATE = 2, RESULT_UNNEEDED = 3
};
enum enumState {
STATE_ERRORED = 0, STATE_SUBMITTED = 1, STATE_BACKUP = 2, STATE_DIFFERENTIAL = 3, STATE_COMPLETED = 4, STATE_NEVERRAN = 5, STATE_ABORTED = 6, STATE_PARTIALLY_ABORTED = 7
STATE_ERRORED = 0, STATE_SUBMITTED = 1, STATE_RUNNING = 2, STATE_RUNNING_DIFFERENTIAL = 3, STATE_COMPLETED = 4, STATE_NEVERRAN = 5, STATE_ABORTED = 6, STATE_PARTIALLY_ABORTED = 7
};
static const Key keyFolderId;
@ -90,11 +112,11 @@ public:
}
else if (!stateText.compare("has been started")) {
enState = STATE_BACKUP;
enState = STATE_RUNNING;
}
else if (!stateText.compare("is differential")) {
enState = STATE_DIFFERENTIAL;
enState = STATE_RUNNING_DIFFERENTIAL;
}
else if (!stateText.compare("has been completed")) {
@ -112,7 +134,7 @@ public:
return enState;
}
// Convert the status text to an enumerated value
// Convert the status enum to a text description
static const char* getStateText(enumState enState)
{
const char* stateText;
@ -128,10 +150,10 @@ public:
case STATE_SUBMITTED:
stateText = "has been submitted";
break;
case STATE_BACKUP:
case STATE_RUNNING:
stateText = "has been started";
break;
case STATE_DIFFERENTIAL:
case STATE_RUNNING_DIFFERENTIAL:
stateText = "is differential";
break;
case STATE_COMPLETED:
@ -151,6 +173,45 @@ public:
return stateText;
}
// Convert the status enum to a name
static const char* getStateName(enumState enState)
{
const char* s;
switch (enState)
{
case STATE_ERRORED:
s = "Errored";
break;
case STATE_NEVERRAN:
s = "NeverRan";
break;
case STATE_SUBMITTED:
s = "Submitted";
break;
case STATE_RUNNING:
s = "Running";
break;
case STATE_RUNNING_DIFFERENTIAL:
s = "RunningDifferentially";
break;
case STATE_COMPLETED:
s = "Completed";
break;
case STATE_ABORTED:
s = "Aborted";
break;
case STATE_PARTIALLY_ABORTED:
s = "Aborting";
break;
default:
s = "<undefined>";
break;
}
return s;
}
// Determine if the specified state is runnable
static bool isRunnable(enumState enState)
{
@ -159,8 +220,8 @@ public:
switch (enState)
{
case STATE_SUBMITTED:
case STATE_BACKUP:
case STATE_DIFFERENTIAL:
case STATE_RUNNING:
case STATE_RUNNING_DIFFERENTIAL:
case STATE_PARTIALLY_ABORTED:
isRunnable = true;
break;
@ -179,6 +240,7 @@ public:
return defaultTagName;
}
// This is only used for automatic backup name generation
static Standalone<StringRef> getCurrentTime() {
double t = now();
time_t curTime = t;
@ -283,6 +345,7 @@ public:
}
Future<std::string> getStatus(Database cx, bool showErrors, std::string tagName);
Future<std::string> getStatusJSON(Database cx, std::string tagName);
Future<Version> getLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName);
void setLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName, Version version);
@ -679,6 +742,14 @@ public:
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
KeyBackedProperty<int64_t> snapshotDispatchLastShardsBehind() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
KeyBackedProperty<Version> snapshotDispatchLastVersion() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
Future<Void> initNewSnapshot(Reference<ReadYourWritesTransaction> tr, int64_t intervalSeconds = -1) {
BackupConfig &copy = *this; // Capture this by value instead of this ptr
@ -702,6 +773,8 @@ public:
copy.snapshotBeginVersion().set(tr, beginVersion.get());
copy.snapshotTargetEndVersion().set(tr, endVersion);
copy.snapshotRangeFileCount().set(tr, 0);
copy.snapshotDispatchLastVersion().clear(tr);
copy.snapshotDispatchLastShardsBehind().clear(tr);
return Void();
});

View File

@ -19,6 +19,8 @@
*/
#include "fdbclient/BackupContainer.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/JsonBuilder.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "flow/Hash3.h"
@ -68,15 +70,6 @@ void BackupFileList::toStream(FILE *fout) const {
}
}
std::string formatTime(int64_t t) {
time_t curTime = (time_t)t;
char buffer[128];
struct tm timeinfo;
getLocalTime(&curTime, &timeinfo);
strftime(buffer, 128, "%Y-%m-%d %H:%M:%S", &timeinfo);
return buffer;
}
Future<Void> fetchTimes(Reference<ReadYourWritesTransaction> tr, std::map<Version, int64_t> *pVersionTimeMap) {
std::vector<Future<Void>> futures;
@ -127,7 +120,7 @@ std::string BackupDescription::toString() const {
if(!versionTimeMap.empty()) {
auto i = versionTimeMap.find(v);
if(i != versionTimeMap.end())
s = format("%lld (%s)", v, formatTime(i->second).c_str());
s = format("%lld (%s)", v, BackupAgentBase::formatTime(i->second).c_str());
else
s = format("%lld (unknown)", v);
}
@ -142,8 +135,8 @@ std::string BackupDescription::toString() const {
};
for(const KeyspaceSnapshotFile &m : snapshots) {
info.append(format("Snapshot: startVersion=%s endVersion=%s totalBytes=%lld restorable=%s\n",
formatVersion(m.beginVersion).c_str(), formatVersion(m.endVersion).c_str(), m.totalSize, m.restorable.orDefault(false) ? "true" : "false"));
info.append(format("Snapshot: startVersion=%s endVersion=%s totalBytes=%lld restorable=%s expiredPct=%.2f\n",
formatVersion(m.beginVersion).c_str(), formatVersion(m.endVersion).c_str(), m.totalSize, m.restorable.orDefault(false) ? "true" : "false", m.expiredPct(expiredEndVersion)));
}
info.append(format("SnapshotBytes: %lld\n", snapshotBytes));
@ -169,6 +162,65 @@ std::string BackupDescription::toString() const {
return info;
}
std::string BackupDescription::toJSON() const {
JsonBuilderObject doc;
doc.setKey("SchemaVersion", "1.0.0");
doc.setKey("URL", url.c_str());
doc.setKey("Restorable", maxRestorableVersion.present());
auto formatVersion = [&](Version v) {
JsonBuilderObject doc;
doc.setKey("Version", v);
if(!versionTimeMap.empty()) {
auto i = versionTimeMap.find(v);
if(i != versionTimeMap.end()) {
doc.setKey("Timestamp", BackupAgentBase::formatTime(i->second));
doc.setKey("EpochSeconds", i->second);
}
}
else if(maxLogEnd.present()) {
double days = double(v - maxLogEnd.get()) / (CLIENT_KNOBS->CORE_VERSIONSPERSECOND * 24 * 60 * 60);
doc.setKey("RelativeDays", days);
}
return doc;
};
JsonBuilderArray snapshotsArray;
for(const KeyspaceSnapshotFile &m : snapshots) {
JsonBuilderObject snapshotDoc;
snapshotDoc.setKey("Start", formatVersion(m.beginVersion));
snapshotDoc.setKey("End", formatVersion(m.endVersion));
snapshotDoc.setKey("Restorable", m.restorable.orDefault(false));
snapshotDoc.setKey("TotalBytes", m.totalSize);
snapshotDoc.setKey("PercentageExpired", m.expiredPct(expiredEndVersion));
snapshotsArray.push_back(snapshotDoc);
}
doc.setKey("Snapshots", snapshotsArray);
doc.setKey("TotalSnapshotBytes", snapshotBytes);
if(expiredEndVersion.present())
doc.setKey("ExpiredEnd", formatVersion(expiredEndVersion.get()));
if(unreliableEndVersion.present())
doc.setKey("UnreliableEnd", formatVersion(unreliableEndVersion.get()));
if(minLogBegin.present())
doc.setKey("MinLogBegin", formatVersion(minLogBegin.get()));
if(contiguousLogEnd.present())
doc.setKey("ContiguousLogEnd", formatVersion(contiguousLogEnd.get()));
if(maxLogEnd.present())
doc.setKey("MaxLogEnd", formatVersion(maxLogEnd.get()));
if(minRestorableVersion.present())
doc.setKey("MinRestorablePoint", formatVersion(minRestorableVersion.get()));
if(maxRestorableVersion.present())
doc.setKey("MaxRestorablePoint", formatVersion(maxRestorableVersion.get()));
if(!extendedDetail.empty())
doc.setKey("ExtendedDetail", extendedDetail);
return doc.getJson();
}
/* BackupContainerFileSystem implements a backup container which stores files in a nested folder structure.
* Inheritors must only defined methods for writing, reading, deleting, sizing, and listing files.
*
@ -1578,20 +1630,11 @@ ACTOR Future<Version> timeKeeperVersionFromDatetime(std::string datetime, Databa
state KeyBackedMap<int64_t, Version> versionMap(timeKeeperPrefixRange.begin);
state Reference<ReadYourWritesTransaction> tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(db));
int year, month, day, hour, minute, second;
if (sscanf(datetime.c_str(), "%d-%d-%d.%d:%d:%d", &year, &month, &day, &hour, &minute, &second) != 6) {
fprintf(stderr, "ERROR: Incorrect date/time format.\n");
state int64_t time = BackupAgentBase::parseTime(datetime);
if(time < 0) {
fprintf(stderr, "ERROR: Incorrect date/time or format. Format is %s.\n", BackupAgentBase::timeFormat().c_str());
throw backup_error();
}
struct tm expDateTime = {0};
expDateTime.tm_year = year - 1900;
expDateTime.tm_mon = month - 1;
expDateTime.tm_mday = day;
expDateTime.tm_hour = hour;
expDateTime.tm_min = minute;
expDateTime.tm_sec = second;
expDateTime.tm_isdst = -1;
state int64_t time = (int64_t) mktime(&expDateTime);
loop {
try {

View File

@ -89,6 +89,21 @@ struct KeyspaceSnapshotFile {
std::string fileName;
int64_t totalSize;
Optional<bool> restorable; // Whether or not the snapshot can be used in a restore, if known
bool isSingleVersion() const {
return beginVersion == endVersion;
}
double expiredPct(Optional<Version> expiredEnd) const {
double pctExpired = 0;
if(expiredEnd.present() && expiredEnd.get() > beginVersion) {
if(isSingleVersion()) {
pctExpired = 1;
}
else {
pctExpired = double(std::min(endVersion, expiredEnd.get()) - beginVersion) / (endVersion - beginVersion);
}
}
return pctExpired * 100;
}
// Order by beginVersion, break ties with endVersion
bool operator< (const KeyspaceSnapshotFile &rhs) const {
@ -132,6 +147,7 @@ struct BackupDescription {
std::map<Version, int64_t> versionTimeMap;
std::string toString() const;
std::string toJSON() const;
};
struct RestorableFileSet {

View File

@ -119,13 +119,14 @@ struct OpenDatabaseRequest {
Arena arena;
StringRef issues, traceLogGroup;
VectorRef<ClientVersionRef> supportedVersions;
int connectedCoordinatorsNum; // Number of coordinators connected by the client
UID knownClientInfoID;
ReplyPromise< struct ClientDBInfo > reply;
template <class Ar>
void serialize(Ar& ar) {
ASSERT( ar.protocolVersion() >= 0x0FDB00A400040001LL );
serializer(ar, issues, supportedVersions, traceLogGroup, knownClientInfoID, reply, arena);
serializer(ar, issues, supportedVersions, connectedCoordinatorsNum, traceLogGroup, knownClientInfoID, reply, arena);
}
};

View File

@ -1376,7 +1376,7 @@ namespace dbBackup {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.addReadConflictRange(singleKeyRange(sourceStates.pack(DatabaseBackupAgent::keyStateStatus)));
tr.set(sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_DIFFERENTIAL)));
tr.set(sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_RUNNING_DIFFERENTIAL)));
Key versionKey = task->params[DatabaseBackupAgent::keyConfigLogUid].withPrefix(task->params[BackupAgentBase::destUid]).withPrefix(backupLatestVersionsPrefix);
Optional<Key> prevBeginVersion = wait(tr.get(versionKey));
@ -1418,7 +1418,7 @@ namespace dbBackup {
wait(success(FinishedFullBackupTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal())));
}
else { // Start the writing of logs, if differential
tr->set(states.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_DIFFERENTIAL)));
tr->set(states.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_RUNNING_DIFFERENTIAL)));
allPartsDone = futureBucket->future(tr);
@ -1544,7 +1544,7 @@ namespace dbBackup {
srcTr2->set( Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceTagName).pack(task->params[BackupAgentBase::keyTagName]), logUidValue );
srcTr2->set( sourceStates.pack(DatabaseBackupAgent::keyFolderId), task->params[DatabaseBackupAgent::keyFolderId] );
srcTr2->set( sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_BACKUP)));
srcTr2->set( sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_RUNNING)));
state Key destPath = destUidValue.withPrefix(backupLogKeys.begin);
// Start logging the mutations for the specified ranges of the tag
@ -1587,7 +1587,7 @@ namespace dbBackup {
tr->set(logUidValue.withPrefix(applyMutationsBeginRange.begin), BinaryWriter::toValue(beginVersion, Unversioned()));
tr->set(logUidValue.withPrefix(applyMutationsEndRange.begin), BinaryWriter::toValue(beginVersion, Unversioned()));
tr->set(states.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_BACKUP)));
tr->set(states.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_RUNNING)));
state Reference<TaskFuture> kvBackupRangeComplete = futureBucket->future(tr);
state Reference<TaskFuture> kvBackupComplete = futureBucket->future(tr);
@ -1791,7 +1791,7 @@ public:
}
// Break, if in differential mode (restorable) and stopWhenDone is not enabled
if ((!stopWhenDone) && (BackupAgentBase::STATE_DIFFERENTIAL == status)) {
if ((!stopWhenDone) && (BackupAgentBase::STATE_RUNNING_DIFFERENTIAL == status)) {
return status;
}
@ -1954,7 +1954,7 @@ public:
state int status = wait(backupAgent->getStateValue(dest, destlogUid));
TraceEvent("DBA_SwitchoverStart").detail("Status", status);
if (status != BackupAgentBase::STATE_DIFFERENTIAL && status != BackupAgentBase::STATE_COMPLETED) {
if (status != BackupAgentBase::STATE_RUNNING_DIFFERENTIAL && status != BackupAgentBase::STATE_COMPLETED) {
throw backup_duplicate();
}
@ -2311,10 +2311,10 @@ public:
case BackupAgentBase::STATE_SUBMITTED:
statusText += "The DR on tag `" + tagNameDisplay + "' is NOT a complete copy of the primary database (just started).\n";
break;
case BackupAgentBase::STATE_BACKUP:
case BackupAgentBase::STATE_RUNNING:
statusText += "The DR on tag `" + tagNameDisplay + "' is NOT a complete copy of the primary database.\n";
break;
case BackupAgentBase::STATE_DIFFERENTIAL:
case BackupAgentBase::STATE_RUNNING_DIFFERENTIAL:
statusText += "The DR on tag `" + tagNameDisplay + "' is a complete copy of the primary database.\n";
break;
case BackupAgentBase::STATE_COMPLETED:

View File

@ -24,6 +24,7 @@
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/Status.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/JsonBuilder.h"
#include <ctime>
#include <climits>
@ -46,15 +47,10 @@ static std::string versionToString(Optional<Version> version) {
return "N/A";
}
static std::string timeStampToString(Optional<int64_t> ts) {
if (!ts.present())
static std::string timeStampToString(Optional<int64_t> epochs) {
if (!epochs.present())
return "N/A";
time_t curTs = ts.get();
char buffer[128];
struct tm* timeinfo;
timeinfo = localtime(&curTs);
strftime(buffer, 128, "%D %T", timeinfo);
return std::string(buffer);
return BackupAgentBase::formatTime(epochs.get());
}
static Future<Optional<int64_t>> getTimestampFromVersion(Optional<Version> ver, Reference<ReadYourWritesTransaction> tr) {
@ -1281,6 +1277,10 @@ namespace fileBackup {
static const uint32_t version;
static struct {
// Set by Execute, used by Finish
static TaskParam<int64_t> shardsBehind() {
return LiteralStringRef(__FUNCTION__);
}
// Set by Execute, used by Finish
static TaskParam<bool> snapshotFinished() {
return LiteralStringRef(__FUNCTION__);
@ -1378,8 +1378,11 @@ namespace fileBackup {
&& store(recentReadVersion, tr->getReadVersion())
&& taskBucket->keepRunning(tr, task));
// If the snapshot batch future key does not exist, create it, set it, and commit
// Also initialize the target snapshot end version if it is not yet set.
// If the snapshot batch future key does not exist, this is the first execution of this dispatch task so
// - create and set the snapshot batch future key
// - initialize the batch size to 0
// - initialize the target snapshot end version if it is not yet set
// - commit
if(!snapshotBatchFutureKey.present()) {
snapshotBatchFuture = futureBucket->future(tr);
config.snapshotBatchFuture().set(tr, snapshotBatchFuture->pack());
@ -1549,14 +1552,38 @@ namespace fileBackup {
// Calculate number of shards that should be done before the next interval end
// timeElapsed is between 0 and 1 and represents what portion of the shards we should have completed by now
double timeElapsed;
Version snapshotScheduledVersionInterval = snapshotTargetEndVersion - snapshotBeginVersion;
if(snapshotTargetEndVersion > snapshotBeginVersion)
timeElapsed = std::min(1.0, (double)(nextDispatchVersion - snapshotBeginVersion) / (snapshotTargetEndVersion - snapshotBeginVersion));
timeElapsed = std::min(1.0, (double)(nextDispatchVersion - snapshotBeginVersion) / (snapshotScheduledVersionInterval));
else
timeElapsed = 1.0;
state int countExpectedShardsDone = countAllShards * timeElapsed;
state int countShardsToDispatch = std::max<int>(0, countExpectedShardsDone - countShardsDone);
// Calculate the number of shards that would have been dispatched by a normal (on-schedule) BackupSnapshotDispatchTask given
// the dispatch window and the start and expected-end versions of the current snapshot.
int64_t dispatchWindow = nextDispatchVersion - recentReadVersion;
// If the scheduled snapshot interval is 0 (such as for initial, as-fast-as-possible snapshot) then all shards are considered late
int countShardsExpectedPerNormalWindow;
if(snapshotScheduledVersionInterval == 0) {
countShardsExpectedPerNormalWindow = 0;
}
else {
// A dispatchWindow of 0 means the target end version is <= now which also results in all shards being considered late
countShardsExpectedPerNormalWindow = (double(dispatchWindow) / snapshotScheduledVersionInterval) * countAllShards;
}
// countShardsThisDispatch is how many total shards are to be dispatched by this dispatch cycle.
// Since this dispatch cycle can span many incrementally progressing separate executions of the BackupSnapshotDispatchTask
// instance, this is calculated as the number of shards dispatched so far in the dispatch batch plus the number of shards
// the current execution is going to attempt to do.
int countShardsThisDispatch = countShardsToDispatch + snapshotBatchSize.get();
// The number of shards 'behind' the snapshot is the count of how may additional shards beyond normal are being dispatched, if any.
int countShardsBehind = std::max<int64_t>(0, countShardsToDispatch + snapshotBatchSize.get() - countShardsExpectedPerNormalWindow);
Params.shardsBehind().set(task, countShardsBehind);
TraceEvent("FileBackupSnapshotDispatchStats")
.detail("BackupUID", config.getUid())
.detail("AllShards", countAllShards)
@ -1564,6 +1591,7 @@ namespace fileBackup {
.detail("ShardsNotDone", countShardsNotDone)
.detail("ExpectedShardsDone", countExpectedShardsDone)
.detail("ShardsToDispatch", countShardsToDispatch)
.detail("ShardsBehind", countShardsBehind)
.detail("SnapshotBeginVersion", snapshotBeginVersion)
.detail("SnapshotTargetEndVersion", snapshotTargetEndVersion)
.detail("NextDispatchVersion", nextDispatchVersion)
@ -1636,6 +1664,8 @@ namespace fileBackup {
ASSERT(snapshotBatchSize.get() == oldBatchSize);
config.snapshotBatchSize().set(tr, newBatchSize);
snapshotBatchSize = newBatchSize;
config.snapshotDispatchLastShardsBehind().set(tr, Params.shardsBehind().get(task));
config.snapshotDispatchLastVersion().set(tr, tr->getReadVersion().get());
}
state std::vector<Future<Void>> addTaskFutures;
@ -1739,6 +1769,10 @@ namespace fileBackup {
config.snapshotBatchDispatchDoneKey().clear(tr);
config.snapshotBatchSize().clear(tr);
// Update shardsBehind here again in case the execute phase did not actually have to create any shard tasks
config.snapshotDispatchLastShardsBehind().set(tr, Params.shardsBehind().getOrDefault(task, 0));
config.snapshotDispatchLastVersion().set(tr, tr->getReadVersion().get());
state Reference<TaskFuture> snapshotFinishedFuture = task->getDoneFuture(futureBucket);
// If the snapshot is finished, the next task is to write a snapshot manifest, otherwise it's another snapshot dispatch task.
@ -2069,8 +2103,8 @@ namespace fileBackup {
}
// If the backup is restorable but the state is not differential then set state to differential
if(restorableVersion.present() && backupState != BackupAgentBase::STATE_DIFFERENTIAL)
config.stateEnum().set(tr, BackupAgentBase::STATE_DIFFERENTIAL);
if(restorableVersion.present() && backupState != BackupAgentBase::STATE_RUNNING_DIFFERENTIAL)
config.stateEnum().set(tr, BackupAgentBase::STATE_RUNNING_DIFFERENTIAL);
// If stopWhenDone is set and there is a restorable version, set the done future and do not create further tasks.
if(stopWhenDone && restorableVersion.present()) {
@ -2305,8 +2339,8 @@ namespace fileBackup {
}
// If the backup is restorable and the state isn't differential the set state to differential
if(restorableVersion.present() && backupState != BackupAgentBase::STATE_DIFFERENTIAL)
config.stateEnum().set(tr, BackupAgentBase::STATE_DIFFERENTIAL);
if(restorableVersion.present() && backupState != BackupAgentBase::STATE_RUNNING_DIFFERENTIAL)
config.stateEnum().set(tr, BackupAgentBase::STATE_RUNNING_DIFFERENTIAL);
// Unless we are to stop, start the next snapshot using the default interval
Reference<TaskFuture> snapshotDoneFuture = task->getDoneFuture(futureBucket);
@ -2386,7 +2420,7 @@ namespace fileBackup {
config.startMutationLogs(tr, backupRange, destUidValue);
}
config.stateEnum().set(tr, EBackupState::STATE_BACKUP);
config.stateEnum().set(tr, EBackupState::STATE_RUNNING);
state Reference<TaskFuture> backupFinished = futureBucket->future(tr);
@ -3504,7 +3538,7 @@ public:
// Break, if one of the following is true
// - no longer runnable
// - in differential mode (restorable) and stopWhenDone is not enabled
if( !FileBackupAgent::isRunnable(status) || ((!stopWhenDone) && (BackupAgentBase::STATE_DIFFERENTIAL == status) )) {
if( !FileBackupAgent::isRunnable(status) || ((!stopWhenDone) && (BackupAgentBase::STATE_RUNNING_DIFFERENTIAL == status) )) {
if(pContainer != nullptr) {
Reference<IBackupContainer> c = wait(config.backupContainer().getOrThrow(tr, false, backup_invalid_info()));
@ -3840,6 +3874,176 @@ public:
return Void();
}
struct TimestampedVersion {
Optional<Version> version;
Optional<int64_t> epochs;
bool present() const {
return version.present();
}
JsonBuilderObject toJSON() const {
JsonBuilderObject doc;
if(version.present()) {
doc.setKey("Version", version.get());
if(epochs.present()) {
doc.setKey("EpochSeconds", epochs.get());
doc.setKey("Timestamp", timeStampToString(epochs));
}
}
return doc;
}
};
// Helper actor for generating status
// If f is present, lookup epochs using timekeeper and tr, return TimestampedVersion
ACTOR static Future<TimestampedVersion> getTimestampedVersion(Reference<ReadYourWritesTransaction> tr, Future<Optional<Version>> f) {
state TimestampedVersion tv;
wait(store(tv.version, f));
if(tv.version.present()) {
wait(store(tv.epochs, timeKeeperEpochsFromVersion(tv.version.get(), tr)));
}
return tv;
}
ACTOR static Future<std::string> getStatusJSON(FileBackupAgent* backupAgent, Database cx, std::string tagName) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop {
try {
state JsonBuilderObject doc;
doc.setKey("SchemaVersion", "1.0.0");
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state KeyBackedTag tag = makeBackupTag(tagName);
state Optional<UidAndAbortedFlagT> uidAndAbortedFlag;
state Optional<Value> paused;
state Version recentReadVersion;
wait( store(paused, tr->get(backupAgent->taskBucket->getPauseKey())) && store(uidAndAbortedFlag, tag.get(tr)) && store(recentReadVersion, tr->getReadVersion()) );
doc.setKey("BackupAgentsPaused", paused.present());
doc.setKey("Tag", tag.tagName);
if(uidAndAbortedFlag.present()) {
state BackupConfig config(uidAndAbortedFlag.get().first);
state EBackupState backupState = wait(config.stateEnum().getD(tr, false, EBackupState::STATE_NEVERRAN));
JsonBuilderObject statusDoc;
statusDoc.setKey("Name", BackupAgentBase::getStateName(backupState));
statusDoc.setKey("Description", BackupAgentBase::getStateText(backupState));
statusDoc.setKey("Completed", backupState == BackupAgentBase::STATE_COMPLETED);
statusDoc.setKey("Running", BackupAgentBase::isRunnable(backupState));
doc.setKey("Status", statusDoc);
state Future<Void> done = Void();
if(backupState != BackupAgentBase::STATE_NEVERRAN) {
state Reference<IBackupContainer> bc;
state TimestampedVersion latestRestorable;
wait( store(latestRestorable, getTimestampedVersion(tr, config.getLatestRestorableVersion(tr)))
&& store(bc, config.backupContainer().getOrThrow(tr))
);
doc.setKey("Restorable", latestRestorable.present());
if(latestRestorable.present()) {
JsonBuilderObject o = latestRestorable.toJSON();
if(backupState != BackupAgentBase::STATE_COMPLETED) {
o.setKey("LagSeconds", (recentReadVersion - latestRestorable.version.get()) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND);
}
doc.setKey("LatestRestorablePoint", o);
}
doc.setKey("DestinationURL", bc->getURL());
}
if(backupState == BackupAgentBase::STATE_RUNNING_DIFFERENTIAL || backupState == BackupAgentBase::STATE_RUNNING) {
state int64_t snapshotInterval;
state int64_t logBytesWritten;
state int64_t rangeBytesWritten;
state bool stopWhenDone;
state TimestampedVersion snapshotBegin;
state TimestampedVersion snapshotTargetEnd;
state TimestampedVersion latestLogEnd;
state TimestampedVersion latestSnapshotEnd;
state TimestampedVersion snapshotLastDispatch;
state Optional<int64_t> snapshotLastDispatchShardsBehind;
wait( store(snapshotInterval, config.snapshotIntervalSeconds().getOrThrow(tr))
&& store(logBytesWritten, config.logBytesWritten().getD(tr))
&& store(rangeBytesWritten, config.rangeBytesWritten().getD(tr))
&& store(stopWhenDone, config.stopWhenDone().getOrThrow(tr))
&& store(snapshotBegin, getTimestampedVersion(tr, config.snapshotBeginVersion().get(tr)))
&& store(snapshotTargetEnd, getTimestampedVersion(tr, config.snapshotTargetEndVersion().get(tr)))
&& store(latestLogEnd, getTimestampedVersion(tr, config.latestLogEndVersion().get(tr)))
&& store(latestSnapshotEnd, getTimestampedVersion(tr, config.latestSnapshotEndVersion().get(tr)))
&& store(snapshotLastDispatch, getTimestampedVersion(tr, config.snapshotDispatchLastVersion().get(tr)))
&& store(snapshotLastDispatchShardsBehind, config.snapshotDispatchLastShardsBehind().get(tr))
);
doc.setKey("StopAfterSnapshot", stopWhenDone);
doc.setKey("SnapshotIntervalSeconds", snapshotInterval);
doc.setKey("LogBytesWritten", logBytesWritten);
doc.setKey("RangeBytesWritten", rangeBytesWritten);
if(latestLogEnd.present()) {
doc.setKey("LatestLogEnd", latestLogEnd.toJSON());
}
if(latestSnapshotEnd.present()) {
doc.setKey("LatestSnapshotEnd", latestSnapshotEnd.toJSON());
}
JsonBuilderObject snapshot;
if(snapshotBegin.present()) {
snapshot.setKey("Begin", snapshotBegin.toJSON());
if(snapshotTargetEnd.present()) {
snapshot.setKey("EndTarget", snapshotTargetEnd.toJSON());
Version interval = snapshotTargetEnd.version.get() - snapshotBegin.version.get();
snapshot.setKey("IntervalSeconds", interval / CLIENT_KNOBS->CORE_VERSIONSPERSECOND);
Version elapsed = recentReadVersion - snapshotBegin.version.get();
double progress = (interval > 0) ? (100.0 * elapsed / interval) : 100;
snapshot.setKey("ExpectedProgress", progress);
}
JsonBuilderObject dispatchDoc = snapshotLastDispatch.toJSON();
if(snapshotLastDispatchShardsBehind.present()) {
dispatchDoc.setKey("ShardsBehind", snapshotLastDispatchShardsBehind.get());
}
snapshot.setKey("LastDispatch", dispatchDoc);
}
doc.setKey("CurrentSnapshot", snapshot);
}
KeyBackedMap<int64_t, std::pair<std::string, Version>>::PairsType errors = wait(config.lastErrorPerType().getRange(tr, 0, std::numeric_limits<int>::max(), CLIENT_KNOBS->TOO_MANY));
JsonBuilderArray errorList;
for(auto &e : errors) {
std::string msg = e.second.first;
Version ver = e.second.second;
JsonBuilderObject errDoc;
errDoc.setKey("Message", msg.c_str());
errDoc.setKey("RelativeSeconds", (ver - recentReadVersion) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND);
}
doc.setKey("Errors", errorList);
}
return doc.getJson();
}
catch (Error &e) {
wait(tr->onError(e));
}
}
}
ACTOR static Future<std::string> getStatus(FileBackupAgent* backupAgent, Database cx, bool showErrors, std::string tagName) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state std::string statusText;
@ -3882,11 +4086,11 @@ public:
case BackupAgentBase::STATE_SUBMITTED:
statusText += "The backup on tag `" + tagName + "' is in progress (just started) to " + bc->getURL() + ".\n";
break;
case BackupAgentBase::STATE_BACKUP:
case BackupAgentBase::STATE_RUNNING:
statusText += "The backup on tag `" + tagName + "' is in progress to " + bc->getURL() + ".\n";
snapshotProgress = true;
break;
case BackupAgentBase::STATE_DIFFERENTIAL:
case BackupAgentBase::STATE_RUNNING_DIFFERENTIAL:
statusText += "The backup on tag `" + tagName + "' is restorable but continuing to " + bc->getURL() + ".\n";
snapshotProgress = true;
break;
@ -3931,7 +4135,7 @@ public:
);
statusText += format("Snapshot interval is %lld seconds. ", snapshotInterval);
if(backupState == BackupAgentBase::STATE_DIFFERENTIAL)
if(backupState == BackupAgentBase::STATE_RUNNING_DIFFERENTIAL)
statusText += format("Current snapshot progress target is %3.2f%% (>100%% means the snapshot is supposed to be done)\n", 100.0 * (recentReadVersion - snapshotBeginVersion) / (snapshotTargetEndVersion - snapshotBeginVersion)) ;
else
statusText += "The initial snapshot is still running.\n";
@ -4076,7 +4280,7 @@ public:
backupConfig = BackupConfig(uidFlag.first);
state EBackupState status = wait(backupConfig.stateEnum().getOrThrow(ryw_tr));
if (status != BackupAgentBase::STATE_DIFFERENTIAL ) {
if (status != BackupAgentBase::STATE_RUNNING_DIFFERENTIAL ) {
throw backup_duplicate();
}
@ -4208,6 +4412,10 @@ Future<std::string> FileBackupAgent::getStatus(Database cx, bool showErrors, std
return FileBackupAgentImpl::getStatus(this, cx, showErrors, tagName);
}
Future<std::string> FileBackupAgent::getStatusJSON(Database cx, std::string tagName) {
return FileBackupAgentImpl::getStatusJSON(this, cx, tagName);
}
Future<Version> FileBackupAgent::getLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName) {
return FileBackupAgentImpl::getLastRestorable(this, tr, tagName);
}

View File

@ -192,4 +192,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 );
init( CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME, 7 * 24 * 60 * 60 ); // 7 days
init( CONSISTENCY_CHECK_RATE_WINDOW, 1.0 );
// TLS related
init( CHECK_CONNECTED_COORDINATOR_NUM_DELAY, 1.0 ); if( randomize && BUGGIFY ) CHECK_CONNECTED_COORDINATOR_NUM_DELAY = g_random->random01() * 60.0; // In seconds
}

View File

@ -183,6 +183,9 @@ public:
int CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME;
int CONSISTENCY_CHECK_RATE_WINDOW;
// TLS related
int CHECK_CONNECTED_COORDINATOR_NUM_DELAY;
ClientKnobs(bool randomize = false);
};

View File

@ -324,9 +324,17 @@ ClientLeaderRegInterface::ClientLeaderRegInterface( INetwork* local ) {
getLeader.makeWellKnownEndpoint( WLTOKEN_CLIENTLEADERREG_GETLEADER, TaskCoordination );
}
ACTOR Future<Void> monitorNominee( Key key, ClientLeaderRegInterface coord, AsyncTrigger* nomineeChange, Optional<LeaderInfo> *info, int generation ) {
// Nominee is the worker among all workers that are considered as leader by a coordinator
// This function contacts a coordinator coord to ask if the worker is considered as a leader (i.e., if the worker
// is a nominee)
ACTOR Future<Void> monitorNominee( Key key, ClientLeaderRegInterface coord, AsyncTrigger* nomineeChange, Optional<LeaderInfo> *info, int generation, Reference<AsyncVar<int>> connectedCoordinatorsNum ) {
state bool hasCounted = false;
loop {
state Optional<LeaderInfo> li = wait( retryBrokenPromise( coord.getLeader, GetLeaderRequest( key, info->present() ? info->get().changeID : UID() ), TaskCoordinationReply ) );
if (li.present() && !hasCounted && connectedCoordinatorsNum.isValid()) {
connectedCoordinatorsNum->set(connectedCoordinatorsNum->get() + 1);
hasCounted = true;
}
wait( Future<Void>(Void()) ); // Make sure we weren't cancelled
TraceEvent("GetLeaderReply").suppressFor(1.0).detail("Coordinator", coord.getLeader.getEndpoint().getPrimaryAddress()).detail("Nominee", li.present() ? li.get().changeID : UID()).detail("Generation", generation);
@ -401,7 +409,8 @@ struct MonitorLeaderInfo {
explicit MonitorLeaderInfo( Reference<ClusterConnectionFile> intermediateConnFile ) : intermediateConnFile(intermediateConnFile), hasConnected(false), generation(0) {}
};
ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedLeaderInfo, MonitorLeaderInfo info ) {
// Leader is the process that will be elected by coordinators as the cluster controller
ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedLeaderInfo, MonitorLeaderInfo info, Reference<AsyncVar<int>> connectedCoordinatorsNum) {
state ClientCoordinators coordinators( info.intermediateConnFile );
state AsyncTrigger nomineeChange;
state std::vector<Optional<LeaderInfo>> nominees;
@ -410,8 +419,9 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration( Reference<ClusterCon
nominees.resize(coordinators.clientLeaderServers.size());
std::vector<Future<Void>> actors;
// Ask all coordinators if the worker is considered as a leader (leader nominee) by the coordinator.
for(int i=0; i<coordinators.clientLeaderServers.size(); i++)
actors.push_back( monitorNominee( coordinators.clusterKey, coordinators.clientLeaderServers[i], &nomineeChange, &nominees[i], info.generation ) );
actors.push_back( monitorNominee( coordinators.clusterKey, coordinators.clientLeaderServers[i], &nomineeChange, &nominees[i], info.generation, connectedCoordinatorsNum) );
allActors = waitForAll(actors);
loop {
@ -442,11 +452,14 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration( Reference<ClusterCon
}
}
ACTOR Future<Void> monitorLeaderInternal( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedLeaderInfo ) {
ACTOR Future<Void> monitorLeaderInternal( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedLeaderInfo, Reference<AsyncVar<int>> connectedCoordinatorsNum ) {
state MonitorLeaderInfo info(connFile);
loop {
MonitorLeaderInfo _info = wait( monitorLeaderOneGeneration( connFile, outSerializedLeaderInfo, info) );
// set the AsyncVar to 0
if (connectedCoordinatorsNum.isValid()) connectedCoordinatorsNum->set(0);
MonitorLeaderInfo _info = wait( monitorLeaderOneGeneration( connFile, outSerializedLeaderInfo, info, connectedCoordinatorsNum) );
info = _info;
info.generation++;
}
}

View File

@ -30,19 +30,19 @@
class ClientCoordinators;
template <class LeaderInterface>
Future<Void> monitorLeader( Reference<ClusterConnectionFile> const& connFile, Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader );
Future<Void> monitorLeader( Reference<ClusterConnectionFile> const& connFile, Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader, Reference<AsyncVar<int>> connectedCoordinatorsNum = Reference<AsyncVar<int>>() );
// Monitors the given coordination group's leader election process and provides a best current guess
// of the current leader. If a leader is elected for long enough and communication with a quorum of
// coordinators is possible, eventually outKnownLeader will be that leader's interface.
#pragma region Implementation
Future<Void> monitorLeaderInternal( Reference<ClusterConnectionFile> const& connFile, Reference<AsyncVar<Value>> const& outSerializedLeaderInfo );
Future<Void> monitorLeaderInternal( Reference<ClusterConnectionFile> const& connFile, Reference<AsyncVar<Value>> const& outSerializedLeaderInfo, Reference<AsyncVar<int>> const& connectedCoordinatorsNum );
template <class LeaderInterface>
Future<Void> monitorLeader( Reference<ClusterConnectionFile> const& connFile, Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader ) {
Future<Void> monitorLeader( Reference<ClusterConnectionFile> const& connFile, Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader, Reference<AsyncVar<int>> connectedCoordinatorsNum ) {
Reference<AsyncVar<Value>> serializedInfo( new AsyncVar<Value> );
Future<Void> m = monitorLeaderInternal( connFile, serializedInfo );
Future<Void> m = monitorLeaderInternal( connFile, serializedInfo, connectedCoordinatorsNum );
return m || asyncDeserialize( serializedInfo, outKnownLeader );
}

View File

@ -534,13 +534,14 @@ DatabaseContext::DatabaseContext(
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000) {}
ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> ccf, Reference<AsyncVar<ClientDBInfo>> outInfo ) {
ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> ccf, Reference<AsyncVar<ClientDBInfo>> outInfo, Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed ) {
try {
state Optional<double> incorrectTime;
loop {
OpenDatabaseRequest req;
req.knownClientInfoID = outInfo->get().id;
req.supportedVersions = VectorRef<ClientVersionRef>(req.arena, networkOptions.supportedVersions);
req.connectedCoordinatorsNum = connectedCoordinatorsNumDelayed->get();
req.traceLogGroup = StringRef(req.arena, networkOptions.traceLogGroup);
ClusterConnectionString fileConnectionString;
@ -571,6 +572,7 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
if(clusterInterface->get().present())
TraceEvent("ClientInfo_CCInterfaceChange").detail("CCID", clusterInterface->get().get().id());
}
when( wait( connectedCoordinatorsNumDelayed->onChange() ) ) {}
}
}
} catch( Error& e ) {
@ -583,10 +585,14 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
}
}
// Create database context and monitor the cluster status;
// Notify client when cluster info (e.g., cluster controller) changes
Database DatabaseContext::create(Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> connFile, LocalityData const& clientLocality) {
Reference<Cluster> cluster(new Cluster(connFile, clusterInterface));
Reference<AsyncVar<int>> connectedCoordinatorsNum(new AsyncVar<int>(0));
Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed(new AsyncVar<int>(0));
Reference<Cluster> cluster(new Cluster(connFile, clusterInterface, connectedCoordinatorsNum));
Reference<AsyncVar<ClientDBInfo>> clientInfo(new AsyncVar<ClientDBInfo>());
Future<Void> clientInfoMonitor = monitorClientInfo(clusterInterface, connFile, clientInfo);
Future<Void> clientInfoMonitor = delayedAsyncVar(connectedCoordinatorsNum, connectedCoordinatorsNumDelayed, CLIENT_KNOBS->CHECK_CONNECTED_COORDINATOR_NUM_DELAY) || monitorClientInfo(clusterInterface, connFile, clientInfo, connectedCoordinatorsNumDelayed);
return Database(new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false));
}
@ -751,9 +757,11 @@ Reference<ClusterConnectionFile> DatabaseContext::getConnectionFile() {
}
Database Database::createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, LocalityData const& clientLocality, DatabaseContext *preallocatedDb ) {
Reference<Cluster> cluster(new Cluster(connFile, apiVersion));
Reference<AsyncVar<int>> connectedCoordinatorsNum(new AsyncVar<int>(0)); // Number of connected coordinators for the client
Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed(new AsyncVar<int>(0));
Reference<Cluster> cluster(new Cluster(connFile, connectedCoordinatorsNum, apiVersion));
Reference<AsyncVar<ClientDBInfo>> clientInfo(new AsyncVar<ClientDBInfo>());
Future<Void> clientInfoMonitor = monitorClientInfo(cluster->getClusterInterface(), connFile, clientInfo);
Future<Void> clientInfoMonitor = delayedAsyncVar(connectedCoordinatorsNum, connectedCoordinatorsNumDelayed, CLIENT_KNOBS->CHECK_CONNECTED_COORDINATOR_NUM_DELAY) || monitorClientInfo(cluster->getClusterInterface(), connFile, clientInfo, connectedCoordinatorsNumDelayed);
DatabaseContext *db;
if(preallocatedDb) {
@ -773,19 +781,19 @@ Database Database::createDatabase( std::string connFileName, int apiVersion, Loc
extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs);
Cluster::Cluster( Reference<ClusterConnectionFile> connFile, int apiVersion )
Cluster::Cluster( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion )
: clusterInterface(new AsyncVar<Optional<ClusterInterface>>())
{
init(connFile, true, apiVersion);
init(connFile, true, connectedCoordinatorsNum, apiVersion);
}
Cluster::Cluster( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface)
Cluster::Cluster( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<AsyncVar<int>> connectedCoordinatorsNum)
: clusterInterface(clusterInterface)
{
init(connFile, true);
init(connFile, true, connectedCoordinatorsNum);
}
void Cluster::init( Reference<ClusterConnectionFile> connFile, bool startClientInfoMonitor, int apiVersion ) {
void Cluster::init( Reference<ClusterConnectionFile> connFile, bool startClientInfoMonitor, Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion ) {
connectionFile = connFile;
connected = clusterInterface->onChange();
@ -819,7 +827,7 @@ void Cluster::init( Reference<ClusterConnectionFile> connFile, bool startClientI
uncancellable( recurring( &systemMonitor, CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, TaskFlushTrace ) );
}
leaderMon = monitorLeader( connFile, clusterInterface );
leaderMon = monitorLeader( connFile, clusterInterface, connectedCoordinatorsNum );
failMon = failureMonitorClient( clusterInterface, false );
}
}

View File

@ -115,8 +115,8 @@ void stopNetwork();
*/
class Cluster : public ReferenceCounted<Cluster>, NonCopyable {
public:
Cluster(Reference<ClusterConnectionFile> connFile, int apiVersion=Database::API_VERSION_LATEST);
Cluster(Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface);
Cluster(Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion=Database::API_VERSION_LATEST);
Cluster(Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface, Reference<AsyncVar<int>> connectedCoordinatorsNum);
~Cluster();
@ -126,7 +126,7 @@ public:
Future<Void> onConnected();
private:
void init(Reference<ClusterConnectionFile> connFile, bool startClientInfoMonitor, int apiVersion=Database::API_VERSION_LATEST);
void init(Reference<ClusterConnectionFile> connFile, bool startClientInfoMonitor, Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion=Database::API_VERSION_LATEST);
Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface;
Reference<ClusterConnectionFile> connectionFile;

View File

@ -301,7 +301,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"connected_clients":[
{
"address":"127.0.0.1:9898",
"log_group":"default"
"log_group":"default",
"connected_coordinators":2
}
],
"count" : 1,

View File

@ -343,6 +343,7 @@ struct Peer : NonCopyable {
break;
}
}
if ( !destination.isPublic() || outgoingConnectionIdle || destination > compatibleAddr ) {
// Keep the new connection
TraceEvent("IncomingConnection", conn->getDebugID())

View File

@ -91,7 +91,7 @@ public:
ProcessIssuesMap clientsWithIssues, workersWithIssues;
std::map<NetworkAddress, double> incompatibleConnections;
ClientVersionMap clientVersionMap;
std::map<NetworkAddress, std::string> traceLogGroupMap;
std::map<NetworkAddress, ClientStatusInfo> clientStatusInfoMap;
AsyncTrigger forceMasterFailure;
int64_t masterRegistrationCount;
bool recoveryStalled;
@ -1253,6 +1253,7 @@ ACTOR Future<Void> clusterOpenDatabase(
UID knownClientInfoID,
std::string issues,
Standalone<VectorRef<ClientVersionRef>> supportedVersions,
int connectedCoordinatorsNum,
Standalone<StringRef> traceLogGroup,
ReplyPromise<ClientDBInfo> reply)
{
@ -1264,7 +1265,8 @@ ACTOR Future<Void> clusterOpenDatabase(
db->clientVersionMap[reply.getEndpoint().getPrimaryAddress()] = supportedVersions;
}
db->traceLogGroupMap[reply.getEndpoint().getPrimaryAddress()] = traceLogGroup.toString();
db->clientStatusInfoMap[reply.getEndpoint().getPrimaryAddress()] = ClientStatusInfo(traceLogGroup.toString(), connectedCoordinatorsNum);
while (db->clientInfo->get().id == knownClientInfoID) {
choose {
@ -1275,7 +1277,7 @@ ACTOR Future<Void> clusterOpenDatabase(
removeIssue( db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID );
db->clientVersionMap.erase(reply.getEndpoint().getPrimaryAddress());
db->traceLogGroupMap.erase(reply.getEndpoint().getPrimaryAddress());
db->clientStatusInfoMap.erase(reply.getEndpoint().getPrimaryAddress());
reply.send( db->clientInfo->get() );
return Void();
@ -1945,7 +1947,8 @@ ACTOR Future<Void> statusServer(FutureStream< StatusRequest> requests,
}
}
state ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, self->db.traceLogGroupMap, coordinators, incompatibleConnections, self->datacenterVersionDifference)));
state ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, self->db.clientStatusInfoMap, coordinators, incompatibleConnections, self->datacenterVersionDifference)));
if (result.isError() && result.getError().code() == error_code_actor_cancelled)
throw result.getError();
@ -2495,7 +2498,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
return Void();
}
when( OpenDatabaseRequest req = waitNext( interf.clientInterface.openDatabase.getFuture() ) ) {
self.addActor.send( clusterOpenDatabase( &self.db, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.traceLogGroup, req.reply ) );
self.addActor.send( clusterOpenDatabase( &self.db, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.connectedCoordinatorsNum, req.traceLogGroup, req.reply ) );
}
when( RecruitFromConfigurationRequest req = waitNext( interf.recruitFromConfiguration.getFuture() ) ) {
self.addActor.send( clusterRecruitFromConfiguration( &self, req ) );

View File

@ -381,6 +381,7 @@ public:
int64_t TIME_KEEPER_DELAY;
int64_t TIME_KEEPER_MAX_ENTRIES;
ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL);
};

View File

@ -810,7 +810,8 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
return processMap;
}
static JsonBuilderObject clientStatusFetcher(ClientVersionMap clientVersionMap, std::map<NetworkAddress, std::string> traceLogGroupMap) {
static JsonBuilderObject clientStatusFetcher(ClientVersionMap clientVersionMap,
std::map<NetworkAddress, ClientStatusInfo> clientStatusInfoMap) {
JsonBuilderObject clientStatus;
clientStatus["count"] = (int64_t)clientVersionMap.size();
@ -834,7 +835,9 @@ static JsonBuilderObject clientStatusFetcher(ClientVersionMap clientVersionMap,
for(auto client : cv.second) {
JsonBuilderObject cli;
cli["address"] = client.toString();
cli["log_group"] = traceLogGroupMap[client];
ASSERT(clientStatusInfoMap.find(client) != clientStatusInfoMap.end());
cli["log_group"] = clientStatusInfoMap[client].traceLogGroup;
cli["connected_coordinators"] = (int) clientStatusInfoMap[client].connectedCoordinatorsNum;
clients.push_back(cli);
}
@ -1806,7 +1809,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
ProcessIssuesMap workerIssues,
ProcessIssuesMap clientIssues,
ClientVersionMap clientVersionMap,
std::map<NetworkAddress, std::string> traceLogGroupMap,
std::map<NetworkAddress, ClientStatusInfo> clientStatusInfoMap,
ServerCoordinators coordinators,
std::vector<NetworkAddress> incompatibleConnections,
Version datacenterVersionDifference )
@ -2039,7 +2042,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
JsonBuilderObject processStatus = wait(processStatusFetcher(db, workers, pMetrics, mMetrics, latestError, traceFileOpenErrors, programStarts, processIssues, storageServers, tLogs, proxies, cx, configuration, &status_incomplete_reasons));
statusObj["processes"] = processStatus;
statusObj["clients"] = clientStatusFetcher(clientVersionMap, traceLogGroupMap);
statusObj["clients"] = clientStatusFetcher(clientVersionMap, clientStatusInfoMap);
JsonBuilderArray incompatibleConnectionsArray;
for(auto it : incompatibleConnections) {

View File

@ -30,8 +30,16 @@
typedef std::map< NetworkAddress, std::pair<std::string,UID> > ProcessIssuesMap;
typedef std::map< NetworkAddress, Standalone<VectorRef<ClientVersionRef>> > ClientVersionMap;
struct ClientStatusInfo {
std::string traceLogGroup;
int connectedCoordinatorsNum;
ClientStatusInfo() : connectedCoordinatorsNum(0) {}
ClientStatusInfo(std::string const& traceLogGroup, int const connectedCoordinatorsNum) : traceLogGroup(traceLogGroup), connectedCoordinatorsNum(connectedCoordinatorsNum) {}
};
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<struct ServerDBInfo>> const& db, Database const& cx, vector<WorkerDetails> const& workers,
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, std::map<NetworkAddress, std::string> const& traceLogGroupMap,
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, std::map<NetworkAddress, struct ClientStatusInfo> const& clientStatusInfoMap,
ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections, Version const& datacenterVersionDifference );
#endif

View File

@ -192,6 +192,8 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
loop {
std::string status = wait(agent.getStatus(cx, true, tag));
puts(status.c_str());
std::string statusJSON = wait(agent.getStatusJSON(cx, tag));
puts(statusJSON.c_str());
wait(delay(2.0));
}
}

View File

@ -625,7 +625,7 @@ struct SendBuffer {
struct PacketBuffer : SendBuffer, FastAllocated<PacketBuffer> {
int reference_count;
enum { DATA_SIZE = 4096 - 28 };
enum { DATA_SIZE = 4096 - 28 }; //28 is the size of the PacketBuffer fields
uint8_t data[ DATA_SIZE ];
PacketBuffer() : reference_count(1) {