diff --git a/src/Illuminate/Foundation/Bus/PendingDispatch.php b/src/Illuminate/Foundation/Bus/PendingDispatch.php index 13c11ab1b2fe..c7d2eb687912 100644 --- a/src/Illuminate/Foundation/Bus/PendingDispatch.php +++ b/src/Illuminate/Foundation/Bus/PendingDispatch.php @@ -2,7 +2,9 @@ namespace Illuminate\Foundation\Bus; +use Closure; use Illuminate\Bus\UniqueLock; +use Illuminate\Cache\RateLimiting\Limit; use Illuminate\Container\Container; use Illuminate\Contracts\Bus\Dispatcher; use Illuminate\Contracts\Cache\Repository as Cache; @@ -129,6 +131,24 @@ public function withoutDelay() return $this; } + /** + * Spread the job out by delaying dispatches respecting the rate limit. + * + * @param \Illuminate\Cache\RateLimiting\Limit|(\Closure(mixed $job): \Illuminate\Cache\RateLimiting\Limit) $limit + * @param int $index + * @return $this + */ + public function spreadWithDelay($limit, $index) + { + if ($limit instanceof Closure) { + $limit = $limit($this->job); + } + + $delay = (int) (($index / $limit->maxAttempts) * $limit->decaySeconds); + + return $this->delay($delay); + } + /** * Indicate that the job should be dispatched after all database transactions have committed. * diff --git a/tests/Bus/BusPendingDispatchTest.php b/tests/Bus/BusPendingDispatchTest.php index 99c4065cb5c5..34a39d39a4d0 100644 --- a/tests/Bus/BusPendingDispatchTest.php +++ b/tests/Bus/BusPendingDispatchTest.php @@ -2,7 +2,10 @@ namespace Illuminate\Tests\Bus; +use Illuminate\Cache\RateLimiting\Limit; use Illuminate\Foundation\Bus\PendingDispatch; +use Illuminate\Support\Facades\RateLimiter; +use Illuminate\Support\Fluent; use Mockery as m; use PHPUnit\Framework\TestCase; use ReflectionClass; @@ -76,6 +79,24 @@ public function testWithoutDelay() $this->pendingDispatch->withoutDelay(); } + public function testSpreadWithDelay() + { + $job = new Fluent; + $pendingDispatch = new PendingDispatchWithoutDestructor($job); + $limit = Limit::perSecond(1, 15); + + $pendingDispatch->spreadWithDelay($limit, 0); + $this->assertSame(0, $job->delay); + $pendingDispatch->spreadWithDelay($limit, 1); + $this->assertSame(15, $job->delay); + $pendingDispatch->spreadWithDelay($limit, 2); + $this->assertSame(30, $job->delay); + $pendingDispatch->spreadWithDelay($limit, 3); + $this->assertSame(45, $job->delay); + $pendingDispatch->spreadWithDelay($limit, 4); + $this->assertSame(60, $job->delay); + } + public function testAfterCommit() { $this->job->shouldReceive('afterCommit')->once();