Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 105 additions & 106 deletions app/lib/methods/subscriptions/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,134 +242,133 @@ export default class RoomSubscription {
readMessages(this.rid, new Date());
}, 300);

updateMessage = (message: IMessage): Promise<void> =>
new Promise(async resolve => {
if (this.rid !== message.rid) {
return resolve();
}
updateMessage = async (message: IMessage): Promise<void> => {
if (this.rid !== message.rid) {
return;
}

const batch: TMessageModel[] | TThreadModel[] | TThreadMessageModel[] = [];
const db = database.active;
const msgCollection = db.get('messages');
const threadsCollection = db.get('threads');
const threadMessagesCollection = db.get('thread_messages');
const batch: TMessageModel[] | TThreadModel[] | TThreadMessageModel[] = [];
const db = database.active;
const msgCollection = db.get('messages');
const threadsCollection = db.get('threads');
const threadMessagesCollection = db.get('thread_messages');

// Decrypt the message if necessary
message = (await Encryption.decryptMessage(message)) as IMessage;
// Decrypt the message if necessary
message = (await Encryption.decryptMessage(message)) as IMessage;

// Create or update message
try {
const messageRecord = await getMessageById(message._id);
if (messageRecord) {
if (messageRecord.t === 'e2e' && message.attachments) {
message.attachments = message.attachments?.map(att => {
const existing = messageRecord.attachments?.find(
a =>
(a.image_url && a.image_url === att.image_url) ||
(a.video_url && a.video_url === att.video_url) ||
(a.audio_url && a.audio_url === att.audio_url) ||
(a.thumb_url && a.thumb_url === att.thumb_url)
);

// Create or update message
return {
...att,
e2e: existing?.e2e,
title_link: existing?.e2e === 'done' ? existing?.title_link : att.title_link
};
});
}
batch.push(
messageRecord.prepareUpdate(
protectedFunction((m: TMessageModel) => {
Object.assign(m, message);
})
)
);
} else {
batch.push(
msgCollection.prepareCreate(
protectedFunction((m: TMessageModel) => {
m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema);
if (m.subscription) m.subscription.id = this.rid;
Object.assign(m, message);
})
)
);
}
} catch (e) {
log(e);
}

// Create or update thread
if (message.tlm) {
try {
const messageRecord = await getMessageById(message._id);
if (messageRecord) {
if (messageRecord.t === 'e2e' && message.attachments) {
message.attachments = message.attachments?.map(att => {
const existing = messageRecord.attachments?.find(
a =>
(a.image_url && a.image_url === att.image_url) ||
(a.video_url && a.video_url === att.video_url) ||
(a.audio_url && a.audio_url === att.audio_url) ||
(a.thumb_url && a.thumb_url === att.thumb_url)
);

return {
...att,
e2e: existing?.e2e,
title_link: existing?.e2e === 'done' ? existing?.title_link : att.title_link
};
});
}
const threadRecord = await getThreadById(message._id);
if (threadRecord) {
batch.push(
messageRecord.prepareUpdate(
protectedFunction((m: TMessageModel) => {
Object.assign(m, message);
threadRecord.prepareUpdate(
protectedFunction((t: TThreadModel) => {
Object.assign(t, message);
})
)
);
} else {
batch.push(
msgCollection.prepareCreate(
protectedFunction((m: TMessageModel) => {
m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema);
if (m.subscription) m.subscription.id = this.rid;
Object.assign(m, message);
threadsCollection.prepareCreate(
protectedFunction((t: TThreadModel) => {
t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema);
if (t.subscription) t.subscription.id = this.rid;
Object.assign(t, message);
})
)
);
}
} catch (e) {
log(e);
}
}

// Create or update thread
if (message.tlm) {
try {
const threadRecord = await getThreadById(message._id);
if (threadRecord) {
batch.push(
threadRecord.prepareUpdate(
protectedFunction((t: TThreadModel) => {
Object.assign(t, message);
})
)
);
} else {
batch.push(
threadsCollection.prepareCreate(
protectedFunction((t: TThreadModel) => {
t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema);
if (t.subscription) t.subscription.id = this.rid;
Object.assign(t, message);
})
)
);
}
} catch (e) {
log(e);
}
}

// Create or update thread message
if (message.tmid) {
try {
const threadMessageRecord = await getThreadMessageById(message._id);
if (threadMessageRecord) {
batch.push(
threadMessageRecord.prepareUpdate(
protectedFunction((tm: TThreadMessageModel) => {
Object.assign(tm, message);
if (message.tmid) {
tm.rid = message.tmid;
delete tm.tmid;
}
})
)
);
} else {
batch.push(
threadMessagesCollection.prepareCreate(
protectedFunction((tm: TThreadMessageModel) => {
tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema);
Object.assign(tm, message);
if (tm.subscription) {
tm.subscription.id = this.rid;
}
if (message.tmid) {
tm.rid = message.tmid;
delete tm.tmid;
}
})
)
);
}
} catch (e) {
log(e);
// Create or update thread message
if (message.tmid) {
try {
const threadMessageRecord = await getThreadMessageById(message._id);
if (threadMessageRecord) {
batch.push(
threadMessageRecord.prepareUpdate(
protectedFunction((tm: TThreadMessageModel) => {
Object.assign(tm, message);
if (message.tmid) {
tm.rid = message.tmid;
delete tm.tmid;
}
})
)
);
} else {
batch.push(
threadMessagesCollection.prepareCreate(
protectedFunction((tm: TThreadMessageModel) => {
tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema);
Object.assign(tm, message);
if (tm.subscription) {
tm.subscription.id = this.rid;
}
if (message.tmid) {
tm.rid = message.tmid;
delete tm.tmid;
}
})
)
);
}
} catch (e) {
log(e);
}
}

await db.write(async () => {
await db.batch(...batch);
});
await db.write(async () => {
await db.batch(...batch);
});
};

handleMessageReceived = async (ddpMessage: IDDPMessage) => {
try {
Expand Down