Skip to content

Commit ae1405c

Browse files
Magesh Chandramouliashishagg
authored andcommitted
adding gzip support in pitchfork agent (#66)
1 parent 786bf91 commit ae1405c

5 files changed

Lines changed: 109 additions & 41 deletions

File tree

agent-providers/span/src/main/java/com/expedia/www/haystack/agent/pitchfork/service/PitchforkService.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.eclipse.jetty.server.HttpConnectionFactory;
2626
import org.eclipse.jetty.server.Server;
2727
import org.eclipse.jetty.server.ServerConnector;
28+
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
2829
import org.eclipse.jetty.servlet.ServletContextHandler;
2930
import org.eclipse.jetty.servlet.ServletHolder;
3031
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@@ -34,11 +35,10 @@
3435
public class PitchforkService {
3536
private final static Logger logger = LoggerFactory.getLogger(PitchforkService.class);
3637
private final Server server;
37-
private final int port;
38+
private final HttpConfig cfg;
3839

3940
public PitchforkService(final Config config, final ZipkinSpanProcessorFactory processorFactory) {
40-
final HttpConfig cfg = HttpConfig.from(config);
41-
this.port = cfg.getPort();
41+
this.cfg = HttpConfig.from(config);
4242
final QueuedThreadPool threadPool = new QueuedThreadPool(cfg.getMaxThreads(), cfg.getMinThreads(), cfg.getIdleTimeout());
4343
server = new Server(threadPool);
4444

@@ -50,6 +50,12 @@ public PitchforkService(final Config config, final ZipkinSpanProcessorFactory pr
5050
final ServletContextHandler context = new ServletContextHandler(server, "/");
5151
addResources(context, processorFactory);
5252

53+
if (cfg.isGzipEnabled()) {
54+
final GzipHandler gzipHandler = new GzipHandler();
55+
gzipHandler.setInflateBufferSize(cfg.getGzipBufferSize());
56+
context.setGzipHandler(gzipHandler);
57+
}
58+
5359
server.setStopTimeout(cfg.getStopTimeout());
5460
logger.info("pitchfork has been initialized successfully !");
5561
}
@@ -66,7 +72,7 @@ private void addResources(final ServletContextHandler context, final ZipkinSpanP
6672

6773
public void start() throws Exception {
6874
server.start();
69-
logger.info("pitchfork has been started on port {} ....", port);
75+
logger.info("pitchfork has been started on port {} ....", cfg.getPort());
7076
}
7177

7278
public void stop() throws Exception {

agent-providers/span/src/main/java/com/expedia/www/haystack/agent/pitchfork/service/config/HttpConfig.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,24 +26,32 @@ public class HttpConfig {
2626
private final static String MIN_THREADS_CONFIG_KEY = "http.threads.min";
2727
private final static String IDLE_TIMEOUT_MILLIS_CONFIG_KEY = "idle.timeout.ms";
2828
private final static String STOP_TIMEOUT_MILLIS_CONFIG_KEY = "stop.timeout.ms";
29+
private final static String GZIP_ENABLED_KEY = "gzip.enabled";
30+
private final static String GZIP_BUFFER_SIZE = "gzip.buffer.size";
2931

30-
private int port;
31-
private int maxThreads;
32-
private int minThreads;
33-
private int idleTimeout;
34-
private int stopTimeout;
3532

36-
HttpConfig(int port, int minThreads, int maxThreads, int idleTimeout, int stopTimeout) {
33+
private final int port;
34+
private final int maxThreads;
35+
private final int minThreads;
36+
private final int idleTimeout;
37+
private final int stopTimeout;
38+
private final boolean gzipEnabled;
39+
private final int gzipBufferSize;
40+
41+
HttpConfig(int port, int minThreads, int maxThreads, int idleTimeout, int stopTimeout, boolean gzipEnabled, int gzipBufferSize) {
3742
Validate.isTrue(minThreads <= maxThreads, "min threads has to be less than or equal to max threads count");
3843
Validate.isTrue(port > 0, "http port should be > 0");
3944
Validate.isTrue(idleTimeout > 0, "idle timeout should be > 0");
4045
Validate.isTrue(stopTimeout > 0, "stop timeout should be > 0");
46+
Validate.isTrue(gzipBufferSize > 0, "gzipbufferSize should be > 0");
4147

4248
this.port = port;
4349
this.maxThreads = maxThreads;
4450
this.minThreads = minThreads;
4551
this.idleTimeout = idleTimeout;
4652
this.stopTimeout = stopTimeout;
53+
this.gzipEnabled = gzipEnabled;
54+
this.gzipBufferSize = gzipBufferSize;
4755
}
4856

4957
public int getPort() {
@@ -66,13 +74,23 @@ public int getStopTimeout() {
6674
return stopTimeout;
6775
}
6876

77+
public boolean isGzipEnabled() {
78+
return gzipEnabled;
79+
}
80+
81+
public int getGzipBufferSize() {
82+
return gzipBufferSize;
83+
}
84+
6985
@SuppressWarnings("PMD.NPathComplexity")
7086
public static HttpConfig from(Config config) {
71-
int port = config.hasPath(PORT_CONFIG_KEY) ? config.getInt(PORT_CONFIG_KEY) : 9411;
72-
int maxThreads = config.hasPath(MAX_THREADS_CONFIG_KEY) ? config.getInt(MAX_THREADS_CONFIG_KEY) : 16;
73-
int minThreads = config.hasPath(MIN_THREADS_CONFIG_KEY) ? config.getInt(MIN_THREADS_CONFIG_KEY) : 2;
74-
int idleTimeout = config.hasPath(IDLE_TIMEOUT_MILLIS_CONFIG_KEY) ? config.getInt(IDLE_TIMEOUT_MILLIS_CONFIG_KEY) : 60000;
75-
int stopTimeout = config.hasPath(STOP_TIMEOUT_MILLIS_CONFIG_KEY) ? config.getInt(STOP_TIMEOUT_MILLIS_CONFIG_KEY) : 30000;
76-
return new HttpConfig(port, minThreads, maxThreads, idleTimeout, stopTimeout);
87+
final int port = config.hasPath(PORT_CONFIG_KEY) ? config.getInt(PORT_CONFIG_KEY) : 9411;
88+
final int maxThreads = config.hasPath(MAX_THREADS_CONFIG_KEY) ? config.getInt(MAX_THREADS_CONFIG_KEY) : 16;
89+
final int minThreads = config.hasPath(MIN_THREADS_CONFIG_KEY) ? config.getInt(MIN_THREADS_CONFIG_KEY) : 2;
90+
final int idleTimeout = config.hasPath(IDLE_TIMEOUT_MILLIS_CONFIG_KEY) ? config.getInt(IDLE_TIMEOUT_MILLIS_CONFIG_KEY) : 60000;
91+
final int stopTimeout = config.hasPath(STOP_TIMEOUT_MILLIS_CONFIG_KEY) ? config.getInt(STOP_TIMEOUT_MILLIS_CONFIG_KEY) : 30000;
92+
final int gzipBufferSize = config.hasPath(GZIP_BUFFER_SIZE) ? config.getInt(GZIP_BUFFER_SIZE) : 16384;
93+
final boolean gzipEnabled = config.hasPath(GZIP_ENABLED_KEY) && config.getBoolean(GZIP_ENABLED_KEY);
94+
return new HttpConfig(port, minThreads, maxThreads, idleTimeout, stopTimeout, gzipEnabled, gzipBufferSize);
7795
}
7896
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.expedia.www.haystack.agent.pitchfork
2+
3+
import java.io.IOException
4+
5+
import com.expedia.www.haystack.agent.pitchfork.service.config.HttpConfig
6+
import com.typesafe.config.ConfigFactory
7+
import org.scalatest.{FunSpec, Matchers}
8+
import org.scalatest.easymock.EasyMockSugar
9+
10+
import scala.collection.JavaConverters._
11+
12+
class HttpConfigSpec extends FunSpec with Matchers with EasyMockSugar {
13+
describe("Http configuration provider") {
14+
it("should return gzip enabled as true if provided and its value is 'true'") {
15+
val config = ConfigFactory.parseMap(Map("port" -> 9115, "http.threads.min" -> 2, "http.threads.max" -> 4, "gzip.enabled" -> true).asJava)
16+
val httpConfig = HttpConfig.from(config)
17+
httpConfig.isGzipEnabled should equal (true)
18+
}
19+
it("should return gzip enabled as false if not provided") {
20+
val config = ConfigFactory.parseMap(Map("port" -> 9115, "http.threads.min" -> 2, "http.threads.max" -> 4).asJava)
21+
val httpConfig = HttpConfig.from(config)
22+
httpConfig.isGzipEnabled should equal (false)
23+
}
24+
it("should return gzip buffer as 16Kb if not provided") {
25+
val config = ConfigFactory.parseMap(Map("port" -> 9115, "http.threads.min" -> 2, "http.threads.max" -> 4).asJava)
26+
val httpConfig = HttpConfig.from(config)
27+
httpConfig.getGzipBufferSize should equal (16 * 1024)
28+
}
29+
}
30+
}

agent-providers/span/src/test/scala/com/expedia/www/haystack/agent/pitchfork/PitchforkServiceSpec.scala

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package com.expedia.www.haystack.agent.pitchfork
1919

20+
import java.io.ByteArrayOutputStream
2021
import java.util
2122
import java.util.Collections
23+
import java.util.zip.GZIPOutputStream
2224

23-
import com.expedia.open.tracing.Span
2425
import com.expedia.www.haystack.agent.core.Dispatcher
2526
import com.expedia.www.haystack.agent.pitchfork.processors.{SpanValidator, ZipkinSpanProcessorFactory}
2627
import com.expedia.www.haystack.agent.pitchfork.service.PitchforkService
@@ -54,8 +55,16 @@ class PitchforkServiceSpec extends FunSpec with Matchers with EasyMockSugar {
5455

5556
describe("Pitchfork Agent Http service") {
5657
it("should dispatch the zipkinv2 span successfully") {
58+
runZipkinV2SpanTest(false)
59+
}
60+
61+
it("should dispatch compressed zipkinv2 span successfully") {
62+
runZipkinV2SpanTest(true)
63+
}
64+
65+
def runZipkinV2SpanTest(compress: Boolean) : Unit = {
5766
val mockDispatcher = mock[Dispatcher]
58-
val config = ConfigFactory.parseMap(Map("port" -> 9115, "http.threads.min" -> 2, "http.threads.max" -> 4).asJava)
67+
val config = ConfigFactory.parseMap(Map("port" -> 9115, "http.threads.min" -> 2, "http.threads.max" -> 4, "gzip.enabled" -> compress).asJava)
5968

6069
val keyCapture = EasyMock.newCapture[Array[Byte]]()
6170
val haystackSpanCapture = EasyMock.newCapture[Array[Byte]]()
@@ -74,34 +83,39 @@ class PitchforkServiceSpec extends FunSpec with Matchers with EasyMockSugar {
7483
// let the server start
7584
Thread.sleep(5000)
7685

77-
val body = RequestBody.create(
78-
MediaType.parse("application/json"), SpanBytesEncoder.JSON_V2.encode(zipkinSpan("0000000000000064")))
79-
val request = new Request.Builder()
80-
.url("http://localhost:9115" + "/api/v2/spans")
81-
.post(body)
82-
.build()
86+
val request = newRequest(compress)
8387

8488
val response = client.newCall(request).execute()
85-
// response.code() shouldBe 200
86-
//
87-
// new String(keyCapture.getValue) shouldEqual "0000000000000064"
88-
// val haystackSpan = Span.parseFrom(haystackSpanCapture.getValue)
89-
// haystackSpan.getTraceId shouldEqual "0000000000000064"
90-
// haystackSpan.getSpanId shouldEqual "0000000000000001"
91-
// haystackSpan.getParentSpanId shouldEqual "0000000000000002"
92-
// haystackSpan.getOperationName shouldEqual "/foo"
93-
// haystackSpan.getServiceName shouldEqual "foo"
94-
// haystackSpan.getDuration shouldBe 100000l
95-
// haystackSpan.getStartTime should be >((System.currentTimeMillis() - 20000) * 1000)
96-
// haystackSpan.getTagsCount shouldBe 6
97-
// haystackSpan.getTagsList.asScala.find(t => t.getKey == "pos").get.getVStr shouldEqual "1"
98-
// haystackSpan.getTagsList.asScala.find(t => t.getKey == "error").get.getVBool shouldBe true
99-
// haystackSpan.getTagsList.asScala.find(t => t.getKey == "remote.service.name").get.getVStr shouldBe "bar"
100-
// haystackSpan.getTagsList.asScala.find(t => t.getKey == "remote.service.port").get.getVLong shouldBe 8080
89+
response.code() shouldBe 200
10190
service.stop()
10291
}
10392
}
10493

94+
def newRequest(compress: Boolean) : Request = {
95+
val requestBuilder = new Request.Builder()
96+
.url("http://localhost:9115" + "/api/v2/spans")
97+
98+
var data = SpanBytesEncoder.JSON_V2.encode(zipkinSpan("0000000000000064"))
99+
if (compress) {
100+
val bos = new ByteArrayOutputStream()
101+
val gzip = new GZIPOutputStream(bos)
102+
try {
103+
gzip.write(data)
104+
gzip.finish()
105+
} finally {
106+
gzip.close()
107+
bos.close()
108+
}
109+
data = bos.toByteArray
110+
requestBuilder.addHeader("Content-Encoding", "gzip")
111+
}
112+
113+
val body = RequestBody.create(MediaType.parse("application/json"), data)
114+
requestBuilder.post(body)
115+
116+
requestBuilder.build()
117+
}
118+
105119
it("should dispatch the proto spans successfully") {
106120
val mockDispatcher = mock[Dispatcher]
107121
val config = ConfigFactory.parseMap(Map("port" -> 9112, "http.threads.min" -> 2, "http.threads.max" -> 4).asJava)

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494

9595
<aws-sdk.version>1.11.128</aws-sdk.version>
9696
<typesafe-config.version>1.3.1</typesafe-config.version>
97-
<commons-io.version>2.5</commons-io.version>
97+
<commons-io.version>2.6</commons-io.version>
9898

9999
<maven-pmd-plugin.version>3.0.1</maven-pmd-plugin.version>
100100
<pmd.version>5.1.3</pmd.version>

0 commit comments

Comments
 (0)