Skip to content

Commit 98dfdaf

Browse files
committed
Add tests for RQ
1 parent 84f9f12 commit 98dfdaf

File tree

8 files changed

+378
-673
lines changed

8 files changed

+378
-673
lines changed

src/crawlee/storage_clients/_file_system/_request_queue_client.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,10 @@ async def open(
104104
) -> FileSystemRequestQueueClient:
105105
if id:
106106
raise ValueError(
107-
'Opening a dataset by "id" is not supported for file system storage client, use "name" instead.'
107+
'Opening a request queue by "id" is not supported for file system storage client, use "name" instead.'
108108
)
109109

110-
name = name or configuration.default_dataset_id
110+
name = name or configuration.default_request_queue_id
111111

112112
# Check if the client is already cached by name.
113113
if name in cls._cache_by_name:
@@ -123,7 +123,7 @@ async def open(
123123
if rq_path.exists():
124124
# If metadata file is missing, raise an error.
125125
if not metadata_path.exists():
126-
raise ValueError(f'Metadata file not found for RQ "{name}"')
126+
raise ValueError(f'Metadata file not found for request queue "{name}"')
127127

128128
file = await asyncio.to_thread(open, metadata_path)
129129
try:
@@ -133,7 +133,7 @@ async def open(
133133
try:
134134
metadata = RequestQueueMetadata(**file_content)
135135
except ValidationError as exc:
136-
raise ValueError(f'Invalid metadata file for RQ "{name}"') from exc
136+
raise ValueError(f'Invalid metadata file for request queue "{name}"') from exc
137137

138138
client = cls(
139139
id=metadata.id,
@@ -185,6 +185,8 @@ async def drop(self) -> None:
185185
if self.metadata.name in self.__class__._cache_by_name: # noqa: SLF001
186186
del self.__class__._cache_by_name[self.metadata.name] # noqa: SLF001
187187

188+
# TODO: continue
189+
188190
@override
189191
async def add_batch_of_requests(
190192
self,

src/crawlee/storages/_key_value_store.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ async def open(
9999
if id and name:
100100
raise ValueError('Only one of "id" or "name" can be specified, not both.')
101101

102-
# Check if key value store is already cached by id or name
102+
# Check if key-value store is already cached by id or name
103103
if id and id in cls._cache_by_id:
104104
return cls._cache_by_id[id]
105105
if name and name in cls._cache_by_name:
@@ -116,7 +116,7 @@ async def open(
116116

117117
kvs = cls(client)
118118

119-
# Cache the key value store by id and name if available
119+
# Cache the key-value store by id and name if available
120120
if kvs.id:
121121
cls._cache_by_id[kvs.id] = kvs
122122
if kvs.name:

src/crawlee/storages/_request_queue.py

+56-74
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def __init__(self, client: RequestQueueClient) -> None:
7676
Preferably use the `RequestQueue.open` constructor to create a new instance.
7777
7878
Args:
79-
client: An instance of a key-value store client.
79+
client: An instance of a request queue client.
8080
"""
8181
self._client = client
8282

@@ -111,7 +111,7 @@ async def open(
111111
if id and name:
112112
raise ValueError('Only one of "id" or "name" can be specified, not both.')
113113

114-
# Check if key value store is already cached by id or name
114+
# Check if request queue is already cached by id or name
115115
if id and id in cls._cache_by_id:
116116
return cls._cache_by_id[id]
117117
if name and name in cls._cache_by_name:
@@ -126,7 +126,15 @@ async def open(
126126
configuration=configuration,
127127
)
128128

129-
return cls(client)
129+
rq = cls(client)
130+
131+
# Cache the request queue by id and name if available
132+
if rq.id:
133+
cls._cache_by_id[rq.id] = rq
134+
if rq.name:
135+
cls._cache_by_name[rq.name] = rq
136+
137+
return rq
130138

131139
@override
132140
async def drop(self) -> None:
@@ -163,27 +171,32 @@ async def add_requests(
163171
transformed_requests = self._transform_requests(requests)
164172
wait_time_secs = wait_time_between_batches.total_seconds()
165173

166-
async def _process_batch(batch: Sequence[Request]) -> None:
167-
request_count = len(batch)
168-
response = await self._client.add_batch_of_requests(batch, forefront=forefront)
169-
logger.debug(f'Added {request_count} requests to the queue, response: {response}')
170-
171174
# Wait for the first batch to be added
172175
first_batch = transformed_requests[:batch_size]
173176
if first_batch:
174-
await _process_batch(first_batch)
177+
await self._process_batch(
178+
first_batch,
179+
base_retry_wait=wait_time_between_batches,
180+
forefront=forefront,
181+
)
175182

176183
async def _process_remaining_batches() -> None:
177184
for i in range(batch_size, len(transformed_requests), batch_size):
178185
batch = transformed_requests[i : i + batch_size]
179-
await _process_batch(batch)
186+
await self._process_batch(
187+
batch,
188+
base_retry_wait=wait_time_between_batches,
189+
forefront=forefront,
190+
)
180191
if i + batch_size < len(transformed_requests):
181192
await asyncio.sleep(wait_time_secs)
182193

183194
# Create and start the task to process remaining batches in the background
184195
remaining_batches_task = asyncio.create_task(
185-
_process_remaining_batches(), name='request_queue_process_remaining_batches_task'
196+
_process_remaining_batches(),
197+
name='request_queue_process_remaining_batches_task',
186198
)
199+
187200
self._add_requests_tasks.append(remaining_batches_task)
188201
remaining_batches_task.add_done_callback(lambda _: self._add_requests_tasks.remove(remaining_batches_task))
189202

@@ -195,69 +208,6 @@ async def _process_remaining_batches() -> None:
195208
timeout=wait_for_all_requests_to_be_added_timeout,
196209
)
197210

198-
# Wait for the first batch to be added
199-
first_batch = transformed_requests[:batch_size]
200-
if first_batch:
201-
await self._process_batch(first_batch, base_retry_wait=wait_time_between_batches)
202-
203-
async def _process_remaining_batches() -> None:
204-
for i in range(batch_size, len(transformed_requests), batch_size):
205-
batch = transformed_requests[i : i + batch_size]
206-
await self._process_batch(batch, base_retry_wait=wait_time_between_batches)
207-
if i + batch_size < len(transformed_requests):
208-
await asyncio.sleep(wait_time_secs)
209-
210-
# Create and start the task to process remaining batches in the background
211-
remaining_batches_task = asyncio.create_task(
212-
_process_remaining_batches(), name='request_queue_process_remaining_batches_task'
213-
)
214-
self._tasks.append(remaining_batches_task)
215-
remaining_batches_task.add_done_callback(lambda _: self._tasks.remove(remaining_batches_task))
216-
217-
# Wait for all tasks to finish if requested
218-
if wait_for_all_requests_to_be_added:
219-
await wait_for_all_tasks_for_finish(
220-
(remaining_batches_task,),
221-
logger=logger,
222-
timeout=wait_for_all_requests_to_be_added_timeout,
223-
)
224-
225-
async def _process_batch(self, batch: Sequence[Request], base_retry_wait: timedelta, attempt: int = 1) -> None:
226-
max_attempts = 5
227-
response = await self._resource_client.batch_add_requests(batch)
228-
229-
if response.unprocessed_requests:
230-
logger.debug(f'Following requests were not processed: {response.unprocessed_requests}.')
231-
if attempt > max_attempts:
232-
logger.warning(
233-
f'Following requests were not processed even after {max_attempts} attempts:\n'
234-
f'{response.unprocessed_requests}'
235-
)
236-
else:
237-
logger.debug('Retry to add requests.')
238-
unprocessed_requests_unique_keys = {request.unique_key for request in response.unprocessed_requests}
239-
retry_batch = [request for request in batch if request.unique_key in unprocessed_requests_unique_keys]
240-
await asyncio.sleep((base_retry_wait * attempt).total_seconds())
241-
await self._process_batch(retry_batch, base_retry_wait=base_retry_wait, attempt=attempt + 1)
242-
243-
request_count = len(batch) - len(response.unprocessed_requests)
244-
self._assumed_total_count += request_count
245-
if request_count:
246-
logger.debug(
247-
f'Added {request_count} requests to the queue. Processed requests: {response.processed_requests}'
248-
)
249-
250-
async def get_request(self, request_id: str) -> Request | None:
251-
"""Retrieve a request from the queue.
252-
253-
Args:
254-
request_id: ID of the request to retrieve.
255-
256-
Returns:
257-
The retrieved request, or `None`, if it does not exist.
258-
"""
259-
# TODO: implement
260-
261211
async def fetch_next_request(self) -> Request | None:
262212
"""Return the next request in the queue to be processed.
263213
@@ -346,3 +296,35 @@ async def is_finished(self) -> bool:
346296
return True
347297

348298
return False
299+
300+
async def _process_batch(
301+
self,
302+
batch: Sequence[Request],
303+
*,
304+
base_retry_wait: timedelta,
305+
attempt: int = 1,
306+
forefront: bool = False,
307+
) -> None:
308+
max_attempts = 5
309+
response = await self._client.add_batch_of_requests(batch, forefront=forefront)
310+
311+
if response.unprocessed_requests:
312+
logger.debug(f'Following requests were not processed: {response.unprocessed_requests}.')
313+
if attempt > max_attempts:
314+
logger.warning(
315+
f'Following requests were not processed even after {max_attempts} attempts:\n'
316+
f'{response.unprocessed_requests}'
317+
)
318+
else:
319+
logger.debug('Retry to add requests.')
320+
unprocessed_requests_unique_keys = {request.unique_key for request in response.unprocessed_requests}
321+
retry_batch = [request for request in batch if request.unique_key in unprocessed_requests_unique_keys]
322+
await asyncio.sleep((base_retry_wait * attempt).total_seconds())
323+
await self._process_batch(retry_batch, base_retry_wait=base_retry_wait, attempt=attempt + 1)
324+
325+
request_count = len(batch) - len(response.unprocessed_requests)
326+
327+
if request_count:
328+
logger.debug(
329+
f'Added {request_count} requests to the queue. Processed requests: {response.processed_requests}'
330+
)

0 commit comments

Comments
 (0)