Skip to content

Commit 906ef83

Browse files
committed
sqlite: create first/last subscriber callback
1 parent c40646d commit 906ef83

File tree

7 files changed

+187
-2
lines changed

7 files changed

+187
-2
lines changed

benchmark/sqlite/sqlite-trace.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
'use strict';
2+
const common = require('../common.js');
3+
const sqlite = require('node:sqlite');
4+
const dc = require('diagnostics_channel');
5+
const assert = require('assert');
6+
7+
const bench = common.createBenchmark(main, {
8+
n: [1e5],
9+
mode: ['none', 'subscribed', 'unsubscribed'],
10+
});
11+
12+
function main(conf) {
13+
const { n, mode } = conf;
14+
15+
const db = new sqlite.DatabaseSync(':memory:');
16+
db.exec('CREATE TABLE t (x INTEGER)');
17+
const insert = db.prepare('INSERT INTO t VALUES (?)');
18+
19+
let subscriber;
20+
if (mode === 'subscribed') {
21+
subscriber = () => {};
22+
dc.subscribe('sqlite.db.query', subscriber);
23+
} else if (mode === 'unsubscribed') {
24+
subscriber = () => {};
25+
dc.subscribe('sqlite.db.query', subscriber);
26+
dc.unsubscribe('sqlite.db.query', subscriber);
27+
}
28+
// mode === 'none': no subscription ever made
29+
30+
let result;
31+
bench.start();
32+
for (let i = 0; i < n; i++) {
33+
result = insert.run(i);
34+
}
35+
bench.end(n);
36+
37+
if (mode === 'subscribed') {
38+
dc.unsubscribe('sqlite.db.query', subscriber);
39+
}
40+
41+
assert.ok(result !== undefined);
42+
}

lib/diagnostics_channel.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,19 @@ function markActive(channel) {
7272
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
7373
channel._subscribers = [];
7474
channel._stores = new SafeMap();
75+
76+
// Notify native modules that this channel just got its first subscriber.
77+
if (channel._index !== undefined)
78+
dc_binding.notifyChannelActive(channel._index);
7579
}
7680

