13
13
14
14
if TYPE_CHECKING :
15
15
from datetime import timedelta
16
+ from types import TracebackType
16
17
17
18
from crawlee .events .types import Event , EventData , Listener , WrappedListener
18
19
@@ -26,8 +27,13 @@ class EventManager:
26
27
their execution. It is built on top of the `pyee.asyncio.AsyncIOEventEmitter` class.
27
28
"""
28
29
29
- def __init__ (self ) -> None :
30
- logger .debug ('Calling EventManager.__init__()...' )
30
+ def __init__ (self , close_timeout : timedelta | None = None ) -> None :
31
+ """Create a new instance.
32
+
33
+ Args:
34
+ close_timeout: Optional timeout after which the pending event listeners are canceled.
35
+ """
36
+ self ._close_timeout = close_timeout
31
37
32
38
# Asynchronous event emitter for handle events and invoke the event listeners.
33
39
self ._event_emitter = AsyncIOEventEmitter ()
@@ -41,19 +47,35 @@ def __init__(self) -> None:
41
47
lambda : defaultdict (list ),
42
48
)
43
49
50
+ async def __aenter__ (self ) -> EventManager :
51
+ """Initializes the event manager upon entering the async context."""
52
+ return self
53
+
54
+ async def __aexit__ (
55
+ self ,
56
+ exc_type : type [BaseException ] | None ,
57
+ exc_value : BaseException | None ,
58
+ exc_traceback : TracebackType | None ,
59
+ ) -> None :
60
+ """Closes the local event manager upon exiting the async context.
61
+
62
+ This will stop listening for the events, and it will wait for all the event listeners to finish.
63
+ """
64
+ await self .wait_for_all_listeners_to_complete (timeout = self ._close_timeout )
65
+ self ._event_emitter .remove_all_listeners ()
66
+ self ._listener_tasks .clear ()
67
+ self ._listeners_to_wrappers .clear ()
68
+
44
69
def on (self , * , event : Event , listener : Listener ) -> None :
45
70
"""Add an event listener to the event manager.
46
71
47
72
Args:
48
73
event: The Actor event for which to listen to.
49
74
listener: The function (sync or async) which is to be called when the event is emitted.
50
75
"""
51
- logger .debug ('Calling EventManager.on()...' )
52
76
53
77
@wraps (listener )
54
78
async def listener_wrapper (event_data : EventData ) -> None :
55
- logger .debug ('Calling EventManager.on.listener_wrapper()...' )
56
-
57
79
# If the listener is a coroutine function, just call it, otherwise, run it in a separate thread
58
80
# to avoid blocking the event loop
59
81
coro = (
@@ -92,8 +114,6 @@ def off(self, *, event: Event, listener: Listener | None = None) -> None:
92
114
listener: The listener which is supposed to be removed. If not passed, all listeners of this event
93
115
are removed.
94
116
"""
95
- logger .debug ('Calling EventManager.off()...' )
96
-
97
117
if listener :
98
118
for listener_wrapper in self ._listeners_to_wrappers [event ][listener ]:
99
119
self ._event_emitter .remove_listener (event .value , listener_wrapper )
@@ -109,31 +129,15 @@ def emit(self, *, event: Event, event_data: EventData) -> None:
109
129
event: The event which will be emitted.
110
130
event_data: The data which will be passed to the event listeners.
111
131
"""
112
- logger .debug ('Calling EventManager.emit()...' )
113
132
self ._event_emitter .emit (event .value , event_data )
114
133
115
- async def close (self , * , timeout : timedelta | None = None ) -> None :
116
- """Close the event manager.
117
-
118
- This will stop listening for the events, and it will wait for all the event listeners to finish.
119
-
120
- Args:
121
- timeout: Optional timeout after which the pending event listeners are canceled.
122
- """
123
- logger .debug ('Calling EventManager.close()...' )
124
- await self ._wait_for_all_listeners_to_complete (timeout = timeout )
125
- self ._event_emitter .remove_all_listeners ()
126
- self ._listener_tasks .clear ()
127
- self ._listeners_to_wrappers .clear ()
128
-
129
- async def _wait_for_all_listeners_to_complete (self , * , timeout : timedelta | None = None ) -> None :
134
+ async def wait_for_all_listeners_to_complete (self , * , timeout : timedelta | None = None ) -> None :
130
135
"""Wait for all currently executing event listeners to complete.
131
136
132
137
Args:
133
138
timeout: The maximum time to wait for the event listeners to finish. If they do not complete within
134
139
the specified timeout, they will be canceled.
135
140
"""
136
- logger .debug ('Calling EventManager.wait_for_all_listeners_to_complete()...' )
137
141
138
142
async def wait_for_listeners () -> None :
139
143
"""Gathers all listener tasks and awaits their completion, logging any exceptions encountered."""
0 commit comments