Skip to content

Fix storages metadata & caching #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "crawlee"
version = "0.0.2"
version = "0.0.3"
description = "Crawlee for Python"
authors = ["Apify Technologies s.r.o. <[email protected]>"]
license = "Apache-2.0"
Expand Down
4 changes: 0 additions & 4 deletions src/crawlee/_utils/recurring_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,17 @@ def __init__(self, func: Callable, delay: timedelta) -> None:

async def _wrapper(self) -> None:
"""Internal method that repeatedly executes the provided function with the specified delay."""
logger.debug('Calling RecurringTask._wrapper()...')
sleep_time_secs = self.delay.total_seconds()
while True:
logger.debug('RecurringTask._wrapper(): calling self.func()...')
await self.func() if asyncio.iscoroutinefunction(self.func) else self.func()
await asyncio.sleep(sleep_time_secs)

def start(self) -> None:
"""Start the recurring task execution."""
logger.debug('Calling RecurringTask.start()...')
self.task = asyncio.create_task(self._wrapper(), name=f'Task-recurring-{self.func.__name__}')

async def stop(self) -> None:
"""Stop the recurring task execution."""
logger.debug('Calling RecurringTask.stop()...')
if self.task:
self.task.cancel()
# Ensure the task has a chance to properly handle the cancellation and any potential exceptions.
Expand Down
8 changes: 6 additions & 2 deletions src/crawlee/http_crawler/types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from dataclasses import dataclass
from __future__ import annotations

from httpx import Response
from dataclasses import dataclass
from typing import TYPE_CHECKING

from crawlee.basic_crawler.types import BasicCrawlingContext

if TYPE_CHECKING:
from httpx import Response


@dataclass(frozen=True)
class HttpCrawlingContext(BasicCrawlingContext):
Expand Down
40 changes: 23 additions & 17 deletions src/crawlee/resource_clients/dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import os
from datetime import datetime, timezone
from logging import getLogger
from typing import TYPE_CHECKING, Any, AsyncIterator

import aiofiles
Expand All @@ -21,6 +22,8 @@
from crawlee.storage_clients import MemoryStorageClient
from crawlee.storages.types import JSONSerializable

logger = getLogger(__name__)


