Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ext/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ add_library(
rcb_users.cxx
rcb_utils.cxx
rcb_version.cxx
rcb_views.cxx)
rcb_views.cxx
rcb_observability.cxx)
target_include_directories(couchbase PRIVATE ${PROJECT_BINARY_DIR}/generated)
target_include_directories(
couchbase
Expand Down
2 changes: 1 addition & 1 deletion ext/couchbase
56 changes: 3 additions & 53 deletions ext/rcb_backend.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <core/cluster.hxx>
#include <core/logger/logger.hxx>
#include <core/tracing/wrapper_sdk_tracer.hxx>
#include <core/utils/connection_string.hxx>

#include <asio/io_context.hpp>
Expand Down Expand Up @@ -422,59 +423,8 @@ initialize_cluster_options(const core::utils::connection_string& connstr,
cluster_options.network().max_http_connections(param.value());
}

static const auto sym_enable_tracing = rb_id2sym(rb_intern("enable_tracing"));
if (auto param = options::get_bool(options, sym_enable_tracing); param) {
cluster_options.tracing().enable(param.value());
}
static const auto sym_orphaned_emit_interval = rb_id2sym(rb_intern("orphaned_emit_interval"));
if (auto param = options::get_milliseconds(options, sym_orphaned_emit_interval); param) {
cluster_options.tracing().orphaned_emit_interval(param.value());
}

static const auto sym_orphaned_sample_size = rb_id2sym(rb_intern("orphaned_sample_size"));
if (auto param = options::get_size_t(options, sym_orphaned_sample_size); param) {
cluster_options.tracing().orphaned_sample_size(param.value());
}

static const auto sym_threshold_emit_interval = rb_id2sym(rb_intern("threshold_emit_interval"));
if (auto param = options::get_milliseconds(options, sym_threshold_emit_interval); param) {
cluster_options.tracing().threshold_emit_interval(param.value());
}

static const auto sym_threshold_sample_size = rb_id2sym(rb_intern("threshold_sample_size"));
if (auto param = options::get_size_t(options, sym_threshold_sample_size); param) {
cluster_options.tracing().threshold_sample_size(param.value());
}

static const auto sym_key_value_threshold = rb_id2sym(rb_intern("key_value_threshold"));
if (auto param = options::get_milliseconds(options, sym_key_value_threshold); param) {
cluster_options.tracing().key_value_threshold(param.value());
}

static const auto sym_query_threshold = rb_id2sym(rb_intern("query_threshold"));
if (auto param = options::get_milliseconds(options, sym_query_threshold); param) {
cluster_options.tracing().query_threshold(param.value());
}

static const auto sym_view_threshold = rb_id2sym(rb_intern("view_threshold"));
if (auto param = options::get_milliseconds(options, sym_view_threshold); param) {
cluster_options.tracing().view_threshold(param.value());
}

static const auto sym_search_threshold = rb_id2sym(rb_intern("search_threshold"));
if (auto param = options::get_milliseconds(options, sym_search_threshold); param) {
cluster_options.tracing().search_threshold(param.value());
}

static const auto sym_analytics_threshold = rb_id2sym(rb_intern("analytics_threshold"));
if (auto param = options::get_milliseconds(options, sym_analytics_threshold); param) {
cluster_options.tracing().analytics_threshold(param.value());
}

static const auto sym_management_threshold = rb_id2sym(rb_intern("management_threshold"));
if (auto param = options::get_milliseconds(options, sym_management_threshold); param) {
cluster_options.tracing().management_threshold(param.value());
}
cluster_options.tracing().tracer(
std::make_shared<couchbase::core::tracing::wrapper_sdk_tracer>());

static const auto sym_enable_metrics = rb_id2sym(rb_intern("enable_metrics"));
if (auto param = options::get_bool(options, sym_enable_metrics); param) {
Expand Down
15 changes: 11 additions & 4 deletions ext/rcb_crud.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <ruby.h>

#include "rcb_backend.hxx"
#include "rcb_observability.hxx"
#include "rcb_utils.hxx"

namespace couchbase
Expand Down Expand Up @@ -77,7 +78,8 @@ cb_Backend_document_get(VALUE self,
VALUE scope,
VALUE collection,
VALUE id,
VALUE options)
VALUE options,
VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -96,12 +98,14 @@ cb_Backend_document_get(VALUE self,

core::operations::get_request req{ doc_id };
cb_extract_timeout(req, options);
auto parent_span = cb_create_parent_span(req, self);
std::promise<core::operations::get_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span), resp.ctx.retry_attempts());
if (resp.ctx.ec()) {
cb_throw_error(resp.ctx, "unable to fetch document");
}
Expand Down Expand Up @@ -235,7 +239,8 @@ cb_Backend_document_get_projected(VALUE self,
VALUE scope,
VALUE collection,
VALUE id,
VALUE options)
VALUE options,
VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -257,6 +262,7 @@ cb_Backend_document_get_projected(VALUE self,

core::operations::get_projected_request req{ doc_id };
cb_extract_timeout(req, options);
auto parent_span = cb_create_parent_span(req, self);
cb_extract_option_bool(req.with_expiry, options, "with_expiry");
cb_extract_option_bool(req.preserve_array_indexes, options, "preserve_array_indexes");
VALUE projections = Qnil;
Expand All @@ -280,6 +286,7 @@ cb_Backend_document_get_projected(VALUE self,
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span), resp.ctx.retry_attempts());
if (resp.ctx.ec()) {
cb_throw_error(resp.ctx, "unable fetch with projections");
}
Expand Down Expand Up @@ -1540,10 +1547,10 @@ cb_Backend_document_mutate_in(VALUE self,
void
init_crud(VALUE cBackend)
{
rb_define_method(cBackend, "document_get", cb_Backend_document_get, 5);
rb_define_method(cBackend, "document_get", cb_Backend_document_get, 6);
rb_define_method(cBackend, "document_get_any_replica", cb_Backend_document_get_any_replica, 5);
rb_define_method(cBackend, "document_get_all_replicas", cb_Backend_document_get_all_replicas, 5);
rb_define_method(cBackend, "document_get_projected", cb_Backend_document_get_projected, 5);
rb_define_method(cBackend, "document_get_projected", cb_Backend_document_get_projected, 6);
rb_define_method(cBackend, "document_get_and_lock", cb_Backend_document_get_and_lock, 6);
rb_define_method(cBackend, "document_get_and_touch", cb_Backend_document_get_and_touch, 6);
rb_define_method(cBackend, "document_insert", cb_Backend_document_insert, 7);
Expand Down
79 changes: 79 additions & 0 deletions ext/rcb_observability.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2025-Present Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "rcb_utils.hxx"

#include <core/tracing/wrapper_sdk_tracer.hxx>

#include <ruby.h>

#include <chrono>
#include <memory>

namespace couchbase::ruby
{
void
cb_add_core_spans(VALUE observability_handler,
std::shared_ptr<couchbase::core::tracing::wrapper_sdk_span> parent_span,
std::size_t retry_attempts)
{
const auto children = parent_span->children();
VALUE spans = rb_ary_new_capa(static_cast<long>(children.size()));

for (const auto& child : parent_span->children()) {
VALUE span = rb_hash_new();

static VALUE sym_name = rb_id2sym(rb_intern("name"));
static VALUE sym_attributes = rb_id2sym(rb_intern("attributes"));
static VALUE sym_start_timestamp = rb_id2sym(rb_intern("start_timestamp"));
static VALUE sym_end_timestamp = rb_id2sym(rb_intern("end_timestamp"));

VALUE attributes = rb_hash_new();

for (const auto& [key, value] : child->uint_tags()) {
rb_hash_aset(attributes, cb_str_new(key), ULL2NUM(value));
}

for (const auto& [key, value] : child->string_tags()) {
rb_hash_aset(attributes, cb_str_new(key), cb_str_new(value));
}

rb_hash_aset(span, sym_name, cb_str_new(child->name()));
rb_hash_aset(span, sym_attributes, attributes);
rb_hash_aset(span,
sym_start_timestamp,
LL2NUM(std::chrono::duration_cast<std::chrono::microseconds>(
child->start_time().time_since_epoch())
.count()));
rb_hash_aset(span,
sym_end_timestamp,
LL2NUM(std::chrono::duration_cast<std::chrono::microseconds>(
child->end_time().time_since_epoch())
.count()));

rb_ary_push(spans, span);
}

static ID add_backend_spans_func = rb_intern("add_spans_from_backend");
rb_funcall(observability_handler, add_backend_spans_func, 1, spans);

if (retry_attempts > 0) {
static ID add_retries_func = rb_intern("add_retries");
rb_funcall(observability_handler, add_retries_func, ULONG2NUM(retry_attempts));
}
}
} // namespace couchbase::ruby
39 changes: 39 additions & 0 deletions ext/rcb_observability.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2025-Present Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <core/tracing/wrapper_sdk_tracer.hxx>

#include <memory>

namespace couchbase::ruby
{
template<typename Request>
inline auto
cb_create_parent_span(Request& req, VALUE backend)
-> std::shared_ptr<couchbase::core::tracing::wrapper_sdk_span>
{
// TODO(Tracing): Conditionally set the parent span only if tracing is enabled
auto span = std::make_shared<couchbase::core::tracing::wrapper_sdk_span>();
req.parent_span = span;
return span;
}

void
cb_add_core_spans(VALUE observability_handler,
std::shared_ptr<couchbase::core::tracing::wrapper_sdk_span> parent_span,
std::size_t retry_attempts);
} // namespace couchbase::ruby
69 changes: 38 additions & 31 deletions lib/couchbase/bucket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,20 @@ class Bucket
alias inspect to_s

# @param [Couchbase::Backend] backend
def initialize(backend, name)
#
# @api private
def initialize(backend, name, observability)
backend.open_bucket(name, true)
@backend = backend
@name = name
@observability = observability
end

# Get default scope
#
# @return [Scope]
def default_scope
Scope.new(@backend, @name, "_default")
Scope.new(@backend, @name, "_default", @observability)
end

# Get a named scope
Expand All @@ -49,7 +52,7 @@ def default_scope
#
# @return [Scope]
def scope(scope_name)
Scope.new(@backend, @name, scope_name)
Scope.new(@backend, @name, scope_name, @observability)
end

# Opens the named collection in the default scope of the bucket
Expand All @@ -65,7 +68,7 @@ def collection(collection_name)
#
# @return [Collection]
def default_collection
Collection.new(@backend, @name, "_default", "_default")
Collection.new(@backend, @name, "_default", "_default", @observability)
end

# Performs query to view index.
Expand All @@ -83,30 +86,32 @@ def default_collection
#
# @return [ViewResult]
def view_query(design_document_name, view_name, options = Options::View::DEFAULT)
resp = @backend.document_view(@name, design_document_name, view_name, options.namespace, options.to_backend)
ViewResult.new do |res|
res.meta_data = ViewMetaData.new do |meta|
meta.total_rows = resp[:meta][:total_rows]
meta.debug_info = resp[:meta][:debug_info]
end
res.rows = resp[:rows].map do |entry|
ViewRow.new do |row|
row.id = entry[:id] if entry.key?(:id)
row.key = JSON.parse(entry[:key])
row.value = JSON.parse(entry[:value])
@observability.record_operation(Observability::OP_VIEW_QUERY, opts.parent_span, self, :views) do |_obs_handler|
resp = @backend.document_view(@name, design_document_name, view_name, options.namespace, options.to_backend)
ViewResult.new do |res|
res.meta_data = ViewMetaData.new do |meta|
meta.total_rows = resp[:meta][:total_rows]
meta.debug_info = resp[:meta][:debug_info]
end
res.rows = resp[:rows].map do |entry|
ViewRow.new do |row|
row.id = entry[:id] if entry.key?(:id)
row.key = JSON.parse(entry[:key])
row.value = JSON.parse(entry[:value])
end
end
end
end
end

# @return [Management::CollectionManager]
def collections
Management::CollectionManager.new(@backend, @name)
Management::CollectionManager.new(@backend, @name, @observability)
end

# @return [Management::ViewIndexManager]
def view_indexes
Management::ViewIndexManager.new(@backend, @name)
Management::ViewIndexManager.new(@backend, @name, @observability)
end

# Performs application-level ping requests against services in the couchbase cluster
Expand All @@ -115,20 +120,22 @@ def view_indexes
#
# @return [PingResult]
def ping(options = Options::Ping::DEFAULT)
resp = @backend.ping(@name, options.to_backend)
PingResult.new do |res|
res.version = resp[:version]
res.id = resp[:id]
res.sdk = resp[:sdk]
resp[:services].each do |type, svcs|
res.services[type] = svcs.map do |svc|
PingResult::ServiceInfo.new do |info|
info.id = svc[:id]
info.state = svc[:state]
info.latency = svc[:latency]
info.remote = svc[:remote]
info.local = svc[:local]
info.error = svc[:error]
@observability.record_operation(Observability::OP_PING, options.parent_span, self) do |_obs_handler|
resp = @backend.ping(@name, options.to_backend)
PingResult.new do |res|
res.version = resp[:version]
res.id = resp[:id]
res.sdk = resp[:sdk]
resp[:services].each do |type, svcs|
res.services[type] = svcs.map do |svc|
PingResult::ServiceInfo.new do |info|
info.id = svc[:id]
info.state = svc[:state]
info.latency = svc[:latency]
info.remote = svc[:remote]
info.local = svc[:local]
info.error = svc[:error]
end
end
end
end
Expand Down
Loading
Loading