mindspore/serving/core/http_process.cc

562 lines
20 KiB
C++

/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <map>
#include <vector>
#include <string>
#include <functional>
#include <utility>
#include <nlohmann/json.hpp>
#include "serving/ms_service.pb.h"
#include "util/status.h"
#include "core/session.h"
#include "core/http_process.h"
#include "core/serving_tensor.h"
using ms_serving::MSService;
using ms_serving::PredictReply;
using ms_serving::PredictRequest;
using nlohmann::json;
namespace mindspore {
namespace serving {
const int BUF_MAX = 0x7FFFFFFF;
static constexpr char HTTP_DATA[] = "data";
static constexpr char HTTP_TENSOR[] = "tensor";
enum HTTP_TYPE { TYPE_DATA = 0, TYPE_TENSOR };
enum HTTP_DATA_TYPE { HTTP_DATA_NONE, HTTP_DATA_INT, HTTP_DATA_FLOAT };
static const std::map<inference::DataType, HTTP_DATA_TYPE> infer_type2_http_type{
{inference::DataType::kMSI_Int32, HTTP_DATA_INT}, {inference::DataType::kMSI_Float32, HTTP_DATA_FLOAT}};
Status GetPostMessage(struct evhttp_request *const req, std::string *const buf) {
Status status(SUCCESS);
size_t post_size = evbuffer_get_length(req->input_buffer);
if (post_size == 0) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message invalid");
return status;
} else if (post_size > BUF_MAX) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message is bigger than 0x7FFFFFFF.");
return status;
} else {
buf->resize(post_size);
auto src_data = evbuffer_pullup(req->input_buffer, -1);
if (src_data == nullptr) {
ERROR_INFER_STATUS(status, FAILED, "get http message failed.");
return status;
}
if (memcpy_s(buf->data(), post_size, src_data, post_size) != EOK) {
ERROR_INFER_STATUS(status, FAILED, "copy http message failed.");
return status;
}
return status;
}
}
Status CheckRequestValid(const struct evhttp_request *const http_request) {
Status status(SUCCESS);
switch (evhttp_request_get_command(http_request)) {
case EVHTTP_REQ_POST:
return status;
default:
ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message only support POST right now");
return status;
}
}
void ErrorMessage(struct evhttp_request *const req, Status status) {
json error_json = {{"error_message", status.StatusMessage()}};
std::string out_error_str = error_json.dump();
struct evbuffer *retbuff = evbuffer_new();
evbuffer_add(retbuff, out_error_str.data(), out_error_str.size());
evhttp_send_reply(req, HTTP_OK, "Client", retbuff);
evbuffer_free(retbuff);
}
Status CheckMessageValid(const json &message_info, HTTP_TYPE *const type) {
Status status(SUCCESS);
int count = 0;
if (message_info.find(HTTP_DATA) != message_info.end()) {
*type = TYPE_DATA;
count++;
}
if (message_info.find(HTTP_TENSOR) != message_info.end()) {
*type = TYPE_TENSOR;
count++;
}
if (count != 1) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message must have only one type of (data, tensor)");
return status;
}
return status;
}
std::vector<int64_t> GetJsonArrayShape(const json &json_array) {
std::vector<int64_t> json_shape;
const json *tmp_json = &json_array;
while (tmp_json->is_array()) {
if (tmp_json->empty()) {
break;
}
json_shape.push_back(tmp_json->size());
tmp_json = &tmp_json->at(0);
}
return json_shape;
}
Status GetScalarDataFromJson(const json &json_data_array, ServingTensor *const request_tensor, HTTP_DATA_TYPE type) {
Status status(SUCCESS);
auto type_name = [](const json &json_data) -> std::string {
if (json_data.is_number_integer()) {
return "integer";
} else if (json_data.is_number_float()) {
return "float";
}
return json_data.type_name();
};
const json *json_data = &json_data_array;
if (json_data_array.is_array()) {
if (json_data_array.size() != 1 || json_data_array[0].is_array()) {
status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected scalar data is scalar or shape(1) array, "
"now array shape is "
<< GetJsonArrayShape(json_data_array);
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
json_data = &json_data_array.at(0);
}
if (type == HTTP_DATA_INT) {
auto data = reinterpret_cast<int32_t *>(request_tensor->mutable_data());
if (!json_data->is_number_integer()) {
status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected integer, given " << type_name(*json_data);
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
data[0] = json_data->get<int32_t>();
} else if (type == HTTP_DATA_FLOAT) {
auto data = reinterpret_cast<float *>(request_tensor->mutable_data());
if (!json_data->is_number_float()) {
status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected float, given " << type_name(*json_data);
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
data[0] = json_data->get<float>();
}
return SUCCESS;
}
Status GetDataFromJson(const json &json_data_array, ServingTensor *const request_tensor, size_t data_index,
HTTP_DATA_TYPE type) {
Status status(SUCCESS);
auto type_name = [](const json &json_data) -> std::string {
if (json_data.is_number_integer()) {
return "integer";
} else if (json_data.is_number_float()) {
return "float";
}
return json_data.type_name();
};
size_t array_size = json_data_array.size();
if (type == HTTP_DATA_INT) {
auto data = reinterpret_cast<int32_t *>(request_tensor->mutable_data()) + data_index;
for (size_t k = 0; k < array_size; k++) {
auto &json_data = json_data_array[k];
if (!json_data.is_number_integer()) {
status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected integer, given " << type_name(json_data);
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
data[k] = json_data.get<int32_t>();
}
} else if (type == HTTP_DATA_FLOAT) {
auto data = reinterpret_cast<float *>(request_tensor->mutable_data()) + data_index;
for (size_t k = 0; k < array_size; k++) {
auto &json_data = json_data_array[k];
if (!json_data.is_number_float()) {
status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected float, given " << type_name(json_data);
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
data[k] = json_data.get<float>();
}
}
return SUCCESS;
}
Status RecusiveGetTensor(const json &json_data, size_t depth, ServingTensor *const request_tensor, size_t data_index,
HTTP_DATA_TYPE type) {
Status status(SUCCESS);
std::vector<int64_t> required_shape = request_tensor->shape();
if (depth >= required_shape.size()) {
status = INFER_STATUS(INVALID_INPUTS)
<< "input tensor shape dims is more than required dims " << required_shape.size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
if (!json_data.is_array()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally");
return status;
}
if (json_data.size() != static_cast<size_t>(required_shape[depth])) {
status = INFER_STATUS(INVALID_INPUTS)
<< "tensor format request is constructed illegally, input tensor shape dim " << depth
<< " not match, required " << required_shape[depth] << ", given " << json_data.size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
if (depth + 1 < required_shape.size()) {
size_t sub_element_cnt =
std::accumulate(required_shape.begin() + depth + 1, required_shape.end(), 1LL, std::multiplies<size_t>());
for (size_t k = 0; k < json_data.size(); k++) {
status = RecusiveGetTensor(json_data[k], depth + 1, request_tensor, data_index + sub_element_cnt * k, type);
if (status != SUCCESS) {
return status;
}
}
} else {
status = GetDataFromJson(json_data, request_tensor, data_index, type);
if (status != SUCCESS) {
return status;
}
}
return status;
}
Status TransDataToPredictRequest(const json &message_info, PredictRequest *const request) {
Status status = SUCCESS;
auto tensors = message_info.find(HTTP_DATA);
if (tensors == message_info.end()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have data type");
return status;
}
if (!tensors->is_array()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is not array");
return status;
}
auto const &json_shape = GetJsonArrayShape(*tensors);
if (json_shape.size() != 2) { // 2 is data format list deep
status = INFER_STATUS(INVALID_INPUTS)
<< "the data format request is constructed illegally, expected list nesting depth 2, given "
<< json_shape.size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
if (tensors->size() != static_cast<size_t>(request->data_size())) {
status = INFER_STATUS(INVALID_INPUTS)
<< "model input count not match, model required " << request->data_size() << ", given " << tensors->size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
for (size_t i = 0; i < tensors->size(); i++) {
const auto &tensor = tensors->at(i);
ServingTensor request_tensor(*(request->mutable_data(i)));
auto iter = infer_type2_http_type.find(request_tensor.data_type());
if (iter == infer_type2_http_type.end()) {
ERROR_INFER_STATUS(status, FAILED, "the model input type is not supported right now");
return status;
}
HTTP_DATA_TYPE type = iter->second;
if (!tensor.is_array()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally");
return status;
}
if (tensor.empty()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor is null");
return status;
}
if (tensor.size() != static_cast<size_t>(request_tensor.ElementNum())) {
status = INFER_STATUS(INVALID_INPUTS) << "input " << i << " element count not match, model required "
<< request_tensor.ElementNum() << ", given " << tensor.size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
status = GetDataFromJson(tensor, &request_tensor, 0, type);
if (status != SUCCESS) {
return status;
}
}
return SUCCESS;
}
Status TransTensorToPredictRequest(const json &message_info, PredictRequest *const request) {
Status status(SUCCESS);
auto tensors = message_info.find(HTTP_TENSOR);
if (tensors == message_info.end()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have tensor type");
return status;
}
if (!tensors->is_array()) {
ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is not array");
return status;
}
if (tensors->size() != static_cast<size_t>(request->data_size())) {
status =
INFER_STATUS(INVALID_INPUTS)
<< "model input count not match or json tensor request is constructed illegally, model input count required "
<< request->data_size() << ", given " << tensors->size();
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
for (size_t i = 0; i < tensors->size(); i++) {
const auto &tensor = tensors->at(i);
ServingTensor request_tensor(*(request->mutable_data(i)));
auto iter = infer_type2_http_type.find(request_tensor.data_type());
if (iter == infer_type2_http_type.end()) {
ERROR_INFER_STATUS(status, FAILED, "the model input type is not supported right now");
return status;
}
HTTP_DATA_TYPE type = iter->second;
// check data shape
auto const &json_shape = GetJsonArrayShape(tensor);
auto is_scalar_shape = [](const std::vector<int64_t> &shape) {
return shape.empty() || (shape.size() == 1 && shape[0] == 1);
};
if (is_scalar_shape(request_tensor.shape())) {
return GetScalarDataFromJson(tensor, &request_tensor, type);
} else {
if (json_shape != request_tensor.shape()) { // data shape not match
status = INFER_STATUS(INVALID_INPUTS) << "input " << i << " shape is invalid, expected "
<< request_tensor.shape() << ", given " << json_shape;
MSI_LOG_ERROR << status.StatusMessage();
return status;
}
size_t depth = 0;
size_t data_index = 0;
status = RecusiveGetTensor(tensor, depth, &request_tensor, data_index, type);
if (status != SUCCESS) {
MSI_LOG_ERROR << "Transfer tensor to predict request failed";
return status;
}
}
}
return status;
}
Status TransHTTPMsgToPredictRequest(struct evhttp_request *const http_request, PredictRequest *const request,
HTTP_TYPE *const type) {
Status status = CheckRequestValid(http_request);
if (status != SUCCESS) {
return status;
}
std::string post_message;
status = GetPostMessage(http_request, &post_message);
if (status != SUCCESS) {
return status;
}
// get model required shape
std::vector<inference::InferTensor> tensor_list;
status = Session::Instance().GetModelInputsInfo(tensor_list);
if (status != SUCCESS) {
ERROR_INFER_STATUS(status, FAILED, "get model inputs info failed");
return status;
}
for (auto &item : tensor_list) {
auto input = request->add_data();
ServingTensor tensor(*input);
tensor.set_shape(item.shape());
tensor.set_data_type(item.data_type());
int64_t element_num = tensor.ElementNum();
int64_t data_type_size = tensor.GetTypeSize(tensor.data_type());
if (element_num <= 0 || INT64_MAX / element_num < data_type_size) {
ERROR_INFER_STATUS(status, FAILED, "model shape invalid");
return status;
}
tensor.resize_data(element_num * data_type_size);
}
MSI_TIME_STAMP_START(ParseJson)
json message_info;
try {
message_info = nlohmann::json::parse(post_message);
} catch (nlohmann::json::exception &e) {
std::string json_exception = e.what();
std::string error_message = "Illegal JSON format." + json_exception;
ERROR_INFER_STATUS(status, INVALID_INPUTS, error_message);
return status;
}
MSI_TIME_STAMP_END(ParseJson)
status = CheckMessageValid(message_info, type);
if (status != SUCCESS) {
return status;
}
switch (*type) {
case TYPE_DATA:
status = TransDataToPredictRequest(message_info, request);
break;
case TYPE_TENSOR:
status = TransTensorToPredictRequest(message_info, request);
break;
default:
ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message must have only one type of (data, tensor)");
return status;
}
return status;
}
Status GetJsonFromTensor(const ms_serving::Tensor &tensor, int len, int *const pos, json *const out_json) {
Status status(SUCCESS);
switch (tensor.tensor_type()) {
case ms_serving::MS_INT32: {
auto data = reinterpret_cast<const int *>(tensor.data().data()) + *pos;
std::vector<int32_t> result_tensor(len);
memcpy_s(result_tensor.data(), result_tensor.size() * sizeof(int32_t), data, len * sizeof(int32_t));
*out_json = std::move(result_tensor);
*pos += len;
break;
}
case ms_serving::MS_FLOAT32: {
auto data = reinterpret_cast<const float *>(tensor.data().data()) + *pos;
std::vector<float> result_tensor(len);
(void)memcpy_s(result_tensor.data(), result_tensor.size() * sizeof(float), data, len * sizeof(float));
*out_json = std::move(result_tensor);
*pos += len;
break;
}
default:
MSI_LOG(ERROR) << "the result type is not supported in restful api, type is " << tensor.tensor_type();
ERROR_INFER_STATUS(status, FAILED, "reply have unsupported type");
}
return status;
}
Status TransPredictReplyToData(const PredictReply &reply, json *const out_json) {
Status status(SUCCESS);
for (int i = 0; i < reply.result_size(); i++) {
(*out_json)["data"].push_back(json());
json &tensor_json = (*out_json)["data"].back();
int num = 1;
for (auto j = 0; j < reply.result(i).tensor_shape().dims_size(); j++) {
num *= reply.result(i).tensor_shape().dims(j);
}
int pos = 0;
status = GetJsonFromTensor(reply.result(i), num, &pos, &tensor_json);
if (status != SUCCESS) {
return status;
}
}
return status;
}
Status RecusiveGetJson(const ms_serving::Tensor &tensor, int depth, int *const pos, json *const out_json) {
Status status(SUCCESS);
if (depth >= 10) {
ERROR_INFER_STATUS(status, FAILED, "result tensor shape dims is larger than 10");
return status;
}
if (depth == tensor.tensor_shape().dims_size() - 1) {
status = GetJsonFromTensor(tensor, tensor.tensor_shape().dims(depth), pos, out_json);
if (status != SUCCESS) {
return status;
}
} else {
for (int i = 0; i < tensor.tensor_shape().dims(depth); i++) {
out_json->push_back(json());
json &tensor_json = out_json->back();
status = RecusiveGetJson(tensor, depth + 1, pos, &tensor_json);
if (status != SUCCESS) {
return status;
}
}
}
return status;
}
Status TransPredictReplyToTensor(const PredictReply &reply, json *const out_json) {
Status status(SUCCESS);
for (int i = 0; i < reply.result_size(); i++) {
(*out_json)["tensor"].push_back(json());
json &tensor_json = (*out_json)["tensor"].back();
int pos = 0;
status = RecusiveGetJson(reply.result(i), 0, &pos, &tensor_json);
if (status != SUCCESS) {
return status;
}
}
return status;
}
Status TransPredictReplyToHTTPMsg(const PredictReply &reply, const HTTP_TYPE &type, struct evbuffer *const buf) {
Status status(SUCCESS);
json out_json;
switch (type) {
case TYPE_DATA:
status = TransPredictReplyToData(reply, &out_json);
break;
case TYPE_TENSOR:
status = TransPredictReplyToTensor(reply, &out_json);
break;
default:
ERROR_INFER_STATUS(status, FAILED, "http message must have only one type of (data, tensor)");
return status;
}
const std::string &out_str = out_json.dump();
evbuffer_add(buf, out_str.data(), out_str.size());
return status;
}
Status HttpHandleMsgDetail(struct evhttp_request *const req, void *const arg, struct evbuffer *const retbuff) {
PredictRequest request;
PredictReply reply;
HTTP_TYPE type;
MSI_TIME_STAMP_START(ParseRequest)
auto status = TransHTTPMsgToPredictRequest(req, &request, &type);
MSI_TIME_STAMP_END(ParseRequest)
if (status != SUCCESS) {
MSI_LOG(ERROR) << "restful trans to request failed";
return status;
}
MSI_TIME_STAMP_START(Predict)
status = Session::Instance().Predict(request, reply);
MSI_TIME_STAMP_END(Predict)
if (status != SUCCESS) {
MSI_LOG(ERROR) << "restful predict failed";
return status;
}
MSI_TIME_STAMP_START(CreateReplyJson)
status = TransPredictReplyToHTTPMsg(reply, type, retbuff);
MSI_TIME_STAMP_END(CreateReplyJson)
if (status != SUCCESS) {
MSI_LOG(ERROR) << "restful trans to reply failed";
return status;
}
return SUCCESS;
}
void http_handler_msg(struct evhttp_request *const req, void *const arg) {
MSI_TIME_STAMP_START(TotalRestfulPredict)
struct evbuffer *retbuff = evbuffer_new();
if (retbuff == nullptr) {
MSI_LOG_ERROR << "Create event buffer failed";
return;
}
auto status = HttpHandleMsgDetail(req, arg, retbuff);
if (status != SUCCESS) {
ErrorMessage(req, status);
evbuffer_free(retbuff);
return;
}
MSI_TIME_STAMP_START(ReplyJson)
evhttp_send_reply(req, HTTP_OK, "Client", retbuff);
MSI_TIME_STAMP_END(ReplyJson)
evbuffer_free(retbuff);
MSI_TIME_STAMP_END(TotalRestfulPredict)
}
} // namespace serving
} // namespace mindspore