renamed BinaryWriter.toStringRef() to .toValue(), because the function now returns a Standalone<StringRef>()

eliminated an unnecessary copy from the proxy commit path
eliminated an unnecessary copy from buffered peek cursor
This commit is contained in:
Evan Tschannen 2019-03-28 11:52:50 -07:00
parent 795ce9f137
commit b6008558d3
27 changed files with 98 additions and 98 deletions

View File

@ -568,7 +568,7 @@ ACTOR Future<int> dumpData(Database cx, PromiseStream<RCGroup> results, Referenc
for(int i = 0; i < group.items.size(); ++i) {
bw.serializeBytes(group.items[i].value);
}
decodeBackupLogValue(req.arena, req.transaction.mutations, mutationSize, bw.toStringRef(), addPrefix, removePrefix, group.groupKey, keyVersion);
decodeBackupLogValue(req.arena, req.transaction.mutations, mutationSize, bw.toValue(), addPrefix, removePrefix, group.groupKey, keyVersion);
newBeginVersion = group.groupKey + 1;
if(mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) {
break;

View File

@ -154,11 +154,11 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
BinaryWriter policyWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, storagePolicy);
out[p+"storage_replication_policy"] = policyWriter.toStringRef().toString();
out[p+"storage_replication_policy"] = policyWriter.toValue().toString();
policyWriter = BinaryWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, tLogPolicy);
out[p+"log_replication_policy"] = policyWriter.toStringRef().toString();
out[p+"log_replication_policy"] = policyWriter.toValue().toString();
return out;
}
@ -194,7 +194,7 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
BinaryWriter policyWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, remoteTLogPolicy);
out[p+"remote_log_policy"] = policyWriter.toStringRef().toString();
out[p+"remote_log_policy"] = policyWriter.toValue().toString();
return out;
}
@ -224,7 +224,7 @@ ConfigurationResult::Type buildConfiguration( std::vector<StringRef> const& mode
Reference<IReplicationPolicy> storagePolicy = Reference<IReplicationPolicy>(new PolicyAcross(storageCount, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
BinaryWriter policyWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, storagePolicy);
outConf[p+"storage_replication_policy"] = policyWriter.toStringRef().toString();
outConf[p+"storage_replication_policy"] = policyWriter.toValue().toString();
}
if(!outConf.count(p + "log_replication_policy") && outConf.count(p + "log_replicas")) {
@ -232,7 +232,7 @@ ConfigurationResult::Type buildConfiguration( std::vector<StringRef> const& mode
Reference<IReplicationPolicy> logPolicy = Reference<IReplicationPolicy>(new PolicyAcross(logCount, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
BinaryWriter policyWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, logPolicy);
outConf[p+"log_replication_policy"] = policyWriter.toStringRef().toString();
outConf[p+"log_replication_policy"] = policyWriter.toValue().toString();
}
return ConfigurationResult::SUCCESS;
}
@ -1308,10 +1308,10 @@ ACTOR Future<int> setDDMode( Database cx, int mode ) {
if (!mode) {
BinaryWriter wrMyOwner(Unversioned());
wrMyOwner << dataDistributionModeLock;
tr.set( moveKeysLockOwnerKey, wrMyOwner.toStringRef() );
tr.set( moveKeysLockOwnerKey, wrMyOwner.toValue() );
}
tr.set( dataDistributionModeKey, wr.toStringRef() );
tr.set( dataDistributionModeKey, wr.toValue() );
wait( tr.commit() );
return oldMode;

View File

@ -268,7 +268,7 @@ struct GetHealthMetricsReply
this->healthMetrics.update(healthMetrics, detailedInput, detailedOutput);
BinaryWriter bw(IncludeVersion());
bw << this->healthMetrics;
serialized = Standalone<StringRef>(bw.toStringRef());
serialized = bw.toValue();
}
template <class Ar>

View File

@ -372,7 +372,7 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext *cx) {
TrInfoChunk chunk;
BinaryWriter chunkBW(Unversioned());
chunkBW << bigEndian32(i+1) << bigEndian32(num_chunks);
chunk.key = KeyRef(clientLatencyName + std::string(10, '\x00') + "/" + random_id + "/" + chunkBW.toStringRef().toString() + "/" + user_provided_id + std::string(4, '\x00'));
chunk.key = KeyRef(clientLatencyName + std::string(10, '\x00') + "/" + random_id + "/" + chunkBW.toValue().toString() + "/" + user_provided_id + std::string(4, '\x00'));
int32_t pos = littleEndian32(clientLatencyName.size());
memcpy(mutateString(chunk.key) + chunk.key.size() - sizeof(int32_t), &pos, sizeof(int32_t));
if (i == num_chunks - 1) {
@ -2236,7 +2236,7 @@ void Transaction::makeSelfConflicting() {
BinaryWriter wr(Unversioned());
wr.serializeBytes(LiteralStringRef("\xFF/SC/"));
wr << g_random->randomUniqueID();
auto r = singleKeyRange( wr.toStringRef(), tr.arena );
auto r = singleKeyRange( wr.toValue(), tr.arena );
tr.transaction.read_conflict_ranges.push_back( tr.arena, r );
tr.transaction.write_conflict_ranges.push_back( tr.arena, r );
}

View File

@ -46,7 +46,7 @@ const Value keyServersValue( const vector<UID>& src, const vector<UID>& dest ) {
// src and dest are expected to be sorted
ASSERT( std::is_sorted(src.begin(), src.end()) && std::is_sorted(dest.begin(), dest.end()) );
BinaryWriter wr((IncludeVersion())); wr << src << dest;
return wr.toStringRef();
return wr.toValue();
}
void decodeKeyServersValue( const ValueRef& value, vector<UID>& src, vector<UID>& dest ) {
if (value.size()) {
@ -62,7 +62,7 @@ const Value logsValue( const vector<std::pair<UID, NetworkAddress>>& logs, const
BinaryWriter wr(IncludeVersion());
wr << logs;
wr << oldLogs;
return wr.toStringRef();
return wr.toValue();
}
std::pair<vector<std::pair<UID, NetworkAddress>>,vector<std::pair<UID, NetworkAddress>>> decodeLogsValue( const ValueRef& value ) {
vector<std::pair<UID, NetworkAddress>> logs;
@ -84,14 +84,14 @@ const Key serverKeysKey( UID serverID, const KeyRef& key ) {
wr << serverID;
wr.serializeBytes( LiteralStringRef("/") );
wr.serializeBytes( key );
return wr.toStringRef();
return wr.toValue();
}
const Key serverKeysPrefixFor( UID serverID ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( serverKeysPrefix );
wr << serverID;
wr.serializeBytes( LiteralStringRef("/") );
return wr.toStringRef();
return wr.toValue();
}
UID serverKeysDecodeServer( const KeyRef& key ) {
UID server_id;
@ -120,21 +120,21 @@ const Key serverTagKeyFor( UID serverID ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( serverTagKeys.begin );
wr << serverID;
return wr.toStringRef();
return wr.toValue();
}
const Key serverTagHistoryKeyFor( UID serverID ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( serverTagHistoryKeys.begin );
wr << serverID;
return addVersionStampAtEnd(wr.toStringRef());
return addVersionStampAtEnd(wr.toValue());
}
const KeyRange serverTagHistoryRangeFor( UID serverID ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( serverTagHistoryKeys.begin );
wr << serverID;
return prefixRange(wr.toStringRef());
return prefixRange(wr.toValue());
}
const KeyRange serverTagHistoryRangeBefore( UID serverID, Version version ) {
@ -147,13 +147,13 @@ const KeyRange serverTagHistoryRangeBefore( UID serverID, Version version ) {
uint8_t* data = mutateString( versionStr );
memcpy(data, &version, 8);
return KeyRangeRef( wr.toStringRef(), versionStr.withPrefix(wr.toStringRef()) );
return KeyRangeRef( wr.toValue(), versionStr.withPrefix(wr.toValue()) );
}
const Value serverTagValue( Tag tag ) {
BinaryWriter wr(IncludeVersion());
wr << tag;
return wr.toStringRef();
return wr.toValue();
}
UID decodeServerTagKey( KeyRef const& key ) {
@ -195,7 +195,7 @@ const Key serverTagConflictKeyFor( Tag tag ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( serverTagConflictKeys.begin );
wr << tag;
return wr.toStringRef();
return wr.toValue();
}
const KeyRangeRef tagLocalityListKeys(
@ -207,13 +207,13 @@ const Key tagLocalityListKeyFor( Optional<Value> dcID ) {
BinaryWriter wr(AssumeVersion(currentProtocolVersion));
wr.serializeBytes( tagLocalityListKeys.begin );
wr << dcID;
return wr.toStringRef();
return wr.toValue();
}
const Value tagLocalityListValue( int8_t const& tagLocality ) {
BinaryWriter wr(IncludeVersion());
wr << tagLocality;
return wr.toStringRef();
return wr.toValue();
}
Optional<Value> decodeTagLocalityListKey( KeyRef const& key ) {
Optional<Value> dcID;
@ -237,13 +237,13 @@ const Key datacenterReplicasKeyFor( Optional<Value> dcID ) {
BinaryWriter wr(AssumeVersion(currentProtocolVersion));
wr.serializeBytes( datacenterReplicasKeys.begin );
wr << dcID;
return wr.toStringRef();
return wr.toValue();
}
const Value datacenterReplicasValue( int const& replicas ) {
BinaryWriter wr(IncludeVersion());
wr << replicas;
return wr.toStringRef();
return wr.toValue();
}
Optional<Value> decodeDatacenterReplicasKey( KeyRef const& key ) {
Optional<Value> dcID;
@ -272,7 +272,7 @@ const Key tLogDatacentersKeyFor( Optional<Value> dcID ) {
BinaryWriter wr(AssumeVersion(currentProtocolVersion));
wr.serializeBytes( tLogDatacentersKeys.begin );
wr << dcID;
return wr.toStringRef();
return wr.toValue();
}
Optional<Value> decodeTLogDatacentersKey( KeyRef const& key ) {
Optional<Value> dcID;
@ -293,13 +293,13 @@ const Key serverListKeyFor( UID serverID ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( serverListKeys.begin );
wr << serverID;
return wr.toStringRef();
return wr.toValue();
}
const Value serverListValue( StorageServerInterface const& server ) {
BinaryWriter wr(IncludeVersion());
wr << server;
return wr.toStringRef();
return wr.toValue();
}
UID decodeServerListKey( KeyRef const& key ) {
UID serverID;
@ -327,13 +327,13 @@ const Key processClassKeyFor(StringRef processID ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( processClassKeys.begin );
wr << processID;
return wr.toStringRef();
return wr.toValue();
}
const Value processClassValue( ProcessClass const& processClass ) {
BinaryWriter wr(IncludeVersion());
wr << processClass;
return wr.toStringRef();
return wr.toValue();
}
Key decodeProcessClassKey( KeyRef const& key ) {
@ -384,13 +384,13 @@ const Key workerListKeyFor( StringRef processID ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( workerListKeys.begin );
wr << processID;
return wr.toStringRef();
return wr.toValue();
}
const Value workerListValue( ProcessData const& processData ) {
BinaryWriter wr(IncludeVersion());
wr << processData;
return wr.toStringRef();
return wr.toValue();
}
Key decodeWorkerListKey(KeyRef const& key) {
@ -498,7 +498,7 @@ KeyRef logRangesDecodeKey(KeyRef key, UID* logUid) {
Key logRangesEncodeValue(KeyRef keyEnd, KeyRef destPath) {
BinaryWriter wr(IncludeVersion());
wr << std::make_pair(keyEnd, destPath);
return wr.toStringRef();
return wr.toValue();
}
// \xff/logRanges/[16-byte UID][begin key] := serialize( make_pair([end key], [destination key prefix]), IncludeVersion() )
@ -521,7 +521,7 @@ Key uidPrefixKey(KeyRef keyPrefix, UID logUid) {
BinaryWriter bw(Unversioned());
bw.serializeBytes(keyPrefix);
bw << logUid;
return bw.toStringRef();
return bw.toValue();
}
// Apply mutations constant variables
@ -561,7 +561,7 @@ const Key metricConfKey( KeyRef const& prefix, MetricNameRef const& name, KeyRef
wr.serializeBytes( LiteralStringRef("\x00\x01") );
wr.serializeBytes( key );
wr.serializeBytes( LiteralStringRef("\x00") );
return wr.toStringRef();
return wr.toValue();
}
std::pair<MetricNameRef, KeyRef> decodeMetricConfKey( KeyRef const& prefix, KeyRef const& key ) {
@ -605,5 +605,5 @@ const Key restoreWorkerKeyFor( UID const& agentID ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( restoreWorkersKeys.begin );
wr << agentID;
return wr.toStringRef();
return wr.toValue();
}

View File

@ -311,7 +311,7 @@ TEST_CASE("/flow/flow/networked futures")
ASSERT(locInt.getEndpoint().isValid() && locInt.getEndpoint().isLocal() && locInt.getEndpoint().getPrimaryAddress() == FlowTransport::transport().getLocalAddress());
BinaryReader rd(wr.toStringRef(), IncludeVersion());
BinaryReader rd(wr.toValue(), IncludeVersion());
RequestStream<int> remoteInt;
rd >> remoteInt;
@ -328,7 +328,7 @@ TEST_CASE("/flow/flow/networked futures")
ASSERT(locInt.getEndpoint().isValid() && locInt.getEndpoint().isLocal());
BinaryReader rd(wr.toStringRef(), IncludeVersion());
BinaryReader rd(wr.toValue(), IncludeVersion());
ReplyPromise<int> remoteInt;
rd >> remoteInt;

View File

@ -976,7 +976,7 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
BinaryWriter wr( AssumeVersion(currentProtocolVersion) );
what.serializeBinaryWriter(wr);
Standalone<StringRef> copy = wr.toStringRef();
Standalone<StringRef> copy = wr.toValue();
#if VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED(copy.begin(), copy.size());
#endif

View File

@ -698,14 +698,14 @@ void IndirectShadowPager::logVersion(StringRef versionKey, Version version) {
BinaryWriter v(Unversioned());
v << version;
pageTableLog->set(KeyValueRef(versionKey, v.toStringRef()));
pageTableLog->set(KeyValueRef(versionKey, v.toValue()));
}
void IndirectShadowPager::logPagesAllocated() {
BinaryWriter v(Unversioned());
v << pagerFile.getPagesAllocated();
pageTableLog->set(KeyValueRef(PAGES_ALLOCATED_KEY, v.toStringRef()));
pageTableLog->set(KeyValueRef(PAGES_ALLOCATED_KEY, v.toValue()));
}
void IndirectShadowPager::logPageTableUpdate(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID) {
@ -715,7 +715,7 @@ void IndirectShadowPager::logPageTableUpdate(LogicalPageID logicalPageID, Versio
BinaryWriter v(Unversioned());
v << physicalPageID;
pageTableLog->set(KeyValueRef(k.toStringRef(), v.toStringRef()));
pageTableLog->set(KeyValueRef(k.toValue(), v.toValue()));
}
void IndirectShadowPager::logPageTableClearToEnd(LogicalPageID logicalPageID, Version start) {
@ -725,7 +725,7 @@ void IndirectShadowPager::logPageTableClearToEnd(LogicalPageID logicalPageID, Ve
BinaryWriter e(Unversioned());
e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID);
pageTableLog->clear(KeyRangeRef(b.toStringRef(), strinc(e.toStringRef())));
pageTableLog->clear(KeyRangeRef(b.toValue(), strinc(e.toValue())));
}
void IndirectShadowPager::logPageTableClear(LogicalPageID logicalPageID, Version start, Version end) {
@ -735,7 +735,7 @@ void IndirectShadowPager::logPageTableClear(LogicalPageID logicalPageID, Version
BinaryWriter e(Unversioned());
e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(end);
pageTableLog->clear(KeyRangeRef(b.toStringRef(), e.toStringRef()));
pageTableLog->clear(KeyRangeRef(b.toValue(), e.toValue()));
}
const StringRef IndirectShadowPager::LATEST_VERSION_KEY = LiteralStringRef("\xff/LatestVersion");

View File

@ -344,7 +344,7 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
TLogPeekReply reply;
reply.maxKnownVersion = self->version.get();
reply.minKnownCommittedVersion = self->poppedVersion;
reply.messages = messages.toStringRef();
reply.messages = messages.toValue();
reply.popped = self->minPopped.get() >= self->startVersion ? self->minPopped.get() : 0;
reply.end = endVersion;

View File

@ -773,14 +773,12 @@ struct LogPushData : NonCopyable {
next_message_tags.clear();
}
Arena getArena() { return arena; }
StringRef getMessages(int loc) {
return StringRef( arena, messagesWriter[loc].toStringRef() ); // FIXME: Unnecessary copy!
Standalone<StringRef> getMessages(int loc) {
return messagesWriter[loc].toValue();
}
private:
Reference<ILogSystem> logSystem;
Arena arena;
vector<Tag> next_message_tags;
vector<Tag> prev_tags;
vector<BinaryWriter> messagesWriter;

View File

@ -908,9 +908,10 @@ void ILogSystem::BufferedCursor::combineMessages() {
messageWriter << t;
}
messageWriter.serializeBytes(msg.message);
msg.arena = Arena();
Standalone<StringRef> val = messageWriter.toValue();
msg.arena = val.arena();
msg.tags = tags;
msg.message = StringRef(msg.arena, messageWriter.toStringRef());
msg.message = val;
}
Reference<ILogSystem::IPeekCursor> ILogSystem::BufferedCursor::cloneNoMore() {

View File

@ -832,7 +832,7 @@ ACTOR Future<Void> commitBatch(
}
// Define the mutation type and and location
backupMutation.param1 = wr.toStringRef();
backupMutation.param1 = wr.toValue();
ASSERT( backupMutation.param1.startsWith(logRangeMutation.first) ); // We are writing into the configured destination
auto& tags = self->tagsForKey(backupMutation.param1);

View File

@ -73,9 +73,9 @@ ACTOR Future<Void> checkMoveKeysLock( Transaction* tr, MoveKeysLock lock, bool i
// Take the lock
if(isWrite) {
BinaryWriter wrMyOwner(Unversioned()); wrMyOwner << lock.myOwner;
tr->set( moveKeysLockOwnerKey, wrMyOwner.toStringRef() );
tr->set( moveKeysLockOwnerKey, wrMyOwner.toValue() );
BinaryWriter wrLastWrite(Unversioned()); wrLastWrite << g_random->randomUniqueID();
tr->set( moveKeysLockWriteKey, wrLastWrite.toStringRef() );
tr->set( moveKeysLockWriteKey, wrLastWrite.toValue() );
}
return Void();
@ -83,7 +83,7 @@ ACTOR Future<Void> checkMoveKeysLock( Transaction* tr, MoveKeysLock lock, bool i
if(isWrite) {
// Touch the lock, preventing overlapping attempts to take it
BinaryWriter wrLastWrite(Unversioned()); wrLastWrite << g_random->randomUniqueID();
tr->set( moveKeysLockWriteKey, wrLastWrite.toStringRef() );
tr->set( moveKeysLockWriteKey, wrLastWrite.toValue() );
// Make this transaction self-conflicting so the database will not execute it twice with the same write key
tr->makeSelfConflicting();
}

View File

@ -135,7 +135,7 @@ namespace oldTLog_4_6 {
wr << qe;
wr << uint8_t(1);
*(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t);
auto loc = queue->push( wr.toStringRef() );
auto loc = queue->push( wr.toValue() );
//TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Loc", loc);
version_location[qe.version] = loc;
}
@ -230,7 +230,7 @@ namespace oldTLog_4_6 {
wr << id;
wr << tag;
wr << bigEndian64( version );
return wr.toStringRef();
return wr.toValue();
}
static Key persistTagPoppedKey( UID id, OldTag tag ) {
@ -238,7 +238,7 @@ namespace oldTLog_4_6 {
wr.serializeBytes( persistTagPoppedKeys.begin );
wr << id;
wr << tag;
return wr.toStringRef();
return wr.toValue();
}
static Value persistTagPoppedValue( Version popped ) {
@ -524,7 +524,7 @@ namespace oldTLog_4_6 {
for(; msg != tag->value.version_messages.end() && msg->first == currentVersion; ++msg)
wr << msg->second.toStringRef();
self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tag->key, currentVersion ), wr.toStringRef() ) );
self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tag->key, currentVersion ), wr.toValue() ) );
Future<Void> f = yield(TaskUpdateStorage);
if(!f.isReady()) {
@ -942,7 +942,7 @@ namespace oldTLog_4_6 {
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
else
messages.serializeBytes( messages2.toStringRef() );
messages.serializeBytes( messages2.toValue() );
} else {
peekMessagesFromMemory( logData, req, messages, endVersion );
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
@ -957,7 +957,7 @@ namespace oldTLog_4_6 {
reply.popped = poppedVer;
reply.end = poppedVer;
} else {
reply.messages = messages.toStringRef();
reply.messages = messages.toValue();
reply.end = endVersion;
}
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());

View File

@ -202,7 +202,7 @@ static Key persistTagMessagesKey( UID id, Tag tag, Version version ) {
wr << id;
wr << tag;
wr << bigEndian64( version );
return wr.toStringRef();
return wr.toValue();
}
static Key persistTagPoppedKey( UID id, Tag tag ) {
@ -210,7 +210,7 @@ static Key persistTagPoppedKey( UID id, Tag tag ) {
wr.serializeBytes( persistTagPoppedKeys.begin );
wr << id;
wr << tag;
return wr.toStringRef();
return wr.toValue();
}
static Value persistTagPoppedValue( Version popped ) {
@ -480,7 +480,7 @@ void TLogQueue::push( T const& qe, Reference<LogData> logData ) {
wr << qe;
wr << uint8_t(1);
*(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t);
auto loc = queue->push( wr.toStringRef() );
auto loc = queue->push( wr.toValue() );
//TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Loc", loc);
logData->versionLocation[qe.version] = loc;
}
@ -583,7 +583,7 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg)
wr << msg->second.toStringRef();
self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tagData->tag, currentVersion ), wr.toStringRef() ) );
self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tagData->tag, currentVersion ), wr.toValue() ) );
Future<Void> f = yield(TaskUpdateStorage);
if(!f.isReady()) {
@ -1052,7 +1052,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
else
messages.serializeBytes( messages2.toStringRef() );
messages.serializeBytes( messages2.toValue() );
} else {
peekMessagesFromMemory( logData, req, messages, endVersion );
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
@ -1061,7 +1061,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toStringRef();
reply.messages = messages.toValue();
reply.end = endVersion;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address);

View File

@ -213,7 +213,7 @@ static Key persistTagMessagesKey( UID id, Tag tag, Version version ) {
wr << id;
wr << tag;
wr << bigEndian64( version );
return wr.toStringRef();
return wr.toValue();
}
static Key persistTagMessageRefsKey( UID id, Tag tag, Version version ) {
@ -222,7 +222,7 @@ static Key persistTagMessageRefsKey( UID id, Tag tag, Version version ) {
wr << id;
wr << tag;
wr << bigEndian64( version );
return wr.toStringRef();
return wr.toValue();
}
static Key persistTagPoppedKey( UID id, Tag tag ) {
@ -230,7 +230,7 @@ static Key persistTagPoppedKey( UID id, Tag tag ) {
wr.serializeBytes( persistTagPoppedKeys.begin );
wr << id;
wr << tag;
return wr.toStringRef();
return wr.toValue();
}
static Value persistTagPoppedValue( Version popped ) {
@ -549,7 +549,7 @@ void TLogQueue::push( T const& qe, Reference<LogData> logData ) {
*(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t);
const IDiskQueue::location startloc = queue->getNextPushLocation();
// FIXME: push shouldn't return anything. We should call getNextPushLocation() again.
const IDiskQueue::location endloc = queue->push( wr.toStringRef() );
const IDiskQueue::location endloc = queue->push( wr.toValue() );
//TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Loc", loc);
logData->versionLocation[qe.version] = std::make_pair(startloc, endloc);
}
@ -770,7 +770,7 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg) {
wr << msg->second.toStringRef();
}
self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tagData->tag, currentVersion ), wr.toStringRef() ) );
self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tagData->tag, currentVersion ), wr.toValue() ) );
} else {
// spill everything else by reference
const IDiskQueue::location begin = logData->versionLocation[currentVersion].first;
@ -800,7 +800,7 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
}
if (refSpilledTagCount > 0) {
*(uint32_t*)wr.getData() = refSpilledTagCount;
self->persistentData->set( KeyValueRef( persistTagMessageRefsKey( logData->logId, tagData->tag, lastVersion ), wr.toStringRef() ) );
self->persistentData->set( KeyValueRef( persistTagMessageRefsKey( logData->logId, tagData->tag, lastVersion ), wr.toValue() ) );
tagData->poppedLocation = std::min(tagData->poppedLocation, firstLocation);
}
@ -1332,7 +1332,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
else
messages.serializeBytes( messages2.toStringRef() );
messages.serializeBytes( messages2.toValue() );
} else {
// Calculating checksums of read pages is potentially expensive, and storage servers with
// spilled data are likely behind and not contributing usefully to the cluster anyway.
@ -1414,7 +1414,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if (earlyEnd)
endVersion = lastRefMessageVersion + 1;
else
messages.serializeBytes( messages2.toStringRef() );
messages.serializeBytes( messages2.toValue() );
}
} else {
peekMessagesFromMemory( logData, req, messages, endVersion );
@ -1424,7 +1424,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toStringRef();
reply.messages = messages.toValue();
reply.end = endVersion;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());

View File

@ -410,7 +410,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(it->isLocal && it->logServers.size()) {
vector<Future<Void>> tLogCommitResults;
for(int loc=0; loc< it->logServers.size(); loc++) {
allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( data.getArena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, data.getMessages(location), debugID ), TaskTLogCommitReply ) );
Standalone<StringRef> msg = data.getMessages(location);
allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskTLogCommitReply ) );
Future<Void> commitSuccess = success(allReplies.back());
addActor.get().send(commitSuccess);
tLogCommitResults.push_back(commitSuccess);

View File

@ -1295,7 +1295,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
if (self->lastEpochEnd != 0) {
if(self->forceRecovery) {
BinaryWriter bw(Unversioned());
tr.set(recoveryCommitRequest.arena, killStorageKey, (bw << self->safeLocality).toStringRef());
tr.set(recoveryCommitRequest.arena, killStorageKey, (bw << self->safeLocality).toValue());
}
// This transaction sets \xff/lastEpochEnd, which the shard servers can use to roll back speculatively
@ -1305,7 +1305,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
// This transaction is by itself in a batch (has its own version number), which simplifies storage servers slightly (they assume there are no modifications to serverKeys in the same batch)
// The proxy also expects the lastEpochEndKey mutation to be first in the transaction
BinaryWriter bw(Unversioned());
tr.set(recoveryCommitRequest.arena, lastEpochEndKey, (bw << self->lastEpochEnd).toStringRef());
tr.set(recoveryCommitRequest.arena, lastEpochEndKey, (bw << self->lastEpochEnd).toValue());
if(self->forceRecovery) {
tr.set(recoveryCommitRequest.arena, rebootWhenDurableKey, StringRef());

View File

@ -288,7 +288,7 @@ ACTOR Future<Void> testKVStoreMain( KVStoreTestWorkload* workload, KVTest* ptest
state double setupNow = now();
state Future<Void> lastCommit = Void();
for(i=0; i<workload->nodeCount; i++) {
test.store->set( KeyValueRef( test.makeKey( i ), wr.toStringRef() ) );
test.store->set( KeyValueRef( test.makeKey( i ), wr.toValue() ) );
if (!((i+1) % 10000) || i+1==workload->nodeCount) {
wait( lastCommit );
lastCommit = test.store->commit();
@ -310,7 +310,7 @@ ACTOR Future<Void> testKVStoreMain( KVStoreTestWorkload* workload, KVTest* ptest
++test.lastSet;
BinaryWriter wr(Unversioned()); wr << test.lastSet;
wr.serializeBytes(extraValue, extraBytes);
test.set( KeyValueRef( test.randomKey(), wr.toStringRef() ) );
test.set( KeyValueRef( test.randomKey(), wr.toValue() ) );
++workload->sets;
}
++commitsStarted;
@ -339,7 +339,7 @@ ACTOR Future<Void> testKVStoreMain( KVStoreTestWorkload* workload, KVTest* ptest
++test.lastSet;
BinaryWriter wr(Unversioned()); wr << test.lastSet;
wr.serializeBytes(extraValue, extraBytes);
test.set( KeyValueRef( test.randomKey(), wr.toStringRef() ) );
test.set( KeyValueRef( test.randomKey(), wr.toValue() ) );
++workload->sets;
} else {
// Read

View File

@ -68,7 +68,7 @@ struct LogMetricsWorkload : TestWorkload {
state Transaction tr(cx);
try {
Version v = wait( tr.getReadVersion() );
tr.set(fastLoggingEnabled, br.toStringRef());
tr.set(fastLoggingEnabled, br.toValue());
tr.makeSelfConflicting();
wait( tr.commit() );
break;

View File

@ -103,7 +103,7 @@ struct PingWorkload : TestWorkload {
ACTOR Future<Void> persistInterface( PingWorkload *self, Database cx ) {
state Transaction tr(cx);
BinaryWriter wr(IncludeVersion()); wr << self->interf;
state Standalone<StringRef> serializedInterface = wr.toStringRef();
state Standalone<StringRef> serializedInterface = wr.toValue();
loop {
try {
Optional<Value> val = wait( tr.get( StringRef( format("Ping/Client/%d", self->clientId) ) ) );

View File

@ -93,7 +93,7 @@ struct SidebandWorkload : TestWorkload {
ACTOR Future<Void> persistInterface( SidebandWorkload *self, Database cx ) {
state Transaction tr(cx);
BinaryWriter wr(IncludeVersion()); wr << self->interf;
state Standalone<StringRef> serializedInterface = wr.toStringRef();
state Standalone<StringRef> serializedInterface = wr.toValue();
loop {
try {
Optional<Value> val = wait( tr.get( StringRef( format("Sideband/Client/%d", self->clientId) ) ) );

View File

@ -159,7 +159,7 @@ struct StorefrontWorkload : TestWorkload {
// set value for the order
BinaryWriter wr(AssumeVersion(currentProtocolVersion)); wr << itemList;
tr.set( orderKey, wr.toStringRef() );
tr.set( orderKey, wr.toValue() );
wait( tr.commit() );
self->orders[id] = items; // save this in a local list to test durability
@ -250,7 +250,7 @@ struct StorefrontWorkload : TestWorkload {
itemList.push_back( it->first );
}
BinaryWriter wr(AssumeVersion(currentProtocolVersion)); wr << itemList;
if( wr.toStringRef() != val.get().toString() ) {
if( wr.toValue() != val.get().toString() ) {
TraceEvent( SevError, "TestFailure").detail("Reason", "OrderContentsMismatch").detail("OrderID", id);
return false;
}

View File

@ -63,15 +63,15 @@ void testCompressedInt(IntType n, StringRef rep = StringRef()) {
CompressedInt<IntType> cn(n);
w << cn;
if(rep.size() != 0 && w.toStringRef() != rep) {
if(rep.size() != 0 && w.toValue() != rep) {
printf("WRONG ENCODING:\n");
printf(" test value (BigE): "); printBitsLittle(sizeof(IntType), &n);
printf(" encoded: "); printBitsBig(w.toStringRef().size(), w.toStringRef().begin());
printf(" encoded: "); printBitsBig(w.toValue().size(), w.toValue().begin());
printf(" expected: "); printBitsBig(rep.size(), rep.begin());
puts("");
}
else
rep = w.toStringRef();
rep = w.toValue();
cn.value = 0;
BinaryReader r(rep, AssumeVersion(currentProtocolVersion));

View File

@ -87,7 +87,7 @@ struct KeyWithWriter {
void operator=( KeyWithWriter&& r ) { key = std::move(r.key); writer = std::move(r.writer); writerOffset = r.writerOffset; }
StringRef value() {
return StringRef(writer.toStringRef().substr(writerOffset));
return StringRef(writer.toValue().substr(writerOffset));
}
};
@ -540,7 +540,7 @@ struct FieldLevel {
// Otherwise, insert but first, patch the header if this block is old enough
if(data.rollTime <= lastTimeRequiringHeaderPatch) {
ASSERT(previousHeader.present());
FieldLevel<T>::updateSerializedHeader(data.writer.toStringRef(), previousHeader.get());
FieldLevel<T>::updateSerializedHeader(data.writer.toValue(), previousHeader.get());
}
batch.inserts.push_back(KeyWithWriter(mk.packDataKey(data.start), data.writer));
@ -1243,7 +1243,7 @@ public:
// TOOD: If it is useful, this could be the current header value of the most recently logged level.
wr << FieldHeader<TimeAndValue<T>>();
enc.write(wr, tv);
return wr.toStringRef();
return wr.toValue();
}
void onEnable() {

View File

@ -79,7 +79,7 @@ const Standalone<StringRef> MetricKeyRef::packLatestKey() const {
wr.serializeBytes( prefix );
wr.serializeBytes( LiteralStringRef("\x01TDMetricsLastValue\x00") );
writeMetricName(wr);
return wr.toStringRef();
return wr.toValue();
}
const Standalone<StringRef> MetricKeyRef::packDataKey(int64_t time) const {
@ -95,7 +95,7 @@ const Standalone<StringRef> MetricKeyRef::packDataKey(int64_t time) const {
wr.serializeAsTuple(level);
if(time >= 0)
wr.serializeAsTuple(time);
return wr.toStringRef();
return wr.toValue();
}
const Standalone<StringRef> MetricKeyRef::packFieldRegKey() const {
@ -111,7 +111,7 @@ const Standalone<StringRef> MetricKeyRef::packFieldRegKey() const {
wr.serializeBytes( LiteralStringRef("\x00\x01") );
wr.serializeBytes( fieldType );
wr.serializeBytes( LiteralStringRef("\x00") );
return wr.toStringRef();
return wr.toValue();
}
bool TDMetricCollection::canLog(int level) {

View File

@ -298,7 +298,7 @@ public:
}
void* getData() { return data; }
int getLength() { return size; }
Standalone<StringRef> toStringRef() { return Standalone<StringRef>( StringRef(data,size), arena ); }
Standalone<StringRef> toValue() { return Standalone<StringRef>( StringRef(data,size), arena ); }
template <class VersionOptions>
explicit BinaryWriter( VersionOptions vo ) : data(NULL), size(0), allocated(0) { vo.write(*this); }
BinaryWriter( BinaryWriter&& rhs ) : arena(std::move(rhs.arena)), data(rhs.data), size(rhs.size), allocated(rhs.allocated), m_protocolVersion(rhs.m_protocolVersion) {
@ -321,7 +321,7 @@ public:
static Standalone<StringRef> toValue( T const& t, VersionOptions vo ) {
BinaryWriter wr(vo);
wr << t;
return wr.toStringRef();
return wr.toValue();
}
static int bytesNeeded( uint64_t val ) {