class DatasetClient(BaseResourceClient):
"""Sub-client for manipulating a single dataset."""
Expand Down Expand Up @@ -95,37 +98,42 @@ def _create_from_directory(
created_at = datetime.now(timezone.utc)
accessed_at = datetime.now(timezone.utc)
modified_at = datetime.now(timezone.utc)
entries: dict[str, dict] = {}

# Load metadata if it exists
metadata_filepath = os.path.join(storage_directory, '__metadata__.json')

if os.path.exists(metadata_filepath):
with open(metadata_filepath, encoding='utf-8') as f:
json_content = json.load(f)
resource_info = DatasetResourceInfo(**json_content)

id_ = resource_info.id
name = resource_info.name
item_count = resource_info.item_count
created_at = resource_info.created_at
accessed_at = resource_info.accessed_at
modified_at = resource_info.modified_at

# Load dataset entries
entries: dict[str, dict] = {}
has_seen_metadata_file = False

# Access the dataset folder
for entry in os.scandir(storage_directory):
if entry.is_file():
if entry.name == '__metadata__.json':
has_seen_metadata_file = True

# We have found the dataset's metadata file, build out information based on it
with open(os.path.join(storage_directory, entry.name), encoding='utf-8') as f:
metadata = json.load(f)
id_ = metadata['id']
name = metadata['name']
item_count = metadata['itemCount']
created_at = datetime.fromisoformat(metadata['createdAt'])
accessed_at = datetime.fromisoformat(metadata['accessedAt'])
modified_at = datetime.fromisoformat(metadata['modifiedAt'])

continue

with open(os.path.join(storage_directory, entry.name), encoding='utf-8') as f:
entry_content = json.load(f)
entry_name = entry.name.split('.')[0]

entry_name = entry.name.split('.')[0]
entries[entry_name] = entry_content

if not has_seen_metadata_file:
item_count += 1

# Create new dataset client
new_client = DatasetClient(
base_storage_directory=memory_storage_client.datasets_directory,
memory_storage_client=memory_storage_client,
Expand All @@ -137,9 +145,7 @@ def _create_from_directory(
item_count=item_count,
)

for entry_id, content in entries.items():
new_client.dataset_entries[entry_id] = content

new_client.dataset_entries.update(entries)
return new_client

@override
Expand Down
23 changes: 14 additions & 9 deletions src/crawlee/resource_clients/key_value_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,21 @@ def _create_from_directory(
accessed_at = datetime.now(timezone.utc)
modified_at = datetime.now(timezone.utc)

store_metadata_path = os.path.join(storage_directory, '__metadata__.json')
if os.path.exists(store_metadata_path):
with open(store_metadata_path, encoding='utf-8') as f:
metadata = json.load(f)
id_ = metadata['id']
name = metadata['name']
created_at = datetime.fromisoformat(metadata['createdAt'])
accessed_at = datetime.fromisoformat(metadata['accessedAt'])
modified_at = datetime.fromisoformat(metadata['modifiedAt'])
# Load metadata if it exists
metadata_filepath = os.path.join(storage_directory, '__metadata__.json')

if os.path.exists(metadata_filepath):
with open(metadata_filepath, encoding='utf-8') as f:
json_content = json.load(f)
resource_info = KeyValueStoreResourceInfo(**json_content)

id_ = resource_info.id
name = resource_info.name
created_at = resource_info.created_at
accessed_at = resource_info.accessed_at
modified_at = resource_info.modified_at

# Create new KVS client
new_client = KeyValueStoreClient(
base_storage_directory=memory_storage_client.key_value_stores_directory,
memory_storage_client=memory_storage_client,
Expand Down
53 changes: 31 additions & 22 deletions src/crawlee/resource_clients/request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,32 +113,43 @@ def _create_from_directory(
modified_at = datetime.now(timezone.utc)
handled_request_count = 0
pending_request_count = 0
entries: list[dict] = []

# Access the request queue folder
# Load metadata if it exists
metadata_filepath = os.path.join(storage_directory, '__metadata__.json')

if os.path.exists(metadata_filepath):
with open(metadata_filepath, encoding='utf-8') as f:
json_content = json.load(f)
resource_info = RequestQueueResourceInfo(**json_content)

id_ = resource_info.id
name = resource_info.name
created_at = resource_info.created_at
accessed_at = resource_info.accessed_at
modified_at = resource_info.modified_at
handled_request_count = resource_info.handled_request_count
pending_request_count = resource_info.pending_request_count

# Load request entries
entries: dict[str, Request] = {}

for entry in os.scandir(storage_directory):
if entry.is_file():
if entry.name == '__metadata__.json':
# We have found the queue's metadata file, build out information based on it
with open(os.path.join(storage_directory, entry.name), encoding='utf-8') as f:
metadata = json.load(f)

id_ = metadata['id']
name = metadata['name']
created_at = datetime.fromisoformat(metadata['createdAt'])
accessed_at = datetime.fromisoformat(metadata['accessedAt'])
modified_at = datetime.fromisoformat(metadata['modifiedAt'])
handled_request_count = metadata['handledRequestCount']
pending_request_count = metadata['pendingRequestCount']
continue

with open(os.path.join(storage_directory, entry.name), encoding='utf-8') as f:
request = json.load(f)
if request.order_no:
request.order_no = Decimal(request.order_no)
entries.append(request)
content = json.load(f)

new_client = cls(
request = Request(**content)
order_no = request.order_no
if order_no:
request.order_no = Decimal(order_no)

entries[request.id] = request

# Create new RQ client
new_client = RequestQueueClient(
base_storage_directory=memory_storage_client.request_queues_directory,
memory_storage_client=memory_storage_client,
id_=id_,
Expand All @@ -150,9 +161,7 @@ def _create_from_directory(
pending_request_count=pending_request_count,
)

for request in entries:
new_client.requests[request['id']] = request

new_client.requests.update(entries)
return new_client

@override
Expand Down Expand Up @@ -492,7 +501,7 @@ async def _persist_single_request_to_storage(
# Write the request to the file
file_path = os.path.join(entity_directory, f'{request.id}.json')
async with aiofiles.open(file_path, mode='wb') as f:
s = await json_dumps(request)
s = await json_dumps(request.model_dump())
await f.write(s.encode('utf-8'))

async def _delete_request_file_from_storage(self, *, request_id: str, entity_directory: str) -> None:
Expand Down
13 changes: 11 additions & 2 deletions src/crawlee/storages/base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from abc import ABC, abstractmethod
from logging import getLogger
from typing import TYPE_CHECKING, Generic, TypeVar, cast

from typing_extensions import Self
Expand All @@ -19,6 +20,8 @@
BaseResourceClientType = TypeVar('BaseResourceClientType')
BaseResourceCollectionClientType = TypeVar('BaseResourceCollectionClientType')

logger = getLogger(__name__)


class BaseStorage(ABC, Generic[BaseResourceClientType, BaseResourceCollectionClientType]):
"""A class for managing storages."""
Expand Down Expand Up @@ -178,7 +181,13 @@ def _ensure_class_initialized(cls) -> None:

def _remove_from_cache(self) -> None:
if self.__class__.cache_by_id is not None:
del self.__class__.cache_by_id[self.id]
try:
del self.__class__.cache_by_id[self.id]
except KeyError as exc:
raise RuntimeError(f'Storage with provided ID was not found ({self.id}).') from exc

if self.name and self.__class__.cache_by_name is not None:
del self.__class__.cache_by_name[self.name]
try:
del self.__class__.cache_by_name[self.name]
except KeyError as exc:
raise RuntimeError(f'Storage with provided name was not found ({self.name}).') from exc
12 changes: 10 additions & 2 deletions src/crawlee/storages/types.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union

if TYPE_CHECKING:
from datetime import datetime

from crawlee.request import Request

T = TypeVar('T')
Expand Down Expand Up @@ -97,6 +96,15 @@ class BaseResourceInfo:
created_at: datetime
modified_at: datetime

def __post_init__(self) -> None:
"""Convert string dates to datetime objects."""
if isinstance(self.accessed_at, str): # type: ignore
self.accessed_at = datetime.fromisoformat(self.accessed_at) # type: ignore
if isinstance(self.created_at, str): # type: ignore
self.created_at = datetime.fromisoformat(self.created_at) # type: ignore
if isinstance(self.modified_at, str): # type: ignore
self.modified_at = datetime.fromisoformat(self.modified_at) # type: ignore


@dataclass
class DatasetResourceInfo(BaseResourceInfo):
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/_utils/test_byte_size.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import pytest

from crawlee._utils.byte_size import ByteSize
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/_utils/test_crypto.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from crawlee._utils.crypto import compute_short_hash, crypto_random_object_id


Expand Down
2 changes: 2 additions & 0 deletions tests/unit/_utils/test_data_processing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from datetime import datetime, timezone
from enum import Enum

Expand Down
2 changes: 2 additions & 0 deletions tests/unit/_utils/test_measure_time.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import asyncio
import time

Expand Down
3 changes: 3 additions & 0 deletions tests/unit/basic_crawler/test_basic_crawler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# ruff: noqa: ARG001

from __future__ import annotations

import pytest

from crawlee.basic_crawler.basic_crawler import BasicCrawler, UserDefinedErrorHandlerError
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/basic_crawler/test_context_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import AsyncGenerator
from unittest.mock import AsyncMock
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/resource_clients/test_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def test_update(dataset_client: DatasetClient) -> None:
updated_dataset_info = await dataset_client.update(name=new_dataset_name)
assert os.path.exists(os.path.join(old_dataset_directory, '000000001.json')) is False
assert os.path.exists(os.path.join(new_dataset_directory, '000000001.json')) is True
# Only modifiedAt and accessedAt should be different
# Only modified_at and accessed_at should be different
assert old_dataset_info.created_at == updated_dataset_info.created_at
assert old_dataset_info.modified_at != updated_dataset_info.modified_at
assert old_dataset_info.accessed_at != updated_dataset_info.accessed_at
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/resource_clients/test_key_value_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async def test_update(key_value_store_client: KeyValueStoreClient) -> None:
updated_kvs_info = await key_value_store_client.update(name=new_kvs_name)
assert os.path.exists(os.path.join(old_kvs_directory, 'test.json')) is False
assert os.path.exists(os.path.join(new_kvs_directory, 'test.json')) is True
# Only modifiedAt and accessedAt should be different
# Only modified_at and accessed_at should be different
assert old_kvs_info.created_at == updated_kvs_info.created_at
assert old_kvs_info.modified_at != updated_kvs_info.modified_at
assert old_kvs_info.accessed_at != updated_kvs_info.accessed_at
Expand Down Expand Up @@ -401,10 +401,10 @@ async def test_reads_correct_metadata(memory_storage_client: MemoryStorageClient
store_metadata = {
'id': crypto_random_object_id(),
'name': None,
'accessedAt': datetime.now(timezone.utc),
'createdAt': datetime.now(timezone.utc),
'modifiedAt': datetime.now(timezone.utc),
'userId': '1',
'accessed_at': datetime.now(timezone.utc),
'created_at': datetime.now(timezone.utc),
'modified_at': datetime.now(timezone.utc),
'user_id': '1',
}

# Write the store metadata to disk
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/resource_clients/test_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async def test_update(request_queue_client: RequestQueueClient) -> None:
updated_rq_info = await request_queue_client.update(name=new_rq_name)
assert os.path.exists(os.path.join(old_rq_directory, 'fvwscO2UJLdr10B.json')) is False
assert os.path.exists(os.path.join(new_rq_directory, 'fvwscO2UJLdr10B.json')) is True
# Only modifiedAt and accessedAt should be different
# Only modified_at and accessed_at should be different
assert old_rq_info.created_at == updated_rq_info.created_at
assert old_rq_info.modified_at != updated_rq_info.modified_at
assert old_rq_info.accessed_at != updated_rq_info.accessed_at
Expand Down
Loading