Skip to content

Commit cda7b31

Browse files
authored
feat: add Session binding capability via session_id in Request (#1086)
### Description - Add strict binding of a `Request` to a specific `Session`. If the `Session` is not available in the `SessionPool`, an error will be raised for the `Request` which can be handled in the `failed_request_handler`. ### Issues - Closes: #1076 ### Testing Added tests to verify functionality: - Binding to a valid session - Binding to a non-existent session - Catching error in `failed_request_handler`
1 parent 0e863a4 commit cda7b31

File tree

9 files changed

+325
-8
lines changed

9 files changed

+325
-8
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import asyncio
2+
from datetime import timedelta
3+
from itertools import count
4+
from typing import Callable
5+
6+
from crawlee import ConcurrencySettings, Request
7+
from crawlee.crawlers import BasicCrawlingContext, HttpCrawler, HttpCrawlingContext
8+
from crawlee.errors import RequestCollisionError
9+
from crawlee.sessions import Session, SessionPool
10+
11+
12+
# Define a function for creating sessions with simple logic for unique `id` generation.
13+
# This is necessary if you need to specify a particular session for the first request,
14+
# for example during authentication
15+
def create_session_function() -> Callable[[], Session]:
16+
counter = count()
17+
18+
def create_session() -> Session:
19+
return Session(
20+
id=str(next(counter)),
21+
max_usage_count=999_999,
22+
max_age=timedelta(hours=999_999),
23+
max_error_score=100,
24+
blocked_status_codes=[403],
25+
)
26+
27+
return create_session
28+
29+
30+
async def main() -> None:
31+
crawler = HttpCrawler(
32+
# Adjust request limits according to your pool size
33+
concurrency_settings=ConcurrencySettings(max_tasks_per_minute=500),
34+
# Requests are bound to specific sessions, no rotation needed
35+
max_session_rotations=0,
36+
session_pool=SessionPool(
37+
max_pool_size=10, create_session_function=create_session_function()
38+
),
39+
)
40+
41+
@crawler.router.default_handler
42+
async def basic_handler(context: HttpCrawlingContext) -> None:
43+
context.log.info(f'Processing {context.request.url}')
44+
45+
# Initialize the session and bind the next request to this session if needed
46+
@crawler.router.handler(label='session_init')
47+
async def session_init(context: HttpCrawlingContext) -> None:
48+
next_requests = []
49+
if context.session:
50+
context.log.info(f'Init session {context.session.id}')
51+
next_request = Request.from_url(
52+
'https://placeholder.dev', session_id=context.session.id
53+
)
54+
next_requests.append(next_request)
55+
56+
await context.add_requests(next_requests)
57+
58+
# Handle errors when a session is blocked and no longer available in the pool
59+
# when attempting to execute requests bound to it
60+
@crawler.failed_request_handler
61+
async def error_processing(context: BasicCrawlingContext, error: Exception) -> None:
62+
if isinstance(error, RequestCollisionError) and context.session:
63+
context.log.error(
64+
f'Request {context.request.url} failed, because the bound '
65+
'session is unavailable'
66+
)
67+
68+
# Create a pool of requests bound to their respective sessions
69+
# Use `always_enqueue=True` if session initialization happens on a non-unique address,
70+
# such as the site's main page
71+
init_requests = [
72+
Request.from_url(
73+
'https://example.org/',
74+
label='session_init',
75+
session_id=str(session_id),
76+
use_extended_unique_key=True,
77+
)
78+
for session_id in range(1, 11)
79+
]
80+
81+
await crawler.run(init_requests)
82+
83+
84+
if __name__ == '__main__':
85+
asyncio.run(main())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from crawlee import ConcurrencySettings, Request
5+
from crawlee.crawlers import BasicCrawlingContext, HttpCrawler, HttpCrawlingContext
6+
from crawlee.errors import SessionError
7+
from crawlee.sessions import SessionPool
8+
9+
10+
async def main() -> None:
11+
crawler = HttpCrawler(
12+
# Limit requests per minute to reduce the chance of being blocked
13+
concurrency_settings=ConcurrencySettings(max_tasks_per_minute=50),
14+
# Disable session rotation
15+
max_session_rotations=0,
16+
session_pool=SessionPool(
17+
# Only one session in the pool
18+
max_pool_size=1,
19+
create_session_settings={
20+
# High value for session usage limit
21+
'max_usage_count': 999_999,
22+
# High value for session lifetime
23+
'max_age': timedelta(hours=999_999),
24+
# High score allows the session to encounter more errors
25+
# before crawlee decides the session is blocked
26+
# Make sure you know how to handle these errors
27+
'max_error_score': 100,
28+
# 403 status usually indicates you're already blocked
29+
'blocked_status_codes': [403],
30+
},
31+
),
32+
)
33+
34+
# Basic request handling logic
35+
@crawler.router.default_handler
36+
async def basic_handler(context: HttpCrawlingContext) -> None:
37+
context.log.info(f'Processing {context.request.url}')
38+
39+
# Handler for session initialization (authentication, initial cookies, etc.)
40+
@crawler.router.handler(label='session_init')
41+
async def session_init(context: HttpCrawlingContext) -> None:
42+
if context.session:
43+
context.log.info(f'Init session {context.session.id}')
44+
45+
# Monitor if our session gets blocked and explicitly stop the crawler
46+
@crawler.error_handler
47+
async def error_processing(context: BasicCrawlingContext, error: Exception) -> None:
48+
if isinstance(error, SessionError) and context.session:
49+
context.log.info(f'Session {context.session.id} blocked')
50+
crawler.stop()
51+
52+
await crawler.run([Request.from_url('https://example.org/', label='session_init')])
53+
54+
55+
if __name__ == '__main__':
56+
asyncio.run(main())

docs/guides/session_management.mdx

+24
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import BeautifulSoupSource from '!!raw-loader!./code_examples/session_management
1515
import ParselSource from '!!raw-loader!./code_examples/session_management/session_management_parsel.py';
1616
import PlaywrightSource from '!!raw-loader!./code_examples/session_management/session_management_playwright.py';
1717
import StandaloneSource from '!!raw-loader!./code_examples/session_management/session_management_standalone.py';
18+
import OneSession from '!!raw-loader!./code_examples/session_management/one_session_http.py';
19+
import MultiSessions from '!!raw-loader!./code_examples/session_management/multi_sessions_http.py';
1820

1921
The <ApiLink to="class/SessionPool">`SessionPool`</ApiLink> class provides a robust way to manage the rotation of proxy IP addresses, cookies, and other custom settings in Crawlee. Its primary advantage is the ability to filter out blocked or non-functional proxies, ensuring that your scraper avoids retrying requests through known problematic proxies.
2022

@@ -68,3 +70,25 @@ Now, let's explore examples of how to use the <ApiLink to="class/SessionPool">`S
6870
These examples demonstrate the basics of configuring and using the <ApiLink to="class/SessionPool">`SessionPool`</ApiLink>.
6971

7072
Please, bear in mind that <ApiLink to="class/SessionPool">`SessionPool`</ApiLink> requires some time to establish a stable pool of working IPs. During the initial setup, you may encounter errors as the pool identifies and filters out blocked or non-functional IPs. This stabilization period is expected and will improve over time.
73+
74+
## Configuring a single session
75+
76+
In some cases, you need full control over session usage. For example, when working with websites requiring authentication or initialization of certain parameters like cookies.
77+
78+
When working with a site that requires authentication, we typically don't want multiple sessions with different browser fingerprints or client parameters accessing the site. In this case, we need to configure the <ApiLink to="class/SessionPool">`SessionPool`</ApiLink> appropriately:
79+
80+
<CodeBlock language="py">
81+
{OneSession}
82+
</CodeBlock>
83+
84+
## Binding requests to specific sessions
85+
86+
In the previous example, there's one obvious limitation - you're restricted to only one session.
87+
88+
In some cases, we need to achieve the same behavior but using multiple sessions in parallel, such as authenticating with different profiles or using different proxies.
89+
90+
To do this, use the `session_id` parameter for the <ApiLink to="class/Request">`Request`</ApiLink> object to bind a request to a specific session:
91+
92+
<CodeBlock language="py">
93+
{MultiSessions}
94+
</CodeBlock>

src/crawlee/_request.py

+20-2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ class CrawleeRequestData(BaseModel):
5858
crawl_depth: Annotated[int, Field(alias='crawlDepth')] = 0
5959
"""The depth of the request in the crawl tree."""
6060

61+
session_id: Annotated[str | None, Field()] = None
62+
"""ID of a session to which the request is bound."""
63+
6164

6265
class UserData(BaseModel, MutableMapping[str, JsonSerializable]):
6366
"""Represents the `user_data` part of a Request.
@@ -84,6 +87,7 @@ def __setitem__(self, key: str, value: JsonSerializable) -> None:
8487
raise ValueError('`label` must be str or None')
8588

8689
self.label = value
90+
8791
self.__pydantic_extra__[key] = value
8892

8993
def __delitem__(self, key: str) -> None:
@@ -119,6 +123,7 @@ class RequestOptions(TypedDict):
119123
headers: NotRequired[HttpHeaders | dict[str, str] | None]
120124
payload: NotRequired[HttpPayload | str | None]
121125
label: NotRequired[str | None]
126+
session_id: NotRequired[str | None]
122127
unique_key: NotRequired[str | None]
123128
id: NotRequired[str | None]
124129
keep_url_fragment: NotRequired[bool]
@@ -227,6 +232,7 @@ def from_url(
227232
headers: HttpHeaders | dict[str, str] | None = None,
228233
payload: HttpPayload | str | None = None,
229234
label: str | None = None,
235+
session_id: str | None = None,
230236
unique_key: str | None = None,
231237
id: str | None = None,
232238
keep_url_fragment: bool = False,
@@ -248,14 +254,17 @@ def from_url(
248254
payload: The data to be sent as the request body. Typically used with 'POST' or 'PUT' requests.
249255
label: A custom label to differentiate between request types. This is stored in `user_data`, and it is
250256
used for request routing (different requests go to different handlers).
257+
session_id: ID of a specific `Session` to which the request will be strictly bound.
258+
If the session becomes unavailable when the request is processed, a `RequestCollisionError` will be
259+
raised.
251260
unique_key: A unique key identifying the request. If not provided, it is automatically computed based on
252261
the URL and other parameters. Requests with the same `unique_key` are treated as identical.
253262
id: A unique identifier for the request. If not provided, it is automatically generated from the
254263
`unique_key`.
255264
keep_url_fragment: Determines whether the URL fragment (e.g., `#section`) should be included in
256265
the `unique_key` computation. This is only relevant when `unique_key` is not provided.
257-
use_extended_unique_key: Determines whether to include the HTTP method and payload in the `unique_key`
258-
computation. This is only relevant when `unique_key` is not provided.
266+
use_extended_unique_key: Determines whether to include the HTTP method, ID Session and payload in the
267+
`unique_key` computation. This is only relevant when `unique_key` is not provided.
259268
always_enqueue: If set to `True`, the request will be enqueued even if it is already present in the queue.
260269
Using this is not allowed when a custom `unique_key` is also provided and will result in a `ValueError`.
261270
**kwargs: Additional request properties.
@@ -274,6 +283,7 @@ def from_url(
274283
method=method,
275284
headers=headers,
276285
payload=payload,
286+
session_id=session_id,
277287
keep_url_fragment=keep_url_fragment,
278288
use_extended_unique_key=use_extended_unique_key,
279289
)
@@ -296,6 +306,9 @@ def from_url(
296306
if label is not None:
297307
request.user_data['label'] = label
298308

309+
if session_id is not None:
310+
request.crawlee_data.session_id = session_id
311+
299312
return request
300313

301314
def get_query_param_from_url(self, param: str, *, default: str | None = None) -> str | None:
@@ -308,6 +321,11 @@ def label(self) -> str | None:
308321
"""A string used to differentiate between arbitrary request types."""
309322
return cast('UserData', self.user_data).label
310323

324+
@property
325+
def session_id(self) -> str | None:
326+
"""The ID of the bound session, if there is any."""
327+
return self.crawlee_data.session_id
328+
311329
@property
312330
def crawlee_data(self) -> CrawleeRequestData:
313331
"""Crawlee-specific configuration stored in the `user_data`."""

src/crawlee/_utils/requests.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ def compute_unique_key(
7878
method: HttpMethod = 'GET',
7979
headers: HttpHeaders | None = None,
8080
payload: HttpPayload | None = None,
81+
session_id: str | None = None,
8182
*,
8283
keep_url_fragment: bool = False,
8384
use_extended_unique_key: bool = False,
@@ -96,6 +97,7 @@ def compute_unique_key(
9697
payload: The data to be sent as the request body.
9798
keep_url_fragment: A flag indicating whether to keep the URL fragment.
9899
use_extended_unique_key: A flag indicating whether to include a hashed payload in the key.
100+
session_id: The ID of a specific `Session` to which the request will be strictly bound
99101
100102
Returns:
101103
A string representing the unique key for the request.
@@ -114,9 +116,13 @@ def compute_unique_key(
114116
if use_extended_unique_key:
115117
payload_hash = _get_payload_hash(payload)
116118
headers_hash = _get_headers_hash(headers)
119+
normalized_session = '' if session_id is None else session_id.lower()
117120

118121
# Return the extended unique key. Use pipe as a separator of the different parts of the unique key.
119-
return f'{normalized_method}|{headers_hash}|{payload_hash}|{normalized_url}'
122+
extended_part = f'{normalized_method}|{headers_hash}|{payload_hash}'
123+
if normalized_session:
124+
extended_part = f'{extended_part}|{normalized_session}'
125+
return f'{extended_part}|{normalized_url}'
120126

121127
# Log information if there is a non-GET request with a payload.
122128
if normalized_method != 'GET' and payload:

src/crawlee/crawlers/_basic/_basic_crawler.py

+40-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
ContextPipelineInterruptedError,
4141
HttpClientStatusCodeError,
4242
HttpStatusCodeError,
43+
RequestCollisionError,
4344
RequestHandlerError,
4445
SessionError,
4546
UserDefinedErrorHandlerError,
@@ -449,6 +450,20 @@ async def _get_session(self) -> Session | None:
449450
logger=self._logger,
450451
)
451452

453+
async def _get_session_by_id(self, session_id: str | None) -> Session | None:
454+
"""If session pool is being used, try to take a session by id from it."""
455+
if not self._use_session_pool or not session_id:
456+
return None
457+
458+
return await wait_for(
459+
partial(self._session_pool.get_session_by_id, session_id),
460+
timeout=self._internal_timeout,
461+
timeout_message='Fetching a session from the pool timed out after '
462+
f'{self._internal_timeout.total_seconds()} seconds',
463+
max_retries=3,
464+
logger=self._logger,
465+
)
466+
452467
async def _get_proxy_info(self, request: Request, session: Session | None) -> ProxyInfo | None:
453468
"""Retrieve a new ProxyInfo object based on crawler configuration and the current request and session."""
454469
if not self._proxy_configuration:
@@ -1065,7 +1080,10 @@ async def __run_task_function(self) -> None:
10651080
if request is None:
10661081
return
10671082

1068-
session = await self._get_session()
1083+
if request.session_id:
1084+
session = await self._get_session_by_id(request.session_id)
1085+
else:
1086+
session = await self._get_session()
10691087
proxy_info = await self._get_proxy_info(request, session)
10701088
result = RequestHandlerRunResult(key_value_store_getter=self.get_key_value_store)
10711089

@@ -1088,6 +1106,8 @@ async def __run_task_function(self) -> None:
10881106
try:
10891107
request.state = RequestState.REQUEST_HANDLER
10901108

1109+
self._check_request_collision(context.request, context.session)
1110+
10911111
try:
10921112
await self._run_request_handler(context=context)
10931113
except asyncio.TimeoutError as e:
@@ -1110,6 +1130,10 @@ async def __run_task_function(self) -> None:
11101130

11111131
self._statistics.record_request_processing_finish(statistics_id)
11121132

1133+
except RequestCollisionError as request_error:
1134+
context.request.no_retry = True
1135+
await self._handle_request_error(context, request_error)
1136+
11131137
except RequestHandlerError as primary_error:
11141138
primary_error = cast(
11151139
'RequestHandlerError[TCrawlingContext]', primary_error
@@ -1226,3 +1250,18 @@ def _raise_for_session_blocked_status_code(self, session: Session | None, status
12261250
ignore_http_error_status_codes=self._ignore_http_error_status_codes,
12271251
):
12281252
raise SessionError(f'Assuming the session is blocked based on HTTP status code {status_code}')
1253+
1254+
def _check_request_collision(self, request: Request, session: Session | None) -> None:
1255+
"""Raise an exception if a request cannot access required resources.
1256+
1257+
Args:
1258+
request: The `Request` that might require specific resources (like a session).
1259+
session: The `Session` that was retrieved for the request, or `None` if not available.
1260+
1261+
Raises:
1262+
RequestCollisionError: If the `Session` referenced by the `Request` is not available.
1263+
"""
1264+
if self._use_session_pool and request.session_id and not session:
1265+
raise RequestCollisionError(
1266+
f'The Session (id: {request.session_id}) bound to the Request is no longer available in SessionPool'
1267+
)

src/crawlee/errors.py

+6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
'HttpClientStatusCodeError',
1515
'HttpStatusCodeError',
1616
'ProxyError',
17+
'RequestCollisionError',
1718
'RequestHandlerError',
1819
'ServiceConflictError',
1920
'SessionError',
@@ -106,3 +107,8 @@ def __init__(self, wrapped_exception: Exception, crawling_context: BasicCrawling
106107
@docs_group('Errors')
107108
class ContextPipelineInterruptedError(Exception):
108109
"""May be thrown in the initialization phase of a middleware to signal that the request should not be processed."""
110+
111+
112+
@docs_group('Errors')
113+
class RequestCollisionError(Exception):
114+
"""Raised when a request cannot be processed due to a conflict with required resources."""

0 commit comments

Comments
 (0)