forked from rabbitmq/rabbitmq-java-client
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathCommandAssembler.java
More file actions
166 lines (143 loc) · 5.89 KB
/
CommandAssembler.java
File metadata and controls
166 lines (143 loc) · 5.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.client.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.UnexpectedFrameError;
/**
* Class responsible for piecing together a command from a series of {@link Frame}s.
* <p/><b>Concurrency</b><br/>
* This class is thread-safe, since all methods are synchronised. Callers should not
* synchronise on objects of this class unless they are sole owners.
* @see AMQCommand
*/
final class CommandAssembler {
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
/** Current state, used to decide how to handle each incoming frame. */
private enum CAState {
EXPECTING_METHOD, EXPECTING_CONTENT_HEADER, EXPECTING_CONTENT_BODY, COMPLETE
}
private CAState state;
/** The method for this command */
private Method method;
/** The content header for this command */
private AMQContentHeader contentHeader;
/** The fragments of this command's content body - a list of byte[] */
private final List<byte[]> bodyN;
/** sum of the lengths of all fragments */
private int bodyLength;
/** No bytes of content body not yet accumulated */
private long remainingBodyBytes;
public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) {
this.method = method;
this.contentHeader = contentHeader;
this.bodyN = new ArrayList<byte[]>(2);
this.bodyLength = 0;
this.remainingBodyBytes = 0;
appendBodyFragment(body);
if (method == null) {
this.state = CAState.EXPECTING_METHOD;
} else if (contentHeader == null) {
this.state = method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
} else {
this.remainingBodyBytes = contentHeader.getBodySize() - this.bodyLength;
updateContentBodyState();
}
}
public synchronized Method getMethod() {
return this.method;
}
public synchronized AMQContentHeader getContentHeader() {
return this.contentHeader;
}
/** @return true if the command is complete */
public synchronized boolean isComplete() {
return (this.state == CAState.COMPLETE);
}
/** Decides whether more body frames are expected */
private void updateContentBodyState() {
this.state = (this.remainingBodyBytes > 0) ? CAState.EXPECTING_CONTENT_BODY : CAState.COMPLETE;
}
private void consumeMethodFrame(Frame f) throws IOException {
if (f.getType() == AMQP.FRAME_METHOD) {
this.method = AMQImpl.readMethodFrom(f.getInputStream());
this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
} else {
throw new UnexpectedFrameError(f, AMQP.FRAME_METHOD);
}
}
private void consumeHeaderFrame(Frame f) throws IOException {
if (f.getType() == AMQP.FRAME_HEADER) {
this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
this.remainingBodyBytes = this.contentHeader.getBodySize();
updateContentBodyState();
} else {
throw new UnexpectedFrameError(f, AMQP.FRAME_HEADER);
}
}
private void consumeBodyFrame(Frame f) {
if (f.getType() == AMQP.FRAME_BODY) {
byte[] fragment = f.getPayload();
this.remainingBodyBytes -= fragment.length;
updateContentBodyState();
if (this.remainingBodyBytes < 0) {
throw new UnsupportedOperationException("%%%%%% FIXME unimplemented");
}
appendBodyFragment(fragment);
} else {
throw new UnexpectedFrameError(f, AMQP.FRAME_BODY);
}
}
/** Stitches together a fragmented content body into a single byte array */
private byte[] coalesceContentBody() {
if (this.bodyLength == 0) return EMPTY_BYTE_ARRAY;
if (this.bodyN.size() == 1) return this.bodyN.get(0);
byte[] body = new byte[bodyLength];
int offset = 0;
for (byte[] fragment : this.bodyN) {
System.arraycopy(fragment, 0, body, offset, fragment.length);
offset += fragment.length;
}
this.bodyN.clear();
this.bodyN.add(body);
return body;
}
public synchronized byte[] getContentBody() {
return coalesceContentBody();
}
private void appendBodyFragment(byte[] fragment) {
if (fragment == null || fragment.length == 0) return;
bodyN.add(fragment);
bodyLength += fragment.length;
}
/**
* @param f frame to be incorporated
* @return true if command becomes complete
* @throws IOException if error reading frame
*/
public synchronized boolean handleFrame(Frame f) throws IOException
{
switch (this.state) {
case EXPECTING_METHOD: consumeMethodFrame(f); break;
case EXPECTING_CONTENT_HEADER: consumeHeaderFrame(f); break;
case EXPECTING_CONTENT_BODY: consumeBodyFrame(f); break;
default:
throw new IllegalStateException("Bad Command State " + this.state);
}
return isComplete();
}
}