7781
function maybeMarkInactive(channel) {
7882
// When there are no more active subscribers or bound, restore to fast prototype.
7983
if (!channel._subscribers.length && !channel._stores.size) {
84+
// Notify native modules that this channel just lost its last subscriber.
85+
if (channel._index !== undefined)
86+
dc_binding.notifyChannelInactive(channel._index);
87+
8088
// eslint-disable-next-line no-use-before-define
8189
ObjectSetPrototypeOf(channel, Channel.prototype);
8290
channel._subscribers = undefined;

src/base_object_types.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ namespace node {
2424
#define UNSERIALIZABLE_BINDING_TYPES(V) \
2525
V(http2_binding_data, http2::BindingData) \
2626
V(http_parser_binding_data, http_parser::BindingData) \
27-
V(quic_binding_data, quic::BindingData)
27+
V(quic_binding_data, quic::BindingData) \
28+
V(sqlite_binding_data, sqlite::BindingData)
2829

2930
// List of (non-binding) BaseObjects that are serializable in the snapshot.
3031
// The first argument should match what the type passes to

src/node_diagnostics_channel.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,38 @@ void BindingData::Deserialize(Local<Context> context,
127127
CHECK_NOT_NULL(binding);
128128
}
129129

130+
void BindingData::SetChannelStatusCallback(uint32_t index,
131+
ChannelStatusCallback cb) {
132+
channel_status_callbacks_[index] = std::move(cb);
133+
}
134+
135+
void BindingData::NotifyChannelActive(const FunctionCallbackInfo<Value>& args) {
136+
Realm* realm = Realm::GetCurrent(args);
137+
BindingData* binding = realm->GetBindingData<BindingData>();
138+
if (binding == nullptr) return;
139+
uint32_t index = args[0].As<v8::Uint32>()->Value();
140+
auto it = binding->channel_status_callbacks_.find(index);
141+
if (it != binding->channel_status_callbacks_.end()) it->second(true);
142+
}
143+
144+
void BindingData::NotifyChannelInactive(
145+
const FunctionCallbackInfo<Value>& args) {
146+
Realm* realm = Realm::GetCurrent(args);
147+
BindingData* binding = realm->GetBindingData<BindingData>();
148+
if (binding == nullptr) return;
149+
uint32_t index = args[0].As<v8::Uint32>()->Value();
150+
auto it = binding->channel_status_callbacks_.find(index);
151+
if (it != binding->channel_status_callbacks_.end()) it->second(false);
152+
}
153+
130154
void BindingData::CreatePerIsolateProperties(IsolateData* isolate_data,
131155
Local<ObjectTemplate> target) {
132156
Isolate* isolate = isolate_data->isolate();
133157
SetMethod(
134158
isolate, target, "getOrCreateChannelIndex", GetOrCreateChannelIndex);
135159
SetMethod(isolate, target, "linkNativeChannel", LinkNativeChannel);
160+
SetMethod(isolate, target, "notifyChannelActive", NotifyChannelActive);
161+
SetMethod(isolate, target, "notifyChannelInactive", NotifyChannelInactive);
136162
}
137163

138164
void BindingData::CreatePerContextProperties(Local<Object> target,
@@ -148,6 +174,8 @@ void BindingData::RegisterExternalReferences(
148174
ExternalReferenceRegistry* registry) {
149175
registry->Register(GetOrCreateChannelIndex);
150176
registry->Register(LinkNativeChannel);
177+
registry->Register(NotifyChannelActive);
178+
registry->Register(NotifyChannelInactive);
151179
}
152180

153181
Channel::Channel(Environment* env,

src/node_diagnostics_channel.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
55

66
#include <cinttypes>
7+
#include <functional>
78
#include <string>
89
#include <unordered_map>
910
#include <vector>
@@ -53,6 +54,14 @@ class BindingData : public SnapshotableObject {
5354
static void LinkNativeChannel(
5455
const v8::FunctionCallbackInfo<v8::Value>& args);
5556

57+
using ChannelStatusCallback = std::function<void(bool is_active)>;
58+
void SetChannelStatusCallback(uint32_t index, ChannelStatusCallback cb);
59+
60+
static void NotifyChannelActive(
61+
const v8::FunctionCallbackInfo<v8::Value>& args);
62+
static void NotifyChannelInactive(
63+
const v8::FunctionCallbackInfo<v8::Value>& args);
64+
5665
static void CreatePerIsolateProperties(IsolateData* isolate_data,
5766
v8::Local<v8::ObjectTemplate> target);
5867
static void CreatePerContextProperties(v8::Local<v8::Object> target,
@@ -63,6 +72,7 @@ class BindingData : public SnapshotableObject {
6372

6473
private:
6574
InternalFieldInfo* internal_field_info_ = nullptr;
75+
std::unordered_map<uint32_t, ChannelStatusCallback> channel_status_callbacks_;
6676
};
6777

6878
class Channel : public BaseObject {

src/node_sqlite.cc

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "node.h"
88
#include "node_diagnostics_channel.h"
99
#include "node_errors.h"
10+
#include "node_external_reference.h"
1011
#include "node_mem-inl.h"
1112
#include "node_url.h"
1213
#include "sqlite3.h"
@@ -64,6 +65,28 @@ using v8::TryCatch;
6465
using v8::Uint8Array;
6566
using v8::Value;
6667

68+
BindingData::BindingData(Realm* realm, Local<Object> wrap)
69+
: BaseObject(realm, wrap) {
70+
MakeWeak();
71+
}
72+
73+
void BindingData::MemoryInfo(MemoryTracker* tracker) const {
74+
tracker->TrackFieldWithSize("open_databases",
75+
open_databases.size() * sizeof(DatabaseSync*),
76+
"open_databases");
77+
}
78+
79+
void BindingData::CreatePerContextProperties(Local<Object> target,
80+
Local<Value> unused,
81+
Local<Context> context,
82+
void* priv) {
83+
Realm* realm = Realm::GetCurrent(context);
84+
realm->AddBindingData<BindingData>(target);
85+
}
86+
87+
void BindingData::RegisterExternalReferences(
88+
ExternalReferenceRegistry* registry) {}
89+
6790
#define CHECK_ERROR_OR_THROW(isolate, db, expr, expected, ret) \
6891
do { \
6992
int r_ = (expr); \
@@ -868,6 +891,9 @@ DatabaseSync::DatabaseSync(Environment* env,
868891
enable_load_extension_ = allow_load_extension;
869892
ignore_next_sqlite_error_ = false;
870893

894+
BindingData* binding = env->principal_realm()->GetBindingData<BindingData>();
895+
if (binding != nullptr) binding->open_databases.insert(this);
896+
871897
if (open) {
872898
Open();
873899
}
@@ -891,6 +917,10 @@ void DatabaseSync::DeleteSessions() {
891917
}
892918

893919
DatabaseSync::~DatabaseSync() {
920+
BindingData* binding =
921+
env()->principal_realm()->GetBindingData<BindingData>();
922+
if (binding != nullptr) binding->open_databases.erase(this);
923+
894924
FinalizeBackups();
895925

896926
if (IsOpen()) {
@@ -975,11 +1005,25 @@ bool DatabaseSync::Open() {
9751005
env()->isolate(), this, load_extension_ret, SQLITE_OK, false);
9761006
}
9771007

978-
sqlite3_trace_v2(connection_, SQLITE_TRACE_STMT, TraceCallback, this);
1008+
diagnostics_channel::Channel* ch =
1009+
diagnostics_channel::Channel::Get(env(), "sqlite.db.query");
1010+
if (ch != nullptr && ch->HasSubscribers()) {
1011+
sqlite3_trace_v2(connection_, SQLITE_TRACE_STMT, TraceCallback, this);
1012+
}
9791013

9801014
return true;
9811015
}
9821016

1017+
void DatabaseSync::EnableTracing() {
1018+
if (!IsOpen()) return;
1019+
sqlite3_trace_v2(connection_, SQLITE_TRACE_STMT, TraceCallback, this);
1020+
}
1021+
1022+
void DatabaseSync::DisableTracing() {
1023+
if (!IsOpen()) return;
1024+
sqlite3_trace_v2(connection_, 0, nullptr, nullptr);
1025+
}
1026+
9831027
void DatabaseSync::FinalizeBackups() {
9841028
for (auto backup : backups_) {
9851029
backup->Cleanup();
@@ -3778,7 +3822,31 @@ static void Initialize(Local<Object> target,
37783822
Local<Context> context,
37793823
void* priv) {
37803824
Environment* env = Environment::GetCurrent(context);
3825+
Realm* realm = env->principal_realm();
37813826
Isolate* isolate = env->isolate();
3827+
3828+
// Set up the per-Environment database registry.
3829+
BindingData::CreatePerContextProperties(target, unused, context, priv);
3830+
3831+
// Register a native callback on the sqlite.db.query diagnostic channel so
3832+
// that SQLite tracing is enabled/disabled as subscribers come and go.
3833+
auto* diag_binding =
3834+
realm->GetBindingData<diagnostics_channel::BindingData>();
3835+
auto* sqlite_bd = realm->GetBindingData<BindingData>();
3836+
if (diag_binding != nullptr && sqlite_bd != nullptr) {
3837+
uint32_t idx = diag_binding->GetOrCreateChannelIndex("sqlite.db.query");
3838+
BaseObjectPtr<BindingData> bd_ptr(sqlite_bd);
3839+
diag_binding->SetChannelStatusCallback(idx, [bd_ptr](bool is_active) {
3840+
BindingData* bd = bd_ptr.get();
3841+
if (bd == nullptr) return;
3842+
for (DatabaseSync* db : bd->open_databases) {
3843+
if (is_active)
3844+
db->EnableTracing();
3845+
else
3846+
db->DisableTracing();
3847+
}
3848+
});
3849+
}
37823850
Local<FunctionTemplate> db_tmpl =
37833851
NewFunctionTemplate(isolate, DatabaseSync::New);
37843852
db_tmpl->InstanceTemplate()->SetInternalFieldCount(
@@ -3857,3 +3925,5 @@ static void Initialize(Local<Object> target,
38573925
} // namespace node
38583926

38593927
NODE_BINDING_CONTEXT_AWARE_INTERNAL(sqlite, node::sqlite::Initialize)
3928+
NODE_BINDING_EXTERNAL_REFERENCE(
3929+
sqlite, node::sqlite::BindingData::RegisterExternalReferences)

src/node_sqlite.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
#include <unordered_set>
1818

1919
namespace node {
20+
21+
class ExternalReferenceRegistry;
22+
2023
namespace sqlite {
2124

2225
// Mapping from JavaScript property names to SQLite limit constants
@@ -160,6 +163,27 @@ class StatementExecutionHelper {
160163
bool use_big_ints);
161164
};
162165

166+
class DatabaseSync;
167+
168+
class BindingData : public BaseObject {
169+
public:
170+
SET_BINDING_ID(sqlite_binding_data)
171+
172+
BindingData(Realm* realm, v8::Local<v8::Object> wrap);
173+
174+
void MemoryInfo(MemoryTracker* tracker) const override;
175+
SET_MEMORY_INFO_NAME(BindingData)
176+
SET_SELF_SIZE(BindingData)
177+
178+
std::unordered_set<DatabaseSync*> open_databases;
179+
180+
static void CreatePerContextProperties(v8::Local<v8::Object> target,
181+
v8::Local<v8::Value> unused,
182+
v8::Local<v8::Context> context,
183+
void* priv);
184+
static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
185+
};
186+
163187
class DatabaseSync : public BaseObject {
164188
public:
165189
enum InternalFields {
@@ -228,6 +252,8 @@ class DatabaseSync : public BaseObject {
228252
// enable that use case.
229253
void SetIgnoreNextSQLiteError(bool ignore);
230254
bool ShouldIgnoreSQLiteError();
255+
void EnableTracing();
256+
void DisableTracing();
231257

232258
SET_MEMORY_INFO_NAME(DatabaseSync)
233259
SET_SELF_SIZE(DatabaseSync)

0 commit comments

Comments
 (0)