HTTP::doRequest() now reads responses in parallel with sending requests, so if the server responds before receiving all of the the request the client can stop sending the remainder of the request. For PUT requests which upload files, this prevents sending potentially several megabytes of unnecessary bytes if the server responds with an error (such as 429) before the request is completely sent. Updated the backup container unit test to use more parallelism in order to test this new behavior.

This commit is contained in:
Stephen Atherton 2018-02-07 10:38:31 -08:00
parent 7de40413d5
commit d8879dc3f3
2 changed files with 35 additions and 16 deletions

View File

@ -1329,23 +1329,23 @@ ACTOR Future<Void> testBackupContainer(std::string url) {
state int64_t versionShift = g_random->randomInt64(0, std::numeric_limits<Version>::max() - 500);
state Reference<IBackupFile> log1 = wait(c->writeLogFile(100 + versionShift, 150 + versionShift, 10));
Void _ = wait(writeAndVerifyFile(c, log1, 0));
state Reference<IBackupFile> log2 = wait(c->writeLogFile(150 + versionShift, 300 + versionShift, 10));
Void _ = wait(writeAndVerifyFile(c, log2, g_random->randomInt(0, 10000000)));
state Reference<IBackupFile> range1 = wait(c->writeRangeFile(160 + versionShift, 10));
Void _ = wait(writeAndVerifyFile(c, range1, g_random->randomInt(0, 1000)));
state Reference<IBackupFile> range2 = wait(c->writeRangeFile(300 + versionShift, 10));
Void _ = wait(writeAndVerifyFile(c, range2, g_random->randomInt(0, 100000)));
state Reference<IBackupFile> range3 = wait(c->writeRangeFile(310 + versionShift, 10));
Void _ = wait(writeAndVerifyFile(c, range3, g_random->randomInt(0, 3000000)));
Void _ = wait(c->writeKeyspaceSnapshotFile({range1->getFileName(), range2->getFileName()}, range1->size() + range2->size()));
Void _ = wait(
writeAndVerifyFile(c, log1, 0)
&& writeAndVerifyFile(c, log2, g_random->randomInt(0, 10000000))
&& writeAndVerifyFile(c, range1, g_random->randomInt(0, 1000))
&& writeAndVerifyFile(c, range2, g_random->randomInt(0, 100000))
&& writeAndVerifyFile(c, range3, g_random->randomInt(0, 3000000))
);
Void _ = wait(c->writeKeyspaceSnapshotFile({range3->getFileName()}, range3->size()));
Void _ = wait(
c->writeKeyspaceSnapshotFile({range1->getFileName(), range2->getFileName()}, range1->size() + range2->size())
&& c->writeKeyspaceSnapshotFile({range3->getFileName()}, range3->size())
);
printf("Checking file list dump\n");
FullBackupListing listing = wait(c->dumpFileList());

View File

@ -307,6 +307,9 @@ namespace HTTP {
if(pContent == NULL)
pContent = &empty;
state bool earlyResponse = false;
state int total_sent = 0;
try {
// Write headers to a packet buffer chain
PacketBuffer *pFirst = new PacketBuffer();
@ -321,11 +324,25 @@ namespace HTTP {
printf("Request Header: %s: %s\n", h.first.c_str(), h.second.c_str());
}
state Reference<HTTP::Response> r(new HTTP::Response());
state Future<Void> responseReading = r->read(conn, verb == "HEAD" || verb == "DELETE");
state double send_start = timer();
state double total_sent = 0;
loop {
Void _ = wait(conn->onWritable());
Void _ = wait( delay( 0, TaskWriteSocket ) );
// If we already got a response, before finishing sending the request, then close the connection,
// set the Connection header to "close" as a hint to the caller that this connection can't be used
// again, and break out of the send loop.
if(responseReading.isReady()) {
conn->close();
r->headers["Connection"] = "close";
earlyResponse = true;
break;
}
state int trySend = CLIENT_KNOBS->HTTP_SEND_SIZE;
Void _ = wait(sendRate->getAllowance(trySend));
int len = conn->write(pContent->getUnsent(), trySend);
@ -338,18 +355,20 @@ namespace HTTP {
break;
}
state Reference<HTTP::Response> r(new HTTP::Response());
Void _ = wait(r->read(conn, verb == "HEAD" || verb == "DELETE"));
Void _ = wait(responseReading);
double elapsed = timer() - send_start;
if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0)
printf("[%s] HTTP code=%d, time=%fs %s %s [%u out, response content len %d]\n", conn->getDebugID().toString().c_str(), r->code, elapsed, verb.c_str(), resource.c_str(), (int)total_sent, (int)r->contentLen);
printf("[%s] HTTP code=%d early=%d, time=%fs %s %s contentLen=%d [%d out, response content len %d]\n",
conn->getDebugID().toString().c_str(), r->code, earlyResponse, elapsed, verb.c_str(), resource.c_str(), contentLen, total_sent, (int)r->contentLen);
if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 2)
printf("[%s] HTTP RESPONSE: %s %s\n%s\n", conn->getDebugID().toString().c_str(), verb.c_str(), resource.c_str(), r->toString().c_str());
return r;
} catch(Error &e) {
double elapsed = timer() - send_start;
if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0)
printf("[%s] HTTP *ERROR*=%s, time=%fs %s %s [%u out]\n", conn->getDebugID().toString().c_str(), e.name(), elapsed, verb.c_str(), resource.c_str(), (int)total_sent);
printf("[%s] HTTP *ERROR*=%s early=%d, time=%fs %s %s contentLen=%d [%d out]\n",
conn->getDebugID().toString().c_str(), e.name(), earlyResponse, elapsed, verb.c_str(), resource.c_str(), contentLen, total_sent);
throw;
}
}