serving RESTful: opt for performance

This commit is contained in:
xuyongfei 2020-08-26 16:35:32 +08:00
parent ee7d9bf4ac
commit 78f88cde1b
3 changed files with 219 additions and 123 deletions

View File

@ -24,6 +24,7 @@
#include <memory>
#include <iostream>
#include <chrono>
#include <vector>
#ifndef ENABLE_ACL
#include "mindspore/core/utils/log_adapter.h"
@ -44,6 +45,19 @@ class LogStream {
return *this;
}
template <typename T>
LogStream &operator<<(const std::vector<T> &val) noexcept {
(*sstream_) << "[";
for (size_t i = 0; i < val.size(); i++) {
(*this) << val[i];
if (i + 1 < val.size()) {
(*sstream_) << ", ";
}
}
(*sstream_) << "]";
return *this;
}
LogStream &operator<<(std::ostream &func(std::ostream &os)) noexcept {
(*sstream_) << func;
return *this;

View File

@ -21,6 +21,7 @@
#include "util/status.h"
#include "core/session.h"
#include "core/http_process.h"
#include "core/serving_tensor.h"
using ms_serving::MSService;
using ms_serving::PredictReply;
@ -35,10 +36,9 @@ static constexpr char HTTP_DATA[] = "data";
static constexpr char HTTP_TENSOR[] = "tensor";
enum HTTP_TYPE { TYPE_DATA = 0, TYPE_TENSOR };
enum HTTP_DATA_TYPE { HTTP_DATA_NONE, HTTP_DATA_INT, HTTP_DATA_FLOAT };
static const std::map<HTTP_DATA_TYPE, ms_serving::DataType> http_to_infer_map{
{HTTP_DATA_NONE, ms_serving::MS_UNKNOWN},
{HTTP_DATA_INT, ms_serving::MS_INT32},
{HTTP_DATA_FLOAT, ms_serving::MS_FLOAT32}};
static const std::map<inference::DataType, HTTP_DATA_TYPE> infer_type2_http_type{
{inference::DataType::kMSI_Int32, HTTP_DATA_INT}, {inference::DataType::kMSI_Float32, HTTP_DATA_FLOAT}};
Status GetPostMessage(struct evhttp_request *req, std::string *buf) {
Status status(SUCCESS);
@ -93,69 +93,96 @@ Status CheckMessageValid(const json &message_info, HTTP_TYPE *type) {
return status;
}
Status GetDataFromJson(const json &json_data, std::string *data, HTTP_DATA_TYPE *type) {
Status GetDataFromJson(const json &json_data_array, ServingTensor *request_tensor, size_t data_index,
HTTP_DATA_TYPE type) {
Status status(SUCCESS);
auto type_name = [](const json &json_data) -> std::string {
if (json_data.is_number_integer()) {
if (*type == HTTP_DATA_NONE) {
*type = HTTP_DATA_INT;
} else if (*type != HTTP_DATA_INT) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input data type should be consistent");
return status;
}
auto s_data = json_data.get<int32_t>();
data->append(reinterpret_cast<char *>(&s_data), sizeof(int32_t));
return "integer";
} else if (json_data.is_number_float()) {
if (*type == HTTP_DATA_NONE) {
*type = HTTP_DATA_FLOAT;
} else if (*type != HTTP_DATA_FLOAT) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input data type should be consistent");
return "float";
}
return json_data.type_name();
};
size_t array_size = json_data_array.size();
if (type == HTTP_DATA_INT) {
auto data = reinterpret_cast<int32_t *>(request_tensor->mutable_data()) + data_index;
for (size_t k = 0; k < array_size; k++) {
auto &json_data = json_data_array[k];
if (!json_data.is_number_integer()) {
status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected integer, given " << type_name(json_data);
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
auto s_data = json_data.get<float>();
data->append(reinterpret_cast<char *>(&s_data), sizeof(float));
} else {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input data type should be int or float");
data[k] = json_data.get<int32_t>();
}
} else if (type == HTTP_DATA_FLOAT) {
auto data = reinterpret_cast<float *>(request_tensor->mutable_data()) + data_index;
for (size_t k = 0; k < array_size; k++) {
auto &json_data = json_data_array[k];
if (!json_data.is_number_float()) {
status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected float, given " << type_name(json_data);
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
data[k] = json_data.get<float>();
}
}
return SUCCESS;
}
Status RecusiveGetTensor(const json &json_data, size_t depth, std::vector<int> *shape, std::string *data,
HTTP_DATA_TYPE *type) {
Status RecusiveGetTensor(const json &json_data, size_t depth, ServingTensor *request_tensor, size_t data_index,
HTTP_DATA_TYPE type) {
Status status(SUCCESS);
if (depth >= 10) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor shape dims is larger than 10");
std::vector<int64_t> required_shape = request_tensor->shape();
if (depth >= required_shape.size()) {
status = INFER_STATUS(INVALID_INPUTS)
<< "input tensor shape dims is more than required dims " << required_shape.size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
if (!json_data.is_array()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally");
return status;
}
int cur_dim = json_data.size();
if (shape->size() <= depth) {
shape->push_back(cur_dim);
} else if ((*shape)[depth] != cur_dim) {
return INFER_STATUS(INVALID_INPUTS) << "the tensor shape is constructed illegally";
if (json_data.size() != static_cast<size_t>(required_shape[depth])) {
status = INFER_STATUS(INVALID_INPUTS)
<< "tensor format request is constructed illegally, input tensor shape dim " << depth
<< " not match, required " << required_shape[depth] << ", given " << json_data.size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
if (json_data.at(0).is_array()) {
for (const auto &item : json_data) {
status = RecusiveGetTensor(item, depth + 1, shape, data, type);
if (depth + 1 < required_shape.size()) {
size_t sub_element_cnt =
std::accumulate(required_shape.begin() + depth + 1, required_shape.end(), 1LL, std::multiplies<size_t>());
for (size_t k = 0; k < json_data.size(); k++) {
status = RecusiveGetTensor(json_data[k], depth + 1, request_tensor, data_index + sub_element_cnt * k, type);
if (status != SUCCESS) {
return status;
}
}
} else {
// last dim, read the data
for (auto item : json_data) {
status = GetDataFromJson(item, data, type);
status = GetDataFromJson(json_data, request_tensor, data_index, type);
if (status != SUCCESS) {
return status;
}
}
}
return status;
}
std::vector<int64_t> GetJsonArrayShape(const json &json_array) {
std::vector<int64_t> json_shape;
const json *tmp_json = &json_array;
while (tmp_json->is_array()) {
if (tmp_json->empty()) {
break;
}
json_shape.push_back(tmp_json->size());
tmp_json = &tmp_json->at(0);
}
return json_shape;
}
Status TransDataToPredictRequest(const json &message_info, PredictRequest *request) {
Status status = SUCCESS;
auto tensors = message_info.find(HTTP_DATA);
@ -163,54 +190,52 @@ Status TransDataToPredictRequest(const json &message_info, PredictRequest *reque
ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have data type");
return status;
}
if (tensors->size() == 0) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is null");
if (!tensors->is_array()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is not array");
return status;
}
for (const auto &tensor : *tensors) {
std::string msg_data;
HTTP_DATA_TYPE type{HTTP_DATA_NONE};
auto const &json_shape = GetJsonArrayShape(*tensors);
if (json_shape.size() != 2) { // 2 is data format list deep
status = INFER_STATUS(INVALID_INPUTS)
<< "the data format request is constructed illegally, expected list nesting depth 2, given "
<< json_shape.size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
if (tensors->size() != static_cast<size_t>(request->data_size())) {
status = INFER_STATUS(INVALID_INPUTS)
<< "model input count not match, model required " << request->data_size() << ", given " << tensors->size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
for (size_t i = 0; i < tensors->size(); i++) {
const auto &tensor = tensors->at(i);
ServingTensor request_tensor(*(request->mutable_data(i)));
auto iter = infer_type2_http_type.find(request_tensor.data_type());
if (iter == infer_type2_http_type.end()) {
ERROR_INFER_STATUS(status, FAILED, "the model input type is not supported right now");
return status;
}
HTTP_DATA_TYPE type = iter->second;
if (!tensor.is_array()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally");
return status;
}
if (tensor.size() == 0) {
if (tensor.empty()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor is null");
return status;
}
for (const auto &tensor_data : tensor) {
status = GetDataFromJson(tensor_data, &msg_data, &type);
if (tensor.size() != static_cast<size_t>(request_tensor.ElementNum())) {
status = INFER_STATUS(INVALID_INPUTS) << "input " << i << " element count not match, model required "
<< request_tensor.ElementNum() << ", given " << tensor.size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
status = GetDataFromJson(tensor, &request_tensor, 0, type);
if (status != SUCCESS) {
return status;
}
}
auto iter = http_to_infer_map.find(type);
if (iter == http_to_infer_map.end()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input type is not supported right now");
return status;
}
auto infer_tensor = request->add_data();
infer_tensor->set_tensor_type(iter->second);
infer_tensor->set_data(msg_data.data(), msg_data.size());
}
// get model required shape
std::vector<inference::InferTensor> tensor_list;
status = Session::Instance().GetModelInputsInfo(tensor_list);
if (status != SUCCESS) {
ERROR_INFER_STATUS(status, FAILED, "get model inputs info failed");
return status;
}
if (request->data_size() != static_cast<int64_t>(tensor_list.size())) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the inputs number is not equal to model required");
return status;
}
for (int i = 0; i < request->data_size(); i++) {
for (size_t j = 0; j < tensor_list[i].shape().size(); ++j) {
request->mutable_data(i)->mutable_tensor_shape()->add_dims(tensor_list[i].shape()[j]);
}
}
return SUCCESS;
}
@ -221,22 +246,44 @@ Status TransTensorToPredictRequest(const json &message_info, PredictRequest *req
ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have tensor type");
return status;
}
for (const auto &tensor : *tensors) {
std::vector<int> shape;
std::string msg_data;
HTTP_DATA_TYPE type{HTTP_DATA_NONE};
RecusiveGetTensor(tensor, 0, &shape, &msg_data, &type);
auto iter = http_to_infer_map.find(type);
if (iter == http_to_infer_map.end()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input type is not supported right now");
if (!tensors->is_array()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is not array");
return status;
}
auto infer_tensor = request->add_data();
infer_tensor->set_tensor_type(iter->second);
infer_tensor->set_data(msg_data.data(), msg_data.size());
for (const auto dim : shape) {
infer_tensor->mutable_tensor_shape()->add_dims(dim);
if (tensors->size() != static_cast<size_t>(request->data_size())) {
status =
INFER_STATUS(INVALID_INPUTS)
<< "model input count not match or json tensor request is constructed illegally, model input count required "
<< request->data_size() << ", given " << tensors->size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
for (size_t i = 0; i < tensors->size(); i++) {
const auto &tensor = tensors->at(i);
ServingTensor request_tensor(*(request->mutable_data(i)));
// check data shape
auto const &json_shape = GetJsonArrayShape(tensor);
if (json_shape != request_tensor.shape()) { // data shape not match
status = INFER_STATUS(INVALID_INPUTS)
<< "input " << i << " shape is invalid, expected " << request_tensor.shape() << ", given " << json_shape;
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
auto iter = infer_type2_http_type.find(request_tensor.data_type());
if (iter == infer_type2_http_type.end()) {
ERROR_INFER_STATUS(status, FAILED, "the model input type is not supported right now");
return status;
}
HTTP_DATA_TYPE type = iter->second;
size_t depth = 0;
size_t data_index = 0;
status = RecusiveGetTensor(tensor, depth, &request_tensor, data_index, type);
if (status != SUCCESS) {
MSI_LOG_ERROR << "Transfer tensor to predict request failed";
return status;
}
}
return status;
@ -253,6 +300,27 @@ Status TransHTTPMsgToPredictRequest(struct evhttp_request *http_request, Predict
return status;
}
// get model required shape
std::vector<inference::InferTensor> tensor_list;
status = Session::Instance().GetModelInputsInfo(tensor_list);
if (status != SUCCESS) {
ERROR_INFER_STATUS(status, FAILED, "get model inputs info failed");
return status;
}
for (auto &item : tensor_list) {
auto input = request->add_data();
ServingTensor tensor(*input);
tensor.set_shape(item.shape());
tensor.set_data_type(item.data_type());
int64_t element_num = tensor.ElementNum();
int64_t data_type_size = tensor.GetTypeSize(tensor.data_type());
if (element_num <= 0 || INT64_MAX / element_num < data_type_size) {
ERROR_INFER_STATUS(status, FAILED, "model shape invalid");
return status;
}
tensor.resize_data(element_num * data_type_size);
}
MSI_TIME_STAMP_START(ParseJson)
json message_info;
try {
message_info = nlohmann::json::parse(post_message);
@ -262,6 +330,7 @@ Status TransHTTPMsgToPredictRequest(struct evhttp_request *http_request, Predict
ERROR_INFER_STATUS(status, INVALID_INPUTS, error_message);
return status;
}
MSI_TIME_STAMP_END(ParseJson)
status = CheckMessageValid(message_info, type);
if (status != SUCCESS) {
@ -285,24 +354,18 @@ Status GetJsonFromTensor(const ms_serving::Tensor &tensor, int len, int *pos, js
Status status(SUCCESS);
switch (tensor.tensor_type()) {
case ms_serving::MS_INT32: {
std::vector<int> result_tensor;
for (int j = 0; j < len; j++) {
int val;
memcpy(&val, reinterpret_cast<const int *>(tensor.data().data()) + *pos + j, sizeof(int));
result_tensor.push_back(val);
}
*out_json = result_tensor;
auto data = reinterpret_cast<const int *>(tensor.data().data()) + *pos;
std::vector<int32_t> result_tensor(len);
memcpy_s(result_tensor.data(), result_tensor.size() * sizeof(int32_t), data, len * sizeof(int32_t));
*out_json = std::move(result_tensor);
*pos += len;
break;
}
case ms_serving::MS_FLOAT32: {
std::vector<float> result_tensor;
for (int j = 0; j < len; j++) {
float val;
memcpy(&val, reinterpret_cast<const float *>(tensor.data().data()) + *pos + j, sizeof(float));
result_tensor.push_back(val);
}
*out_json = result_tensor;
auto data = reinterpret_cast<const float *>(tensor.data().data()) + *pos;
std::vector<float> result_tensor(len);
memcpy_s(result_tensor.data(), result_tensor.size() * sizeof(float), data, len * sizeof(float));
*out_json = std::move(result_tensor);
*pos += len;
break;
}
@ -316,7 +379,8 @@ Status GetJsonFromTensor(const ms_serving::Tensor &tensor, int len, int *pos, js
Status TransPredictReplyToData(const PredictReply &reply, json *out_json) {
Status status(SUCCESS);
for (int i = 0; i < reply.result_size(); i++) {
json tensor_json;
(*out_json)["data"].push_back(json());
json &tensor_json = (*out_json)["data"].back();
int num = 1;
for (auto j = 0; j < reply.result(i).tensor_shape().dims_size(); j++) {
num *= reply.result(i).tensor_shape().dims(j);
@ -326,7 +390,6 @@ Status TransPredictReplyToData(const PredictReply &reply, json *out_json) {
if (status != SUCCESS) {
return status;
}
(*out_json)["data"].push_back(tensor_json);
}
return status;
}
@ -344,12 +407,12 @@ Status RecusiveGetJson(const ms_serving::Tensor &tensor, int depth, int *pos, js
}
} else {
for (int i = 0; i < tensor.tensor_shape().dims(depth); i++) {
json tensor_json;
out_json->push_back(json());
json &tensor_json = out_json->back();
status = RecusiveGetJson(tensor, depth + 1, pos, &tensor_json);
if (status != SUCCESS) {
return status;
}
out_json->push_back(tensor_json);
}
}
return status;
@ -358,13 +421,13 @@ Status RecusiveGetJson(const ms_serving::Tensor &tensor, int depth, int *pos, js
Status TransPredictReplyToTensor(const PredictReply &reply, json *out_json) {
Status status(SUCCESS);
for (int i = 0; i < reply.result_size(); i++) {
json tensor_json;
(*out_json)["tensor"].push_back(json());
json &tensor_json = (*out_json)["tensor"].back();
int pos = 0;
status = RecusiveGetJson(reply.result(i), 0, &pos, &tensor_json);
if (status != SUCCESS) {
return status;
}
(*out_json)["tensor"].push_back(tensor_json);
}
return status;
}
@ -384,38 +447,57 @@ Status TransPredictReplyToHTTPMsg(const PredictReply &reply, const HTTP_TYPE &ty
return status;
}
std::string out_str = out_json.dump();
const std::string &out_str = out_json.dump();
evbuffer_add(buf, out_str.data(), out_str.size());
return status;
}
void http_handler_msg(struct evhttp_request *req, void *arg) {
std::cout << "in handle" << std::endl;
Status HttpHandleMsgDetail(struct evhttp_request *req, void *arg, struct evbuffer *retbuff) {
PredictRequest request;
PredictReply reply;
HTTP_TYPE type;
MSI_TIME_STAMP_START(ParseRequest)
auto status = TransHTTPMsgToPredictRequest(req, &request, &type);
MSI_TIME_STAMP_END(ParseRequest)
if (status != SUCCESS) {
ErrorMessage(req, status);
MSI_LOG(ERROR) << "restful trans to request failed";
return;
return status;
}
MSI_TIME_STAMP_START(Predict)
status = Session::Instance().Predict(request, reply);
if (status != SUCCESS) {
ErrorMessage(req, status);
MSI_LOG(ERROR) << "restful predict failed";
}
MSI_TIME_STAMP_END(Predict)
struct evbuffer *retbuff = evbuffer_new();
status = TransPredictReplyToHTTPMsg(reply, type, retbuff);
if (status != SUCCESS) {
ErrorMessage(req, status);
MSI_LOG(ERROR) << "restful predict failed";
return status;
}
MSI_TIME_STAMP_START(CreateReplyJson)
status = TransPredictReplyToHTTPMsg(reply, type, retbuff);
MSI_TIME_STAMP_END(CreateReplyJson)
if (status != SUCCESS) {
MSI_LOG(ERROR) << "restful trans to reply failed";
return status;
}
return SUCCESS;
}
void http_handler_msg(struct evhttp_request *req, void *arg) {
MSI_TIME_STAMP_START(TotalRestfulPredict)
struct evbuffer *retbuff = evbuffer_new();
if (retbuff == nullptr) {
MSI_LOG_ERROR << "Create event buffer failed";
return;
}
evhttp_send_reply(req, HTTP_OK, "Client", retbuff);
auto status = HttpHandleMsgDetail(req, arg, retbuff);
if (status != SUCCESS) {
ErrorMessage(req, status);
evbuffer_free(retbuff);
return;
}
MSI_TIME_STAMP_START(ReplyJson)
evhttp_send_reply(req, HTTP_OK, "Client", retbuff);
MSI_TIME_STAMP_END(ReplyJson)
evbuffer_free(retbuff);
MSI_TIME_STAMP_END(TotalRestfulPredict)
}
} // namespace serving

View File

@ -185,7 +185,7 @@ Status Server::BuildAndStart() {
int32_t http_port = option_args->rest_api_port;
std::string http_addr = "0.0.0.0";
evhttp_set_timeout(http_server, 5);
evhttp_set_timeout(http_server, 60);
evhttp_set_gencb(http_server, http_handler_msg, nullptr);
// grpc server