Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
175 commits
Select commit Hold shift + click to select a range
625cd3b
Update changelog
widatama Feb 2, 2026
425a234
Merge pull request #7246 from Countly/symb-fs
widatama Feb 2, 2026
08f2428
Bump nodemailer from 7.0.12 to 7.0.13
dependabot[bot] Jan 28, 2026
bc01c1b
Merge pull request #7231 from Countly/dependabot/npm_and_yarn/nodemai…
ar2rsawseen Feb 3, 2026
b20b1df
Bump puppeteer from 24.36.0 to 24.36.1
dependabot[bot] Jan 28, 2026
ac38929
Merge pull request #7232 from Countly/dependabot/npm_and_yarn/puppete…
ar2rsawseen Feb 3, 2026
f8bcca5
Update cypress versiyon to 15.10
can-angun Feb 4, 2026
e1049c5
Update cypress version to 15.10 in correct package.json
can-angun Feb 4, 2026
82191c0
Revert accidental changes in root package-lock.json
can-angun Feb 4, 2026
01f437e
Merge pull request #7258 from Countly/update-cypress-15.10
can-angun Feb 4, 2026
8c6af4a
Bump pdfjs-dist from 5.4.530 to 5.4.624 in /ui-tests
dependabot[bot] Feb 4, 2026
b0a4343
Update changelog
widatama Feb 4, 2026
1fc35f4
Merge pull request #7259 from Countly/funnel-tooltip
widatama Feb 4, 2026
3fcb32f
Merge branch 'master' into dependabot/npm_and_yarn/ui-tests/pdfjs-dis…
can-angun Feb 4, 2026
fcc0d41
Merge pull request #7244 from Countly/dependabot/npm_and_yarn/ui-test…
can-angun Feb 4, 2026
f6b206e
fix: dashboards import undefined check
tryshank Feb 3, 2026
8577847
fix: password reset null check
tryshank Feb 4, 2026
8414799
Merge branch 'master' into fix/password-reset-null-check
tryshank Feb 5, 2026
8b795d0
Merge pull request #7263 from Countly/fix/password-reset-null-check
tryshank Feb 5, 2026
b903953
Merge branch 'master' into fix/dashboards-import-undefined-check
tryshank Feb 5, 2026
400a96f
Merge pull request #7254 from Countly/fix/dashboards-import-undefined…
tryshank Feb 5, 2026
140e6e4
Update CHANGELOG with group permission fix
Cookiezaurs Feb 5, 2026
a98c840
chore: update CHANGELOG
gabrieloliveirapinto Feb 5, 2026
d017ad1
[sdk] Journey trigger events (#7208)
turtledreams Feb 5, 2026
cdb36b9
Merge pull request #7268 from Countly/Cookiezaurs-patch-2
Cookiezaurs Feb 5, 2026
a731e46
Update changelog
widatama Feb 6, 2026
f7e2308
Merge remote-tracking branch 'origin/master' into feat-add-resize-to-…
gabrieloliveirapinto Feb 6, 2026
9b41712
Merge pull request #7274 from Countly/location-target
Cookiezaurs Feb 6, 2026
b3abc1e
Merge remote-tracking branch 'origin/master' into feat-add-resize-to-…
gabrieloliveirapinto Feb 6, 2026
5a2e63e
Merge pull request #7270 from Countly/feat-add-resize-to-feedback-wid…
gabrieloliveirapinto Feb 6, 2026
0d2d212
[dashboards] Update dashboard delete dialog button color
widatama Feb 6, 2026
4b0330b
Update changelog
widatama Feb 6, 2026
a5b3ce1
[dashboards] Update button selector in ui test
widatama Feb 9, 2026
e9074ee
Merge pull request #7284 from Countly/dashboard-delete
widatama Feb 9, 2026
a1b1bbe
Update changelog
widatama Feb 10, 2026
42eb34f
Merge pull request #7287 from Countly/user-profile
widatama Feb 10, 2026
db1e2c0
fix: public dashboards disabled message
tryshank Feb 11, 2026
58fb768
Merge pull request #7292 from Countly/fix/dashboards-public-disabled-…
tryshank Feb 11, 2026
9861925
[star-rating] Fix rating number in rating table export
widatama Feb 9, 2026
fd01c7b
Update changelog
widatama Feb 9, 2026
ea3aba9
[star-rating] Update rating color
widatama Feb 11, 2026
dc7ab50
Merge pull request #7291 from Countly/rating-export
widatama Feb 11, 2026
7bc63f3
update changelog
Cookiezaurs Feb 12, 2026
634cebf
Merge pull request #7297 from Countly/Cookiezaurs-patch-3
Cookiezaurs Feb 12, 2026
2428a19
install plugins on countly upgrade
ar2rsawseen Feb 12, 2026
fa86b1f
Merge pull request #7296 from Countly/ar2rsawseen/master2
ar2rsawseen Feb 12, 2026
766e465
Update changelog
Cookiezaurs Feb 15, 2026
321cd93
Merge pull request #7301 from Countly/Cookiezaurs-patch-4
Cookiezaurs Feb 16, 2026
359564f
fix: cohort search by escaping regex metachars
tryshank Feb 16, 2026
d8ec6d5
Merge pull request #7303 from Countly/fix/cohorts-datatable-search-re…
tryshank Feb 17, 2026
3f0f96c
Update CHANGELOG for version 25.03.35
Cookiezaurs Feb 17, 2026
b644ed2
Merge pull request #7305 from Countly/Cookiezaurs-patch-5
Cookiezaurs Feb 17, 2026
df53a64
Update Nginx config to include content block paths
Cookiezaurs Feb 17, 2026
16e5f81
Merge pull request #7306 from Countly/Cookiezaurs-patch-6
Cookiezaurs Feb 17, 2026
bff6448
feat: hooks internal events pretty names
tryshank Feb 19, 2026
e520df8
[views] Use date strings as selected period in views.
Feb 16, 2026
367bd9b
fix(auth): prevent infinite refresh loop on invalid auth token 🔄
davidecavaliere Feb 20, 2026
8ec49a7
Merge pull request #7304 from Countly/anna/master
Cookiezaurs Feb 23, 2026
c0cb8f2
[core] Allow adding multiple digits after comma when formatting.
Feb 23, 2026
ee53199
lint
Feb 23, 2026
70dec1c
Merge pull request #7312 from Countly/anna/master
Cookiezaurs Feb 23, 2026
8846539
fix(test-api-core): wait for CSRF instead
davidecavaliere Feb 23, 2026
6ca5e1e
fix(test-api-core): try avoiding wait for CSRF
davidecavaliere Feb 23, 2026
a7ccc0e
[views] use period as date strings when fetching table
Feb 24, 2026
a706353
Merge pull request #7313 from Countly/anna/master
Cookiezaurs Feb 24, 2026
8ada0fa
Add changelog entry for version 25.03.36
Cookiezaurs Feb 24, 2026
51237c4
Merge pull request #7314 from Countly/Cookiezaurs-patch-3
Cookiezaurs Feb 24, 2026
3091a04
[exports] Make transformValuesInObject function available outside
widatama Feb 23, 2026
816e2d1
Update changelog
widatama Feb 23, 2026
aac08bb
Merge pull request #7311 from Countly/user-export
widatama Feb 25, 2026
f605d71
Merge branch 'master' into next
ar2rsawseen Feb 25, 2026
f35abc7
test(test-api-core): trying proof test problems
davidecavaliere Feb 25, 2026
9343bed
Merge branch 'next' into newarchitecture
ar2rsawseen Feb 25, 2026
bef53f6
Merge branch 'newarchitecture' into fix/SER-2791_infinite_refreshes
davidecavaliere Feb 25, 2026
e9d237a
Merge pull request #7310 from Countly/fix/SER-2791_infinite_refreshes
davidecavaliere Feb 25, 2026
e04c0bc
fix: add null checks for arrays in ingestion and aggregator views to …
kanwarujjaval Mar 2, 2026
b457cb7
fix: add parameterized queries for ClickHouse to improve security and…
kanwarujjaval Mar 2, 2026
96c79b1
Merge branch 'newarchitecture' of github.com:Countly/countly-server i…
kanwarujjaval Mar 2, 2026
6b4106a
Merge pull request #7336 from Countly/uj/query-fixes
kanwarujjaval Mar 2, 2026
fcb9c64
fix: update @clickhouse/client to v1.17.0 for compatibility and impro…
kanwarujjaval Mar 2, 2026
c14706b
fix: bump lz4-napi to v2.9.0 for dependency update in Kafka plugin
kanwarujjaval Mar 2, 2026
9b9aac2
Merge branch 'newarchitecture' into uj/query-fixes
kanwarujjaval Mar 2, 2026
dfc1140
Merge pull request #7337 from Countly/uj/query-fixes
kanwarujjaval Mar 2, 2026
4880915
[core] Stream export cursor
widatama Mar 3, 2026
b98a8b7
[core] Rename stream close event to end
widatama Mar 3, 2026
2e3add3
Merge pull request #7338 from Countly/stream-export
widatama Mar 3, 2026
bbc4012
Changes for app_users export download
Mar 5, 2026
c978149
Merge pull request #7350 from Countly/feature/ingestion
Cookiezaurs Mar 5, 2026
f20cce3
Replaced deprecated url.parse() with URL
Mar 6, 2026
cf2fa2b
[jobs] Add cronstrue to production bundle
widatama Mar 9, 2026
e2683ba
Merge branch 'newarchitecture' into feat/hooks-internal-events-pretty…
tryshank Mar 9, 2026
d0291c7
fix: integrate OpenTelemetry metrics for ClickHouse, Kafka, and HTTP …
kanwarujjaval Mar 10, 2026
49ca8eb
fix: extract shared observability metrics module to eliminate duplica…
kanwarujjaval Mar 10, 2026
395691a
Merge pull request #7364 from Countly/otel-improvements
kanwarujjaval Mar 10, 2026
d288c5e
Merge branch 'newarchitecture' into job-cron
kanwarujjaval Mar 10, 2026
50b1460
Merge pull request #7361 from Countly/job-cron
kanwarujjaval Mar 10, 2026
15555e0
fix: copy OpenTelemetry metrics.js to required locations and add inst…
kanwarujjaval Mar 10, 2026
cb18965
Merge branch 'newarchitecture' into otel-improvements
kanwarujjaval Mar 10, 2026
f1d71cf
Merge pull request #7365 from Countly/otel-improvements
kanwarujjaval Mar 10, 2026
dd7472e
fix: update dependencies and CI for improved compatibility and perfor…
kanwarujjaval Mar 10, 2026
57a2670
Merge branch 'otel-improvements' of github.com:Countly/countly-server…
kanwarujjaval Mar 10, 2026
bad1afd
Merge branch 'newarchitecture' into otel-improvements
kanwarujjaval Mar 10, 2026
d797562
Merge pull request #7366 from Countly/otel-improvements
kanwarujjaval Mar 10, 2026
1b2bf0f
fix: set NODE_OPTIONS to empty and unref intervals for improved resou…
kanwarujjaval Mar 10, 2026
3b6f185
Merge remote-tracking branch 'origin/otel-improvements' into otel-imp…
kanwarujjaval Mar 10, 2026
4e4c7f6
Merge branch 'newarchitecture' into otel-improvements
kanwarujjaval Mar 10, 2026
620fdcd
Merge pull request #7367 from Countly/otel-improvements
kanwarujjaval Mar 10, 2026
f76d7b4
Merge branch 'newarchitecture' into feat/hooks-internal-events-pretty…
tryshank Mar 10, 2026
d7dffe1
Merge branch 'newarchitecture' into deprecate-url-parse
cihadtekin Mar 10, 2026
9b4ab6c
[crashes] Fix os_version field
widatama Mar 10, 2026
4c7ad18
[crashes] Fix lastTs and startTs fields
widatama Mar 11, 2026
11c7fd7
Merge pull request #7373 from Countly/crash-agg
widatama Mar 11, 2026
f295e91
Merge branch 'newarchitecture' into feat/hooks-internal-events-pretty…
tryshank Mar 11, 2026
e920529
Merge branch 'newarchitecture' into deprecate-url-parse
cihadtekin Mar 11, 2026
d0c0325
Merge pull request #7356 from Countly/deprecate-url-parse
cihadtekin Mar 11, 2026
481756a
fixes for nginx otel propagation
kanwarujjaval Mar 12, 2026
53f8881
Merge branch 'newarchitecture' into otel-improvements
kanwarujjaval Mar 12, 2026
4b516de
Merge pull request #7376 from Countly/otel-improvements
kanwarujjaval Mar 12, 2026
30fe637
[crashes] Add query to crashgroup detail page
widatama Feb 27, 2026
2674c6a
[crashes] Pass crash query from crashgroup detail page to backend
widatama Mar 5, 2026
b4fba69
[crashes] Apply crashes query to crashgroup page
widatama Mar 5, 2026
fe70449
[crashes] Update occurrence table filter
widatama Mar 6, 2026
d913180
[crashes] Update metric calculations in crashgroup page
widatama Mar 9, 2026
0d8bfb0
[crashes] Filter and modify crash query for drill
widatama Mar 10, 2026
c147b0c
[crashes] Fetch bar data based on crashgroup last occurrence
widatama Mar 10, 2026
36535c9
[crashes] Apply crashes query to breakdown data
widatama Mar 11, 2026
1fe81f5
[crashes] Fix session count
widatama Mar 12, 2026
0479c30
[crashes] Compare last occurence with latest crash timestamp
widatama Mar 12, 2026
6243b88
[crashes] Add clear crashes query button
widatama Mar 13, 2026
3372682
[crashes] Update crashesQuery computed prop
widatama Mar 16, 2026
dee2c1f
[crashes] Init filter properties on created
widatama Mar 16, 2026
73dab24
Merge branch 'newarchitecture' into feat/hooks-internal-events-pretty…
tryshank Mar 16, 2026
6776d4a
Merge pull request #7308 from Countly/feat/hooks-internal-events-pret…
tryshank Mar 16, 2026
9e51194
Merge branch 'newarchitecture' into crash-query
can-angun Mar 16, 2026
c531aa5
Fix getperiodrange test
widatama Mar 17, 2026
dfe4602
Merge pull request #7380 from Countly/crash-query
widatama Mar 17, 2026
c77850e
Update version to 26.01
kanwarujjaval Mar 18, 2026
d59eb53
Update package-lock.json version to 26.01.0
kanwarujjaval Mar 18, 2026
29c86ab
Update version in version.info.js to 26.01
kanwarujjaval Mar 18, 2026
28f159a
Merge pull request #7385 from Countly/newarch-release-fixes
kanwarujjaval Mar 18, 2026
465580c
feat: add native ClickHouse mutation support and event deduplication job
kanwarujjaval Mar 18, 2026
aa38a09
Merge branch 'newarchitecture' into newarch-release-fixes
kanwarujjaval Mar 18, 2026
451e1c6
enhance ClickHouse mutation validation and event deduplication
kanwarujjaval Mar 19, 2026
d01c9f4
Merge remote-tracking branch 'origin/newarch-release-fixes' into newa…
kanwarujjaval Mar 19, 2026
4b412c5
enhance mutation validation and deduplication logic in ClickHouse jobs
kanwarujjaval Mar 19, 2026
d6143bb
update: switch `EventDeduplicationJob` to epoch milliseconds for impr…
kanwarujjaval Mar 19, 2026
0056f9c
[26.01] upgrade
ar2rsawseen Mar 19, 2026
7381c9d
Merge pull request #7390 from Countly/ar2rsawseen/newarchictecture2
ar2rsawseen Mar 19, 2026
af01b0a
Merge branch 'newarchitecture' into newarch-release-fixes
kanwarujjaval Mar 25, 2026
32f2ce8
enhance: add checkpointing and extended lookback for `EventDeduplicat…
kanwarujjaval Mar 25, 2026
9b7bb2a
Merge remote-tracking branch 'origin/newarch-release-fixes' into newa…
kanwarujjaval Mar 25, 2026
af5ede4
Merge pull request #7387 from Countly/newarch-release-fixes
kanwarujjaval Mar 27, 2026
02e1873
Merge branch 'newarchitecture' of github.com:Countly/countly-server i…
kanwarujjaval Mar 27, 2026
4ad2b57
fixes for small batches dedupplication
kanwarujjaval Mar 27, 2026
6e1de78
review fixes for checking remaining batches
kanwarujjaval Mar 28, 2026
f52cf27
Merge pull request #7418 from Countly/dedupe-fixes
kanwarujjaval Mar 28, 2026
bd208ab
Merge branch 'newarchitecture' of github.com:Countly/countly-server i…
kanwarujjaval Apr 1, 2026
5f8634a
update: modify partitioning strategy in drill_events to use weekly in…
kanwarujjaval Apr 7, 2026
655182f
refactor: update Dockerfile with improved caching, conditional model …
kanwarujjaval Apr 7, 2026
b2b000b
update: adapt ColdPartitionMerging logic for flexible partition ID fo…
kanwarujjaval Apr 7, 2026
9c01551
Merge pull request #7442 from Countly/newarch-release-fixes
kanwarujjaval Apr 8, 2026
da188ad
Revert "Merge pull request #7442 from Countly/newarch-release-fixes"
Hakandede Apr 8, 2026
771a343
Guard feedback uploads against missing files
ar2rsawseen Apr 9, 2026
63eaa3e
test: cover star rating upload validation
ar2rsawseen Apr 9, 2026
6f4cb7f
Merge pull request #7452 from Countly/codex/upload-validation-guards
ar2rsawseen Apr 9, 2026
733f51c
fix: support callback shorthand for task rename
ar2rsawseen Apr 9, 2026
5187172
Fix notes save false success path
ar2rsawseen Apr 9, 2026
88243f0
Fix own-account password verification
ar2rsawseen Apr 9, 2026
76c1595
Stabilize notes API test timestamp
ar2rsawseen Apr 9, 2026
4a2c351
Fix self-delete session cleanup query
ar2rsawseen Apr 9, 2026
fbfa127
Stabilize notes persistence test
ar2rsawseen Apr 9, 2026
cb4210d
Add notes API debug output
ar2rsawseen Apr 9, 2026
c14f925
Add notes persistence debug
ar2rsawseen Apr 9, 2026
d9db6ad
Fix notes period filtering
ar2rsawseen Apr 9, 2026
d672454
Merge pull request #7455 from Countly/codex/upload-validation-guards
ar2rsawseen Apr 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,38 @@
## Version 25.03.X
## Version 25.03.36
Enterprise fixes:
- [journey] Workflow fixes
- [users] UI events table fixes

## Version 25.03.35
Fixes:
- [core] Fixes for search bar in standart table component

Enterprise fixes:
- [journeys] Fixes for journey data updates on incoming data.
- [surveys] Return error message if invalid widget_id passed on template loading
- [users] Show content and journey events in user profile
- [users] Display profile group name in table column
- [users] When exporting user profiles, replace user name with device id if user name does not exist
- [users] Use user profile endpoint for exporting data instead of the generic export endpoint

## Version 25.03.34
Fixes:
- [core] Fix period calculation
- [dashboards] Update dialog button color when deleting dashboard/widget
- [star-rating] Fix rating number when exporting data

Enterprise Fixes:
- [content] Uniform journey and content block actions
- [content] Fix overflow and missing translations in content blocks
- [content] Fix button management when creating fullscreen content blocks
- [crash_symbolication] Use countlyfs for JavaScript symbolication
- [funnels] Fix funnel name tooltip content
- [surveys] Allow surveys to resize and reposition when user rotates devices or adjust browser window
- [nps] Allow nps to resize and reposition when user rotates devices or adjust browser window
- [groups] Dealing with invalid values for group permission
- [geo] Update table row cursor to indicate that it's clickable
- [users] Change table column min-width to width so it can be resized even smaller
- [users] Display filtered user count instead of all user count in the table summary

## Version 25.03.33
Fixes:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile-core
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ RUN useradd -r -M -U -d /opt/countly -s /bin/false countly && \
\
# npm dependencies
./bin/docker/modify.sh && \
HOME=/tmp npm install --unsafe-perm && \
HOME=/tmp npm ci --unsafe-perm && \
./bin/docker/preinstall.sh && \
\
# web sdk
Expand Down
4 changes: 4 additions & 0 deletions Dockerfile-unified
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,14 @@ WORKDIR /opt/countly
COPY . .

# 3.1.2 copy OpenTelemetry files from build context
# otel.js and otelexpress.js both require('./metrics'), so metrics.js
# must be copied alongside each entry point.
RUN if [ -f observability/otel.js ]; then \
echo "Copying OpenTelemetry files..."; \
cp observability/otel.js api/utils/otel.js; \
cp observability/metrics.js api/utils/metrics.js; \
cp observability/otelexpress.js frontend/express/otel.js; \
cp observability/metrics.js frontend/express/metrics.js; \
else \
echo "Observability files not found, skipping..."; \
fi
Expand Down
1 change: 1 addition & 0 deletions Gruntfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ module.exports = function(grunt) {
'frontend/express/public/javascripts/utils/Sortable.min.js',
'frontend/express/public/javascripts/utils/vue/vuedraggable.umd.min.js',
'frontend/express/public/javascripts/utils/lodash.mergeWith.js',
'frontend/express/public/javascripts/utils/cronstrue.min.js',
'frontend/express/public/javascripts/utils/element-tiptap.umd.min.js'
],
dest: 'frontend/express/public/javascripts/min/countly.utils.concat.js'
Expand Down
26 changes: 26 additions & 0 deletions api/eventSink/KafkaEventSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,28 @@ const EventSinkInterface = require('./EventSinkInterface.ts').default;
const { transformToKafkaEventFormat } = require('../utils/eventTransformer');
const Log = require('../utils/log.js');

// OTel metrics — opt-in, zero overhead when OTEL_ENABLED is not set
const _otelEnabled = /^(1|true|yes)$/i.test(process.env.OTEL_ENABLED || '');
let _sinkMetrics: { duration: any; eventsTotal: any } | null = null;
function getSinkMetrics() {
if (_sinkMetrics) {
return _sinkMetrics;
}
if (!_otelEnabled) {
return null;
}
try {
const { metrics } = require('@opentelemetry/api');
const meter = metrics.getMeter('countly-event-sink');
_sinkMetrics = {
duration: meter.createHistogram('countly_event_sink_duration_seconds', { unit: 's' }),
eventsTotal: meter.createCounter('countly_event_sink_events_total'),
};
}
catch (_e) { /* no-op */ }
return _sinkMetrics;
}

/**
* Logger interface for type safety
*/
Expand Down Expand Up @@ -192,6 +214,8 @@ class KafkaEventSink extends EventSinkInterface {

if (result.success) {
this.#log.d(`Successfully sent ${result.sent} events to Kafka in ${duration}ms`);
getSinkMetrics()?.duration.record(duration / 1000, { sink: 'kafka' });
getSinkMetrics()?.eventsTotal.add(result.sent ?? 0, { sink: 'kafka', result: 'success' });

return this._createResult(true, result.sent ?? 0, 'Events sent to Kafka successfully', {
duration,
Expand All @@ -205,6 +229,8 @@ class KafkaEventSink extends EventSinkInterface {
}
catch (error) {
const duration = Date.now() - startTime;
getSinkMetrics()?.duration.record(duration / 1000, { sink: 'kafka' });
getSinkMetrics()?.eventsTotal.add(transformedEvents, { sink: 'kafka', result: 'error' });
this.#log.e(`Failed to write ${transformedEvents} events to Kafka in ${duration}ms:`);
this.#log.e('Error writing events to Kafka:', error);
throw error;
Expand Down
33 changes: 33 additions & 0 deletions api/eventSource/KafkaEventSource.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,33 @@
const EventSourceInterface = require('./EventSourceInterface');
const Log = require('../utils/log.js');

// OTel metrics — opt-in, zero overhead when OTEL_ENABLED is not set
const _otelEnabled = /^(1|true|yes)$/i.test(process.env.OTEL_ENABLED || '');
let _sourceMetrics = null;

/**
* Initialize OTel metrics for event source
* @returns {Object|null} OTel metrics object or null if OTel is not enabled
* @private
*/
function getSourceMetrics() {
if (_sourceMetrics) {
return _sourceMetrics;
}
if (!_otelEnabled) {
return null;
}
try {
const { metrics } = require('@opentelemetry/api');
const meter = metrics.getMeter('countly-event-source');
_sourceMetrics = {
batchWait: meter.createHistogram('countly_event_source_batch_wait_seconds', { unit: 's' }),
};
}
catch (_e) { /* no-op */ }
return _sourceMetrics;
}

/**
* Kafka implementation of EventSourceInterface
* Supports async iteration with auto-acknowledgment and proper at-least-once delivery
Expand Down Expand Up @@ -563,6 +590,7 @@ class KafkaEventSource extends EventSourceInterface {
return checkAndReturnBatch(batch);
}
// Wait for next batch from Kafka
const waitStart = Date.now();
await new Promise((resolve) => {
// Race condition check: batch might have arrived while creating promise
if (this.#currentBatch) {
Expand All @@ -571,6 +599,11 @@ class KafkaEventSource extends EventSourceInterface {
}
this.#batchAvailable = resolve;
});
const waitDuration = (Date.now() - waitStart) / 1000;
getSourceMetrics()?.batchWait.record(waitDuration, {
group_id: this.#effectiveGroupId || this.#name,
topic: this.#kafkaOptions?.topics?.[0] || 'unknown'
});

// Return the batch that arrived
if (this.#currentBatch) {
Expand Down
14 changes: 7 additions & 7 deletions api/ingestor/requestProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { createRequire } from 'module';

import usage from './usage.js';
import common from '../utils/common.js';
import url from 'url';
import logModule from '../utils/log.js';
import crypto from 'crypto';
import { ignorePossibleDevices, checksumSaltVerification, validateRedirect } from '../utils/requestProcessorCommon.js';
Expand Down Expand Up @@ -378,7 +377,7 @@ interface RequestParams {
/** Href */
href?: string;
/** URL parts */
urlParts?: ReturnType<typeof url.parse>;
urlParts?: { pathname: string; path: string };
/** Path segments */
paths?: string[];
/** API path */
Expand Down Expand Up @@ -1246,14 +1245,15 @@ const processRequest = (params: RequestParams): boolean | void => {
}

params.tt = Date.now().valueOf();
const urlParts = url.parse(params.req.url, true);
const queryString = urlParts.query;
const paths = urlParts.pathname.split('/');
// base URL is required by WHATWG URL API for relative paths, only pathname and query are used
const parsedUrl = new URL(params.req.url, 'http://localhost');
const queryString = Object.fromEntries(parsedUrl.searchParams);
const paths = parsedUrl.pathname.split('/');

params.href = urlParts.href;
params.href = parsedUrl.pathname + parsedUrl.search;
params.qstring = params.qstring || {};
params.res = params.res || {} as ServerResponse;
params.urlParts = urlParts;
params.urlParts = { pathname: parsedUrl.pathname, path: parsedUrl.pathname + parsedUrl.search };
params.paths = paths;

params.req.headers = params.req.headers || {};
Expand Down
113 changes: 106 additions & 7 deletions api/jobs/mutationManagerJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ interface JobConfig {

interface MutationTask extends Document {
_id: ObjectId | string;
type: 'delete' | 'update';
type: 'delete' | 'update' | 'native_ch';
db: string;
collection: string;
query: Record<string, unknown>;
update?: Record<string, unknown>;
native_sql?: string;
running: boolean;
status: string;
hb?: number;
Expand Down Expand Up @@ -272,7 +273,7 @@ class MutationManagerJob extends Job {
*/
async processTask(task: MutationTask, summary: SummaryEntry[], jobConfig: JobConfig = jobConfigState || DEFAULT_JOB_CONFIG): Promise<void> {
const type = task.type;
if (type !== 'delete' && type !== 'update') {
if (type !== 'delete' && type !== 'update' && type !== 'native_ch') {
await common.db.collection('mutation_manager').updateOne(
{ _id: task._id },
{
Expand All @@ -288,7 +289,7 @@ class MutationManagerJob extends Job {
const clickhouseEnabled = mutationManager.isClickhouseEnabled();
const hasClickhouseDelete = clickhouseEnabled && !!(clickHouseRunner && clickHouseRunner.deleteGranularDataByQuery);
const hasClickhouseUpdate = clickhouseEnabled && !!(clickHouseRunner && clickHouseRunner.updateGranularDataByQuery);
const hasClickhouse = (type === 'update' ? hasClickhouseUpdate : hasClickhouseDelete);
const hasClickhouse = type === 'native_ch' ? clickhouseEnabled : (type === 'update' ? hasClickhouseUpdate : hasClickhouseDelete);

if (!mongoDb && !hasClickhouse) {
const reason = `mongo_db_unavailable:${task.db || 'missing'}`;
Expand Down Expand Up @@ -317,7 +318,11 @@ class MutationManagerJob extends Job {
}

let mongoOk = true;
if (mongoDb) {
if (type === 'native_ch') {
// Native CH mutations skip MongoDB entirely
log.d('Native CH mutation - skipping MongoDB', { taskId: task._id });
}
else if (mongoDb) {
if (type === 'update') {
mongoOk = await this.updateMongo(task, mongoDb);
}
Expand All @@ -330,7 +335,10 @@ class MutationManagerJob extends Job {
}

let chScheduledOk = true;
if (type === 'update' && hasClickhouseUpdate) {
if (type === 'native_ch' && clickhouseEnabled) {
chScheduledOk = await this.executeNativeClickhouse(task);
}
else if (type === 'update' && hasClickhouseUpdate) {
chScheduledOk = await this.updateClickhouse(task);
}
else if (type === 'delete' && hasClickhouseDelete) {
Expand Down Expand Up @@ -422,8 +430,10 @@ class MutationManagerJob extends Job {
for (const task of awaiting) {
try {
if (chHealth && typeof chHealth.getMutationStatus === 'function') {
// In cluster mode, mutations target _local tables, so validation must check _local
const validationTable = isClusterMode ? task.collection + '_local' : task.collection;
// In cluster mode, mutations target _local tables, so validation must check _local.
// native_ch tasks may already have _local in collection name — avoid doubling.
const needsLocalSuffix = isClusterMode && !task.collection.endsWith('_local');
const validationTable = needsLocalSuffix ? task.collection + '_local' : task.collection;
const status = await chHealth.getMutationStatus({ validation_command_id: task.validation_command_id, table: validationTable, database: task.db });
if (status && status.is_done) {
await common.db.collection('mutation_manager').updateOne(
Expand Down Expand Up @@ -677,6 +687,95 @@ class MutationManagerJob extends Job {
}
}

/**
* Build validated ClickHouse mutation SQL with an embedded command-id for tracking.
* - Strips trailing semicolon
* - Validates ALTER TABLE ... DELETE/UPDATE ... WHERE ... shape
* - Injects tautological AND before any SETTINGS clause
* @returns Final SQL string, or null if the shape is invalid
*/
private buildValidatedNativeClickhouseSql(baseSql: string, commandId: string): string | null {
if (!baseSql || typeof baseSql !== 'string') {
return null;
}
let sql = baseSql.trim();
if (sql.endsWith(';')) {
sql = sql.slice(0, -1).trimEnd();
}
const upper = sql.toUpperCase();
if (!upper.startsWith('ALTER TABLE ')) {
return null;
}
if (!/\b(DELETE|UPDATE)\b/.test(upper)) {
return null;
}
if (!/\bWHERE\b/.test(upper)) {
return null;
}
// Find SETTINGS clause (if any) — inject command-id BEFORE it
const settingsMatch = upper.match(/\bSETTINGS\b/);
const settingsIdx = settingsMatch?.index ?? -1;
const injection = ` AND '${commandId}' = '${commandId}'`;
if (settingsIdx !== -1) {
return sql.slice(0, settingsIdx) + injection + sql.slice(settingsIdx);
}
return sql + injection;
}

/**
* Executes a native ClickHouse SQL mutation directly.
* Used for complex mutations (e.g., deduplication) that cannot be expressed as Mongo-style queries.
* Embeds validation_command_id for tracking via system.mutations.
* @param task - The mutation task with native_sql field
*/
async executeNativeClickhouse(task: MutationTask): Promise<boolean> {
if (!task.native_sql || typeof task.native_sql !== 'string') {
log.e('Skipping native CH mutation (empty sql)', { taskId: task._id });
await this.markFailedOrRetry(task, 'empty_native_sql');
return false;
}

if (!common.clickhouseQueryService) {
log.e('ClickHouse query service not available for native mutation', { taskId: task._id });
await this.markFailedOrRetry(task, 'ch_query_service_unavailable');
return false;
}

try {
const retryIndex = Number(task.fail_count || 0);
const commandId = `nm_${String(task._id)}_${retryIndex}`;

const sql = this.buildValidatedNativeClickhouseSql(task.native_sql, commandId);
if (!sql) {
log.e('Skipping native CH mutation (invalid SQL shape)', {
taskId: task._id,
native_sql: task.native_sql
});
await this.markFailedOrRetry(task, 'invalid_native_sql_shape');
return false;
}

// Persist command_id BEFORE executing mutation (crash safety: if we crash
// between execution and this update, validation can still find the command_id)
await common.db.collection('mutation_manager').updateOne(
{ _id: task._id },
{ $set: { validation_command_id: commandId } }
);

await common.clickhouseQueryService.executeMutation({ query: sql });
log.d('Native CH mutation scheduled', { taskId: task._id, commandId });
return true;
}
catch (err) {
log.e('Native CH mutation failed', {
taskId: task._id,
error: (err as Error)?.message || String(err)
});
await this.markFailedOrRetry(task, 'native_ch_error: ' + ((err as Error)?.message || err + ''));
return false;
}
}

/**
* Marks a task as failed or schedules it for a retry based on the number of previous failures.
* @param task - The task object to update.
Expand Down
Loading
Loading