Skip to content

Commit 619df02

Browse files
authored
Adjust concurrency and sleep constants in BroadcastManager
1 parent 92b2613 commit 619df02

1 file changed

Lines changed: 44 additions & 10 deletions

File tree

src/BroadcastManager.php

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,15 @@
2323
class BroadcastManager
2424
{
2525
private const MAX_ATTEMPTS = 3;
26-
private const DEFAULT_CONCURRENCY = 20;
27-
private const MAX_CONCURRENCY = 50;
26+
private const DEFAULT_CONCURRENCY = 10;
27+
private const MAX_CONCURRENCY = 30;
28+
private const PROGRESS_UPDATE_INTERVAL = 5.0;
29+
private const WORKER_IDLE_SLEEP = 1.0;
30+
private const WORKER_RETRY_SLEEP = 1.0;
31+
private const WORKER_PAUSED_SLEEP = 1.5;
32+
private const WORKER_AFTER_JOB_SLEEP = 0.75;
33+
private const SEND_MESSAGE_SLEEP = 0.35;
34+
private const SEND_CHUNK_SLEEP = 0.75;
2835

2936
private const SEND_HARD_FAIL_RPCS = [
3037
'INPUT_USER_DEACTIVATED',
@@ -919,25 +926,25 @@ private function startQueueWorkers(
919926
\Amp\async(function () use (&$state, $handler, $hardFailRpcs, $rpcHandler, $retryThrowable): void {
920927
while (!$state['cancel'] && !$state['done']) {
921928
if ($state['queue']->isEmpty()) {
922-
$this->api->sleep(0.5);
929+
$this->api->sleep(self::WORKER_IDLE_SLEEP);
923930
continue;
924931
}
925932

926933
if ($state['paused']) {
927-
$this->api->sleep(1);
934+
$this->api->sleep(self::WORKER_PAUSED_SLEEP);
928935
continue;
929936
}
930937

931938
$job = $state['queue']->dequeue();
932939

933940
if (($job['availableAt'] ?? 0) > microtime(true)) {
934941
$state['queue']->enqueue($job);
935-
$this->api->sleep(0.5);
942+
$this->api->sleep(self::WORKER_RETRY_SLEEP);
936943
continue;
937944
}
938945

939946
while ($state['paused'] && !$state['cancel'] && !$state['done']) {
940-
$this->api->sleep(1);
947+
$this->api->sleep(self::WORKER_PAUSED_SLEEP);
941948
}
942949

943950
if ($state['cancel'] || $state['done']) {
@@ -981,7 +988,7 @@ private function startQueueWorkers(
981988
}
982989
}
983990

984-
$this->api->sleep(0.25);
991+
$this->api->sleep(self::WORKER_AFTER_JOB_SLEEP);
985992
}
986993
});
987994
}
@@ -1093,6 +1100,8 @@ private function sendMessagesToPeer(string $peer, array $messages): array
10931100
$messageIds[] = $messageId;
10941101
}
10951102
}
1103+
1104+
$this->api->sleep(self::SEND_CHUNK_SLEEP);
10961105
}
10971106

10981107
return $messageIds;
@@ -1115,6 +1124,8 @@ private function sendMessagesToPeer(string $peer, array $messages): array
11151124
if ($messageId > 0) {
11161125
$messageIds[] = $messageId;
11171126
}
1127+
1128+
$this->api->sleep(self::SEND_MESSAGE_SLEEP);
11181129
}
11191130

11201131
return $messageIds;
@@ -1642,13 +1653,13 @@ private function buildStatusControls(array $state): ?array
16421653

16431654
$id = (string) $state['id'];
16441655
$toggleAction = !empty($state['paused']) ? 'resume' : 'pause';
1645-
$toggleText = !empty($state['paused']) ? '▶️ המשך' : '⏸ השהייה';
1656+
$toggleText = !empty($state['paused']) ? 'Resume' : 'Pause';
16461657

16471658
return [
16481659
'inline_keyboard' => [
16491660
[
16501661
['text' => $toggleText, 'callback_data' => 'bm:' . $toggleAction . ':' . $id],
1651-
['text' => '🛑 ביטול', 'callback_data' => 'bm:cancel:' . $id],
1662+
['text' => 'Cancel', 'callback_data' => 'bm:cancel:' . $id],
16521663
],
16531664
],
16541665
];
@@ -1710,6 +1721,20 @@ private function startProgressLoop($chatId, ?int $statusId, array &$state, strin
17101721

17111722
$this->api->messages->editMessage($payload);
17121723
$last = $fingerprint;
1724+
} catch (RPCErrorException $e) {
1725+
if (
1726+
($e->rpc ?? '') === 'MESSAGE_NOT_MODIFIED'
1727+
|| str_contains($e->getMessage(), 'MESSAGE_NOT_MODIFIED')
1728+
) {
1729+
$last = $fingerprint;
1730+
$this->api->sleep(self::PROGRESS_UPDATE_INTERVAL);
1731+
continue;
1732+
}
1733+
1734+
if ($loggedFailures < 3) {
1735+
$loggedFailures++;
1736+
$this->logError('Failed to update status message.', $e);
1737+
}
17131738
} catch (Throwable $e) {
17141739
if ($loggedFailures < 3) {
17151740
$loggedFailures++;
@@ -1718,7 +1743,7 @@ private function startProgressLoop($chatId, ?int $statusId, array &$state, strin
17181743
}
17191744
}
17201745

1721-
$this->api->sleep(1);
1746+
$this->api->sleep(self::PROGRESS_UPDATE_INTERVAL);
17221747
}
17231748
});
17241749
}
@@ -1742,6 +1767,15 @@ private function editStatusMessage($chatId, ?int $statusId, string $text, ?array
17421767
}
17431768

17441769
$this->api->messages->editMessage($payload);
1770+
} catch (RPCErrorException $e) {
1771+
if (
1772+
($e->rpc ?? '') === 'MESSAGE_NOT_MODIFIED'
1773+
|| str_contains($e->getMessage(), 'MESSAGE_NOT_MODIFIED')
1774+
) {
1775+
return;
1776+
}
1777+
1778+
$this->logError('Failed to edit final status message.', $e);
17451779
} catch (Throwable $e) {
17461780
$this->logError('Failed to edit final status message.', $e);
17471781
}

0 commit comments

Comments
 (0)