MVC2.0: Use page-aligned buffers and offsets, enable async IO for reading & writing client lib files
This commit is contained in:
parent
5360abb238
commit
39017c3b41
|
@ -39,6 +39,8 @@ namespace ClientLibUtils {
|
|||
struct ClientLibBinaryInfo {
|
||||
size_t totalBytes;
|
||||
size_t chunkCnt;
|
||||
size_t chunkSize;
|
||||
ClientLibBinaryInfo() : totalBytes(0), chunkCnt(0), chunkSize(0) {}
|
||||
};
|
||||
|
||||
static const char* g_statusNames[] = { "disabled", "available", "uploading", "deleting" };
|
||||
|
@ -200,15 +202,20 @@ ACTOR Future<Void> uploadClientLibBinary(Database db,
|
|||
|
||||
state size_t fileOffset = 0;
|
||||
state size_t chunkNo = 0;
|
||||
state int transactionSize = CLIENT_KNOBS->MVC_CLIENTLIB_TRANSACTION_SIZE;
|
||||
state int chunkSize = CLIENT_KNOBS->MVC_CLIENTLIB_CHUNK_SIZE;
|
||||
state int transactionSize = getAlignedUpperBound(CLIENT_KNOBS->MVC_CLIENTLIB_TRANSACTION_SIZE, _PAGE_SIZE);
|
||||
state int chunkSize = getAlignedUpperBound(CLIENT_KNOBS->MVC_CLIENTLIB_CHUNK_SIZE, 1024);
|
||||
if (transactionSize % chunkSize != 0) {
|
||||
TraceEvent(SevError, "ClientLibraryInvalidConfig")
|
||||
.detail("Reason", format("Invalid chunk size configuration, falling back to %d", _PAGE_SIZE));
|
||||
chunkSize = _PAGE_SIZE;
|
||||
}
|
||||
|
||||
state Reference<IAsyncFile> fClientLib = wait(IAsyncFileSystem::filesystem()->open(
|
||||
libFilePath.toString(), IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO, 0));
|
||||
libFilePath.toString(), IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED, 0));
|
||||
|
||||
loop {
|
||||
state Arena arena;
|
||||
state StringRef buf = makeString(transactionSize, arena);
|
||||
state StringRef buf = makeAlignedString(_PAGE_SIZE, transactionSize, arena);
|
||||
state int bytesRead = wait(fClientLib->read(mutateString(buf), transactionSize, fileOffset));
|
||||
fileOffset += bytesRead;
|
||||
if (bytesRead <= 0) {
|
||||
|
@ -242,6 +249,7 @@ ACTOR Future<Void> uploadClientLibBinary(Database db,
|
|||
}
|
||||
binInfo->totalBytes = fileOffset;
|
||||
binInfo->chunkCnt = chunkNo;
|
||||
binInfo->chunkSize = chunkSize;
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -316,7 +324,7 @@ ACTOR Future<Void> uploadClientLibrary(Database db, StringRef metadataString, St
|
|||
/*
|
||||
* Upload the binary of the client library in chunks
|
||||
*/
|
||||
state ClientLibBinaryInfo binInfo = {};
|
||||
state ClientLibBinaryInfo binInfo;
|
||||
wait(uploadClientLibBinary(db, libFilePath, clientLibBinPrefix, &binInfo));
|
||||
|
||||
/*
|
||||
|
@ -325,6 +333,7 @@ ACTOR Future<Void> uploadClientLibrary(Database db, StringRef metadataString, St
|
|||
*/
|
||||
metadataJson[CLIENTLIB_ATTR_SIZE] = static_cast<int64_t>(binInfo.totalBytes);
|
||||
metadataJson[CLIENTLIB_ATTR_CHUNK_COUNT] = static_cast<int64_t>(binInfo.chunkCnt);
|
||||
metadataJson[CLIENTLIB_ATTR_CHUNK_SIZE] = static_cast<int64_t>(binInfo.chunkSize);
|
||||
metadataJson[CLIENTLIB_ATTR_FILENAME] = basename(libFilePath.toString());
|
||||
metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(targetStatus);
|
||||
jsStr = json_spirit::write_string(json_spirit::mValue(metadataJson));
|
||||
|
@ -382,22 +391,22 @@ ACTOR Future<Void> downloadClientLibrary(Database db, StringRef clientLibId, Str
|
|||
}
|
||||
|
||||
int64_t flags = IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_CREATE |
|
||||
IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO;
|
||||
IAsyncFile::OPEN_UNCACHED;
|
||||
state Reference<IAsyncFile> fClientLib =
|
||||
wait(IAsyncFileSystem::filesystem()->open(libFilePath.toString(), flags, 0666));
|
||||
|
||||
state size_t fileOffset = 0;
|
||||
state int transactionSize = CLIENT_KNOBS->MVC_CLIENTLIB_TRANSACTION_SIZE;
|
||||
state int transactionSize = getAlignedUpperBound(CLIENT_KNOBS->MVC_CLIENTLIB_TRANSACTION_SIZE, _PAGE_SIZE);
|
||||
state size_t chunkCount = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_CHUNK_COUNT);
|
||||
state size_t binarySize = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_SIZE);
|
||||
state size_t expectedChunkSize =
|
||||
(binarySize % chunkCount == 0) ? (binarySize / chunkCount) : (binarySize / chunkCount + 1);
|
||||
state size_t expectedChunkSize = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_CHUNK_SIZE);
|
||||
ASSERT(transactionSize % chunkCount == 0);
|
||||
state size_t chunkNo = 0;
|
||||
|
||||
loop {
|
||||
state Arena arena;
|
||||
state Transaction tr1(db);
|
||||
state StringRef buf = makeString(transactionSize, arena);
|
||||
state StringRef buf = makeAlignedString(_PAGE_SIZE, transactionSize, arena);
|
||||
state size_t bufferOffset = 0;
|
||||
state size_t firstChunkNo = chunkNo;
|
||||
|
||||
|
@ -414,7 +423,10 @@ ACTOR Future<Void> downloadClientLibrary(Database db, StringRef clientLibId, Str
|
|||
throw client_lib_invalid_binary();
|
||||
}
|
||||
StringRef chunkVal = chunkValOpt.get();
|
||||
if (chunkVal.size() > expectedChunkSize) {
|
||||
// All chunks exept for the last one must be of the expected size to guarantee
|
||||
// alignment when writing to file
|
||||
if ((chunkNo != chunkCount && chunkVal.size() != expectedChunkSize) ||
|
||||
chunkVal.size() > expectedChunkSize) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidChunkSize")
|
||||
.detail("Key", chunkKey)
|
||||
.detail("MaxSize", expectedChunkSize)
|
||||
|
|
|
@ -57,6 +57,7 @@ enum ClientLibPlatform {
|
|||
#define CLIENTLIB_ATTR_FILENAME "filename"
|
||||
#define CLIENTLIB_ATTR_SIZE "size"
|
||||
#define CLIENTLIB_ATTR_CHUNK_COUNT "chunkcount"
|
||||
#define CLIENTLIB_ATTR_CHUNK_SIZE "chunksize"
|
||||
|
||||
struct ClientLibFilter {
|
||||
bool matchAvailableOnly;
|
||||
|
|
12
flow/Arena.h
12
flow/Arena.h
|
@ -677,6 +677,10 @@ inline StringRef operator"" _sr(const char* str, size_t size) {
|
|||
return StringRef(reinterpret_cast<const uint8_t*>(str), size);
|
||||
}
|
||||
|
||||
inline static uintptr_t getAlignedUpperBound(uintptr_t value, uintptr_t alignment) {
|
||||
return ((value + alignment - 1) / alignment) * alignment;
|
||||
}
|
||||
|
||||
// makeString is used to allocate a Standalone<StringRef> of a known length for later
|
||||
// mutation (via mutateString). If you need to append to a string of unknown length,
|
||||
// consider factoring StringBuffer from DiskQueue.actor.cpp.
|
||||
|
@ -690,7 +694,7 @@ inline static Standalone<StringRef> makeString(int length) {
|
|||
inline static Standalone<StringRef> makeAlignedString(int alignment, int length) {
|
||||
Standalone<StringRef> returnString;
|
||||
uint8_t* outData = new (returnString.arena()) uint8_t[alignment + length];
|
||||
outData = (uint8_t*)((((uintptr_t)outData + (alignment - 1)) / alignment) * alignment);
|
||||
outData = (uint8_t*)getAlignedUpperBound((uintptr_t)outData, alignment);
|
||||
((StringRef&)returnString) = StringRef(outData, length);
|
||||
return returnString;
|
||||
}
|
||||
|
@ -700,6 +704,12 @@ inline static StringRef makeString(int length, Arena& arena) {
|
|||
return StringRef(outData, length);
|
||||
}
|
||||
|
||||
inline static StringRef makeAlignedString(int alignment, int length, Arena& arena) {
|
||||
uint8_t* outData = new (arena) uint8_t[alignment + length];
|
||||
outData = (uint8_t*)getAlignedUpperBound((uintptr_t)outData, alignment);
|
||||
return StringRef(outData, length);
|
||||
}
|
||||
|
||||
// mutateString() simply casts away const and returns a pointer that can be used to mutate the
|
||||
// contents of the given StringRef (it will also accept Standalone<StringRef>). Obviously this
|
||||
// is only legitimate if you know where the StringRef's memory came from and that it is not shared!
|
||||
|
|
Loading…
Reference in New Issue