-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathudf_agent.h
More file actions
84 lines (62 loc) · 2.25 KB
/
udf_agent.h
File metadata and controls
84 lines (62 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#pragma once
#include <functional>
#include <istream>
#include <memory>
#include <mutex>
#include <thread>
#include <uvw.hpp>
#include "request_handlers/request_handler.h"
#include "server/unix_socket_client.h"
#include "utils/rw_utils.h"
#include "udf.pb.h"
namespace stream_data_processor {
namespace kapacitor_udf {
class IUDFAgent {
public:
virtual void start() = 0;
virtual void stop() = 0;
virtual void writeResponse(const agent::Response& response) const = 0;
virtual ~IUDFAgent() = 0;
protected:
IUDFAgent() = default;
IUDFAgent(const IUDFAgent& /* non-used */) = default;
IUDFAgent& operator=(const IUDFAgent& /* non-used */) = default;
IUDFAgent(IUDFAgent&& /* non-used */) = default;
IUDFAgent& operator=(IUDFAgent&& /* non-used */) = default;
};
class RequestHandler;
template <typename UVWHandleType, typename LibuvHandleType>
class UDFAgent : public IUDFAgent {
public:
explicit UDFAgent(uvw::Loop* loop);
UDFAgent(
std::shared_ptr<uvw::StreamHandle<UVWHandleType, LibuvHandleType>> in,
std::shared_ptr<uvw::StreamHandle<UVWHandleType, LibuvHandleType>> out);
void setHandler(std::shared_ptr<RequestHandler> request_handler);
void start() override;
void stop() override;
void writeResponse(const agent::Response& response) const override;
private:
void reportError(const std::string& error_message);
void handleRequest(const agent::Request& request) const;
private:
const std::function<void(const agent::Request&)> handle_function_{
[this](const agent::Request& request) { handleRequest(request); }};
std::shared_ptr<uvw::StreamHandle<UVWHandleType, LibuvHandleType>> in_;
std::shared_ptr<uvw::StreamHandle<UVWHandleType, LibuvHandleType>> out_;
std::unique_ptr<rw_utils::IKapacitorRequestReader>
kapacitor_request_reader_;
std::shared_ptr<RequestHandler> request_handler_;
};
using ChildProcessBasedUDFAgent = UDFAgent<uvw::TTYHandle, uv_tty_t>;
using SocketBasedUDFAgent = UDFAgent<uvw::PipeHandle, uv_pipe_t>;
class AgentClient : public UnixSocketClient {
public:
explicit AgentClient(std::shared_ptr<IUDFAgent> agent);
void start() override;
void stop() override;
private:
std::shared_ptr<IUDFAgent> agent_;
};
} // namespace kapacitor_udf
} // namespace stream_data_processor