feat: Add deduplication to add_batch_of_requests#534
Conversation
add_batch_of_requests and testadd_batch_of_requests and test
add_batch_of_requests and testadd_batch_of_requests
| await rq.add_request(request) | ||
| await rq.add_request(request) |
There was a problem hiding this comment.
Maybe you could make two distinct Request instances with the same uniqueKey here?
There was a problem hiding this comment.
Yes, good point. I also added one more test to make it explicit that deduplication works based on unique_key only and unless we use use_extended_unique_key argument, some attributes of the request might be ignored. Another test makes this behavior clearly intentional to avoid some confusion in the future.
| await rq.add_requests(requests) | ||
|
|
||
| add_requests_workers = [asyncio.create_task(add_requests_worker()) for _ in range(10)] | ||
| await asyncio.gather(*add_requests_workers) |
There was a problem hiding this comment.
I guess you made sure that these do in fact run in parallel? To the naked eye, 100 requests doesn't seem like much, I'd expect that the event loop may run the tasks in sequence.
Maybe you could add the requests in each worker in smaller batches and add some random delays? Or just add a comment saying that you verified parallel execution empirically 😁
There was a problem hiding this comment.
I wrote the test for the implementation that did not take parallel execution into account, and it was failing consistently. So from that perspective, I consider the test sufficient.
Anyway, I added some chunking to make the test slightly more challenging. The parallel execution can be verified in the logs. For example, below. From the logs it can be seen that the add_batch_of_requests that was started first did not finish first - as it was "taken over" during it's await by another worker.
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 10
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 20
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 90
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 80
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 0
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 40
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 50
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 60
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 30
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 70
INFO {'readCount': 0, 'writeCount': 100, 'deleteCount': 0, 'headItemReadCount': 0, 'storageBytes': 7400}
| with mock.patch( | ||
| 'apify_client.clients.resource_clients.request_queue.RequestQueueClientAsync.batch_add_requests', | ||
| side_effect=return_unprocessed_requests, | ||
| ): | ||
| # Simulate failed API call for adding requests. Request was not processed and should not be cached. | ||
| await apify_named_rq.add_requests(['http://example.com/1']) | ||
|
|
||
| # This will succeed. | ||
| await apify_named_rq.add_requests(['http://example.com/1']) |
There was a problem hiding this comment.
Any chance we could verify that the request was actually not cached between the two add_requests calls?
There was a problem hiding this comment.
This is checked implicitly in the last line where it is asserted that there was exactly 1 writeCount difference. The first call is "hardcoded" to fail, even on all retries, so it never even sends the API request and thus has no chance of increasing the writeCount.
The second call can make the write only if it is not cached, as cached requests do not make the call (tested in other tests). So this means the request was not cached in between.
I could assert the state of the cache in between those calls, but since it is kind of an implementation detail, I would prefer not to.
There was a problem hiding this comment.
Fair enough, can you explain this in a comment then?
There was a problem hiding this comment.
Yes, added to the test description.
|
|
||
| for request in requests: | ||
| if self._requests_cache.get(request.id): | ||
| # We are no sure if it was already handled at this point, and it is not worth calling API for it. |
There was a problem hiding this comment.
Did you mean "We are now sure that it was already handled..."? I'm not sure 😁
There was a problem hiding this comment.
Yes, that was not very clear. Updated
| ] | ||
|
|
||
| # Send requests to API. | ||
| response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront) |
There was a problem hiding this comment.
It's probably out of the scope of the PR, but it might be worth it to validate the response with a Pydantic model.
There was a problem hiding this comment.
That happens in the original code already few lines down: api_response = AddRequestsResponse.model_validate(response)
There was a problem hiding this comment.
I'm sorry, I meant validating the whole response object with the two lists, so that you wouldn't need to do response['unprocessedRequests']
| already_present_requests: list[ProcessedRequest] = [] | ||
|
|
||
| for request in requests: | ||
| if self._requests_cache.get(request.id): |
There was a problem hiding this comment.
Judging by apify/crawlee#3120, a day may come when we try to limit the size of _requests_cache somehow. Perhaps we should think ahead and come up with a more space-efficient way of tracking already added requests?
EDIT: hollup a minute, do you use the ID here for deduplication instead of unique key?
There was a problem hiding this comment.
Since there is this deterministic transformation function unique_key_to_request_id, which respects Apify platform way of creating IDs, this seems ok. If someone starts creating Requests with a custom id, then deduplication will most likely stop working.
There are two issues I created based on the discussion about this:
Description
api_client.batch_add_requestscalls to avoid expensive and pointless API calls.batch_add_requests.Issues
Testing