diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 282363844a..ddbfb7ae08 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -982,3 +982,122 @@ void startThreadF( F && func ) { Thing* t = new Thing(std::move(func)); startThread(Thing::start, t); } + +void net2_test() { + /*printf("ThreadSafeQueue test\n"); + printf(" Interface: "); + ThreadSafeQueue tq; + ASSERT( tq.canSleep() == true ); + + ASSERT( tq.push( 1 ) == true ) ; + ASSERT( tq.push( 2 ) == false ); + ASSERT( tq.push( 3 ) == false ); + + ASSERT( tq.pop().get() == 1 ); + ASSERT( tq.pop().get() == 2 ); + ASSERT( tq.push( 4 ) == false ); + ASSERT( tq.pop().get() == 3 ); + ASSERT( tq.pop().get() == 4 ); + ASSERT( !tq.pop().present() ); + printf("OK\n"); + + printf("Threaded: "); + Event finished, finished2; + int thread1Iterations = 1000000, thread2Iterations = 100000; + + if (thread1Iterations) + startThreadF([&](){ + printf("Thread1\n"); + for(int i=0; i i = tq.pop(); + if (i.present()) { + int v = i.get(); + ++c; + if (mx[v>>20] != v) + printf("Wrong value dequeued!\n"); + ASSERT( mx[v>>20] == v ); + mx[v>>20] = v + 1; + } else { + ++p; + _mm_pause(); + } + if ((c&3)==0) tq.canSleep(); + } + printf("%d %d %x %x %s\n", c, p, mx[0], mx[1], mx[0]==thread1Iterations && mx[1]==(1<<20)+thread2Iterations ? "OK" : "FAIL"); + + finished.block(); + finished2.block(); + + + g_network = newNet2(NetworkAddress::parse("127.0.0.1:12345")); // for promise serialization below + + Endpoint destination; + + printf(" Used: %lld\n", FastAllocator<4096>::getMemoryUsed()); + + char junk[100]; + + double before = timer(); + + vector reqs; + reqs.reserve( 10000 ); + + int totalBytes = 0; + for(int j=0; j<1000; j++) { + UnsentPacketQueue unsent; + ReliablePacketList reliable; + + reqs.resize(10000); + for(int i=0; i<10000; i++) { + TestGVR &req = reqs[i]; + req.key = LiteralStringRef("Foobar"); + + SerializeSource what(req); + + SendBuffer* pb = unsent.getWriteBuffer(); + ReliablePacket* rp = new ReliablePacket; // 0 + + PacketWriter wr(pb,rp,AssumeVersion(currentProtocolVersion)); + //BinaryWriter wr; + SplitBuffer packetLen; + uint32_t len = 0; + wr.writeAhead(sizeof(len), &packetLen); + wr << destination.token; + //req.reply.getEndpoint(); + what.serializePacketWriter(wr); + //wr.serializeBytes(junk, 43); + + unsent.setWriteBuffer(wr.finish()); + len = wr.size() - sizeof(len); + packetLen.write(&len, sizeof(len)); + + //totalBytes += wr.getLength(); + totalBytes += wr.size(); + + if (rp) reliable.insert(rp); + } + reqs.clear(); + unsent.discardAll(); + reliable.discardAll(); + } + + printf("SimSend x 1Kx10K: %0.2f sec\n", timer()-before); + printf(" Bytes: %d\n", totalBytes); + printf(" Used: %lld\n", FastAllocator<4096>::getMemoryUsed()); + */ +};