Merge commit 'dd6ea70051aef215315e9eb3dea3b67a24778e32' into feature-remote-logs

# Conflicts:
#	flow/Net2.actor.cpp
This commit is contained in:
Evan Tschannen 2018-01-29 17:11:03 -08:00
commit 2e3b1d7ab8
4 changed files with 58 additions and 63 deletions

View File

@ -962,7 +962,7 @@ private:
// Backup files to under a single folder prefix with subfolders for each named backup
static const std::string DATAFOLDER;
// The metafolder contains keys for which user-named backups exist. Backup names can contain an arbitrary
// Indexfolder contains keys for which user-named backups exist. Backup names can contain an arbitrary
// number of slashes so the backup names are kept in a separate folder tree from their actual data.
static const std::string INDEXFOLDER;
@ -1080,36 +1080,8 @@ public:
}
ACTOR static Future<Void> deleteContainer_impl(Reference<BackupContainerBlobStore> bc, int *pNumDeleted) {
state PromiseStream<BlobStoreEndpoint::ListResult> resultStream;
state Future<Void> done = bc->m_bstore->listBucketStream(BUCKET, resultStream, bc->dataPath(""), '/', std::numeric_limits<int>::max());
state std::list<Future<Void>> deleteFutures;
loop {
choose {
when(Void _ = wait(done)) {
break;
}
when(BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) {
for(auto &object : list.objects) {
int *pNumDeletedCopy = pNumDeleted; // avoid capture of this
deleteFutures.push_back(map(bc->m_bstore->deleteObject(BUCKET, object.name), [pNumDeletedCopy](Void) {
if(pNumDeletedCopy != nullptr)
++*pNumDeletedCopy;
return Void();
}));
}
while(deleteFutures.size() > CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS) {
Void _ = wait(deleteFutures.front());
deleteFutures.pop_front();
}
}
}
}
while(deleteFutures.size() > 0) {
Void _ = wait(deleteFutures.front());
deleteFutures.pop_front();
}
// First delete everything under the data prefix in the bucket
Void _ = wait(bc->m_bstore->deleteRecursively(BUCKET, bc->dataPath(""), pNumDeleted));
// Now that all files are deleted, delete the index entry
Void _ = wait(bc->m_bstore->deleteObject(BUCKET, bc->indexEntry()));

View File

@ -215,34 +215,50 @@ Future<Void> BlobStoreEndpoint::deleteObject(std::string const &bucket, std::str
return deleteObject_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, object);
}
ACTOR Future<Void> deleteBucket_impl(Reference<BlobStoreEndpoint> b, std::string bucket, int *pNumDeleted) {
ACTOR Future<Void> deleteRecursively_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string prefix, int *pNumDeleted) {
state PromiseStream<BlobStoreEndpoint::ListResult> resultStream;
state Future<Void> done = b->listBucketStream(bucket, resultStream);
state std::vector<Future<Void>> deleteFutures;
loop {
choose {
when(Void _ = wait(done)) {
break;
// Start a recursive parallel listing which will send results to resultStream as they are received
state Future<Void> done = b->listBucketStream(bucket, resultStream, prefix, '/', std::numeric_limits<int>::max());
// Wrap done in an actor which will send end_of_stream since listBucketStream() does not (so that many calls can write to the same stream)
done = map(done, [=](Void) {
resultStream.sendError(end_of_stream());
return Void();
});
state std::list<Future<Void>> deleteFutures;
try {
loop {
BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture());
for(auto &object : list.objects) {
int *pNumDeletedCopy = pNumDeleted; // avoid capture of this
deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [pNumDeletedCopy](Void) -> Void {
if(pNumDeletedCopy != nullptr)
++*pNumDeletedCopy;
return Void();
}));
}
when(BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) {
for(auto &object : list.objects) {
int *pNumDeletedCopy = pNumDeleted; // avoid capture of this
deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [pNumDeletedCopy](Void) -> Void {
if(pNumDeletedCopy != nullptr)
++*pNumDeletedCopy;
return Void();
}));
}
// This is just a precaution to avoid having too many outstanding delete actors waiting to run
while(deleteFutures.size() > CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS) {
Void _ = wait(deleteFutures.front());
deleteFutures.pop_front();
}
}
} catch(Error &e) {
if(e.code() != error_code_end_of_stream)
throw;
}
while(deleteFutures.size() > 0) {
Void _ = wait(deleteFutures.front());
deleteFutures.pop_front();
}
Void _ = wait(waitForAll(deleteFutures));
return Void();
}
Future<Void> BlobStoreEndpoint::deleteBucket(std::string const &bucket, int *pNumDeleted) {
return deleteBucket_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, pNumDeleted);
Future<Void> BlobStoreEndpoint::deleteRecursively(std::string const &bucket, std::string prefix, int *pNumDeleted) {
return deleteRecursively_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, prefix, pNumDeleted);
}
ACTOR Future<Void> createBucket_impl(Reference<BlobStoreEndpoint> b, std::string bucket) {
@ -649,17 +665,23 @@ ACTOR Future<BlobStoreEndpoint::ListResult> listBucket_impl(Reference<BlobStoreE
state BlobStoreEndpoint::ListResult results;
state PromiseStream<BlobStoreEndpoint::ListResult> resultStream;
state Future<Void> done = bstore->listBucketStream(bucket, resultStream, prefix, delimiter, maxDepth, recurseFilter);
loop {
choose {
when(Void _ = wait(done)) {
break;
}
when(BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture())) {
results.commonPrefixes.insert(results.commonPrefixes.end(), info.commonPrefixes.begin(), info.commonPrefixes.end());
results.objects.insert(results.objects.end(), info.objects.begin(), info.objects.end());
}
// Wrap done in an actor which sends end_of_stream because list does not so that many lists can write to the same stream
done = map(done, [=](Void) {
resultStream.sendError(end_of_stream());
return Void();
});
try {
loop {
BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture());
results.commonPrefixes.insert(results.commonPrefixes.end(), info.commonPrefixes.begin(), info.commonPrefixes.end());
results.objects.insert(results.objects.end(), info.objects.begin(), info.objects.end());
}
} catch(Error &e) {
if(e.code() != error_code_end_of_stream)
throw;
}
return results;
}

View File

@ -185,11 +185,12 @@ public:
// Delete an object in a bucket
Future<Void> deleteObject(std::string const &bucket, std::string const &object);
// Delete a bucket - note this is not atomic as blob store does not support this operation directly.
// This method is just a convenience method that lists and deletes all of the objects in the bucket
// Delete all objects in a bucket under a prefix. Note this is not atomic as blob store does not
// support this operation directly. This method is just a convenience method that lists and deletes
// all of the objects in the bucket under the given prefix.
// Since it can take a while, if a pNumDeleted is provided then it will be incremented every time
// a deletion of an object completes.
Future<Void> deleteBucket(std::string const &bucket, int *pNumDeleted = NULL);
Future<Void> deleteRecursively(std::string const &bucket, std::string prefix = "", int *pNumDeleted = NULL);
// Create a bucket if it does not already exists.
Future<Void> createBucket(std::string const &bucket);

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{89BEB6D6-A3E8-4BE5-879B-AA3F0735EF91}'
Id='{62160341-2769-4104-9D74-4235618775EA}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'