From 80f33f19844879211aa5d18f7e7cdee03b9245fe Mon Sep 17 00:00:00 2001 From: fangxiang Date: Fri, 13 Dec 2024 10:42:44 +0800 Subject: [PATCH] add StreamReader cancel read steam --- .../async_stream/RequestStreamExampleCtrl.cc | 6 +++-- lib/src/MultipartStreamParser.cc | 26 ++++++++++++++----- lib/src/MultipartStreamParser.h | 13 ++++++++++ lib/src/RequestStream.cc | 12 +++++++-- .../server/RequestStreamTestCtrl.cc | 1 + 5 files changed, 48 insertions(+), 10 deletions(-) diff --git a/examples/async_stream/RequestStreamExampleCtrl.cc b/examples/async_stream/RequestStreamExampleCtrl.cc index d997dc5ac6..9788dcea6c 100644 --- a/examples/async_stream/RequestStreamExampleCtrl.cc +++ b/examples/async_stream/RequestStreamExampleCtrl.cc @@ -98,7 +98,7 @@ class RequestStreamExampleCtrl : public HttpController [files](const char *data, size_t length) { if (files->back().tmpName.empty()) { - return; + return false; } auto ¤tFile = files->back().file; if (length == 0) @@ -109,18 +109,20 @@ class RequestStreamExampleCtrl : public HttpController currentFile.flush(); currentFile.close(); } - return; + return true; } LOG_INFO << "data[" << length << "]: "; if (currentFile.is_open()) { LOG_INFO << "write file"; currentFile.write(data, length); + return true; } else { LOG_ERROR << "file not open"; } + return false; }, [files, callback = std::move(callback)](std::exception_ptr ex) { if (ex) diff --git a/lib/src/MultipartStreamParser.cc b/lib/src/MultipartStreamParser.cc index b54b54b6ce..060ba526b6 100644 --- a/lib/src/MultipartStreamParser.cc +++ b/lib/src/MultipartStreamParser.cc @@ -252,23 +252,37 @@ void drogon::MultipartStreamParser::parse( } std::string_view v = buffer_.view(); auto pos = v.find(crlfDashBoundary_); + + const auto callDataCB = [&dataCb, this](const char * _data, std::size_t _len) { + if(!dataCb(_data, _len)) { + isValid_ = false; + isFinished_ = true; + exception_type_ = ExceptionType::kServerCancel; + } + }; + if (pos == std::string::npos) { // boundary not found, leave potential partial boundary - size_t len = v.size() - crlfDashBoundary_.size(); + const size_t len = v.size() - crlfDashBoundary_.size(); if (len > 0) { - dataCb(v.data(), len); + callDataCB(v.data(), len); buffer_.eraseFront(len); } return; } // found boundary - dataCb(v.data(), pos); - if (pos > 0) - { - dataCb(v.data() + pos, 0); // notify end of file + if(!isFinished() && isValid()) { + callDataCB(v.data(), pos); + + if (pos > 0) + { + // notify end of file + callDataCB(v.data() + pos, 0); + } } + buffer_.eraseFront(pos + crlfDashBoundary_.size()); status_ = Status::kExpectEndOrNewEntry; continue; diff --git a/lib/src/MultipartStreamParser.h b/lib/src/MultipartStreamParser.h index 44f647410b..972477dc4c 100644 --- a/lib/src/MultipartStreamParser.h +++ b/lib/src/MultipartStreamParser.h @@ -21,6 +21,13 @@ namespace drogon { class DROGON_EXPORT MultipartStreamParser { + public: + enum class ExceptionType + { + kNoException = 0, + kServerCancel = 1, + }; + public: MultipartStreamParser(const std::string &contentType); @@ -39,6 +46,10 @@ class DROGON_EXPORT MultipartStreamParser return isValid_; } + ExceptionType exceptionType() const { + return exception_type_; + } + private: const std::string dash_ = "--"; const std::string crlf_ = "\r\n"; @@ -70,6 +81,8 @@ class DROGON_EXPORT MultipartStreamParser kExpectEndOrNewEntry = 4, } status_{Status::kExpectFirstBoundary}; + ExceptionType exception_type_{ExceptionType::kNoException}; + MultipartHeader currentHeader_; bool isValid_{true}; bool isFinished_{false}; diff --git a/lib/src/RequestStream.cc b/lib/src/RequestStream.cc index 7a6afbc614..aae68ca6e1 100644 --- a/lib/src/RequestStream.cc +++ b/lib/src/RequestStream.cc @@ -161,9 +161,17 @@ class MultipartStreamReader : public RequestStreamReader parser_.parse(data, length, headerCb_, dataCb_); if (!parser_.isValid()) { + std::exception_ptr exception = nullptr; // TODO: should we mix stream error and user error? - finishCb_(std::make_exception_ptr( - std::runtime_error("invalid multipart data"))); + switch (parser_.exceptionType()) { + case MultipartStreamParser::ExceptionType::kServerCancel: + exception = std::make_exception_ptr(std::runtime_error("server cancelled")); + break; + default: + exception = std::make_exception_ptr(std::runtime_error("invalid multipart data")); + break; + } + finishCb_(exception); } else if (parser_.isFinished()) { diff --git a/lib/tests/integration_test/server/RequestStreamTestCtrl.cc b/lib/tests/integration_test/server/RequestStreamTestCtrl.cc index 2bdf34a4ae..4addb40f1c 100644 --- a/lib/tests/integration_test/server/RequestStreamTestCtrl.cc +++ b/lib/tests/integration_test/server/RequestStreamTestCtrl.cc @@ -118,6 +118,7 @@ class RequestStreamTestCtrl : public HttpController { ctx->firstFileContent.append(data, length); } + return true; }, [ctx, callback = std::move(callback)](std::exception_ptr ex) { auto resp = HttpResponse::newHttpResponse();