Moved BackupContainerAzureBlobStore into its own files
This commit is contained in:
parent
e5338d213b
commit
961aeaecac
|
@ -36,6 +36,7 @@
|
|||
#include "fdbrpc/simulator.h"
|
||||
#include "flow/Platform.h"
|
||||
#include "fdbclient/AsyncFileBlobStore.actor.h"
|
||||
#include "fdbclient/BackupContainerAzureBlobStore.h"
|
||||
#include "fdbclient/BackupContainerFileSystem.actor.h"
|
||||
#include "fdbclient/BackupContainerLocalDirectory.h"
|
||||
#include "fdbclient/Status.h"
|
||||
|
|
|
@ -0,0 +1,281 @@
|
|||
/*
|
||||
* BackupContainerAzureBlobStore.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/BackupContainerAzureBlobStore.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
|
||||
using AzureClient = azure::storage_lite::blob_client;
|
||||
|
||||
class ReadFile final : public IAsyncFile, ReferenceCounted<ReadFile> {
|
||||
AsyncTaskThread& asyncTaskThread;
|
||||
std::string containerName;
|
||||
std::string blobName;
|
||||
AzureClient* client;
|
||||
|
||||
public:
|
||||
ReadFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName,
|
||||
AzureClient* client)
|
||||
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
|
||||
|
||||
void addref() override { ReferenceCounted<ReadFile>::addref(); }
|
||||
void delref() override { ReferenceCounted<ReadFile>::delref(); }
|
||||
Future<int> read(void* data, int length, int64_t offset) {
|
||||
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName,
|
||||
blobName = this->blobName, data, length, offset] {
|
||||
std::ostringstream oss(std::ios::out | std::ios::binary);
|
||||
client->download_blob_to_stream(containerName, blobName, offset, length, oss);
|
||||
auto str = oss.str();
|
||||
memcpy(data, str.c_str(), str.size());
|
||||
return static_cast<int>(str.size());
|
||||
});
|
||||
}
|
||||
Future<Void> zeroRange(int64_t offset, int64_t length) override { throw file_not_writable(); }
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override { throw file_not_writable(); }
|
||||
Future<Void> truncate(int64_t size) override { throw file_not_writable(); }
|
||||
Future<Void> sync() override { throw file_not_writable(); }
|
||||
Future<int64_t> size() const override {
|
||||
return asyncTaskThread.execAsync(
|
||||
[client = this->client, containerName = this->containerName, blobName = this->blobName] {
|
||||
return static_cast<int64_t>(client->get_blob_properties(containerName, blobName).get().response().size);
|
||||
});
|
||||
}
|
||||
std::string getFilename() const override { return blobName; }
|
||||
int64_t debugFD() const override { return 0; }
|
||||
};
|
||||
|
||||
class WriteFile final : public IAsyncFile, ReferenceCounted<WriteFile> {
|
||||
AsyncTaskThread& asyncTaskThread;
|
||||
AzureClient* client;
|
||||
std::string containerName;
|
||||
std::string blobName;
|
||||
int64_t m_cursor{ 0 };
|
||||
std::string buffer;
|
||||
|
||||
static constexpr size_t bufferLimit = 1 << 20;
|
||||
|
||||
// From https://tuttlem.github.io/2014/08/18/getting-istream-to-work-off-a-byte-array.html:
|
||||
class MemStream : public std::istream {
|
||||
class MemBuf : public std::basic_streambuf<char> {
|
||||
public:
|
||||
MemBuf(const uint8_t* p, size_t l) { setg((char*)p, (char*)p, (char*)p + l); }
|
||||
} buffer;
|
||||
|
||||
public:
|
||||
MemStream(const uint8_t* p, size_t l) : std::istream(&buffer), buffer(p, l) { rdbuf(&buffer); }
|
||||
};
|
||||
|
||||
public:
|
||||
WriteFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName,
|
||||
AzureClient* client)
|
||||
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
|
||||
|
||||
void addref() override { ReferenceCounted<WriteFile>::addref(); }
|
||||
void delref() override { ReferenceCounted<WriteFile>::delref(); }
|
||||
Future<int> read(void* data, int length, int64_t offset) override { throw file_not_readable(); }
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override {
|
||||
if (offset != m_cursor) {
|
||||
throw non_sequential_op();
|
||||
}
|
||||
m_cursor += length;
|
||||
auto p = static_cast<char const*>(data);
|
||||
buffer.insert(buffer.cend(), p, p + length);
|
||||
if (buffer.size() > bufferLimit) {
|
||||
return sync();
|
||||
} else {
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
Future<Void> truncate(int64_t size) override {
|
||||
if (size != m_cursor) {
|
||||
throw non_sequential_op();
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
Future<Void> sync() override {
|
||||
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName,
|
||||
blobName = this->blobName, buffer = std::move(this->buffer)] {
|
||||
// MemStream memStream(buffer.data(), buffer.size());
|
||||
std::istringstream iss(buffer);
|
||||
auto resp = client->append_block_from_stream(containerName, blobName, iss).get();
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
Future<int64_t> size() const override {
|
||||
return asyncTaskThread.execAsync(
|
||||
[client = this->client, containerName = this->containerName, blobName = this->blobName] {
|
||||
auto resp = client->get_blob_properties(containerName, blobName).get().response();
|
||||
ASSERT(resp.valid()); // TODO: Should instead throw here
|
||||
return static_cast<int64_t>(resp.size);
|
||||
});
|
||||
}
|
||||
std::string getFilename() const override { return blobName; }
|
||||
int64_t debugFD() const override { return -1; }
|
||||
};
|
||||
|
||||
class BackupFile final : public IBackupFile, ReferenceCounted<BackupFile> {
|
||||
Reference<IAsyncFile> m_file;
|
||||
|
||||
public:
|
||||
BackupFile(const std::string& fileName, Reference<IAsyncFile> file) : IBackupFile(fileName), m_file(file) {}
|
||||
Future<Void> append(const void* data, int len) override {
|
||||
Future<Void> r = m_file->write(data, len, m_offset);
|
||||
m_offset += len;
|
||||
return r;
|
||||
}
|
||||
Future<Void> finish() override {
|
||||
Reference<BackupFile> self = Reference<BackupFile>::addRef(this);
|
||||
return map(m_file->sync(), [=](Void _) {
|
||||
self->m_file.clear();
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
void addref() override { ReferenceCounted<BackupFile>::addref(); }
|
||||
void delref() override { ReferenceCounted<BackupFile>::delref(); }
|
||||
};
|
||||
|
||||
bool isDirectory(const std::string& blobName) {
|
||||
return blobName.size() && blobName.back() == '/';
|
||||
}
|
||||
|
||||
ACTOR Future<Reference<IAsyncFile>> readFileImpl(BackupContainerAzureBlobStore* self, std::string fileName) {
|
||||
bool exists = wait(self->blobExists(fileName));
|
||||
if (!exists) {
|
||||
throw file_not_found();
|
||||
}
|
||||
return Reference<IAsyncFile>(
|
||||
new ReadFile(self->asyncTaskThread, self->containerName, fileName, self->client.get()));
|
||||
}
|
||||
|
||||
ACTOR Future<Reference<IBackupFile>> writeFileImpl(BackupContainerAzureBlobStore* self, std::string fileName) {
|
||||
wait(self->asyncTaskThread.execAsync(
|
||||
[client = self->client.get(), containerName = self->containerName, fileName = fileName] {
|
||||
auto outcome = client->create_append_blob(containerName, fileName).get();
|
||||
return Void();
|
||||
}));
|
||||
return Reference<IBackupFile>(
|
||||
new BackupFile(fileName, Reference<IAsyncFile>(new WriteFile(self->asyncTaskThread, self->containerName,
|
||||
fileName, self->client.get()))));
|
||||
}
|
||||
|
||||
void listFilesImpl(AzureClient* client, const std::string& containerName, const std::string& path,
|
||||
std::function<bool(std::string const&)> folderPathFilter,
|
||||
BackupContainerFileSystem::FilesAndSizesT& result) {
|
||||
auto resp = client->list_blobs_segmented(containerName, "/", "", path).get().response();
|
||||
for (const auto& blob : resp.blobs) {
|
||||
if (isDirectory(blob.name) && folderPathFilter(blob.name)) {
|
||||
listFilesImpl(client, containerName, blob.name, folderPathFilter, result);
|
||||
} else {
|
||||
result.emplace_back(blob.name, blob.content_length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> deleteContainerImpl(BackupContainerAzureBlobStore* self, int* pNumDeleted) {
|
||||
state int filesToDelete = 0;
|
||||
if (pNumDeleted) {
|
||||
BackupContainerFileSystem::FilesAndSizesT files = wait(self->listFiles());
|
||||
filesToDelete = files.size();
|
||||
}
|
||||
wait(self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client.get()] {
|
||||
client->delete_container(containerName).wait();
|
||||
return Void();
|
||||
}));
|
||||
if (pNumDeleted) {
|
||||
*pNumDeleted += filesToDelete;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
Future<bool> BackupContainerAzureBlobStore::blobExists(const std::string& fileName) {
|
||||
return asyncTaskThread.execAsync(
|
||||
[client = this->client.get(), containerName = this->containerName, fileName = fileName] {
|
||||
auto resp = client->get_blob_properties(containerName, fileName).get().response();
|
||||
return resp.valid();
|
||||
});
|
||||
}
|
||||
|
||||
BackupContainerAzureBlobStore::BackupContainerAzureBlobStore() : containerName("test_container") {
|
||||
// std::string account_name = std::getenv("AZURE_TESTACCOUNT");
|
||||
// std::string account_key = std::getenv("AZURE_TESTKEY");
|
||||
// bool use_https = true;
|
||||
|
||||
// auto credential = std::make_shared<azure::storage_lite::shared_key_credential>(account_name, account_key);
|
||||
// auto storage_account =
|
||||
// std::make_shared<azure::storage_lite::storage_account>(account_name, credential, use_https);
|
||||
|
||||
auto storage_account = azure::storage_lite::storage_account::development_storage_account();
|
||||
|
||||
client = std::make_unique<AzureClient>(storage_account, 1);
|
||||
}
|
||||
|
||||
void BackupContainerAzureBlobStore::addref() {
|
||||
return ReferenceCounted<BackupContainerAzureBlobStore>::addref();
|
||||
}
|
||||
void BackupContainerAzureBlobStore::delref() {
|
||||
return ReferenceCounted<BackupContainerAzureBlobStore>::delref();
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerAzureBlobStore::create() {
|
||||
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
|
||||
client->create_container(containerName).wait();
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
Future<bool> BackupContainerAzureBlobStore::exists() {
|
||||
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
|
||||
auto resp = client->get_container_properties(containerName).get().response();
|
||||
return resp.valid();
|
||||
});
|
||||
}
|
||||
|
||||
Future<Reference<IAsyncFile>> BackupContainerAzureBlobStore::readFile(std::string fileName) {
|
||||
return readFileImpl(this, fileName);
|
||||
}
|
||||
|
||||
Future<Reference<IBackupFile>> BackupContainerAzureBlobStore::writeFile(const std::string& fileName) {
|
||||
return writeFileImpl(this, fileName);
|
||||
}
|
||||
|
||||
Future<BackupContainerFileSystem::FilesAndSizesT> BackupContainerAzureBlobStore::listFiles(
|
||||
std::string path, std::function<bool(std::string const&)> folderPathFilter) {
|
||||
return asyncTaskThread.execAsync([client = this->client.get(), containerName = this->containerName, path = path,
|
||||
folderPathFilter = folderPathFilter] {
|
||||
FilesAndSizesT result;
|
||||
listFilesImpl(client, containerName, path, folderPathFilter, result);
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerAzureBlobStore::deleteFile(std::string fileName) {
|
||||
return asyncTaskThread.execAsync(
|
||||
[containerName = this->containerName, fileName = fileName, client = client.get()]() {
|
||||
client->delete_blob(containerName, fileName).wait();
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerAzureBlobStore::deleteContainer(int* pNumDeleted) {
|
||||
return deleteContainerImpl(this, pNumDeleted);
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* BackupContainerAzureBlobStore.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBCLIENT_BACKUP_CONTAINER_AZURE_BLOBSTORE_H
|
||||
#define FDBCLIENT_BACKUP_CONTAINER_AZURE_BLOBSTORE_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/BackupContainerFileSystem.actor.h"
|
||||
|
||||
#include "storage_credential.h"
|
||||
#include "storage_account.h"
|
||||
#include "blob/blob_client.h"
|
||||
|
||||
class BackupContainerAzureBlobStore final : public BackupContainerFileSystem,
|
||||
ReferenceCounted<BackupContainerAzureBlobStore> {
|
||||
using AzureClient = azure::storage_lite::blob_client;
|
||||
|
||||
public:
|
||||
// TODO: Encapsulate these?
|
||||
std::unique_ptr<AzureClient> client;
|
||||
std::string containerName;
|
||||
AsyncTaskThread asyncTaskThread;
|
||||
|
||||
Future<bool> blobExists(const std::string& fileName);
|
||||
|
||||
BackupContainerAzureBlobStore();
|
||||
|
||||
void addref() override;
|
||||
void delref() override;
|
||||
|
||||
Future<Void> create() override;
|
||||
|
||||
Future<bool> exists() override;
|
||||
|
||||
Future<Reference<IAsyncFile>> readFile(std::string fileName) override;
|
||||
|
||||
Future<Reference<IBackupFile>> writeFile(const std::string& fileName) override;
|
||||
|
||||
Future<FilesAndSizesT> listFiles(std::string path = "",
|
||||
std::function<bool(std::string const&)> folderPathFilter = nullptr) override;
|
||||
|
||||
Future<Void> deleteFile(std::string fileName) override;
|
||||
|
||||
Future<Void> deleteContainer(int* pNumDeleted) override;
|
||||
};
|
||||
|
||||
#endif
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/BackupContainerAzureBlobStore.h"
|
||||
#include "fdbclient/BackupContainerFileSystem.actor.h"
|
||||
#include "fdbclient/BackupContainerLocalDirectory.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
|
|
|
@ -25,10 +25,6 @@
|
|||
#elif !defined(FDBCLIENT_BACKUP_CONTAINER_FILESYSTEM_H)
|
||||
#define FDBCLIENT_BACKUP_CONTAINER_FILESYSTEM_H
|
||||
|
||||
#include "storage_credential.h"
|
||||
#include "storage_account.h"
|
||||
#include "blob/blob_client.h"
|
||||
|
||||
// FIXME: Trim this down
|
||||
#include "flow/Platform.actor.h"
|
||||
#include "fdbclient/AsyncTaskThread.h"
|
||||
|
@ -440,259 +436,5 @@ public:
|
|||
std::string getBucket() const { return m_bucket; }
|
||||
};
|
||||
|
||||
class BackupContainerAzureBlobStore final : public BackupContainerFileSystem,
|
||||
ReferenceCounted<BackupContainerAzureBlobStore> {
|
||||
|
||||
using AzureClient = azure::storage_lite::blob_client;
|
||||
|
||||
std::unique_ptr<AzureClient> client;
|
||||
std::string containerName;
|
||||
AsyncTaskThread asyncTaskThread;
|
||||
|
||||
class ReadFile final : public IAsyncFile, ReferenceCounted<ReadFile> {
|
||||
AsyncTaskThread& asyncTaskThread;
|
||||
std::string containerName;
|
||||
std::string blobName;
|
||||
AzureClient* client;
|
||||
|
||||
public:
|
||||
ReadFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName,
|
||||
AzureClient* client)
|
||||
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
|
||||
|
||||
void addref() override { ReferenceCounted<ReadFile>::addref(); }
|
||||
void delref() override { ReferenceCounted<ReadFile>::delref(); }
|
||||
Future<int> read(void* data, int length, int64_t offset) {
|
||||
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName,
|
||||
blobName = this->blobName, data, length, offset] {
|
||||
std::ostringstream oss(std::ios::out | std::ios::binary);
|
||||
client->download_blob_to_stream(containerName, blobName, offset, length, oss);
|
||||
auto str = oss.str();
|
||||
memcpy(data, str.c_str(), str.size());
|
||||
return static_cast<int>(str.size());
|
||||
});
|
||||
}
|
||||
Future<Void> zeroRange(int64_t offset, int64_t length) override { throw file_not_writable(); }
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override { throw file_not_writable(); }
|
||||
Future<Void> truncate(int64_t size) override { throw file_not_writable(); }
|
||||
Future<Void> sync() override { throw file_not_writable(); }
|
||||
Future<int64_t> size() const override {
|
||||
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName,
|
||||
blobName = this->blobName] {
|
||||
return static_cast<int64_t>(client->get_blob_properties(containerName, blobName).get().response().size);
|
||||
});
|
||||
}
|
||||
std::string getFilename() const override { return blobName; }
|
||||
int64_t debugFD() const override { return 0; }
|
||||
};
|
||||
|
||||
class WriteFile final : public IAsyncFile, ReferenceCounted<WriteFile> {
|
||||
AsyncTaskThread& asyncTaskThread;
|
||||
AzureClient* client;
|
||||
std::string containerName;
|
||||
std::string blobName;
|
||||
int64_t m_cursor{ 0 };
|
||||
std::string buffer;
|
||||
|
||||
static constexpr size_t bufferLimit = 1 << 20;
|
||||
|
||||
// From https://tuttlem.github.io/2014/08/18/getting-istream-to-work-off-a-byte-array.html:
|
||||
class MemStream : public std::istream {
|
||||
class MemBuf : public std::basic_streambuf<char> {
|
||||
public:
|
||||
MemBuf(const uint8_t* p, size_t l) { setg((char*)p, (char*)p, (char*)p + l); }
|
||||
} buffer;
|
||||
|
||||
public:
|
||||
MemStream(const uint8_t* p, size_t l) : std::istream(&buffer), buffer(p, l) { rdbuf(&buffer); }
|
||||
};
|
||||
|
||||
public:
|
||||
WriteFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName,
|
||||
AzureClient* client)
|
||||
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
|
||||
|
||||
void addref() override { ReferenceCounted<WriteFile>::addref(); }
|
||||
void delref() override { ReferenceCounted<WriteFile>::delref(); }
|
||||
Future<int> read(void* data, int length, int64_t offset) override { throw file_not_readable(); }
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override {
|
||||
if (offset != m_cursor) {
|
||||
throw non_sequential_op();
|
||||
}
|
||||
m_cursor += length;
|
||||
auto p = static_cast<char const*>(data);
|
||||
buffer.insert(buffer.cend(), p, p + length);
|
||||
if (buffer.size() > bufferLimit) {
|
||||
return sync();
|
||||
} else {
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
Future<Void> truncate(int64_t size) override {
|
||||
if (size != m_cursor) {
|
||||
throw non_sequential_op();
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
Future<Void> sync() override {
|
||||
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName,
|
||||
blobName = this->blobName, buffer = std::move(this->buffer)] {
|
||||
// MemStream memStream(buffer.data(), buffer.size());
|
||||
std::istringstream iss(buffer);
|
||||
auto resp = client->append_block_from_stream(containerName, blobName, iss).get();
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
Future<int64_t> size() const override {
|
||||
return asyncTaskThread.execAsync(
|
||||
[client = this->client, containerName = this->containerName, blobName = this->blobName] {
|
||||
auto resp = client->get_blob_properties(containerName, blobName).get().response();
|
||||
ASSERT(resp.valid()); // TODO: Should instead throw here
|
||||
return static_cast<int64_t>(resp.size);
|
||||
});
|
||||
}
|
||||
std::string getFilename() const override { return blobName; }
|
||||
int64_t debugFD() const override { return -1; }
|
||||
};
|
||||
|
||||
class BackupFile final : public IBackupFile, ReferenceCounted<BackupFile> {
|
||||
Reference<IAsyncFile> m_file;
|
||||
|
||||
public:
|
||||
BackupFile(const std::string& fileName, Reference<IAsyncFile> file) : IBackupFile(fileName), m_file(file) {}
|
||||
Future<Void> append(const void* data, int len) override {
|
||||
Future<Void> r = m_file->write(data, len, m_offset);
|
||||
m_offset += len;
|
||||
return r;
|
||||
}
|
||||
Future<Void> finish() override {
|
||||
Reference<BackupFile> self = Reference<BackupFile>::addRef(this);
|
||||
return map(m_file->sync(), [=](Void _) {
|
||||
self->m_file.clear();
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
void addref() override { ReferenceCounted<BackupFile>::addref(); }
|
||||
void delref() override { ReferenceCounted<BackupFile>::delref(); }
|
||||
};
|
||||
|
||||
Future<bool> blobExists(const std::string& fileName) {
|
||||
return asyncTaskThread.execAsync(
|
||||
[client = this->client.get(), containerName = this->containerName, fileName = fileName] {
|
||||
auto resp = client->get_blob_properties(containerName, fileName).get().response();
|
||||
return resp.valid();
|
||||
});
|
||||
}
|
||||
|
||||
static bool isDirectory(const std::string& blobName) { return blobName.size() && blobName.back() == '/'; }
|
||||
|
||||
ACTOR static Future<Reference<IAsyncFile>> readFile_impl(BackupContainerAzureBlobStore* self,
|
||||
std::string fileName) {
|
||||
bool exists = wait(self->blobExists(fileName));
|
||||
if (!exists) {
|
||||
throw file_not_found();
|
||||
}
|
||||
return Reference<IAsyncFile>(
|
||||
new ReadFile(self->asyncTaskThread, self->containerName, fileName, self->client.get()));
|
||||
}
|
||||
|
||||
ACTOR static Future<Reference<IBackupFile>> writeFile_impl(BackupContainerAzureBlobStore* self,
|
||||
std::string fileName) {
|
||||
wait(self->asyncTaskThread.execAsync(
|
||||
[client = self->client.get(), containerName = self->containerName, fileName = fileName] {
|
||||
auto outcome = client->create_append_blob(containerName, fileName).get();
|
||||
return Void();
|
||||
}));
|
||||
return Reference<IBackupFile>(
|
||||
new BackupFile(fileName, Reference<IAsyncFile>(new WriteFile(self->asyncTaskThread, self->containerName,
|
||||
fileName, self->client.get()))));
|
||||
}
|
||||
|
||||
static void listFilesImpl(AzureClient* client, const std::string& containerName, const std::string& path,
|
||||
std::function<bool(std::string const&)> folderPathFilter, FilesAndSizesT& result) {
|
||||
auto resp = client->list_blobs_segmented(containerName, "/", "", path).get().response();
|
||||
for (const auto& blob : resp.blobs) {
|
||||
if (isDirectory(blob.name) && folderPathFilter(blob.name)) {
|
||||
listFilesImpl(client, containerName, blob.name, folderPathFilter, result);
|
||||
} else {
|
||||
result.emplace_back(blob.name, blob.content_length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> deleteContainerImpl(BackupContainerAzureBlobStore* self, int* pNumDeleted) {
|
||||
state int filesToDelete = 0;
|
||||
if (pNumDeleted) {
|
||||
FilesAndSizesT files = wait(self->listFiles());
|
||||
filesToDelete = files.size();
|
||||
}
|
||||
wait(self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client.get()] {
|
||||
client->delete_container(containerName).wait();
|
||||
return Void();
|
||||
}));
|
||||
if (pNumDeleted) {
|
||||
*pNumDeleted += filesToDelete;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
public:
|
||||
BackupContainerAzureBlobStore() : containerName("test_container") {
|
||||
// std::string account_name = std::getenv("AZURE_TESTACCOUNT");
|
||||
// std::string account_key = std::getenv("AZURE_TESTKEY");
|
||||
// bool use_https = true;
|
||||
|
||||
// auto credential = std::make_shared<azure::storage_lite::shared_key_credential>(account_name, account_key);
|
||||
// auto storage_account =
|
||||
// std::make_shared<azure::storage_lite::storage_account>(account_name, credential, use_https);
|
||||
|
||||
auto storage_account = azure::storage_lite::storage_account::development_storage_account();
|
||||
|
||||
client = std::make_unique<AzureClient>(storage_account, 1);
|
||||
}
|
||||
|
||||
void addref() override { return ReferenceCounted<BackupContainerAzureBlobStore>::addref(); }
|
||||
void delref() override { return ReferenceCounted<BackupContainerAzureBlobStore>::delref(); }
|
||||
|
||||
Future<Void> create() override {
|
||||
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
|
||||
client->create_container(containerName).wait();
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
Future<bool> exists() override {
|
||||
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
|
||||
auto resp = client->get_container_properties(containerName).get().response();
|
||||
return resp.valid();
|
||||
});
|
||||
}
|
||||
|
||||
Future<Reference<IAsyncFile>> readFile(std::string fileName) override { return readFile_impl(this, fileName); }
|
||||
|
||||
Future<Reference<IBackupFile>> writeFile(const std::string& fileName) override {
|
||||
return writeFile_impl(this, fileName);
|
||||
}
|
||||
|
||||
Future<FilesAndSizesT> listFiles(std::string path = "",
|
||||
std::function<bool(std::string const&)> folderPathFilter = nullptr) {
|
||||
return asyncTaskThread.execAsync([client = this->client.get(), containerName = this->containerName, path = path,
|
||||
folderPathFilter = folderPathFilter] {
|
||||
FilesAndSizesT result;
|
||||
listFilesImpl(client, containerName, path, folderPathFilter, result);
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
Future<Void> deleteFile(std::string fileName) override {
|
||||
return asyncTaskThread.execAsync(
|
||||
[containerName = this->containerName, fileName = fileName, client = client.get()]() {
|
||||
client->delete_blob(containerName, fileName).wait();
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
|
||||
Future<Void> deleteContainer(int* pNumDeleted) override { return deleteContainerImpl(this, pNumDeleted); }
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -8,6 +8,8 @@ set(FDBCLIENT_SRCS
|
|||
BackupAgentBase.actor.cpp
|
||||
BackupContainer.actor.cpp
|
||||
BackupContainer.h
|
||||
BackupContainerAzureBlobStore.actor.cpp
|
||||
BackupContainerAzureBlobStore.h
|
||||
BackupContainerFileSystem.actor.cpp
|
||||
BackupContainerFileSystem.actor.h
|
||||
BackupContainerLocalDirectory.actor.cpp
|
||||
|
|
Loading…
Reference in New Issue