added the basic structure for parallel restore

This commit is contained in:
Evan Tschannen 2018-10-09 18:47:28 -07:00
parent 1314bcec9e
commit 4c95a5ee0f
6 changed files with 221 additions and 2 deletions

104
fdbclient/Restore.actor.cpp Normal file
View File

@ -0,0 +1,104 @@
/*
* Restore.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RestoreInterface.h"
#include "NativeAPI.h"
#include "SystemData.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<Void> restoreAgent(Reference<ClusterConnectionFile> ccf, LocalityData locality) {
Reference<Cluster> cluster = Cluster::createCluster(ccf->getFilename(), -1);
state Database cx = wait(cluster->createDatabase(locality));
state RestoreInterface interf;
interf.initEndpoints();
state Optional<RestoreInterface> leaderInterf;
state Transaction tr(cx);
loop {
try {
Optional<Value> leader = wait(tr.get(restoreLeaderKey));
if(leader.present()) {
leaderInterf = decodeRestoreAgentValue(leader.get());
break;
}
tr.set(restoreLeaderKey, restoreAgentValue(interf));
wait(tr.commit());
break;
} catch( Error &e ) {
wait( tr.onError(e) );
}
}
//we are not the leader, so put our interface in the agent list
if(leaderInterf.present()) {
loop {
try {
tr.set(restoreAgentKeyFor(interf.id()), restoreAgentValue(interf));
wait(tr.commit());
break;
} catch( Error &e ) {
wait( tr.onError(e) );
}
}
loop {
choose {
when(TestRequest req = waitNext(interf.test.getFuture())) {
printf("Got Request: %d\n", req.testData);
req.reply.send(TestReply(req.testData + 1));
}
}
}
}
//we are the leader
wait( delay(5.0) );
state vector<RestoreInterface> agents;
loop {
try {
Standalone<RangeResultRef> agentValues = wait(tr.getRange(restoreAgentsKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!agentValues.more);
if(agentValues.size()) {
for(auto& it : agentValues) {
agents.push_back(decodeRestoreAgentValue(it.value));
}
break;
}
wait( delay(5.0) );
} catch( Error &e ) {
wait( tr.onError(e) );
}
}
ASSERT(agents.size() > 0);
state int testData = 0;
loop {
wait(delay(1.0));
printf("Sending Request: %d\n", testData);
std::vector<Future<TestReply>> replies;
for(auto& it : agents) {
replies.push_back( it.test.getReply(TestRequest(testData)) );
}
std::vector<TestReply> reps = wait( getAll(replies ));
testData = reps[0].replyData;
}
}

View File

@ -0,0 +1,73 @@
/*
* RestoreInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_RestoreInterface_H
#define FDBCLIENT_RestoreInterface_H
#pragma once
#include "FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
struct RestoreInterface {
RequestStream< struct TestRequest > test;
bool operator == (RestoreInterface const& r) const { return id() == r.id(); }
bool operator != (RestoreInterface const& r) const { return id() != r.id(); }
UID id() const { return test.getEndpoint().token; }
NetworkAddress address() const { return test.getEndpoint().address; }
void initEndpoints() {
test.getEndpoint( TaskClusterController );
}
template <class Ar>
void serialize( Ar& ar ) {
ar & test;
}
};
struct TestRequest {
int testData;
ReplyPromise< struct TestReply > reply;
TestRequest() : testData(0) {}
explicit TestRequest(int testData) : testData(testData) {}
template <class Ar>
void serialize(Ar& ar) {
ar & testData & reply;
}
};
struct TestReply {
int replyData;
TestReply() : replyData(0) {}
explicit TestReply(int replyData) : replyData(replyData) {}
template <class Ar>
void serialize(Ar& ar) {
ar & replyData;
}
};
Future<Void> restoreAgent(Reference<struct ClusterConnectionFile> const& ccf, struct LocalityData const& locality);
#endif

View File

@ -564,3 +564,29 @@ const KeyRangeRef monitorConfKeys(
LiteralStringRef("\xff\x02/monitorConf/"),
LiteralStringRef("\xff\x02/monitorConf0")
);
const KeyRef restoreLeaderKey = LiteralStringRef("\xff\x02/restoreLeader");
const KeyRangeRef restoreAgentsKeys(
LiteralStringRef("\xff\x02/restoreAgents/"),
LiteralStringRef("\xff\x02/restoreAgents0")
);
const Key restoreAgentKeyFor( UID const& agentID ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( restoreAgentsKeys.begin );
wr << agentID;
return wr.toStringRef();
}
const Value restoreAgentValue( RestoreInterface const& server ) {
BinaryWriter wr(IncludeVersion());
wr << server;
return wr.toStringRef();
}
RestoreInterface decodeRestoreAgentValue( ValueRef const& value ) {
RestoreInterface s;
BinaryReader reader( value, IncludeVersion() );
reader >> s;
return s;
}

View File

@ -26,6 +26,7 @@
#include "FDBTypes.h"
#include "StorageServerInterface.h"
#include "RestoreInterface.h"
extern const KeyRangeRef normalKeys; // '' to systemKeys.begin
extern const KeyRangeRef systemKeys; // [FF] to [FF][FF]
@ -255,4 +256,11 @@ extern const KeyRef mustContainSystemMutationsKey;
// Key range reserved for storing changes to monitor conf files
extern const KeyRangeRef monitorConfKeys;
extern const KeyRef restoreLeaderKey;
extern const KeyRangeRef restoreAgentsKeys;
const Key restoreAgentKeyFor( UID const& agentID );
const Value restoreAgentValue( RestoreInterface const& server );
RestoreInterface decodeRestoreAgentValue( ValueRef const& value );
#endif

View File

@ -96,6 +96,8 @@
<ClCompile Include="Knobs.cpp" />
<ActorCompiler Include="MonitorLeader.actor.cpp" />
<ActorCompiler Include="ManagementAPI.actor.cpp" />
<ActorCompiler Include="Restore.actor.cpp" />
<ClCompile Include="RestoreInterface.h" />
<ActorCompiler Include="MultiVersionTransaction.actor.cpp" />
<ClCompile Include="RYWIterator.cpp" />
<ActorCompiler Include="StatusClient.actor.cpp" />

View File

@ -26,6 +26,7 @@
#include "fdbclient/NativeAPI.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/FailureMonitorClient.h"
#include "fdbclient/RestoreInterface.h"
#include "CoordinationInterface.h"
#include "WorkerInterface.h"
#include "ClusterRecruitmentInterface.h"
@ -568,7 +569,7 @@ static void printUsage( const char *name, bool devhelp ) {
if( devhelp ) {
printf(" -r ROLE, --role ROLE\n"
" Server role (valid options are fdbd, test, multitest,\n");
printf(" simulation, networktestclient, networktestserver,\n");
printf(" simulation, networktestclient, networktestserver, restore\n");
printf(" consistencycheck, kvfileintegritycheck, kvfilegeneratesums). The default is `fdbd'.\n");
#ifdef _WIN32
printf(" -n, --newconsole\n"
@ -769,6 +770,7 @@ int main(int argc, char* argv[]) {
CreateTemplateDatabase,
NetworkTestClient,
NetworkTestServer,
Restore,
KVFileIntegrityCheck,
KVFileGenerateIOLogChecksums,
ConsistencyCheck
@ -902,6 +904,7 @@ int main(int argc, char* argv[]) {
else if (!strcmp(sRole, "createtemplatedb")) role = CreateTemplateDatabase;
else if (!strcmp(sRole, "networktestclient")) role = NetworkTestClient;
else if (!strcmp(sRole, "networktestserver")) role = NetworkTestServer;
else if (!strcmp(sRole, "restore")) role = Restore;
else if (!strcmp(sRole, "kvfileintegritycheck")) role = KVFileIntegrityCheck;
else if (!strcmp(sRole, "kvfilegeneratesums")) role = KVFileGenerateIOLogChecksums;
else if (!strcmp(sRole, "consistencycheck")) role = ConsistencyCheck;
@ -1421,7 +1424,7 @@ int main(int argc, char* argv[]) {
tlsOptions->register_network();
#endif
if (role == FDBD || role == NetworkTestServer) {
if (role == FDBD || role == NetworkTestServer || role == Restore) {
try {
listenError = FlowTransport::transport().bind(publicAddress, listenAddress);
if (listenError.isReady()) listenError.get();
@ -1572,6 +1575,9 @@ int main(int argc, char* argv[]) {
} else if (role == NetworkTestServer) {
f = stopAfter( networkTestServer() );
g_network->run();
} else if (role == Restore) {
f = stopAfter( restoreAgent(connectionFile, localities) );
g_network->run();
} else if (role == KVFileIntegrityCheck) {
f = stopAfter( KVFileCheck(kvFile, true) );
g_network->run();