forked from mindspore-Ecosystem/mindspore
!4629 Support sending large graph and large tensors for mindspore debugger
Merge pull request !4629 from lichen_101010/sendgraph_sendtensor_issue
This commit is contained in:
commit
3449abd741
|
@ -23,7 +23,7 @@ import "debug_graph.proto";
|
|||
service EventListener {
|
||||
rpc WaitCMD (Metadata) returns (EventReply) {};
|
||||
rpc SendMetadata (Metadata) returns (EventReply) {};
|
||||
rpc SendGraph (GraphProto) returns (EventReply) {};
|
||||
rpc SendGraph (stream Chunk) returns (EventReply) {};
|
||||
rpc SendTensors (stream TensorProto) returns (EventReply) {};
|
||||
rpc SendWatchpointHits (stream WatchpointHit) returns (EventReply) {};
|
||||
}
|
||||
|
@ -37,6 +37,10 @@ message Metadata {
|
|||
string cur_node = 4;
|
||||
}
|
||||
|
||||
message Chunk {
|
||||
bytes buffer = 1;
|
||||
}
|
||||
|
||||
message EventReply {
|
||||
enum Status {
|
||||
OK = 0;
|
||||
|
|
|
@ -40,6 +40,8 @@ using debugger::WatchCondition_Condition_nan;
|
|||
using debugger::WatchNode;
|
||||
using debugger::WatchpointHit;
|
||||
|
||||
#define CHUNK_SIZE 1024 * 1024 * 3
|
||||
|
||||
namespace mindspore {
|
||||
|
||||
DebuggerPtr Debugger::debugger_ = nullptr;
|
||||
|
@ -458,6 +460,16 @@ void Debugger::CommandLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
void AddTensorProtoInfo(TensorProto *tensor_item, TensorProto tensor) {
|
||||
tensor_item->set_node_name(tensor.node_name());
|
||||
tensor_item->set_slot(tensor.slot());
|
||||
tensor_item->set_iter(tensor.iter());
|
||||
tensor_item->set_truncate(tensor.truncate());
|
||||
tensor_item->clear_tensor_content();
|
||||
tensor_item->clear_data_type();
|
||||
tensor_item->clear_dims();
|
||||
}
|
||||
|
||||
void Debugger::SetWatchpoint(const ProtoVector<WatchNode> &nodes, const WatchCondition &condition, const int32_t id) {
|
||||
std::vector<std::tuple<std::string, bool>> check_node_list;
|
||||
std::transform(nodes.begin(), nodes.end(), std::back_inserter(check_node_list),
|
||||
|
@ -482,35 +494,40 @@ std::list<TensorProto> Debugger::LoadTensors(const ProtoVector<TensorProto> &ten
|
|||
// ret_name will contain tensor names that are found in TensorLoader
|
||||
// items in ret_name will be in the same order with tensors if found
|
||||
debug_services_->ReadNodesTensors(name, &ret_name, &data_ptr, &data_size, &dtype, &shape);
|
||||
|
||||
std::list<TensorProto> tensor_list;
|
||||
unsigned int result_index = 0;
|
||||
for (auto tensor : tensors) {
|
||||
TensorProto tensor_item;
|
||||
tensor_item.set_node_name(tensor.node_name());
|
||||
tensor_item.set_slot(tensor.slot());
|
||||
tensor_item.set_iter(tensor.iter());
|
||||
tensor_item.set_truncate(tensor.truncate());
|
||||
tensor_item.clear_tensor_content();
|
||||
tensor_item.clear_data_type();
|
||||
tensor_item.clear_dims();
|
||||
// always set finished to true before big tensor splitting is supported
|
||||
tensor_item.set_finished(true);
|
||||
|
||||
// return empty tensor if didn't find the requested tensor
|
||||
for (auto tensor : tensors) {
|
||||
int size_iter = 0;
|
||||
if (result_index >= ret_name.size() || ret_name[result_index] != GetTensorFullName(tensor)) {
|
||||
TensorProto tensor_item;
|
||||
tensor_item.set_finished(true);
|
||||
AddTensorProtoInfo(&tensor_item, tensor);
|
||||
tensor_list.push_back(tensor_item);
|
||||
continue;
|
||||
}
|
||||
int tensor_size = data_size[result_index];
|
||||
while (size_iter < tensor_size) {
|
||||
int chunk_size = CHUNK_SIZE;
|
||||
TensorProto tensor_item;
|
||||
tensor_item.set_finished(false);
|
||||
if (tensor_size - size_iter <= CHUNK_SIZE) {
|
||||
chunk_size = tensor_size - size_iter;
|
||||
tensor_item.set_finished(true);
|
||||
}
|
||||
AddTensorProtoInfo(&tensor_item, tensor);
|
||||
// return empty tensor if didn't find the requested tensor
|
||||
|
||||
tensor_item.set_tensor_content(data_ptr[result_index], data_size[result_index]);
|
||||
tensor_item.set_data_type(GetDebuggerNumberDataType(dtype[result_index]));
|
||||
for (auto &elem : shape[result_index]) {
|
||||
tensor_item.add_dims(elem);
|
||||
tensor_item.set_tensor_content(data_ptr[result_index] + size_iter, chunk_size);
|
||||
|
||||
tensor_item.set_data_type(GetDebuggerNumberDataType(dtype[result_index]));
|
||||
for (auto &elem : shape[result_index]) {
|
||||
tensor_item.add_dims(elem);
|
||||
}
|
||||
// add tensor to result list and increment result_index to check next item in ret_name
|
||||
tensor_list.push_back(tensor_item);
|
||||
size_iter += CHUNK_SIZE;
|
||||
}
|
||||
|
||||
// add tensor to result list and increment result_index to check next item in ret_name
|
||||
tensor_list.push_back(tensor_item);
|
||||
result_index++;
|
||||
}
|
||||
return tensor_list;
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "debug/debugger/grpc_client.h"
|
||||
#include "debug/debug_services.h"
|
||||
|
||||
using debugger::Chunk;
|
||||
using debugger::DataType;
|
||||
using debugger::EventReply;
|
||||
using debugger::GraphProto;
|
||||
|
|
|
@ -15,9 +15,11 @@
|
|||
*/
|
||||
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include "debug/debugger/grpc_client.h"
|
||||
#include "utils/log_adapter.h"
|
||||
|
||||
using debugger::Chunk;
|
||||
using debugger::EventListener;
|
||||
using debugger::EventReply;
|
||||
using debugger::EventReply_Status_FAILED;
|
||||
|
@ -26,6 +28,8 @@ using debugger::Metadata;
|
|||
using debugger::TensorProto;
|
||||
using debugger::WatchpointHit;
|
||||
|
||||
#define CHUNK_SIZE 1024 * 1024 * 3
|
||||
|
||||
namespace mindspore {
|
||||
GrpcClient::GrpcClient(const std::string &host, const std::string &port) : stub_(nullptr) { Init(host, port); }
|
||||
|
||||
|
@ -65,10 +69,43 @@ EventReply GrpcClient::SendMetadata(const Metadata &metadata) {
|
|||
return reply;
|
||||
}
|
||||
|
||||
std::vector<std::string> ChunkString(std::string str, int graph_size) {
|
||||
std::vector<std::string> buf;
|
||||
int size_iter = 0;
|
||||
while (size_iter < graph_size) {
|
||||
int chunk_size = CHUNK_SIZE;
|
||||
if (graph_size - size_iter < CHUNK_SIZE) {
|
||||
chunk_size = graph_size - size_iter;
|
||||
}
|
||||
std::string buffer;
|
||||
buffer.resize(chunk_size);
|
||||
memcpy(reinterpret_cast<char *>(buffer.data()), str.data() + size_iter, chunk_size);
|
||||
buf.push_back(buffer);
|
||||
size_iter += CHUNK_SIZE;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
EventReply GrpcClient::SendGraph(const GraphProto &graph) {
|
||||
EventReply reply;
|
||||
grpc::ClientContext context;
|
||||
grpc::Status status = stub_->SendGraph(&context, graph, &reply);
|
||||
Chunk chunk;
|
||||
|
||||
std::unique_ptr<grpc::ClientWriter<Chunk> > writer(stub_->SendGraph(&context, &reply));
|
||||
std::string str = graph.SerializeAsString();
|
||||
int graph_size = graph.ByteSize();
|
||||
auto buf = ChunkString(str, graph_size);
|
||||
|
||||
for (unsigned int i = 0; i < buf.size(); i++) {
|
||||
MS_LOG(INFO) << "RPC:sending the " << i << "chunk in graph";
|
||||
chunk.set_buffer(buf[i]);
|
||||
if (!writer->Write(chunk)) {
|
||||
break;
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
}
|
||||
writer->WritesDone();
|
||||
grpc::Status status = writer->Finish();
|
||||
|
||||
if (!status.ok()) {
|
||||
MS_LOG(ERROR) << "RPC failed: SendGraph";
|
||||
|
|
Loading…
Reference in New Issue