Skip to content

Commit d71c071

Browse files
authored
impl(pubsub): inject trace context into message (#12868)
1 parent 2c7599a commit d71c071

5 files changed

Lines changed: 51 additions & 6 deletions

File tree

google/cloud/pubsub/internal/message_propagator.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2727

2828
void InjectTraceContext(
2929
pubsub::Message& message,
30-
opentelemetry::context::propagation::TextMapPropagator& propagator) {
30+
std::shared_ptr<
31+
opentelemetry::context::propagation::TextMapPropagator> const&
32+
propagator) {
3133
auto current = opentelemetry::context::RuntimeContext::GetCurrent();
3234
MessageCarrier carrier(message);
33-
propagator.Inject(carrier, current);
35+
propagator->Inject(carrier, current);
3436
}
3537

3638
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/pubsub/internal/message_propagator.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3232
*/
3333
void InjectTraceContext(
3434
pubsub::Message& message,
35-
opentelemetry::context::propagation::TextMapPropagator& propagator);
35+
std::shared_ptr<
36+
opentelemetry::context::propagation::TextMapPropagator> const&
37+
propagator);
3638

3739
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
3840
} // namespace pubsub_internal

google/cloud/pubsub/internal/message_propagator_test.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "opentelemetry/context/propagation/text_map_propagator.h"
2424
#include "opentelemetry/trace/scope.h"
2525
#include <gmock/gmock.h>
26+
#include <opentelemetry/trace/propagation/http_trace_context.h>
2627

2728
namespace google {
2829
namespace cloud {
@@ -32,6 +33,8 @@ namespace {
3233

3334
using ::google::cloud::testing_util::InstallSpanCatcher;
3435
using ::testing::_;
36+
using ::testing::Contains;
37+
using ::testing::Pair;
3538
using ::testing::StartsWith;
3639

3740
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> MakeTestSpan() {
@@ -43,9 +46,10 @@ TEST(MessagePropagatorTest, InjectTraceContext) {
4346
auto span_catcher = InstallSpanCatcher();
4447
opentelemetry::trace::Scope scope(MakeTestSpan());
4548
auto message = pubsub::MessageBuilder().Build();
46-
auto propagator = internal::MakePropagator();
49+
auto propagator =
50+
std::make_shared<opentelemetry::trace::propagation::HttpTraceContext>();
4751

48-
InjectTraceContext(message, *propagator);
52+
InjectTraceContext(message, propagator);
4953

5054
EXPECT_THAT(message.attributes(),
5155
Contains(Pair(StartsWith("googclient_"), _)));

google/cloud/pubsub/internal/publisher_tracing_connection.cc

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
#include <string>
2323
#include <utility>
2424
#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
25+
#include "google/cloud/pubsub/internal/message_propagator.h"
2526
#include "google/cloud/internal/opentelemetry.h"
2627
#include "google/cloud/status_or.h"
28+
#include <opentelemetry/context/propagation/text_map_propagator.h>
2729
#include <opentelemetry/nostd/shared_ptr.h>
30+
#include <opentelemetry/trace/propagation/http_trace_context.h>
2831
#include <opentelemetry/trace/scope.h>
2932
#include <opentelemetry/trace/semantic_conventions.h>
3033
#include <opentelemetry/trace/span_metadata.h>
@@ -78,13 +81,19 @@ class PublisherTracingConnection : public pubsub::PublisherConnection {
7881
public:
7982
explicit PublisherTracingConnection(
8083
pubsub::Topic topic, std::shared_ptr<pubsub::PublisherConnection> child)
81-
: topic_(std::move(topic)), child_(std::move(child)) {}
84+
: topic_(std::move(topic)),
85+
child_(std::move(child)),
86+
propagator_(std::make_shared<
87+
opentelemetry::trace::propagation::HttpTraceContext>()) {}
8288

8389
~PublisherTracingConnection() override = default;
8490

8591
future<StatusOr<std::string>> Publish(PublishParams p) override {
8692
auto span = StartPublishSpan(topic_.FullName(), p.message);
8793
auto scope = opentelemetry::trace::Scope(span);
94+
95+
InjectTraceContext(p.message, propagator_);
96+
8897
return EndPublishSpan(std::move(span), child_->Publish(std::move(p)));
8998
};
9099

@@ -105,6 +114,8 @@ class PublisherTracingConnection : public pubsub::PublisherConnection {
105114
private:
106115
pubsub::Topic const topic_;
107116
std::shared_ptr<pubsub::PublisherConnection> child_;
117+
std::shared_ptr<opentelemetry::context::propagation::TextMapPropagator>
118+
propagator_;
108119
};
109120

110121
} // namespace

google/cloud/pubsub/internal/publisher_tracing_connection_test.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,11 @@ using ::google::cloud::testing_util::StatusIs;
5151
using ::google::cloud::testing_util::ThereIsAnActiveSpan;
5252
using ::testing::_;
5353
using ::testing::AllOf;
54+
using ::testing::Contains;
5455
using ::testing::ElementsAre;
5556
using ::testing::Not;
57+
using ::testing::Pair;
58+
using ::testing::StartsWith;
5659

5760
TEST(PublisherTracingConnectionTest, PublishSpanOnSuccess) {
5861
namespace sc = ::opentelemetry::trace::SemanticConventions;
@@ -139,6 +142,29 @@ TEST(PublisherTracingConnectionTest, PublishSpanOnError) {
139142
"messaging.message.total_size_bytes", 45)))));
140143
}
141144

145+
TEST(PublisherTracingConnectionTest, PublishInjectsTraceContext) {
146+
auto mock = std::make_shared<MockPublisherConnection>();
147+
// Need to install the span catcher so ThereIsAnActiveSpan() detects a span.
148+
auto span_catcher = InstallSpanCatcher();
149+
EXPECT_CALL(*mock, Publish)
150+
.WillOnce([&](pubsub::PublisherConnection::PublishParams const& p) {
151+
EXPECT_TRUE(ThereIsAnActiveSpan());
152+
// We need to test the trace context has been injected here, since the
153+
// connection moves the message to the child connection.
154+
EXPECT_THAT(p.message.attributes(),
155+
Contains(Pair(StartsWith("googclient_"), _)));
156+
return make_ready_future(StatusOr<std::string>("test-id-0"));
157+
});
158+
auto connection = MakePublisherTracingConnection(
159+
Topic("test-project", "test-topic"), std::move(mock));
160+
161+
auto message = pubsub::MessageBuilder{}
162+
.SetData("test-data-0")
163+
.SetOrderingKey("ordering-key-0")
164+
.Build();
165+
auto response = connection->Publish({message}).get();
166+
}
167+
142168
TEST(PublisherTracingConnectionTest, PublishSpanOmitsOrderingKey) {
143169
auto span_catcher = InstallSpanCatcher();
144170
auto mock = std::make_shared<MockPublisherConnection>();

0 commit comments

Comments
 (0)