-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathPsrClickHouseAsyncClient.php
More file actions
123 lines (107 loc) · 3.57 KB
/
PsrClickHouseAsyncClient.php
File metadata and controls
123 lines (107 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
<?php
declare(strict_types=1);
namespace SimPod\ClickHouseClient\Client;
use Exception;
use GuzzleHttp\Promise\Create;
use GuzzleHttp\Promise\PromiseInterface;
use Http\Client\HttpAsyncClient;
use Psr\Http\Message\ResponseInterface;
use SimPod\ClickHouseClient\Client\Http\RequestFactory;
use SimPod\ClickHouseClient\Client\Http\RequestOptions;
use SimPod\ClickHouseClient\Client\Http\RequestSettings;
use SimPod\ClickHouseClient\Exception\ServerError;
use SimPod\ClickHouseClient\Format\Format;
use SimPod\ClickHouseClient\Logger\SqlLogger;
use SimPod\ClickHouseClient\Output\Output;
use SimPod\ClickHouseClient\Sql\SqlFactory;
use SimPod\ClickHouseClient\Sql\ValueFormatter;
use function uniqid;
readonly class PsrClickHouseAsyncClient implements ClickHouseAsyncClient
{
private SqlFactory $sqlFactory;
/** @param array<string, float|int|string> $defaultSettings */
public function __construct(
private HttpAsyncClient $asyncClient,
private RequestFactory $requestFactory,
private SqlLogger|null $sqlLogger = null,
private array $defaultSettings = [],
) {
$this->sqlFactory = new SqlFactory(new ValueFormatter());
}
/**
* {@inheritDoc}
*
* @throws Exception
*/
public function select(string $query, Format $outputFormat, array $settings = []): PromiseInterface
{
return $this->selectWithParams($query, [], $outputFormat, $settings);
}
/**
* {@inheritDoc}
*
* @throws Exception
*/
public function selectWithParams(
string $query,
array $params,
Format $outputFormat,
array $settings = [],
): PromiseInterface {
$formatClause = $outputFormat::toSql();
$sql = $this->sqlFactory->createWithParameters($query, $params);
return $this->executeRequest(
<<<CLICKHOUSE
$sql
$formatClause
CLICKHOUSE,
params: $params,
settings: $settings,
processResponse: static fn (ResponseInterface $response): Output => $outputFormat::output(
$response->getBody()->__toString(),
),
);
}
/**
* @param array<string, mixed> $params
* @param array<string, float|int|string> $settings
* @param (callable(ResponseInterface):mixed)|null $processResponse
*
* @throws Exception
*/
private function executeRequest(
string $sql,
array $params,
array $settings = [],
callable|null $processResponse = null,
): PromiseInterface {
$request = $this->requestFactory->prepareSqlRequest(
$sql,
new RequestSettings(
$this->defaultSettings,
$settings,
),
new RequestOptions(
$params,
),
);
$id = uniqid('', true);
$this->sqlLogger?->startQuery($id, $sql);
return Create::promiseFor(
$this->asyncClient->sendAsyncRequest($request),
)
->then(
function (ResponseInterface $response) use ($id, $processResponse) {
$this->sqlLogger?->stopQuery($id);
if ($response->getStatusCode() !== 200) {
throw ServerError::fromResponse($response);
}
if ($processResponse === null) {
return $response;
}
return $processResponse($response);
},
fn () => $this->sqlLogger?->stopQuery($id),
);
}
}