diff --git a/src/apify_client/_types.py b/src/apify_client/_types.py index af1c58f5..e13443af 100644 --- a/src/apify_client/_types.py +++ b/src/apify_client/_types.py @@ -31,7 +31,7 @@ class ListPage(Generic[T]): desc: bool """Whether the listing is descending or not.""" - def __init__(self: ListPage, data: dict) -> None: + def __init__(self, data: dict) -> None: """Initialize a ListPage instance from the API response data.""" self.items = data.get('items', []) self.offset = data.get('offset', 0) diff --git a/src/apify_client/clients/base/base_client.py b/src/apify_client/clients/base/base_client.py index c5aa744c..c3d99c04 100644 --- a/src/apify_client/clients/base/base_client.py +++ b/src/apify_client/clients/base/base_client.py @@ -1,14 +1,23 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Generator, Iterable, Iterator +from typing import ( + TYPE_CHECKING, + Any, + Generic, + Protocol, + TypeVar, +) from apify_client._logging import WithLogDetailsClient +from apify_client._types import ListPage from apify_client._utils import to_safe_id # Conditional import only executed when type checking, otherwise we'd get circular dependency issues if TYPE_CHECKING: from apify_client import ApifyClient, ApifyClientAsync from apify_client._http_client import HTTPClient, HTTPClientAsync +T = TypeVar('T') class _BaseBaseClient(metaclass=WithLogDetailsClient): @@ -87,6 +96,40 @@ def __init__( self.safe_id = to_safe_id(self.resource_id) self.url = f'{self.url}/{self.safe_id}' + @staticmethod + def _list_iterable_from_callback(callback: Callable[..., ListPage[T]], **kwargs: Any) -> ListPageProtocol[T]: + """Return object can be awaited or iterated over. + + Not using total from the API response as it can change during iteration. + """ + chunk_size = kwargs.pop('chunk_size', 0) or 0 + offset = kwargs.get('offset') or 0 + limit = kwargs.get('limit') or 0 + + list_page = callback(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)}) + + def iterator() -> Iterator[T]: + current_page = list_page + for item in current_page.items: + yield item + + fetched_items = len(current_page.items) + while ( + current_page.items # If there are any items left to fetch + and (not limit or (limit > fetched_items)) # If there is limit to fetch, and it was not reached it yet. + ): + new_kwargs = { + **kwargs, + 'offset': offset + fetched_items, + 'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size), + } + current_page = callback(**new_kwargs) + for item in current_page.items: + yield item + fetched_items += len(current_page.items) + + return IterableListPage[T](list_page, iterator()) + class BaseClientAsync(_BaseBaseClient): """Base class for async sub-clients.""" @@ -127,3 +170,112 @@ def __init__( if self.resource_id is not None: self.safe_id = to_safe_id(self.resource_id) self.url = f'{self.url}/{self.safe_id}' + + @staticmethod + def _list_iterable_from_callback( + callback: Callable[..., Awaitable[ListPage[T]]], **kwargs: Any + ) -> ListPageProtocolAsync[T]: + """Return object can be awaited or iterated over. + + Not using total from the API response as it can change during iteration. + """ + chunk_size = kwargs.pop('chunk_size', 0) or 0 + offset = kwargs.get('offset') or 0 + limit = kwargs.get('limit') or 0 + + list_page_awaitable = callback(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)}) + + async def async_iterator() -> AsyncIterator[T]: + current_page = await list_page_awaitable + for item in current_page.items: + yield item + + fetched_items = len(current_page.items) + while ( + current_page.items # If there are any items left to fetch + and (not limit or (limit > fetched_items)) # If there is limit to fetch, and it was not reached it yet. + ): + new_kwargs = { + **kwargs, + 'offset': offset + fetched_items, + 'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size), + } + current_page = await callback(**new_kwargs) + for item in current_page.items: + yield item + fetched_items += len(current_page.items) + + return IterableListPageAsync[T](list_page_awaitable, async_iterator()) + + +def _min_for_limit_param(a: int | None, b: int | None) -> int | None: + """Return minimum of two limit parameters, treating None or 0 as infinity. Return None for infinity.""" + # API treats 0 as None for limit parameter, in this context API understands 0 as infinity. + if a == 0: + a = None + if b == 0: + b = None + if a is None: + return b + if b is None: + return a + return min(a, b) + + +class ListPageProtocol(Iterable[T], Protocol[T]): + """Protocol for an object that can be both awaited and asynchronously iterated over.""" + + items: list[T] + """List of returned objects on this page.""" + + count: int + """Count of the returned objects on this page.""" + + offset: int + """The limit on the number of returned objects offset specified in the API call.""" + + limit: int + """The offset of the first object specified in the API call""" + + total: int + """Total number of objects matching the API call criteria.""" + + desc: bool + """Whether the listing is descending or not.""" + + +class ListPageProtocolAsync(AsyncIterable[T], Awaitable[ListPage[T]], Protocol[T]): + """Protocol for an object that can be both awaited and asynchronously iterated over.""" + + +class IterableListPage(ListPage[T], Generic[T]): + """Can be called to get ListPage with items or iterated over to get individual items.""" + + def __init__(self, list_page: ListPage[T], iterator: Iterator[T]) -> None: + self.items = list_page.items + self.offset = list_page.offset + self.limit = list_page.limit + self.count = list_page.count + self.total = list_page.total + self.desc = list_page.desc + self._iterator = iterator + + def __iter__(self) -> Iterator[T]: + """Return an iterator over the items from API, possibly doing multiple API calls.""" + return self._iterator + + +class IterableListPageAsync(Generic[T]): + """Can be awaited to get ListPage with items or asynchronously iterated over to get individual items.""" + + def __init__(self, awaitable: Awaitable[ListPage[T]], async_iterator: AsyncIterator[T]) -> None: + self._awaitable = awaitable + self._async_iterator = async_iterator + + def __aiter__(self) -> AsyncIterator[T]: + """Return an asynchronous iterator over the items from API, possibly doing multiple API calls.""" + return self._async_iterator + + def __await__(self) -> Generator[Any, Any, ListPage[T]]: + """Return an awaitable that resolves to the ListPage doing exactly one API call.""" + return self._awaitable.__await__() diff --git a/src/apify_client/clients/base/resource_collection_client.py b/src/apify_client/clients/base/resource_collection_client.py index 2e9c6063..f312f8f4 100644 --- a/src/apify_client/clients/base/resource_collection_client.py +++ b/src/apify_client/clients/base/resource_collection_client.py @@ -1,44 +1,17 @@ from __future__ import annotations -from typing import Any, Generic, TypeVar +from typing import Any, TypeVar from apify_client._utils import parse_date_fields, pluck_data -from apify_client.clients.base.base_client import BaseClient, BaseClientAsync +from apify_client.clients.base.base_client import ( + BaseClient, + BaseClientAsync, + ListPage, +) T = TypeVar('T') -class ListPage(Generic[T]): - """A single page of items returned from a list() method.""" - - items: list[T] - """List of returned objects on this page""" - - count: int - """Count of the returned objects on this page""" - - offset: int - """The limit on the number of returned objects offset specified in the API call""" - - limit: int - """The offset of the first object specified in the API call""" - - total: int - """Total number of objects matching the API call criteria""" - - desc: bool - """Whether the listing is descending or not""" - - def __init__(self, data: dict) -> None: - """Initialize a ListPage instance from the API response data.""" - self.items = data.get('items', []) - self.offset = data.get('offset', 0) - self.limit = data.get('limit', 0) - self.count = data['count'] if 'count' in data else len(self.items) - self.total = data['total'] if 'total' in data else self.offset + self.count - self.desc = data.get('desc', False) - - class ResourceCollectionClient(BaseClient): """Base class for sub-clients manipulating a resource collection.""" diff --git a/src/apify_client/clients/resource_clients/actor_collection.py b/src/apify_client/clients/resource_clients/actor_collection.py index 0786b650..a5c7876a 100644 --- a/src/apify_client/clients/resource_clients/actor_collection.py +++ b/src/apify_client/clients/resource_clients/actor_collection.py @@ -7,7 +7,7 @@ from apify_client.clients.resource_clients.actor import get_actor_representation if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class ActorCollectionClient(ResourceCollectionClient): @@ -25,7 +25,7 @@ def list( offset: int | None = None, desc: bool | None = None, sort_by: Literal['createdAt', 'stats.lastRunStartedAt'] | None = 'createdAt', - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the Actors the user has created or used. https://docs.apify.com/api/v2#/reference/actors/actor-collection/get-list-of-actors @@ -40,7 +40,9 @@ def list( Returns: The list of available Actors matching the specified filters. """ - return self._list(my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by) + return self._list_iterable_from_callback( + self._list, my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by + ) def create( self, @@ -142,7 +144,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'acts') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, my: bool | None = None, @@ -150,7 +152,7 @@ async def list( offset: int | None = None, desc: bool | None = None, sort_by: Literal['createdAt', 'stats.lastRunStartedAt'] | None = 'createdAt', - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the Actors the user has created or used. https://docs.apify.com/api/v2#/reference/actors/actor-collection/get-list-of-actors @@ -165,7 +167,9 @@ async def list( Returns: The list of available Actors matching the specified filters. """ - return await self._list(my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by) + return self._list_iterable_from_callback( + callback=self._list, my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by + ) async def create( self, diff --git a/src/apify_client/clients/resource_clients/actor_env_var_collection.py b/src/apify_client/clients/resource_clients/actor_env_var_collection.py index 217bdd22..4bd825f5 100644 --- a/src/apify_client/clients/resource_clients/actor_env_var_collection.py +++ b/src/apify_client/clients/resource_clients/actor_env_var_collection.py @@ -7,7 +7,7 @@ from apify_client.clients.resource_clients.actor_env_var import get_actor_env_var_representation if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class ActorEnvVarCollectionClient(ResourceCollectionClient): @@ -17,7 +17,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'env-vars') super().__init__(*args, resource_path=resource_path, **kwargs) - def list(self) -> ListPage[dict]: + def list(self) -> ListPageProtocol[dict]: """List the available actor environment variables. https://docs.apify.com/api/v2#/reference/actors/environment-variable-collection/get-list-of-environment-variables @@ -25,7 +25,7 @@ def list(self) -> ListPage[dict]: Returns: The list of available actor environment variables. """ - return self._list() + return self._list_iterable_from_callback(self._list) def create( self, @@ -62,7 +62,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'env-vars') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list(self) -> ListPage[dict]: + def list(self) -> ListPageProtocolAsync[dict]: """List the available actor environment variables. https://docs.apify.com/api/v2#/reference/actors/environment-variable-collection/get-list-of-environment-variables @@ -70,7 +70,7 @@ async def list(self) -> ListPage[dict]: Returns: The list of available actor environment variables. """ - return await self._list() + return self._list_iterable_from_callback(callback=self._list) async def create( self, diff --git a/src/apify_client/clients/resource_clients/actor_version_collection.py b/src/apify_client/clients/resource_clients/actor_version_collection.py index 91e1e333..394d178c 100644 --- a/src/apify_client/clients/resource_clients/actor_version_collection.py +++ b/src/apify_client/clients/resource_clients/actor_version_collection.py @@ -9,7 +9,7 @@ if TYPE_CHECKING: from apify_shared.consts import ActorSourceType - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class ActorVersionCollectionClient(ResourceCollectionClient): @@ -19,7 +19,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'versions') super().__init__(*args, resource_path=resource_path, **kwargs) - def list(self) -> ListPage[dict]: + def list(self) -> ListPageProtocol[dict]: """List the available Actor versions. https://docs.apify.com/api/v2#/reference/actors/version-collection/get-list-of-versions @@ -27,7 +27,7 @@ def list(self) -> ListPage[dict]: Returns: The list of available Actor versions. """ - return self._list() + return self._list_iterable_from_callback(self._list) def create( self, @@ -88,7 +88,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'versions') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list(self) -> ListPage[dict]: + def list(self) -> ListPageProtocolAsync[dict]: """List the available Actor versions. https://docs.apify.com/api/v2#/reference/actors/version-collection/get-list-of-versions @@ -96,7 +96,7 @@ async def list(self) -> ListPage[dict]: Returns: The list of available Actor versions. """ - return await self._list() + return self._list_iterable_from_callback(callback=self._list) async def create( self, diff --git a/src/apify_client/clients/resource_clients/build_collection.py b/src/apify_client/clients/resource_clients/build_collection.py index 4eada958..4350a078 100644 --- a/src/apify_client/clients/resource_clients/build_collection.py +++ b/src/apify_client/clients/resource_clients/build_collection.py @@ -5,7 +5,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class BuildCollectionClient(ResourceCollectionClient): @@ -21,7 +21,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List all Actor builds. List all Actor builds, either of a single Actor, or all user's Actors, depending on where this client @@ -38,7 +38,7 @@ def list( Returns: The retrieved Actor builds. """ - return self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(self._list, limit=limit, offset=offset, desc=desc) class BuildCollectionClientAsync(ResourceCollectionClientAsync): @@ -48,13 +48,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'actor-builds') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List all Actor builds. List all Actor builds, either of a single Actor, or all user's Actors, depending on where this client @@ -71,4 +71,4 @@ async def list( Returns: The retrieved Actor builds. """ - return await self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(callback=self._list, limit=limit, offset=offset, desc=desc) diff --git a/src/apify_client/clients/resource_clients/dataset.py b/src/apify_client/clients/resource_clients/dataset.py index 87d6aab5..28afb17d 100644 --- a/src/apify_client/clients/resource_clients/dataset.py +++ b/src/apify_client/clients/resource_clients/dataset.py @@ -2,7 +2,7 @@ import warnings from contextlib import asynccontextmanager, contextmanager -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, TypeVar from urllib.parse import urlencode, urlparse, urlunparse from apify_shared.utils import create_storage_content_signature @@ -23,9 +23,12 @@ from apify_shared.consts import StorageGeneralAccess from apify_client._types import JSONSerializable + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync + _SMALL_TIMEOUT = 5 # For fast and common actions. Suitable for idempotent actions. _MEDIUM_TIMEOUT = 30 # For actions that may take longer. +T = TypeVar('T') class DatasetClient(ResourceClient): @@ -86,7 +89,8 @@ def list_items( flatten: list[str] | None = None, view: str | None = None, signature: str | None = None, - ) -> ListPage: + chunk_size: int | None = None, + ) -> ListPageProtocol[dict]: """List the items of the dataset. https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items @@ -118,11 +122,66 @@ def list_items( flatten: A list of fields that should be flattened. view: Name of the dataset view to be used. signature: Signature used to access the items. + chunk_size: Maximum number of items returned in one API response. Relevant when used as async iterator. Returns: A page of the list of dataset items according to the specified filters. """ - request_params = self._params( + + def _list( + *, + offset: int | None = None, + limit: int | None = None, + clean: bool | None = None, + desc: bool | None = None, + fields: list[str] | None = None, + omit: list[str] | None = None, + unwind: list[str] | None = None, + skip_empty: bool | None = None, + skip_hidden: bool | None = None, + flatten: list[str] | None = None, + view: str | None = None, + signature: str | None = None, + ) -> ListPage: + request_params = self._params( + offset=offset, + limit=limit, + desc=desc, + clean=clean, + fields=fields, + omit=omit, + unwind=unwind, + skipEmpty=skip_empty, + skipHidden=skip_hidden, + flatten=flatten, + view=view, + signature=signature, + ) + + response = self.http_client.call( + url=self._url('items'), + method='GET', + params=request_params, + ) + + data = response.json() + return ListPage( + { + 'items': data, + 'total': int(response.headers['x-apify-pagination-total']), + 'offset': int(response.headers['x-apify-pagination-offset']), + 'count': len( + data + ), # because x-apify-pagination-count returns invalid values when hidden/empty items are skipped + 'limit': int( + response.headers['x-apify-pagination-limit'] + ), # API returns 999999999999 when no limit is used + 'desc': bool(response.headers['x-apify-pagination-desc']), + } + ) + + return self._list_iterable_from_callback( + callback=_list, offset=offset, limit=limit, desc=desc, @@ -130,34 +189,12 @@ def list_items( fields=fields, omit=omit, unwind=unwind, - skipEmpty=skip_empty, - skipHidden=skip_hidden, + skip_empty=skip_empty, + skip_hidden=skip_hidden, flatten=flatten, view=view, signature=signature, - ) - - response = self.http_client.call( - url=self._url('items'), - method='GET', - params=request_params, - ) - - data = response.json() - - return ListPage( - { - 'items': data, - 'total': int(response.headers['x-apify-pagination-total']), - 'offset': int(response.headers['x-apify-pagination-offset']), - 'count': len( - data - ), # because x-apify-pagination-count returns invalid values when hidden/empty items are skipped - 'limit': int( - response.headers['x-apify-pagination-limit'] - ), # API returns 999999999999 when no limit is used - 'desc': bool(response.headers['x-apify-pagination-desc']), - } + chunk_size=chunk_size, ) def iterate_items( @@ -173,8 +210,8 @@ def iterate_items( skip_empty: bool | None = None, skip_hidden: bool | None = None, signature: str | None = None, - ) -> Iterator[dict]: - """Iterate over the items in the dataset. + ) -> ListPageProtocol[dict]: + """Iterate over the items in the dataset. Deprecated, It is possible to use `list_items` even for iteration. https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items @@ -207,41 +244,24 @@ def iterate_items( Yields: An item from the dataset. """ - cache_size = 1000 - - should_finish = False - read_items = 0 - - # We can't rely on ListPage.total because that is updated with a delay, - # so if you try to read the dataset items right after a run finishes, you could miss some. - # Instead, we just read and read until we reach the limit, or until there are no more items to read. - while not should_finish: - effective_limit = cache_size - if limit is not None: - if read_items == limit: - break - effective_limit = min(cache_size, limit - read_items) - - current_items_page = self.list_items( - offset=offset + read_items, - limit=effective_limit, - clean=clean, - desc=desc, - fields=fields, - omit=omit, - unwind=unwind, - skip_empty=skip_empty, - skip_hidden=skip_hidden, - signature=signature, - ) - - yield from current_items_page.items - - current_page_item_count = len(current_items_page.items) - read_items += current_page_item_count - - if current_page_item_count < cache_size: - should_finish = True + warnings.warn( + '`DatasetClient.iterate_items()` is deprecated, you can use return value of `DatasetClient.list_items()`.', + DeprecationWarning, + stacklevel=2, + ) + return self.list_items( + offset=offset, + limit=limit, + clean=clean, + desc=desc, + fields=fields, + omit=omit, + unwind=unwind, + skip_empty=skip_empty, + skip_hidden=skip_hidden, + signature=signature, + chunk_size=1000, + ) def download_items( self, @@ -684,7 +704,7 @@ async def delete(self) -> None: """ return await self._delete(timeout_secs=_SMALL_TIMEOUT) - async def list_items( + def list_items( self, *, offset: int | None = None, @@ -699,7 +719,8 @@ async def list_items( flatten: list[str] | None = None, view: str | None = None, signature: str | None = None, - ) -> ListPage: + chunk_size: int | None = None, + ) -> ListPageProtocolAsync[dict]: """List the items of the dataset. https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items @@ -731,11 +752,66 @@ async def list_items( flatten: A list of fields that should be flattened. view: Name of the dataset view to be used. signature: Signature used to access the items. + chunk_size: Maximum number of items returned in one API response. Relevant when used as async iterator. Returns: A page of the list of dataset items according to the specified filters. """ - request_params = self._params( + + async def _list( + *, + offset: int | None = None, + limit: int | None = None, + clean: bool | None = None, + desc: bool | None = None, + fields: list[str] | None = None, + omit: list[str] | None = None, + unwind: list[str] | None = None, + skip_empty: bool | None = None, + skip_hidden: bool | None = None, + flatten: list[str] | None = None, + view: str | None = None, + signature: str | None = None, + ) -> ListPage: + request_params = self._params( + offset=offset, + limit=limit, + desc=desc, + clean=clean, + fields=fields, + omit=omit, + unwind=unwind, + skipEmpty=skip_empty, + skipHidden=skip_hidden, + flatten=flatten, + view=view, + signature=signature, + ) + + response = await self.http_client.call( + url=self._url('items'), + method='GET', + params=request_params, + ) + + data = response.json() + return ListPage( + { + 'items': data, + 'total': int(response.headers['x-apify-pagination-total']), + 'offset': int(response.headers['x-apify-pagination-offset']), + 'count': len( + data + ), # because x-apify-pagination-count returns invalid values when hidden/empty items are skipped + 'limit': int( + response.headers['x-apify-pagination-limit'] + ), # API returns 999999999999 when no limit is used + 'desc': bool(response.headers['x-apify-pagination-desc']), + } + ) + + return self._list_iterable_from_callback( + callback=_list, offset=offset, limit=limit, desc=desc, @@ -743,37 +819,15 @@ async def list_items( fields=fields, omit=omit, unwind=unwind, - skipEmpty=skip_empty, - skipHidden=skip_hidden, + skip_empty=skip_empty, + skip_hidden=skip_hidden, flatten=flatten, view=view, signature=signature, + chunk_size=chunk_size, ) - response = await self.http_client.call( - url=self._url('items'), - method='GET', - params=request_params, - ) - - data = response.json() - - return ListPage( - { - 'items': data, - 'total': int(response.headers['x-apify-pagination-total']), - 'offset': int(response.headers['x-apify-pagination-offset']), - 'count': len( - data - ), # because x-apify-pagination-count returns invalid values when hidden/empty items are skipped - 'limit': int( - response.headers['x-apify-pagination-limit'] - ), # API returns 999999999999 when no limit is used - 'desc': bool(response.headers['x-apify-pagination-desc']), - } - ) - - async def iterate_items( + def iterate_items( self, *, offset: int = 0, @@ -786,8 +840,8 @@ async def iterate_items( skip_empty: bool | None = None, skip_hidden: bool | None = None, signature: str | None = None, - ) -> AsyncIterator[dict]: - """Iterate over the items in the dataset. + ) -> ListPageProtocolAsync[dict]: + """Iterate over the items in the dataset. Deprecated, It is possible to use `list_items` even for iteration. https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items @@ -820,42 +874,26 @@ async def iterate_items( Yields: An item from the dataset. """ - cache_size = 1000 - - should_finish = False - read_items = 0 - - # We can't rely on ListPage.total because that is updated with a delay, - # so if you try to read the dataset items right after a run finishes, you could miss some. - # Instead, we just read and read until we reach the limit, or until there are no more items to read. - while not should_finish: - effective_limit = cache_size - if limit is not None: - if read_items == limit: - break - effective_limit = min(cache_size, limit - read_items) - - current_items_page = await self.list_items( - offset=offset + read_items, - limit=effective_limit, - clean=clean, - desc=desc, - fields=fields, - omit=omit, - unwind=unwind, - skip_empty=skip_empty, - skip_hidden=skip_hidden, - signature=signature, - ) - - for item in current_items_page.items: - yield item - - current_page_item_count = len(current_items_page.items) - read_items += current_page_item_count + warnings.warn( + '`DatasetClient.iterate_items()` is deprecated,' + ' you can use return value of `DatasetClientAsync.list_items()`.', + DeprecationWarning, + stacklevel=2, + ) - if current_page_item_count < cache_size: - should_finish = True + return self.list_items( + offset=offset, + limit=limit, + clean=clean, + desc=desc, + fields=fields, + omit=omit, + unwind=unwind, + skip_empty=skip_empty, + skip_hidden=skip_hidden, + signature=signature, + chunk_size=1000, + ) async def get_items_as_bytes( self, diff --git a/src/apify_client/clients/resource_clients/dataset_collection.py b/src/apify_client/clients/resource_clients/dataset_collection.py index 602497ce..4c804e45 100644 --- a/src/apify_client/clients/resource_clients/dataset_collection.py +++ b/src/apify_client/clients/resource_clients/dataset_collection.py @@ -6,7 +6,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class DatasetCollectionClient(ResourceCollectionClient): @@ -23,7 +23,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available datasets. https://docs.apify.com/api/v2#/reference/datasets/dataset-collection/get-list-of-datasets @@ -37,7 +37,7 @@ def list( Returns: The list of available datasets matching the specified filters. """ - return self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(self._list, unnamed=unnamed, limit=limit, offset=offset, desc=desc) def get_or_create(self, *, name: str | None = None, schema: dict | None = None) -> dict: """Retrieve a named dataset, or create a new one when it doesn't exist. @@ -61,14 +61,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'datasets') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, unnamed: bool | None = None, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available datasets. https://docs.apify.com/api/v2#/reference/datasets/dataset-collection/get-list-of-datasets @@ -82,7 +82,9 @@ async def list( Returns: The list of available datasets matching the specified filters. """ - return await self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback( + callback=self._list, unnamed=unnamed, limit=limit, offset=offset, desc=desc + ) async def get_or_create( self, diff --git a/src/apify_client/clients/resource_clients/key_value_store_collection.py b/src/apify_client/clients/resource_clients/key_value_store_collection.py index 8af38903..718b66b2 100644 --- a/src/apify_client/clients/resource_clients/key_value_store_collection.py +++ b/src/apify_client/clients/resource_clients/key_value_store_collection.py @@ -6,7 +6,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class KeyValueStoreCollectionClient(ResourceCollectionClient): @@ -23,7 +23,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available key-value stores. https://docs.apify.com/api/v2#/reference/key-value-stores/store-collection/get-list-of-key-value-stores @@ -37,7 +37,7 @@ def list( Returns: The list of available key-value stores matching the specified filters. """ - return self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(self._list, unnamed=unnamed, limit=limit, offset=offset, desc=desc) def get_or_create( self, @@ -66,14 +66,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'key-value-stores') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, unnamed: bool | None = None, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available key-value stores. https://docs.apify.com/api/v2#/reference/key-value-stores/store-collection/get-list-of-key-value-stores @@ -87,7 +87,9 @@ async def list( Returns: The list of available key-value stores matching the specified filters. """ - return await self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback( + callback=self._list, unnamed=unnamed, limit=limit, offset=offset, desc=desc + ) async def get_or_create( self, diff --git a/src/apify_client/clients/resource_clients/request_queue_collection.py b/src/apify_client/clients/resource_clients/request_queue_collection.py index f2ee80bb..8194e8c9 100644 --- a/src/apify_client/clients/resource_clients/request_queue_collection.py +++ b/src/apify_client/clients/resource_clients/request_queue_collection.py @@ -5,7 +5,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class RequestQueueCollectionClient(ResourceCollectionClient): @@ -22,7 +22,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available request queues. https://docs.apify.com/api/v2#/reference/request-queues/queue-collection/get-list-of-request-queues @@ -36,7 +36,7 @@ def list( Returns: The list of available request queues matching the specified filters. """ - return self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(self._list, unnamed=unnamed, limit=limit, offset=offset, desc=desc) def get_or_create(self, *, name: str | None = None) -> dict: """Retrieve a named request queue, or create a new one when it doesn't exist. @@ -59,14 +59,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'request-queues') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, unnamed: bool | None = None, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available request queues. https://docs.apify.com/api/v2#/reference/request-queues/queue-collection/get-list-of-request-queues @@ -80,7 +80,9 @@ async def list( Returns: The list of available request queues matching the specified filters. """ - return await self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback( + callback=self._list, unnamed=unnamed, limit=limit, offset=offset, desc=desc + ) async def get_or_create(self, *, name: str | None = None) -> dict: """Retrieve a named request queue, or create a new one when it doesn't exist. diff --git a/src/apify_client/clients/resource_clients/run_collection.py b/src/apify_client/clients/resource_clients/run_collection.py index b51d5c47..93be9a4b 100644 --- a/src/apify_client/clients/resource_clients/run_collection.py +++ b/src/apify_client/clients/resource_clients/run_collection.py @@ -10,7 +10,7 @@ from apify_shared.consts import ActorJobStatus - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class RunCollectionClient(ResourceCollectionClient): @@ -29,7 +29,7 @@ def list( status: ActorJobStatus | list[ActorJobStatus] | None = None, # ty: ignore[invalid-type-form] started_before: str | datetime | None = None, started_after: str | datetime | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List all Actor runs. List all Actor runs, either of a single Actor, or all user's Actors, depending on where this client @@ -54,7 +54,8 @@ def list( else: status_param = maybe_extract_enum_member_value(status) - return self._list( + return self._list_iterable_from_callback( + self._list, limit=limit, offset=offset, desc=desc, @@ -71,7 +72,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'actor-runs') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, @@ -80,7 +81,7 @@ async def list( status: ActorJobStatus | list[ActorJobStatus] | None = None, # ty: ignore[invalid-type-form] started_before: str | datetime | None = None, started_after: str | datetime | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List all Actor runs. List all Actor runs, either of a single Actor, or all user's Actors, depending on where this client @@ -105,7 +106,8 @@ async def list( else: status_param = maybe_extract_enum_member_value(status) - return await self._list( + return self._list_iterable_from_callback( + callback=self._list, limit=limit, offset=offset, desc=desc, diff --git a/src/apify_client/clients/resource_clients/schedule_collection.py b/src/apify_client/clients/resource_clients/schedule_collection.py index a4f23623..10948d20 100644 --- a/src/apify_client/clients/resource_clients/schedule_collection.py +++ b/src/apify_client/clients/resource_clients/schedule_collection.py @@ -7,7 +7,7 @@ from apify_client.clients.resource_clients.schedule import _get_schedule_representation if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class ScheduleCollectionClient(ResourceCollectionClient): @@ -23,7 +23,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available schedules. https://docs.apify.com/api/v2#/reference/schedules/schedules-collection/get-list-of-schedules @@ -36,7 +36,7 @@ def list( Returns: The list of available schedules matching the specified filters. """ - return self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(self._list, limit=limit, offset=offset, desc=desc) def create( self, @@ -93,13 +93,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'schedules') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available schedules. https://docs.apify.com/api/v2#/reference/schedules/schedules-collection/get-list-of-schedules @@ -112,7 +112,7 @@ async def list( Returns: The list of available schedules matching the specified filters. """ - return await self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(callback=self._list, limit=limit, offset=offset, desc=desc) async def create( self, diff --git a/src/apify_client/clients/resource_clients/store_collection.py b/src/apify_client/clients/resource_clients/store_collection.py index f04200a0..e99a630d 100644 --- a/src/apify_client/clients/resource_clients/store_collection.py +++ b/src/apify_client/clients/resource_clients/store_collection.py @@ -5,7 +5,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class StoreCollectionClient(ResourceCollectionClient): @@ -25,7 +25,7 @@ def list( category: str | None = None, username: str | None = None, pricing_model: str | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List Actors in Apify store. https://docs.apify.com/api/v2/#/reference/store/store-actors-collection/get-list-of-actors-in-store @@ -43,7 +43,8 @@ def list( Returns: The list of available tasks matching the specified filters. """ - return self._list( + return self._list_iterable_from_callback( + self._list, limit=limit, offset=offset, search=search, @@ -61,7 +62,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'store') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, @@ -71,7 +72,7 @@ async def list( category: str | None = None, username: str | None = None, pricing_model: str | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List Actors in Apify store. https://docs.apify.com/api/v2/#/reference/store/store-actors-collection/get-list-of-actors-in-store @@ -89,7 +90,8 @@ async def list( Returns: The list of available tasks matching the specified filters. """ - return await self._list( + return self._list_iterable_from_callback( + callback=self._list, limit=limit, offset=offset, search=search, diff --git a/src/apify_client/clients/resource_clients/task_collection.py b/src/apify_client/clients/resource_clients/task_collection.py index 0f8fe188..3ad25132 100644 --- a/src/apify_client/clients/resource_clients/task_collection.py +++ b/src/apify_client/clients/resource_clients/task_collection.py @@ -7,7 +7,7 @@ from apify_client.clients.resource_clients.task import get_task_representation if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class TaskCollectionClient(ResourceCollectionClient): @@ -23,7 +23,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available tasks. https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/get-list-of-tasks @@ -36,7 +36,7 @@ def list( Returns: The list of available tasks matching the specified filters. """ - return self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(self._list, limit=limit, offset=offset, desc=desc) def create( self, @@ -114,13 +114,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'actor-tasks') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available tasks. https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/get-list-of-tasks @@ -133,7 +133,7 @@ async def list( Returns: The list of available tasks matching the specified filters. """ - return await self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(callback=self._list, limit=limit, offset=offset, desc=desc) async def create( self, diff --git a/src/apify_client/clients/resource_clients/webhook_collection.py b/src/apify_client/clients/resource_clients/webhook_collection.py index 7219eade..cd846508 100644 --- a/src/apify_client/clients/resource_clients/webhook_collection.py +++ b/src/apify_client/clients/resource_clients/webhook_collection.py @@ -9,7 +9,7 @@ if TYPE_CHECKING: from apify_shared.consts import WebhookEventType - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class WebhookCollectionClient(ResourceCollectionClient): @@ -25,7 +25,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available webhooks. https://docs.apify.com/api/v2#/reference/webhooks/webhook-collection/get-list-of-webhooks @@ -38,7 +38,7 @@ def list( Returns: The list of available webhooks matching the specified filters. """ - return self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(self._list, limit=limit, offset=offset, desc=desc) def create( self, @@ -103,13 +103,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'webhooks') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available webhooks. https://docs.apify.com/api/v2#/reference/webhooks/webhook-collection/get-list-of-webhooks @@ -122,7 +122,7 @@ async def list( Returns: The list of available webhooks matching the specified filters. """ - return await self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(callback=self._list, limit=limit, offset=offset, desc=desc) async def create( self, diff --git a/src/apify_client/clients/resource_clients/webhook_dispatch_collection.py b/src/apify_client/clients/resource_clients/webhook_dispatch_collection.py index 60ac1df1..507a7c8f 100644 --- a/src/apify_client/clients/resource_clients/webhook_dispatch_collection.py +++ b/src/apify_client/clients/resource_clients/webhook_dispatch_collection.py @@ -5,7 +5,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync class WebhookDispatchCollectionClient(ResourceCollectionClient): @@ -21,7 +21,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List all webhook dispatches of a user. https://docs.apify.com/api/v2#/reference/webhook-dispatches/webhook-dispatches-collection/get-list-of-webhook-dispatches @@ -34,7 +34,7 @@ def list( Returns: The retrieved webhook dispatches of a user. """ - return self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(self._list, limit=limit, offset=offset, desc=desc) class WebhookDispatchCollectionClientAsync(ResourceCollectionClientAsync): @@ -44,13 +44,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'webhook-dispatches') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List all webhook dispatches of a user. https://docs.apify.com/api/v2#/reference/webhook-dispatches/webhook-dispatches-collection/get-list-of-webhook-dispatches @@ -63,4 +63,4 @@ async def list( Returns: The retrieved webhook dispatches of a user. """ - return await self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable_from_callback(callback=self._list, limit=limit, offset=offset, desc=desc) diff --git a/tests/unit/test_client_pagination.py b/tests/unit/test_client_pagination.py new file mode 100644 index 00000000..ac5f40a0 --- /dev/null +++ b/tests/unit/test_client_pagination.py @@ -0,0 +1,327 @@ +import dataclasses +import math +from typing import Any, Literal, TypeAlias +from unittest import mock +from unittest.mock import Mock + +import pytest +from _pytest.mark import ParameterSet + +from apify_client import ApifyClient, ApifyClientAsync +from apify_client.clients import ( + ActorCollectionClient, + ActorCollectionClientAsync, + BaseClient, + BaseClientAsync, + BuildCollectionClient, + BuildCollectionClientAsync, + DatasetClient, + DatasetCollectionClient, + DatasetCollectionClientAsync, + KeyValueStoreCollectionClient, + KeyValueStoreCollectionClientAsync, + RequestQueueCollectionClient, + RequestQueueCollectionClientAsync, + RunCollectionClient, + RunCollectionClientAsync, + ScheduleCollectionClient, + ScheduleCollectionClientAsync, + StoreCollectionClient, + StoreCollectionClientAsync, + TaskCollectionClient, + TaskCollectionClientAsync, + WebhookCollectionClient, + WebhookCollectionClientAsync, + WebhookDispatchCollectionClient, + WebhookDispatchCollectionClientAsync, +) +from apify_client.clients.resource_clients import DatasetClientAsync + +CollectionClientAsync: TypeAlias = ( + ActorCollectionClientAsync + | BuildCollectionClientAsync + | RunCollectionClientAsync + | ScheduleCollectionClientAsync + | TaskCollectionClientAsync + | WebhookCollectionClientAsync + | WebhookDispatchCollectionClientAsync + | DatasetCollectionClientAsync + | KeyValueStoreCollectionClientAsync + | RequestQueueCollectionClientAsync + | StoreCollectionClientAsync +) + +CollectionClient: TypeAlias = ( + ActorCollectionClient + | BuildCollectionClient + | RunCollectionClient + | ScheduleCollectionClient + | TaskCollectionClient + | WebhookCollectionClient + | WebhookDispatchCollectionClient + | DatasetCollectionClient + | KeyValueStoreCollectionClient + | RequestQueueCollectionClient + | StoreCollectionClient +) + +ID_PLACEHOLDER = 'some-id' + + +def create_items(start: int, end: int) -> list[dict[str, int]]: + """Create list of test items of specified range.""" + step = -1 if end < start else 1 + return [{'id': i, 'key': i} for i in range(start, end, step)] + + +def mocked_api_pagination_logic(*_: Any, **kwargs: Any) -> dict: + """This function is a placeholder representing the mocked API pagination logic. + + It simulates paginated responses from an API only to a limited extend to test iteration logic in client. + Returned items are only placeholders that enable keeping track of their index on platform. + + There are 2500 normal items in the collection and additional 100 extra items. + Items are simple objects with incrementing attributes for easy verification. + """ + params = kwargs.get('params', {}) + normal_items = 2500 + extra_items = 100 # additional items, for example unnamed + max_items_per_page = 1000 + + total_items = (normal_items + extra_items) if params.get('unnamed') else normal_items + offset = params.get('offset') or 0 + limit = params.get('limit') or 0 + assert offset >= 0, 'Invalid offset send to API' + assert limit >= 0, 'Invalid limit send to API' + + # Ordered all items in the mocked platform. + items = create_items(total_items, 0) if params.get('desc', False) else create_items(0, total_items) + lower_index = min(offset, total_items) + upper_index = min(offset + (limit or total_items), total_items) + count = min(upper_index - lower_index, max_items_per_page) + + selected_items = items[lower_index : min(upper_index, lower_index + max_items_per_page)] + + response = Mock() + if kwargs['url'] == f'https://api.apify.com/v2/datasets/{ID_PLACEHOLDER}/items': + # Get dataset items endpoint returns the items directly + response_content = selected_items + else: + response_content = { + 'data': { + 'total': total_items, + 'count': count, + 'offset': offset, + 'limit': limit or count, + 'desc': params.get('desc', False), + 'items': selected_items, + } + } + + response.json = lambda: response_content + response.headers = { + 'x-apify-pagination-total': str(total_items), + 'x-apify-pagination-offset': str(offset), + 'x-apify-pagination-limit': str(limit or count), + 'x-apify-pagination-desc': str(params.get('desc', False)).lower(), + } + return response + + +@dataclasses.dataclass +class TestCase: + """Class representing a single test case for pagination tests.""" + + id: str + inputs: dict + expected_items: list[dict[str, int]] + supported_clients: set[str] + + def __hash__(self) -> int: + return hash(self.id) + + def supports(self, client: BaseClient | BaseClientAsync) -> bool: + """Check whether the given client implements functionality tested by this test.""" + return client.__class__.__name__.replace('Async', '') in self.supported_clients + + +# Prepare supported testcases for different clients +COLLECTION_CLIENTS = { + 'ActorCollectionClient', + 'BuildCollectionClient', + 'RunCollectionClient', + 'ScheduleCollectionClient', + 'TaskCollectionClient', + 'WebhookCollectionClient', + 'WebhookDispatchCollectionClient', + 'DatasetCollectionClient', + 'KeyValueStoreCollectionClient', + 'RequestQueueCollectionClient', + 'StoreCollectionClient', +} + +NO_OPTIONS_CLIENTS = { + 'ActorEnvVarCollectionClient', + 'ActorVersionCollectionClient', +} + +STORAGE_CLIENTS = { + 'DatasetClient', + 'KeyValueStoreClient', + 'RequestQueueClient', +} + +ALL_CLIENTS = COLLECTION_CLIENTS | NO_OPTIONS_CLIENTS | STORAGE_CLIENTS + +TEST_CASES = ( + TestCase('No options', {}, create_items(0, 2500), ALL_CLIENTS), + TestCase('Limit', {'limit': 1100}, create_items(0, 1100), ALL_CLIENTS - NO_OPTIONS_CLIENTS), + TestCase('Out of range limit', {'limit': 3000}, create_items(0, 2500), ALL_CLIENTS - NO_OPTIONS_CLIENTS), + TestCase('Offset', {'offset': 1000}, create_items(1000, 2500), ALL_CLIENTS - NO_OPTIONS_CLIENTS), + TestCase( + 'Offset and limit', {'offset': 1000, 'limit': 1100}, create_items(1000, 2100), ALL_CLIENTS - NO_OPTIONS_CLIENTS + ), + TestCase('Out of range offset', {'offset': 3000}, [], ALL_CLIENTS - NO_OPTIONS_CLIENTS), + TestCase( + 'Offset, limit, descending', + {'offset': 1000, 'limit': 1100, 'desc': True}, + create_items(1500, 400), + ALL_CLIENTS - NO_OPTIONS_CLIENTS - {'StoreCollectionClient'}, + ), + TestCase( + 'Offset, limit, descending, unnamed', + {'offset': 50, 'limit': 1100, 'desc': True, 'unnamed': True}, + create_items(2550, 1450), + {'DatasetCollectionClient', 'KeyValueStoreCollectionClient', 'RequestQueueCollectionClient'}, + ), + TestCase( + 'Offset, limit, descending, chunk_size', + {'offset': 50, 'limit': 1100, 'desc': True, 'chunk_size': 100}, + create_items(2450, 1350), + {'DatasetClient'}, + ), + TestCase('Exclusive start key', {'exclusive_start_key': 1000}, create_items(1001, 2500), {'KeyValueStoreClient'}), + TestCase('Exclusive start id', {'exclusive_start_id': 1000}, create_items(1001, 2500), {'RequestQueueClient'}), +) + + +def generate_test_params( + client_set: Literal['collection', 'kvs', 'rq', 'dataset'], *, async_clients: bool = False +) -> list[ParameterSet]: + """Generate list of ParameterSets for parametrized tests. + + Different clients support different options and thus different scenarios. + """ + + client = ApifyClientAsync(token='') if async_clients else ApifyClient(token='') + + # This is tuple instead of set because pytest-xdist + # https://pytest-xdist.readthedocs.io/en/stable/known-limitations.html#order-and-amount-of-test-must-be-consistent + clients: tuple[BaseClient | BaseClientAsync, ...] + + match client_set: + case 'collection': + clients = ( + client.actors(), + client.schedules(), + client.tasks(), + client.webhooks(), + client.webhook_dispatches(), + client.store(), + client.datasets(), + client.key_value_stores(), + client.request_queues(), + client.actor(ID_PLACEHOLDER).builds(), + client.actor(ID_PLACEHOLDER).runs(), + client.actor(ID_PLACEHOLDER).versions(), + client.actor(ID_PLACEHOLDER).version('some-version').env_vars(), + ) + case 'kvs': + clients = (client.key_value_store(ID_PLACEHOLDER),) + case 'rq': + clients = (client.request_queue(ID_PLACEHOLDER),) + case 'dataset': + clients = (client.dataset(ID_PLACEHOLDER),) + case _: + raise ValueError(f'Unknown client set: {client_set}') + + return [ + pytest.param( + test_case.inputs, test_case.expected_items, client, id=f'{client.__class__.__name__}:{test_case.id}' + ) + for test_case in TEST_CASES + for client in clients + if test_case.supports(client) + ] + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client'), generate_test_params(client_set='collection', async_clients=True) +) +async def test_client_list_iterable_async( + client: CollectionClientAsync, inputs: dict, expected_items: list[dict[str, int]] +) -> None: + with mock.patch.object(client.http_client, 'call', side_effect=mocked_api_pagination_logic): + returned_items = [item async for item in client.list(**inputs)] + + if inputs == {}: + list_response = await client.list(**inputs) + assert len(returned_items) == list_response.total + + assert returned_items == expected_items + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client'), generate_test_params(client_set='collection', async_clients=False) +) +def test_client_list_iterable(client: CollectionClient, inputs: dict, expected_items: list[dict[str, int]]) -> None: + with mock.patch.object(client.http_client, 'call', side_effect=mocked_api_pagination_logic): + returned_items = [item for item in client.list(**inputs)] # noqa: C416 list needed for assertion + + if inputs == {}: + list_response = client.list(**inputs) + assert len(returned_items) == list_response.total + + assert returned_items == expected_items + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client'), generate_test_params(client_set='dataset', async_clients=True) +) +async def test_dataset_items_list_iterable_async( + client: DatasetClientAsync, inputs: dict, expected_items: list[dict[str, int]] +) -> None: + with mock.patch.object(client.http_client, 'call', side_effect=mocked_api_pagination_logic) as mocked_client_call: + returned_items = [item async for item in client.list_items(**inputs)] + + if inputs == {}: + list_response = await client.list_items(**inputs) + assert len(returned_items) == list_response.total + + assert returned_items == expected_items + input_chunk_size = inputs.get('chunk_size', 1000) + assert mocked_client_call.call_count == math.ceil(len(expected_items) / input_chunk_size) or 1 + + # Until deprecated method `iterate_items` is removed + inputs.pop('chunk_size', None) + assert returned_items == [item async for item in client.iterate_items(**inputs)] + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client'), generate_test_params(client_set='dataset', async_clients=False) +) +def test_dataset_items_list_iterable(client: DatasetClient, inputs: dict, expected_items: list[dict[str, int]]) -> None: + with mock.patch.object(client.http_client, 'call', side_effect=mocked_api_pagination_logic) as mocked_client_call: + returned_items = list(client.list_items(**inputs)) + + if inputs == {}: + list_response = client.list_items(**inputs) + assert len(returned_items) == list_response.total + + assert returned_items == expected_items + input_chunk_size = inputs.get('chunk_size', 1000) + assert mocked_client_call.call_count == math.ceil(len(expected_items) / input_chunk_size) or 1 + + # Until deprecated method `iterate_items` is removed + inputs.pop('chunk_size', None) + assert returned_items == list(client.iterate_items(**inputs))