-
Notifications
You must be signed in to change notification settings - Fork 42
Expand file tree
/
Copy pathOutgoingPackageCreator.php
More file actions
99 lines (82 loc) · 2.85 KB
/
OutgoingPackageCreator.php
File metadata and controls
99 lines (82 loc) · 2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
<?php
namespace React\Stomp\Client;
use React\Stomp\Protocol\Frame;
class OutgoingPackageCreator
{
private $state;
public function __construct(State $state)
{
$this->state = $state;
}
public function connect($host, $login = null, $passcode = null, $heartbeat = null)
{
$this->state->startConnecting();
$headers = array('accept-version' => '1.1', 'host' => $host);
if (null !== $login || null !== $passcode) {
$headers = array_merge($headers, array(
'login' => (string) $login,
'passcode' => (string) $passcode,
));
}
if (null !== $heartbeat) {
$headers = array_merge($headers, array(
'heart-beat' => $heartbeat,
));
}
return new Frame('CONNECT', $headers);
}
public function send($destination, $body, array $headers = array())
{
$headers['destination'] = $destination;
$headers['content-length'] = strlen($body);
if (!isset($headers['content-type'])) {
$headers['content-type'] = 'text/plain';
}
return new Frame('SEND', $headers, $body);
}
public function subscribe($destination, $ack = 'auto', array $headers = array())
{
$headers['id'] = $this->state->subscriptions->add($destination, $ack);
$headers['destination'] = $destination;
$headers['ack'] = $ack;
return new Frame('SUBSCRIBE', $headers);
}
public function unsubscribe($subscriptionId, array $headers = array())
{
$headers['id'] = $subscriptionId;
return new Frame('UNSUBSCRIBE', $headers);
}
public function ack($subscriptionId, $messageId, array $headers = array())
{
$headers['subscription'] = $subscriptionId;
$headers['message-id'] = $messageId;
return new Frame('ACK', $headers);
}
public function nack($subscriptionId, $messageId, array $headers = array())
{
$headers['subscription'] = $subscriptionId;
$headers['message-id'] = $messageId;
return new Frame('NACK', $headers);
}
public function begin($transactionId, array $headers = array())
{
$headers['transaction'] = $transactionId;
return new Frame('BEGIN', $headers);
}
public function commit($transactionId, array $headers = array())
{
$headers['transaction'] = $transactionId;
return new Frame('COMMIT', $headers);
}
public function abort($transactionId, array $headers = array())
{
$headers['transaction'] = $transactionId;
return new Frame('ABORT', $headers);
}
public function disconnect($receipt, array $headers = array())
{
$this->state->startDisconnecting($receipt);
$headers['receipt'] = $receipt;
return new Frame('DISCONNECT', $headers);
}
}