Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 98 additions & 43 deletions bin/scripts/fix-data/user-merge.js
Original file line number Diff line number Diff line change
@@ -1,103 +1,146 @@
/**
* Description: This script is used to merge users based on username.
* Description: This script merges users if they match on any fields configured in the script.
* configure - processAllFields function to add or remove fields for merging.
* Server: countly
* Path: $(countly dir)/bin/scripts/fix-data
* Command: node user-merge.js
* Command: node user-merge.js --no-dry-run [--record-overload-sleep 1000]
* --record-overload-sleep: Cooldown period when record count exceeds RECORD_COUNT_LIMIT
*/
var pluginManager = require("../../../plugins/pluginManager.js");
var appUsers = require("../../../api/parts/mgmt/app_users.js");
var common = require("../../../api/utils/common.js");

console.log("Merging app users");

var APP_ID = "";
Comment thread
John-Weak marked this conversation as resolved.
var COLLECTION_NAME = "app_users" + APP_ID;

if (!APP_ID) {
console.error("Please set APP_ID variable to the ID of the app you want to merge users for.");
process.exit(1);
}

var RETRY_LIMIT = 3;
var UPDATE_COUNTER = 0;

//Number of requests to be made before checking record count in app_user_merges
var UPDATE_LIMIT = 100;
//Number of records in app_user_merges after which script will sleep
var RECORD_COUNT_LIMIT = 10;
//Cooldown period if record count exceeds limit
var RECORD_OVERLOAD_SLEEP = 2000;
for (let i = 2; i < process.argv.length; i++) {
if (process.argv[i] === '--record-overload-sleep' && process.argv[i + 1]) {
RECORD_OVERLOAD_SLEEP = parseInt(process.argv[i + 1]);
break;
}
}
//Cooldown period between requests
var COOLDOWN_PERIOD = 1000;

// Check for dry run flag
let DRY_RUN = true;
if (process.argv.includes('--no-dry-run')) {
DRY_RUN = false;
}
console.log(DRY_RUN ? "Running in DRY RUN mode - no actual merges will be performed" : "Running in LIVE mode - merges will be performed");

console.log("Merging app users");

const sleep = m => new Promise((r) => {
//console.log("Cooling period for " + m + " seconds!");
setTimeout(r, m);
});

pluginManager.dbConnection("countly").then(async(countlyDb) => {
try {

common.db = countlyDb;

await cursor();

console.log("Total updates on the server - ", UPDATE_COUNTER);
console.log("Script ran successfully!");
await processAllFields();
console.log("Total potential merges found - ", UPDATE_COUNTER);
if (DRY_RUN) {
console.log("Dry run completed - no actual merges were performed");
}
else {
console.log("All merges completed successfully!");
}
common.db.close();
process.exit(1);
process.exit(0);
}
catch (e) {
console.log("Error while running script ", e);
common.db.close();
process.exit(1);
}

async function cursor() {
async function processAllFields() {
//await processDuplicates('email'); we can also run multiple merges one after the other based on different fields
await processDuplicates('name');
}

async function processDuplicates(field) {
console.log(`\nProcessing duplicates by ${field}`);

const duplicates = await common.db.collection(COLLECTION_NAME).aggregate([
{
$match: {
[field]: { $nin: [null, ""], $exists: true } // Only match non-null, non-empty values
}
},
{
$group: {
_id: "$username",
_id: `$${field}`,
count: { $sum: 1 }
}
},
{
$match: {
count: { $gt: 1 },
_id: { $ne: null }
count: { $gt: 1 }
}
}
]).toArray();

console.log("Found", duplicates.length, "duplicate username groups.");
console.log(`Found ${duplicates.length} duplicate groups for ${field}`);

for (var i = 0; i < duplicates.length; i++) {
for (const duplicate of duplicates) {
const query = { [field]: duplicate._id };

var mainUser = null;
var mergedUsersUIDs = [];
const cursor = common.db.collection(COLLECTION_NAME)
.find(query)
.sort({ lac: -1 });

var query = {
username: duplicates[i]._id
};
let mainUser = null;
let mergedUIDs = 0;

var projections = {};
console.log(`\n${DRY_RUN ? '[DRY RUN] Would merge' : 'Merging'} users matching ${field}: "${duplicate._id}"`);

var sort = { ls: -1 };

var cursor = common.db.collection(COLLECTION_NAME).find(query).project(projections).sort(sort);

while (await cursor.hasNext()) {
var doc = await cursor.next();
for await (const user of cursor) {
if (!mainUser) {
mainUser = user;
console.log('Main user would be:', {
uid: mainUser.uid,
email: mainUser.email || "null",
phone: mainUser.phone || "null",
name: mainUser.name || "null",
last_action: formatLac(mainUser.lac)
});
continue;
}

if (doc.uid && doc.uid !== "") {
if (!mainUser) {
mainUser = doc;
}
else {
await mergeUsers(mainUser, doc);
mergedUsersUIDs.push(doc.uid);
if (user.uid && user.uid !== "") {
console.log('Would merge user:', {
uid: user.uid,
email: user.email || "null",
phone: user.phone || "null",
name: user.name || "null",
last_action: formatLac(user.lac)
});

if (!DRY_RUN) {
await mergeUsers(mainUser, user);
}
mergedUIDs++;
UPDATE_COUNTER++;
}
}

if (mergedUsersUIDs.length > 0) {
console.log("Total", mergedUsersUIDs.length, "users merged into user", mainUser.uid, ": (", mergedUsersUIDs.join(", "), ")");
if (mergedUIDs > 0) {
console.log(`${DRY_RUN ? '[DRY RUN] Would merge' : 'Merged'} ${mergedUIDs} users into ${mainUser.uid}`);
}
}
}
Expand All @@ -118,7 +161,6 @@ pluginManager.dbConnection("countly").then(async(countlyDb) => {
else {
success = true;
}

resolve();
});
});
Expand All @@ -129,7 +171,6 @@ pluginManager.dbConnection("countly").then(async(countlyDb) => {
if (retryCounter > 1) {
console.log("User ", user.uid, " merged successfully after ", retryCounter, " retries.");
}
UPDATE_COUNTER += 1;
if (UPDATE_COUNTER % UPDATE_LIMIT === 0) {
await checkRecordCount();
}
Expand All @@ -140,13 +181,27 @@ pluginManager.dbConnection("countly").then(async(countlyDb) => {
}

async function checkRecordCount() {
if (DRY_RUN) {
return;
}

var recordCount = await common.db.collection("app_user_merges").countDocuments();
console.log("Record count in app_user_merges: ", recordCount);

while (recordCount > RECORD_COUNT_LIMIT) {
console.log("Record count exceeds limit. Sleeping for " + RECORD_OVERLOAD_SLEEP / 1000 + "seconds.");
console.log("Record count exceeds limit. Sleeping for " + RECORD_OVERLOAD_SLEEP / 1000 + " seconds.");
await sleep(RECORD_OVERLOAD_SLEEP);
recordCount = await common.db.collection("app_user_merges").countDocuments();
}
}

function formatLac(timestamp) {
if (!timestamp) {
return null;
}
if (Math.round(timestamp).toString().length === 10) {
timestamp *= 1000;
}
return new Date(timestamp);
}
});
Loading