Skip to content

Commit 6b2f2be

Browse files
authored
feat(tasker): 添加 MaaTaskerOverridePipeline 接口支持通过 task_id 动态修改 pipeline (#1079)
1 parent 96f5e3e commit 6b2f2be

22 files changed

Lines changed: 294 additions & 28 deletions

File tree

docs/en_us/2.2-IntegratedInterfaceOverview.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,13 @@ Get bound controller
645645

646646
Clear all queryable information
647647

648+
### MaaTaskerOverridePipeline
649+
650+
- `task_id`: Task ID
651+
- `pipeline_override`: JSON for overriding
652+
653+
Override the pipeline of a specified task, dynamically modify pipeline configuration during task execution.
654+
648655
### MaaTaskerGetRecognitionDetail
649656

650657
- `reco_id`: Recognition id

docs/zh_cn/2.2-集成接口一览.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,13 @@
645645

646646
清理所有可查询的信息
647647

648+
### MaaTaskerOverridePipeline
649+
650+
- `task_id`: 任务 ID
651+
- `pipeline_override`: 用于覆盖的 JSON
652+
653+
覆盖指定任务的 pipeline,在任务执行期间动态修改 pipeline 配置。
654+
648655
### MaaTaskerGetRecognitionDetail
649656

650657
- `reco_id`: 识别号

include/MaaFramework/Instance/MaaTasker.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ extern "C"
8888

8989
MAA_FRAMEWORK_API MaaBool MaaTaskerClearCache(MaaTasker* tasker);
9090

91+
MAA_FRAMEWORK_API MaaBool MaaTaskerOverridePipeline(MaaTasker* tasker, MaaTaskId task_id, const char* pipeline_override);
92+
9193
/**
9294
* @param[out] hit
9395
*/

source/Common/MaaTasker.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,29 @@ MaaBool MaaTaskerClearCache(MaaTasker* tasker)
309309
return true;
310310
}
311311

312+
MaaBool MaaTaskerOverridePipeline(MaaTasker* tasker, MaaTaskId task_id, const char* pipeline_override)
313+
{
314+
LogFunc << VAR_VOIDP(tasker) << VAR(task_id) << VAR(pipeline_override);
315+
316+
if (!tasker) {
317+
LogError << "handle is null";
318+
return false;
319+
}
320+
321+
if (!pipeline_override) {
322+
LogError << "pipeline_override is null";
323+
return false;
324+
}
325+
326+
auto ov_opt = json::parse(pipeline_override);
327+
if (!ov_opt) {
328+
LogError << "failed to parse" << VAR(pipeline_override);
329+
return false;
330+
}
331+
332+
return tasker->override_pipeline(task_id, *ov_opt);
333+
}
334+
312335
#define CheckNullAndWarn(var) \
313336
if (!var) { \
314337
LogWarn << #var << "is null, no assignment"; \

source/MaaAgentClient/Client/AgentClient.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,9 @@ bool AgentClient::handle_inserted_request(const json::value& j)
298298
else if (handle_tasker_clear_cache(j)) {
299299
return true;
300300
}
301+
else if (handle_tasker_override_pipeline(j)) {
302+
return true;
303+
}
301304
else if (handle_tasker_get_task_detail(j)) {
302305
return true;
303306
}
@@ -1106,6 +1109,29 @@ bool AgentClient::handle_tasker_clear_cache(const json::value& j)
11061109
return true;
11071110
}
11081111

