From 75d116b5dbfeb0b03896cba65fcf6d9deaa24eb7 Mon Sep 17 00:00:00 2001 From: hexia Date: Tue, 18 Aug 2020 19:31:57 +0800 Subject: [PATCH] serving http init --- cmake/external_libs/libevent.cmake | 11 + cmake/mind_expression.cmake | 2 + cmake/package.cmake | 14 + include/infer_log.h | 12 + include/inference.h | 4 + .../session/ascend_inference_session.cc | 26 ++ .../session/ascend_inference_session.h | 1 + .../ccsrc/backend/session/infer_session.cc | 16 +- .../ccsrc/backend/session/infer_session.h | 1 + .../ccsrc/backend/session/session_basic.h | 1 + serving/CMakeLists.txt | 3 + serving/core/http_process.cc | 423 ++++++++++++++++++ serving/core/http_process.h | 29 ++ serving/core/server.cc | 139 ++---- serving/core/server.h | 36 -- serving/core/session.cc | 136 ++++++ serving/core/session.h | 62 +++ serving/core/util/option_parser.cc | 10 + serving/core/util/option_parser.h | 1 + .../version_control/version_controller.cc | 2 +- serving/example/export_model/add_model.py | 5 +- serving/example/python_client/ms_client.py | 8 +- tests/st/serving/serving.sh | 4 +- tests/ut/cpp/CMakeLists.txt | 2 +- tests/ut/cpp/serving/CMakeLists.txt | 1 - .../ut/cpp/serving/acl_session_test_common.h | 1 + 26 files changed, 804 insertions(+), 146 deletions(-) create mode 100644 cmake/external_libs/libevent.cmake create mode 100644 serving/core/http_process.cc create mode 100644 serving/core/http_process.h create mode 100644 serving/core/session.cc create mode 100644 serving/core/session.h diff --git a/cmake/external_libs/libevent.cmake b/cmake/external_libs/libevent.cmake new file mode 100644 index 00000000000..553e1c65c40 --- /dev/null +++ b/cmake/external_libs/libevent.cmake @@ -0,0 +1,11 @@ +mindspore_add_pkg(libevent + VER 2.1.12 + LIBS event event_pthreads + URL https://github.com/libevent/libevent/releases/download/release-2.1.12-stable/libevent-2.1.12-stable.tar.gz + MD5 b5333f021f880fe76490d8a799cd79f4 + CMAKE_OPTION -DCMAKE_BUILD_TYPE:STRING=Release -DBUILD_TESTING=OFF) + +include_directories(${libevent_INC}) + +add_library(mindspore::event ALIAS libevent::event) +add_library(mindspore::event_pthreads ALIAS libevent::event_pthreads) diff --git a/cmake/mind_expression.cmake b/cmake/mind_expression.cmake index 8e1e9ce553c..f033e14e393 100644 --- a/cmake/mind_expression.cmake +++ b/cmake/mind_expression.cmake @@ -22,6 +22,8 @@ if (ENABLE_DEBUGGER OR ENABLE_SERVING OR ENABLE_TESTCASES) include(${CMAKE_SOURCE_DIR}/cmake/external_libs/zlib.cmake) # build gRPC include(${CMAKE_SOURCE_DIR}/cmake/external_libs/grpc.cmake) + # build event + include(${CMAKE_SOURCE_DIR}/cmake/external_libs/libevent.cmake) endif() include(${CMAKE_SOURCE_DIR}/cmake/external_libs/pybind11.cmake) diff --git a/cmake/package.cmake b/cmake/package.cmake index ed31483f2f6..9f7a2410113 100644 --- a/cmake/package.cmake +++ b/cmake/package.cmake @@ -163,6 +163,13 @@ if (ENABLE_GPU) ) endif () +if (ENABLE_SERVING OR ENABLE_TESTCASES) + file(GLOB_RECURSE LIBEVENT_LIB_LIST + ${libevent_LIBPATH}/libevent* + ${libevent_LIBPATH}/libevent_pthreads* + ) +endif () + if (NOT ENABLE_GE) if (ENABLE_D) if (DEFINED ENV{ASCEND_CUSTOM_PATH}) @@ -191,6 +198,7 @@ if (NOT ENABLE_GE) ${CMAKE_SOURCE_DIR}/graphengine/third_party/prebuild/${CMAKE_HOST_SYSTEM_PROCESSOR}/libslog.so ${CMAKE_SOURCE_DIR}/graphengine/third_party/prebuild/${CMAKE_HOST_SYSTEM_PROCESSOR}/liberror_manager.so ${CMAKE_SOURCE_DIR}/build/graphengine/libc_sec.so + ${LIBEVENT_LIB_LIST} DESTINATION ${INSTALL_LIB_DIR} COMPONENT mindspore ) @@ -273,4 +281,10 @@ if (ENABLE_SERVING) DESTINATION ${INSTALL_LIB_DIR} COMPONENT mindspore ) + + install( + FILES ${LIBEVENT_LIB_LIST} + DESTINATION ${INSTALL_LIB_DIR} + COMPONENT mindspore + ) endif () diff --git a/include/infer_log.h b/include/infer_log.h index 869588bda3a..f08fefde68d 100644 --- a/include/infer_log.h +++ b/include/infer_log.h @@ -23,6 +23,7 @@ #include #include #include +#include #ifndef ENABLE_ACL #include "mindspore/core/utils/log_adapter.h" @@ -101,7 +102,18 @@ class LogWriter { #endif // ENABLE_ACL +#define MSI_TIME_STAMP_START(name) auto time_start_##name = std::chrono::steady_clock::now(); +#define MSI_TIME_STAMP_END(name) \ + { \ + auto time_end_##name = std::chrono::steady_clock::now(); \ + auto time_cost = std::chrono::duration(time_end_##name - time_start_##name).count(); \ + MSI_LOG_INFO << #name " Time Cost # " << time_cost << " ms ---------------------"; \ + } + #define INFER_STATUS(code) inference::Status(code) < inference::LogStream() +#define ERROR_INFER_STATUS(status, type, msg) \ + MSI_LOG_ERROR << msg; \ + status = inference::Status(type, msg) } // namespace mindspore::inference diff --git a/include/inference.h b/include/inference.h index 8598401b757..c66d6555709 100644 --- a/include/inference.h +++ b/include/inference.h @@ -74,6 +74,10 @@ class MS_API InferSession { const RequestBase & /*request*/, ReplyBase & /*reply*/) { return FAILED; } + virtual Status GetModelInputsInfo(uint32_t graph_id, std::vector *tensor_list) const { + Status status(SUCCESS); + return status; + } static std::shared_ptr CreateSession(const std::string &device, uint32_t device_id); }; diff --git a/mindspore/ccsrc/backend/session/ascend_inference_session.cc b/mindspore/ccsrc/backend/session/ascend_inference_session.cc index fd0bec6d5e0..bbf790c7b01 100644 --- a/mindspore/ccsrc/backend/session/ascend_inference_session.cc +++ b/mindspore/ccsrc/backend/session/ascend_inference_session.cc @@ -212,5 +212,31 @@ std::string AscendInferenceSession::InputsInfo(const std::vector & return graph + " " + actual; } +void AscendInferenceSession::GetModelInputsInfo(uint32_t graph_id, std::vector *inputs) const { + MS_LOG(INFO) << "Start get model inputs, graph id : " << graph_id; + auto kernel_graph = GetGraph(graph_id); + MS_EXCEPTION_IF_NULL(kernel_graph); + auto kernel_graph_inputs = kernel_graph->inputs(); + vector paras; + // find parameters of graph inputs + for (size_t i = 0; i < kernel_graph_inputs.size(); ++i) { + if (!kernel_graph_inputs[i]->isa()) { + MS_LOG(ERROR) << "Kernel graph inputs have anfnode which is not Parameter."; + continue; + } + auto parameter = kernel_graph_inputs[i]->cast(); + if (!AnfAlgo::IsParameterWeight(parameter)) { + vector input_shape; + auto parameter_shape = AnfAlgo::GetOutputDeviceShape(parameter, 0); + (void)std::transform(parameter_shape.begin(), parameter_shape.end(), std::back_inserter(input_shape), + [](const size_t dim) { return static_cast(dim); }); + auto kernel_build_info = AnfAlgo::GetSelectKernelBuildInfo(parameter); + auto data_type = kernel_build_info->GetOutputDeviceType(0); + auto ms_tensor = std::make_shared(data_type, input_shape); + inputs->push_back(ms_tensor); + } + } +} + } // namespace session } // namespace mindspore diff --git a/mindspore/ccsrc/backend/session/ascend_inference_session.h b/mindspore/ccsrc/backend/session/ascend_inference_session.h index d092b3ccb3f..772c9a21ff3 100644 --- a/mindspore/ccsrc/backend/session/ascend_inference_session.h +++ b/mindspore/ccsrc/backend/session/ascend_inference_session.h @@ -45,6 +45,7 @@ class AscendInferenceSession : public AscendSession { template std::string PrintInputShape(std::vector shape) const; std::string InputsInfo(const std::vector ¶s, const std::vector &inputs) const; + void GetModelInputsInfo(uint32_t graph_id, std::vector *inputs) const override; }; MS_REG_SESSION(kDavinciInferenceDevice, AscendInferenceSession); } // namespace session diff --git a/mindspore/ccsrc/backend/session/infer_session.cc b/mindspore/ccsrc/backend/session/infer_session.cc index b7829795b20..78c4ddeeeee 100644 --- a/mindspore/ccsrc/backend/session/infer_session.cc +++ b/mindspore/ccsrc/backend/session/infer_session.cc @@ -220,7 +220,7 @@ Status MSInferSession::ExecuteModel(uint32_t model_id, const RequestBase &reques for (const auto &tensor : outputs) { auto out_tensor = reply.add(); if (out_tensor == nullptr) { - MS_LOG(ERROR) << "Execute Model " << model_id << " Failed, add output tensor failed"; + MS_LOG(ERROR) << "Execute Model " << model_id << " Failed add output tensor failed"; return FAILED; } MSTensor2ServingTensor(tensor, *out_tensor); @@ -374,4 +374,18 @@ Status MSInferSession::CheckModelInputs(uint32_t graph_id, const std::vector *tensor_list) const { + vector inputs; + session_impl_->GetModelInputsInfo(model_id, &inputs); + if (inputs.size() == 0) { + MS_LOG(ERROR) << "The model inputs is NULL"; + return FAILED; + } + for (const auto &tensor : inputs) { + InferTensor infer_tensor = InferTensor(); + MSTensor2ServingTensor(tensor, infer_tensor); + tensor_list->push_back(infer_tensor); + } + return SUCCESS; +} } // namespace mindspore::inference diff --git a/mindspore/ccsrc/backend/session/infer_session.h b/mindspore/ccsrc/backend/session/infer_session.h index c58e16e3821..b7434ecb35b 100644 --- a/mindspore/ccsrc/backend/session/infer_session.h +++ b/mindspore/ccsrc/backend/session/infer_session.h @@ -43,6 +43,7 @@ class MSInferSession : public InferSession { Status LoadModelFromFile(const std::string &file_name, uint32_t &model_id) override; Status UnloadModel(uint32_t model_id) override; Status ExecuteModel(uint32_t model_id, const RequestBase &inputs, ReplyBase &outputs) override; + Status GetModelInputsInfo(uint32_t graph_id, std::vector *tensor_list) const override; private: std::shared_ptr session_impl_ = nullptr; diff --git a/mindspore/ccsrc/backend/session/session_basic.h b/mindspore/ccsrc/backend/session/session_basic.h index d3e94473a41..ddd15c6d359 100644 --- a/mindspore/ccsrc/backend/session/session_basic.h +++ b/mindspore/ccsrc/backend/session/session_basic.h @@ -97,6 +97,7 @@ class SessionBasic { std::string *error_msg) const { return true; } + virtual void GetModelInputsInfo(uint32_t graph_id, std::vector *inputs) const {} #ifdef ENABLE_DEBUGGER // set debugger diff --git a/serving/CMakeLists.txt b/serving/CMakeLists.txt index 8b60168228a..8da0288bacb 100644 --- a/serving/CMakeLists.txt +++ b/serving/CMakeLists.txt @@ -93,7 +93,10 @@ if (ENABLE_ACL) endif () include_directories(${CMAKE_BINARY_DIR}) + add_executable(ms_serving ${SERVING_SRC}) +#libevent +target_link_libraries(ms_serving mindspore::event mindspore::event_pthreads) target_link_libraries(ms_serving ${_REFLECTION} ${_GRPC_GRPCPP} ${_PROTOBUF_LIBPROTOBUF} pthread) if (ENABLE_D) diff --git a/serving/core/http_process.cc b/serving/core/http_process.cc new file mode 100644 index 00000000000..cf989779001 --- /dev/null +++ b/serving/core/http_process.cc @@ -0,0 +1,423 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include "serving/ms_service.pb.h" +#include "util/status.h" +#include "core/session.h" +#include "core/http_process.h" + +using ms_serving::MSService; +using ms_serving::PredictReply; +using ms_serving::PredictRequest; +using nlohmann::json; + +namespace mindspore { +namespace serving { + +const int BUF_MAX = 0x1FFFFF; +static constexpr char HTTP_DATA[] = "data"; +static constexpr char HTTP_TENSOR[] = "tensor"; +enum HTTP_TYPE { TYPE_DATA = 0, TYPE_TENSOR }; +enum HTTP_DATA_TYPE { HTTP_DATA_NONE, HTTP_DATA_INT, HTTP_DATA_FLOAT }; +static const std::map http_to_infer_map{ + {HTTP_DATA_NONE, ms_serving::MS_UNKNOWN}, + {HTTP_DATA_INT, ms_serving::MS_INT32}, + {HTTP_DATA_FLOAT, ms_serving::MS_FLOAT32}}; + +Status GetPostMessage(struct evhttp_request *req, std::string *buf) { + Status status(SUCCESS); + size_t post_size = evbuffer_get_length(req->input_buffer); + if (post_size == 0) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message invalid"); + return status; + } else { + size_t copy_len = post_size > BUF_MAX ? BUF_MAX : post_size; + buf->resize(copy_len); + memcpy(buf->data(), evbuffer_pullup(req->input_buffer, -1), copy_len); + return status; + } +} +Status CheckRequestValid(struct evhttp_request *http_request) { + Status status(SUCCESS); + switch (evhttp_request_get_command(http_request)) { + case EVHTTP_REQ_POST: + return status; + default: + ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message only support POST right now"); + return status; + } +} + +void ErrorMessage(struct evhttp_request *req, Status status) { + json error_json = {{"error_message", status.StatusMessage()}}; + std::string out_error_str = error_json.dump(); + struct evbuffer *retbuff = evbuffer_new(); + evbuffer_add(retbuff, out_error_str.data(), out_error_str.size()); + evhttp_send_reply(req, HTTP_OK, "Client", retbuff); + evbuffer_free(retbuff); +} + +Status CheckMessageValid(const json &message_info, HTTP_TYPE *type) { + Status status(SUCCESS); + int count = 0; + if (message_info.find(HTTP_DATA) != message_info.end()) { + *type = TYPE_DATA; + count++; + } + if (message_info.find(HTTP_TENSOR) != message_info.end()) { + *type = TYPE_TENSOR; + count++; + } + if (count != 1) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message must have only one type of (data, tensor, text)"); + return status; + } + return status; +} + +Status GetDataFromJson(const json &json_data, std::string *data, HTTP_DATA_TYPE *type) { + Status status(SUCCESS); + if (json_data.is_number_integer()) { + if (*type == HTTP_DATA_NONE) { + *type = HTTP_DATA_INT; + } else if (*type != HTTP_DATA_INT) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input data type should be consistent"); + return status; + } + auto s_data = json_data.get(); + data->append(reinterpret_cast(&s_data), sizeof(int32_t)); + MSI_LOG(INFO) << "data size " << data->size(); + } else if (json_data.is_number_float()) { + if (*type == HTTP_DATA_NONE) { + *type = HTTP_DATA_FLOAT; + } else if (*type != HTTP_DATA_FLOAT) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input data type should be consistent"); + return status; + } + auto s_data = json_data.get(); + data->append(reinterpret_cast(&s_data), sizeof(float)); + MSI_LOG(INFO) << "data size " << data->size(); + } else { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input data type should be int or float"); + return status; + } + return SUCCESS; +} + +Status RecusiveGetTensor(const json &json_data, size_t depth, std::vector *shape, std::string *data, + HTTP_DATA_TYPE *type) { + Status status(SUCCESS); + if (depth >= 10) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor shape dims is larger than 10"); + return status; + } + if (!json_data.is_array()) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally"); + return status; + } + int cur_dim = json_data.size(); + if (shape->size() <= depth) { + shape->push_back(cur_dim); + } else if ((*shape)[depth] != cur_dim) { + return INFER_STATUS(INVALID_INPUTS) << "the tensor shape is constructed illegally"; + } + if (json_data.at(0).is_array()) { + for (const auto &item : json_data) { + status = RecusiveGetTensor(item, depth + 1, shape, data, type); + if (status != SUCCESS) { + return status; + } + } + } else { + // last dim, read the data + for (auto item : json_data) { + status = GetDataFromJson(item, data, type); + if (status != SUCCESS) { + return status; + } + } + } + return status; +} + +Status TransDataToPredictRequest(const json &message_info, PredictRequest *request) { + Status status = SUCCESS; + auto tensors = message_info.find(HTTP_DATA); + if (tensors == message_info.end()) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have data type"); + return status; + } + + if (tensors->size() == 0) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is null"); + return status; + } + for (const auto &tensor : *tensors) { + std::string msg_data; + HTTP_DATA_TYPE type{HTTP_DATA_NONE}; + if (!tensor.is_array()) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally"); + return status; + } + if (tensor.size() == 0) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor is null"); + return status; + } + for (const auto &tensor_data : tensor) { + status = GetDataFromJson(tensor_data, &msg_data, &type); + if (status != SUCCESS) { + return status; + } + } + auto iter = http_to_infer_map.find(type); + if (iter == http_to_infer_map.end()) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input type is not supported right now"); + return status; + } + + auto infer_tensor = request->add_data(); + infer_tensor->set_tensor_type(iter->second); + infer_tensor->set_data(msg_data.data(), msg_data.size()); + } + // get model required shape + std::vector tensor_list; + status = Session::Instance().GetModelInputsInfo(tensor_list); + if (status != SUCCESS) { + ERROR_INFER_STATUS(status, FAILED, "get model inputs info failed"); + return status; + } + if (request->data_size() != static_cast(tensor_list.size())) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the inputs number is not equal to model required"); + return status; + } + for (int i = 0; i < request->data_size(); i++) { + for (size_t j = 0; j < tensor_list[i].shape().size(); ++j) { + request->mutable_data(i)->mutable_tensor_shape()->add_dims(tensor_list[i].shape()[i]); + } + } + return SUCCESS; +} + +Status TransTensorToPredictRequest(const json &message_info, PredictRequest *request) { + Status status(SUCCESS); + auto tensors = message_info.find(HTTP_TENSOR); + if (tensors == message_info.end()) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have tensor type"); + return status; + } + + for (const auto &tensor : *tensors) { + std::vector shape; + std::string msg_data; + HTTP_DATA_TYPE type{HTTP_DATA_NONE}; + RecusiveGetTensor(tensor, 0, &shape, &msg_data, &type); + MSI_LOG(INFO) << shape << ", data = " << msg_data.size(); + auto iter = http_to_infer_map.find(type); + if (iter == http_to_infer_map.end()) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input type is not supported right now"); + return status; + } + auto infer_tensor = request->add_data(); + infer_tensor->set_tensor_type(iter->second); + infer_tensor->set_data(msg_data.data(), msg_data.size()); + for (const auto dim : shape) { + infer_tensor->mutable_tensor_shape()->add_dims(dim); + } + } + return status; +} + +Status TransHTTPMsgToPredictRequest(struct evhttp_request *http_request, PredictRequest *request, HTTP_TYPE *type) { + Status status = CheckRequestValid(http_request); + if (status != SUCCESS) { + return status; + } + std::string post_message; + status = GetPostMessage(http_request, &post_message); + if (status != SUCCESS) { + return status; + } + + json message_info; + try { + message_info = nlohmann::json::parse(post_message); + } catch (nlohmann::json::exception &e) { + std::string json_exception = e.what(); + std::string error_message = "Illegal JSON format." + json_exception; + ERROR_INFER_STATUS(status, INVALID_INPUTS, error_message); + return status; + } + + status = CheckMessageValid(message_info, type); + if (status != SUCCESS) { + return status; + } + switch (*type) { + case TYPE_DATA: + status = TransDataToPredictRequest(message_info, request); + break; + case TYPE_TENSOR: + status = TransTensorToPredictRequest(message_info, request); + break; + default: + ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message must have only one type of (data, tensor)"); + return status; + } + return status; +} + +Status GetJsonFromTensor(const ms_serving::Tensor &tensor, int len, int *pos, json *out_json) { + Status status(SUCCESS); + switch (tensor.tensor_type()) { + case ms_serving::MS_INT32: { + std::vector result_tensor; + for (int j = 0; j < len; j++) { + int val; + memcpy(&val, reinterpret_cast(tensor.data().data()) + *pos + j, sizeof(int)); + result_tensor.push_back(val); + } + *out_json = result_tensor; + *pos += len; + break; + } + case ms_serving::MS_FLOAT32: { + std::vector result_tensor; + for (int j = 0; j < len; j++) { + float val; + memcpy(&val, reinterpret_cast(tensor.data().data()) + *pos + j, sizeof(float)); + result_tensor.push_back(val); + } + *out_json = result_tensor; + *pos += len; + break; + } + default: + MSI_LOG(ERROR) << "the result type is not supported in restful api, type is " << tensor.tensor_type(); + ERROR_INFER_STATUS(status, FAILED, "reply have unsupported type"); + } + return status; +} + +Status TransPredictReplyToData(const PredictReply &reply, json *out_json) { + Status status(SUCCESS); + for (int i = 0; i < reply.result_size(); i++) { + json tensor_json; + int num = 1; + for (auto j = 0; j < reply.result(i).tensor_shape().dims_size(); j++) { + num *= reply.result(i).tensor_shape().dims(j); + } + int pos = 0; + status = GetJsonFromTensor(reply.result(i), num, &pos, &tensor_json); + if (status != SUCCESS) { + return status; + } + (*out_json)["data"].push_back(tensor_json); + } + return status; +} + +Status RecusiveGetJson(const ms_serving::Tensor &tensor, int depth, int *pos, json *out_json) { + Status status(SUCCESS); + if (depth >= 10) { + ERROR_INFER_STATUS(status, FAILED, "result tensor shape dims is larger than 10"); + return status; + } + if (depth == tensor.tensor_shape().dims_size() - 1) { + status = GetJsonFromTensor(tensor, tensor.tensor_shape().dims(depth), pos, out_json); + if (status != SUCCESS) { + return status; + } + } else { + for (int i = 0; i < tensor.tensor_shape().dims(depth); i++) { + json tensor_json; + status = RecusiveGetJson(tensor, depth + 1, pos, &tensor_json); + if (status != SUCCESS) { + return status; + } + out_json->push_back(tensor_json); + } + } + return status; +} + +Status TransPredictReplyToTensor(const PredictReply &reply, json *out_json) { + Status status(SUCCESS); + for (int i = 0; i < reply.result_size(); i++) { + json tensor_json; + int pos = 0; + status = RecusiveGetJson(reply.result(i), 0, &pos, &tensor_json); + if (status != SUCCESS) { + return status; + } + (*out_json)["tensor"].push_back(tensor_json); + } + return status; +} + +Status TransPredictReplyToHTTPMsg(const PredictReply &reply, const HTTP_TYPE &type, struct evbuffer *buf) { + Status status(SUCCESS); + json out_json; + switch (type) { + case TYPE_DATA: + status = TransPredictReplyToData(reply, &out_json); + break; + case TYPE_TENSOR: + status = TransPredictReplyToTensor(reply, &out_json); + break; + default: + ERROR_INFER_STATUS(status, FAILED, "http message must have only one type of (data, tensor)"); + return status; + } + + std::string out_str = out_json.dump(); + evbuffer_add(buf, out_str.data(), out_str.size()); + return status; +} + +void http_handler_msg(struct evhttp_request *req, void *arg) { + std::cout << "in handle" << std::endl; + PredictRequest request; + PredictReply reply; + HTTP_TYPE type; + auto status = TransHTTPMsgToPredictRequest(req, &request, &type); + if (status != SUCCESS) { + ErrorMessage(req, status); + MSI_LOG(ERROR) << "restful trans to request failed"; + return; + } + MSI_TIME_STAMP_START(Predict) + status = Session::Instance().Predict(request, reply); + if (status != SUCCESS) { + ErrorMessage(req, status); + MSI_LOG(ERROR) << "restful predict failed"; + } + MSI_TIME_STAMP_END(Predict) + struct evbuffer *retbuff = evbuffer_new(); + status = TransPredictReplyToHTTPMsg(reply, type, retbuff); + if (status != SUCCESS) { + ErrorMessage(req, status); + MSI_LOG(ERROR) << "restful trans to reply failed"; + return; + } + evhttp_send_reply(req, HTTP_OK, "Client", retbuff); + evbuffer_free(retbuff); +} + +} // namespace serving +} // namespace mindspore diff --git a/serving/core/http_process.h b/serving/core/http_process.h new file mode 100644 index 00000000000..a4dfc374a7b --- /dev/null +++ b/serving/core/http_process.h @@ -0,0 +1,29 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_SERVING_HTTP_PROCESS_H +#define MINDSPORE_SERVING_HTTP_PROCESS_H + +#include +#include +#include +#include + +namespace mindspore { +namespace serving { +void http_handler_msg(struct evhttp_request *req, void *arg); +} // namespace serving +} // namespace mindspore +#endif // MINDSPORE_SERVER_H diff --git a/serving/core/server.cc b/serving/core/server.cc index 61a3f1558a2..797793a3b44 100644 --- a/serving/core/server.cc +++ b/serving/core/server.cc @@ -14,23 +14,25 @@ * limitations under the License. */ #include "core/server.h" +#include +#include +#include #include #include #include +#include +#include #include -#include #include #include -#include -#include -#include - #include "include/infer_log.h" #include "serving/ms_service.grpc.pb.h" #include "core/util/option_parser.h" #include "core/version_control/version_controller.h" -#include "core/util/file_system_operation.h" +#include "core/session.h" #include "core/serving_tensor.h" +#include "core/http_process.h" + using ms_serving::MSService; using ms_serving::PredictReply; @@ -39,93 +41,6 @@ using ms_serving::PredictRequest; namespace mindspore { namespace serving { -#define MSI_TIME_STAMP_START(name) auto time_start_##name = std::chrono::steady_clock::now(); -#define MSI_TIME_STAMP_END(name) \ - { \ - auto time_end_##name = std::chrono::steady_clock::now(); \ - auto time_cost = std::chrono::duration(time_end_##name - time_start_##name).count(); \ - MSI_LOG_INFO << #name " Time Cost # " << time_cost << " ms ---------------------"; \ - } - -Status Session::CreatDeviceSession(const std::string &device, uint32_t device_id) { - session_ = inference::InferSession::CreateSession(device, device_id); - if (session_ == nullptr) { - MSI_LOG(ERROR) << "Creat Session Failed"; - return FAILED; - } - device_type_ = device; - return SUCCESS; -} - -Session &Session::Instance() { - static Session instance; - return instance; -} - -Status Session::Predict(const PredictRequest &request, PredictReply &reply) { - if (!model_loaded_) { - MSI_LOG(ERROR) << "the model has not loaded"; - return FAILED; - } - if (session_ == nullptr) { - MSI_LOG(ERROR) << "the inference session has not be initialized"; - return FAILED; - } - std::lock_guard lock(mutex_); - MSI_LOG(INFO) << "run Predict"; - - if (request.images_size() > 0) { - ServingImagesRequest serving_images(request); - ServingRequest serving_request(request); - ServingReply serving_reply(reply); - Status ret = session_->ExecuteModel(graph_id_, serving_images, serving_request, serving_reply); - if (ret != SUCCESS) { - MSI_LOG(ERROR) << "execute model with images return failed"; - return ret; - } - } else if (request.data_size() > 0) { - ServingRequest serving_request(request); - ServingReply serving_reply(reply); - Status ret = session_->ExecuteModel(graph_id_, serving_request, serving_reply); - if (ret != SUCCESS) { - MSI_LOG(ERROR) << "execute model with datas return failed"; - return ret; - } - } - - MSI_LOG(INFO) << "run Predict finished"; - return SUCCESS; -} - -Status Session::Warmup(const MindSporeModelPtr model) { - if (session_ == nullptr) { - MSI_LOG(ERROR) << "The CreatDeviceSession should be called, before warmup"; - return FAILED; - } - std::lock_guard lock(mutex_); - std::string file_name = model->GetModelPath() + '/' + model->GetModelName(); - model_loaded_ = false; - MSI_TIME_STAMP_START(LoadModelFromFile) - auto ret = session_->LoadModelFromFile(file_name, graph_id_); - MSI_TIME_STAMP_END(LoadModelFromFile) - if (ret != SUCCESS) { - MSI_LOG(ERROR) << "Load graph model failed, file name is " << file_name.c_str(); - return ret; - } - model_loaded_ = true; - MSI_LOG(INFO) << "Session Warmup finished"; - return SUCCESS; -} - -Status Session::Clear() { - if (session_ != nullptr) { - session_->UnloadModel(graph_id_); - session_->FinalizeEnv(); - session_ = nullptr; - } - return SUCCESS; -} - namespace { static const uint32_t uint32max = 0x7FFFFFFF; std::promise exit_requested; @@ -179,6 +94,7 @@ Status Server::BuildAndStart() { signal(SIGINT, HandleSignal); signal(SIGTERM, HandleSignal); Status res; + auto option_args = Options::Instance().GetArgs(); std::string server_address = "0.0.0.0:" + std::to_string(option_args->grpc_port); std::string model_path = option_args->model_path; @@ -198,6 +114,26 @@ Status Server::BuildAndStart() { ClearEnv(); return res; } + + // init http server + struct evhttp *http_server = NULL; + struct event_base *eb = NULL; + int32_t http_port = option_args->rest_api_port; + std::string http_addr = "0.0.0.0"; + event_init(); + evthread_use_pthreads(); + eb = event_base_new(); + http_server = evhttp_new(eb); + evhttp_bind_socket_with_handle(http_server, http_addr.c_str(), http_port); + // http_server = evhttp_start(http_addr.c_str(), http_port); + if (http_server == NULL) { + MSI_LOG(ERROR) << "http server start failed."; + return res; + } + evhttp_set_timeout(http_server, 5); + evhttp_set_gencb(http_server, http_handler_msg, NULL); + + // grpc server MSServiceImpl ms_service; grpc::EnableDefaultHealthCheckService(true); grpc::reflection::InitProtoReflectionServerBuilderPlugin(); @@ -214,14 +150,23 @@ Status Server::BuildAndStart() { ClearEnv(); return FAILED; } - auto grpc_server_run = [&server]() { server->Wait(); }; - std::thread serving_thread(grpc_server_run); - MSI_LOG(INFO) << "MS Serving listening on " << server_address; + auto grpc_server_run = [&server, &server_address]() { + MSI_LOG(INFO) << "MS Serving grpc listening on " << server_address; + server->Wait(); + }; + auto http_server_run = [&eb, &http_addr, &http_port]() { + MSI_LOG(INFO) << "MS Serving restful listening on " << http_addr << ":" << http_port; + event_base_dispatch(eb); + }; + std::thread grpc_thread(grpc_server_run); + std::thread restful_thread(http_server_run); auto exit_future = exit_requested.get_future(); exit_future.wait(); ClearEnv(); server->Shutdown(); - serving_thread.join(); + event_base_loopexit(eb, NULL); + grpc_thread.join(); + restful_thread.join(); return SUCCESS; } } // namespace serving diff --git a/serving/core/server.h b/serving/core/server.h index f97db84fce3..a7795403e8b 100644 --- a/serving/core/server.h +++ b/serving/core/server.h @@ -16,46 +16,10 @@ #ifndef MINDSPORE_SERVER_H #define MINDSPORE_SERVER_H -#include -#include -#include -#include #include "util/status.h" -#include "version_control/model.h" -#include "include/inference.h" -#include "serving/ms_service.pb.h" -#include "serving/ms_service.grpc.pb.h" - namespace mindspore { namespace serving { -using ms_serving::PredictReply; -using ms_serving::PredictRequest; -using inference::Status; -using inference::SUCCESS; -using inference::FAILED; -using inference::INVALID_INPUTS; - -class Session { - public: - static Session &Instance(); - Status CreatDeviceSession(const std::string &device, uint32_t device_id); - // Status Predict(const inference::MultiTensor &inputs, inference::MultiTensor &output); - Status Predict(const PredictRequest &request, PredictReply &reply); - Status Warmup(const MindSporeModelPtr model); - Status Clear(); - - private: - Session() = default; - ~Session() = default; - int sesseion_id_{0}; - std::shared_ptr session_{nullptr}; - bool model_loaded_ = false; - uint32_t graph_id_{0}; - std::mutex mutex_; - std::string device_type_; -}; - class Server { public: Server() = default; diff --git a/serving/core/session.cc b/serving/core/session.cc new file mode 100644 index 00000000000..9c0ba32f85b --- /dev/null +++ b/serving/core/session.cc @@ -0,0 +1,136 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/session.h" +#include +#include +#include +#include +#include +#include +#include + +#include "include/infer_log.h" +#include "serving/ms_service.grpc.pb.h" +#include "core/util/option_parser.h" +#include "core/version_control/version_controller.h" +#include "core/util/file_system_operation.h" +#include "core/serving_tensor.h" + +using ms_serving::MSService; +using ms_serving::PredictReply; +using ms_serving::PredictRequest; + +namespace mindspore { +namespace serving { + +Status Session::CreatDeviceSession(const std::string &device, uint32_t device_id) { + session_ = inference::InferSession::CreateSession(device, device_id); + if (session_ == nullptr) { + MSI_LOG(ERROR) << "Creat Session Failed"; + return FAILED; + } + device_type_ = device; + return SUCCESS; +} + +Session &Session::Instance() { + static Session instance; + return instance; +} + +Status Session::Predict(const PredictRequest &request, PredictReply &reply) { + if (!model_loaded_) { + MSI_LOG(ERROR) << "the model has not loaded"; + return FAILED; + } + if (session_ == nullptr) { + MSI_LOG(ERROR) << "the inference session has not be initialized"; + return FAILED; + } + std::lock_guard lock(mutex_); + MSI_LOG(INFO) << "run Predict"; + + if (request.images_size() > 0) { + ServingImagesRequest serving_images(request); + ServingRequest serving_request(request); + ServingReply serving_reply(reply); + Status ret = session_->ExecuteModel(graph_id_, serving_images, serving_request, serving_reply); + if (ret != SUCCESS) { + MSI_LOG(ERROR) << "execute model with images return failed"; + return ret; + } + } else if (request.data_size() > 0) { + ServingRequest serving_request(request); + ServingReply serving_reply(reply); + Status ret = session_->ExecuteModel(graph_id_, serving_request, serving_reply); + if (ret != SUCCESS) { + MSI_LOG(ERROR) << "execute model with datas return failed"; + return ret; + } + } + + MSI_LOG(INFO) << "run Predict finished"; + return SUCCESS; +} + +Status Session::Warmup(const MindSporeModelPtr model) { + if (session_ == nullptr) { + MSI_LOG(ERROR) << "The CreatDeviceSession should be called, before warmup"; + return FAILED; + } + std::lock_guard lock(mutex_); + std::string file_name = model->GetModelPath() + '/' + model->GetModelName(); + model_loaded_ = false; + MSI_TIME_STAMP_START(LoadModelFromFile) + auto ret = session_->LoadModelFromFile(file_name, graph_id_); + MSI_TIME_STAMP_END(LoadModelFromFile) + if (ret != SUCCESS) { + MSI_LOG(ERROR) << "Load graph model failed, file name is " << file_name.c_str(); + return ret; + } + model_loaded_ = true; + MSI_LOG(INFO) << "Session Warmup finished"; + return SUCCESS; +} + +Status Session::Clear() { + if (session_ != nullptr) { + session_->UnloadModel(graph_id_); + session_->FinalizeEnv(); + session_ = nullptr; + } + return SUCCESS; +} + +Status Session::GetModelInputsInfo(std::vector &tensor_list) { + if (!model_loaded_) { + MSI_LOG(ERROR) << "the model has not loaded"; + return FAILED; + } + if (session_ == nullptr) { + MSI_LOG(ERROR) << "the inference session has not be initialized"; + return FAILED; + } + std::lock_guard lock(mutex_); + Status ret = session_->GetModelInputsInfo(graph_id_, &tensor_list); + if (ret != SUCCESS) { + MSI_LOG(ERROR) << "get model inputs info failed"; + } + return ret; +} + +} // namespace serving +} // namespace mindspore diff --git a/serving/core/session.h b/serving/core/session.h new file mode 100644 index 00000000000..ae50f6f78ff --- /dev/null +++ b/serving/core/session.h @@ -0,0 +1,62 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_SERVING_SESSION_H +#define MINDSPORE_SERVING_SESSION_H + +#include +#include +#include +#include +#include "util/status.h" +#include "version_control/model.h" +#include "include/inference.h" +#include "serving/ms_service.pb.h" +#include "serving/ms_service.grpc.pb.h" + +namespace mindspore { +namespace serving { + +using inference::FAILED; +using inference::INVALID_INPUTS; +using inference::Status; +using inference::SUCCESS; +using ms_serving::PredictReply; +using ms_serving::PredictRequest; + +class Session { + public: + static Session &Instance(); + Status CreatDeviceSession(const std::string &device, uint32_t device_id); + // Status Predict(const inference::MultiTensor &inputs, inference::MultiTensor &output); + Status Predict(const PredictRequest &request, PredictReply &reply); + Status Warmup(const MindSporeModelPtr model); + Status Clear(); + Status GetModelInputsInfo(std::vector &tensor_list); + + private: + Session() = default; + ~Session() = default; + int sesseion_id_{0}; + std::shared_ptr session_{nullptr}; + bool model_loaded_ = false; + uint32_t graph_id_{0}; + std::mutex mutex_; + std::string device_type_; +}; + +} // namespace serving +} // namespace mindspore +#endif // MINDSPORE_SERVER_H diff --git a/serving/core/util/option_parser.cc b/serving/core/util/option_parser.cc index df2047c30f6..4a71176a321 100644 --- a/serving/core/util/option_parser.cc +++ b/serving/core/util/option_parser.cc @@ -160,6 +160,8 @@ void Options::CreateOptions() { std::vector