1112+
bool AgentClient::handle_tasker_override_pipeline(const json::value& j)
1113+
{
1114+
if (!j.is<TaskerOverridePipelineReverseRequest>()) {
1115+
return false;
1116+
}
1117+
const TaskerOverridePipelineReverseRequest& req = j.as<TaskerOverridePipelineReverseRequest>();
1118+
LogFunc << VAR(req) << VAR(ipc_addr_);
1119+
1120+
MaaTasker* tasker = query_tasker(req.tasker_id);
1121+
if (!tasker) {
1122+
LogError << "tasker not found" << VAR(req.tasker_id);
1123+
return false;
1124+
}
1125+
1126+
bool ret = tasker->override_pipeline(req.task_id, req.pipeline_override);
1127+
1128+
TaskerOverridePipelineReverseResponse resp {
1129+
.ret = ret,
1130+
};
1131+
send(resp);
1132+
return true;
1133+
}
1134+
11091135
bool AgentClient::handle_tasker_get_task_detail(const json::value& j)
11101136
{
11111137
if (!j.is<TaskerGetTaskDetailReverseRequest>()) {

source/MaaAgentClient/Client/AgentClient.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class AgentClient
6767
bool handle_tasker_resource(const json::value& j);
6868
bool handle_tasker_controller(const json::value& j);
6969
bool handle_tasker_clear_cache(const json::value& j);
70+
bool handle_tasker_override_pipeline(const json::value& j);
7071
bool handle_tasker_get_task_detail(const json::value& j);
7172
bool handle_tasker_get_node_detail(const json::value& j);
7273
bool handle_tasker_get_reco_result(const json::value& j);

source/MaaAgentServer/RemoteInstance/RemoteTasker.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,22 @@ void RemoteTasker::clear_cache()
204204
server_.send_and_recv<TaskerClearCacheReverseResponse>(req);
205205
}
206206

207+
bool RemoteTasker::override_pipeline(MaaTaskId task_id, const json::value& pipeline_override)
208+
{
209+
TaskerOverridePipelineReverseRequest req {
210+
.tasker_id = tasker_id_,
211+
.task_id = task_id,
212+
.pipeline_override = pipeline_override,
213+
};
214+
215+
auto resp_opt = server_.send_and_recv<TaskerOverridePipelineReverseResponse>(req);
216+
if (!resp_opt) {
217+
return false;
218+
}
219+
220+
return resp_opt->ret;
221+
}
222+
207223
std::optional<MAA_TASK_NS::TaskDetail> RemoteTasker::get_task_detail(MaaTaskId task_id) const
208224
{
209225
TaskerGetTaskDetailReverseRequest req {

source/MaaAgentServer/RemoteInstance/RemoteTasker.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class RemoteTasker : public MaaTasker
2929
post_action(const std::string& action_type, const json::value& action_param, const cv::Rect& box, const std::string& reco_detail)
3030
override;
3131

32+
virtual bool override_pipeline(MaaTaskId task_id, const json::value& pipeline_override) override;
33+
3234
virtual MaaStatus status(MaaTaskId task_id) const override;
3335
virtual MaaStatus wait(MaaTaskId task_id) const override;
3436

source/MaaFramework/Base/AsyncRunner.hpp

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <functional>
55
#include <list>
66
#include <mutex>
7+
#include <optional>
78
#include <shared_mutex>
89
#include <thread>
910

@@ -39,6 +40,8 @@ class AsyncRunner : public NonCopyable
3940
void wait(Id id) const;
4041
void wait_all() const;
4142
Status status(Id id) const;
43+
std::optional<Item> get(Id id) const;
44+
std::optional<Item> get_running() const;
4245

4346
void clear();
4447
bool running() const;
@@ -49,10 +52,13 @@ class AsyncRunner : public NonCopyable
4952
ProcessFunc process_;
5053

5154
std::list<std::pair<Id, Item>> queue_;
52-
std::mutex queue_mutex_;
55+
mutable std::mutex queue_mutex_;
5356
std::condition_variable queue_cond_;
5457
std::atomic_bool running_ = false;
5558

59+
Id running_id_ = 0;
60+
Item running_item_ {};
61+
5662
mutable std::shared_mutex status_mutex_;
5763
std::map<Id, Status> status_map_;
5864

@@ -128,6 +134,9 @@ inline void AsyncRunner<Item>::working()
128134

129135
auto [id, item] = std::move(queue_.front());
130136
queue_.pop_front();
137+
138+
running_id_ = id;
139+
running_item_ = item;
131140
queue_lock.unlock();
132141

133142
std::unique_lock status_lock(status_mutex_);
@@ -136,6 +145,12 @@ inline void AsyncRunner<Item>::working()
136145

137146
bool ret = process_(id, std::move(item));
138147

148+
{
149+
std::unique_lock clear_lock(queue_mutex_);
150+
running_id_ = 0;
151+
running_item_ = {};
152+
}
153+
139154
status_lock.lock();
140155
status_map_[id] = ret ? Status::succeeded : Status::failed;
141156
status_lock.unlock();
@@ -219,6 +234,36 @@ inline AsyncRunner<Item>::Status AsyncRunner<Item>::status(Id id) const
219234
return iter->second;
220235
}
221236

237+
template <typename Item>
238+
inline std::optional<Item> AsyncRunner<Item>::get(Id id) const
239+
{
240+
std::unique_lock queue_lock(queue_mutex_);
241+
242+
if (running_id_ == id) {
243+
return running_item_;
244+
}
245+
246+
for (const auto& [item_id, item] : queue_) {
247+
if (item_id == id) {
248+
return item;
249+
}
250+
}
251+
252+
return std::nullopt;
253+
}
254+
255+
template <typename Item>
256+
inline std::optional<Item> AsyncRunner<Item>::get_running() const
257+
{
258+
std::unique_lock queue_lock(queue_mutex_);
259+
260+
if (running_id_ != 0) {
261+
return running_item_;
262+
}
263+
264+
return std::nullopt;
265+
}
266+
222267
template <typename Item>
223268
inline void AsyncRunner<Item>::clear()
224269
{

source/MaaFramework/Tasker/Tasker.cpp

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,13 @@ MaaTaskId Tasker::post_stop()
167167

168168
need_to_stop_ = true;
169169

170-
if (task_runner_ && task_runner_->running()) {
171-
task_runner_->clear();
172-
}
173-
if (running_task_) {
174-
running_task_->post_stop();
170+
if (task_runner_) {
171+
if (task_runner_->running()) {
172+
task_runner_->clear();
173+
}
174+
if (auto running_task = task_runner_->get_running()) {
175+
(*running_task)->post_stop();
176+
}
175177
}
176178
if (resource_) {
177179
resource_->post_stop();
@@ -323,11 +325,8 @@ bool Tasker::run_task(RunnerId runner_id, TaskPtr task_ptr)
323325
return false;
324326
}
325327

326-
running_task_ = task_ptr;
327-
OnScopeLeave([&] { running_task_ = nullptr; });
328-
329328
if (need_to_stop_) {
330-
running_task_->post_stop();
329+
task_ptr->post_stop();
331330
}
332331

333332
MaaTaskId task_id = task_ptr->task_id();
@@ -360,11 +359,27 @@ bool Tasker::run_task(RunnerId runner_id, TaskPtr task_ptr)
360359
}
361360
notifier_.notify(this, ret ? MaaMsg_Tasker_Task_Succeeded : MaaMsg_Tasker_Task_Failed, cb_detail);
362361

363-
running_task_ = nullptr;
364-
365362
return ret;
366363
}
367364

365+
bool Tasker::override_pipeline(MaaTaskId task_id, const json::value& pipeline_override)
366+
{
367+
LogFunc << VAR(task_id) << VAR(pipeline_override);
368+
369+
RunnerId runner_id = task_id_to_runner_id(task_id);
370+
if (runner_id == MaaInvalidId) {
371+
return false;
372+
}
373+
374+
auto task_ptr_opt = task_runner_->get(runner_id);
375+
if (!task_ptr_opt) {
376+
LogError << "task not found or already completed" << VAR(task_id) << VAR(runner_id);
377+
return false;
378+
}
379+
380+
return (*task_ptr_opt)->override_pipeline(pipeline_override);
381+
}
382+
368383
bool Tasker::check_stop()
369384
{
370385
if (!need_to_stop_) {

0 commit comments

Comments
 (0)