Skip to content

API Reference

API Reference

astel.agent

User agent for processing domain rules, thus allowing the crawler to fetch the pages without getting blocked.

UserAgent

A user agent for processing domain rules so that the crawler can respect them.

Parameters:

Name Type Description Default
name str

The name of the user agent

required
Source code in astel/agent.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class UserAgent:
    """A user agent for processing domain rules so that the crawler can respect them.

    Args:
        name (str): The name of the user agent
    """

    __slots__ = ("name", "_acknowledged_domains")

    def __init__(self, name: str) -> None:
        self.name = name
        self._acknowledged_domains: dict[str, RobotFileParser] = {}

    def respect(self, domain: str, robots_txt: str) -> None:
        """Process the rules in the robots.txt file in the URL and associates
        them to the given domain, if the domain has not already been acknowledged.

        Args:
            domain (str): A string representing the domain to be acknowledged.
            robots_txt (str): A string representing the content of the robots.txt file.
        """
        if domain in self._acknowledged_domains:
            return
        parser = RobotFileParser()
        parser.parse(robots_txt.splitlines())
        self._acknowledged_domains[domain] = parser

    def can_access(self, domain: str, url: str) -> bool:
        """Determines whether the given URL can be accessed by the user agent for the specified domain.

        Args:
            domain (str): A string representing the domain of the URL.
            url (str): A string representing the URL to access.

        Returns:
            bool: A boolean indicating whether the URL can be accessed for the specified domain.
        """  # noqa: E501
        return self._acknowledged_domains[domain].can_fetch(self.name, url)

    def get_request_rate(self, domain: str) -> RequestRate | None:
        """Return the request rate of that domain if it is acknowledged.

        Args:
            domain (str): A string representing the domain whose request rate is sought.

        Returns:
            Union[RequestRate, None]: An instance of `RequestRate` representing the domain's request rate if the domain is acknowledged, else `None`.
        """  # noqa: E501
        if domain not in self._acknowledged_domains:
            return None
        return self._acknowledged_domains[domain].request_rate(self.name)

    def get_crawl_delay(self, domain: str) -> str | None:
        """Return the crawl delay for the given domain if it has been acknowledged, and `None` otherwise.

        Args:
            domain (str): A string representing the domain to check the crawl delay for.

        Returns:
            Union[str, None]: A string representing the crawl delay for the given domain if it has been acknowledged, `None` otherwise.
        """  # noqa: E501
        if domain not in self._acknowledged_domains:
            return None

        crawl_delay = self._acknowledged_domains[domain].crawl_delay(self.name)
        return str(crawl_delay) if crawl_delay is not None else None

    def get_site_maps(self, domain: str) -> list[str] | None:
        """Return the site maps associated with the given domain if the domain is acknowledged, otherwise returns `None`.

        Args:
            domain (str): A string representing the domain to retrieve site maps for.

        Returns:
            Union[list[str], None]: A list of strings representing the site maps associated with the domain, or `None` if the domain is not acknowledged.
        """  # noqa: E501
        if domain not in self._acknowledged_domains:
            return None
        return self._acknowledged_domains[domain].site_maps()

    @property
    def acknowledged_domains(self) -> List[str]:
        """The domains that have been acknowledged by the user agent."""
        return list(self._acknowledged_domains.keys())

acknowledged_domains: List[str] property

The domains that have been acknowledged by the user agent.

can_access(domain, url)

Determines whether the given URL can be accessed by the user agent for the specified domain.

Parameters:

Name Type Description Default
domain str

A string representing the domain of the URL.

required
url str

A string representing the URL to access.

required

Returns:

Name Type Description
bool bool

A boolean indicating whether the URL can be accessed for the specified domain.

Source code in astel/agent.py
41
42
43
44
45
46
47
48
49
50
51
def can_access(self, domain: str, url: str) -> bool:
    """Determines whether the given URL can be accessed by the user agent for the specified domain.

    Args:
        domain (str): A string representing the domain of the URL.
        url (str): A string representing the URL to access.

    Returns:
        bool: A boolean indicating whether the URL can be accessed for the specified domain.
    """  # noqa: E501
    return self._acknowledged_domains[domain].can_fetch(self.name, url)

get_crawl_delay(domain)

Return the crawl delay for the given domain if it has been acknowledged, and None otherwise.

Parameters:

Name Type Description Default
domain str

A string representing the domain to check the crawl delay for.

required

Returns:

Type Description
str | None

Union[str, None]: A string representing the crawl delay for the given domain if it has been acknowledged, None otherwise.

Source code in astel/agent.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def get_crawl_delay(self, domain: str) -> str | None:
    """Return the crawl delay for the given domain if it has been acknowledged, and `None` otherwise.

    Args:
        domain (str): A string representing the domain to check the crawl delay for.

    Returns:
        Union[str, None]: A string representing the crawl delay for the given domain if it has been acknowledged, `None` otherwise.
    """  # noqa: E501
    if domain not in self._acknowledged_domains:
        return None

    crawl_delay = self._acknowledged_domains[domain].crawl_delay(self.name)
    return str(crawl_delay) if crawl_delay is not None else None

get_request_rate(domain)

Return the request rate of that domain if it is acknowledged.

Parameters:

Name Type Description Default
domain str

A string representing the domain whose request rate is sought.

required

Returns:

Type Description
RequestRate | None

Union[RequestRate, None]: An instance of RequestRate representing the domain's request rate if the domain is acknowledged, else None.

Source code in astel/agent.py
53
54
55
56
57
58
59
60
61
62
63
64
def get_request_rate(self, domain: str) -> RequestRate | None:
    """Return the request rate of that domain if it is acknowledged.

    Args:
        domain (str): A string representing the domain whose request rate is sought.

    Returns:
        Union[RequestRate, None]: An instance of `RequestRate` representing the domain's request rate if the domain is acknowledged, else `None`.
    """  # noqa: E501
    if domain not in self._acknowledged_domains:
        return None
    return self._acknowledged_domains[domain].request_rate(self.name)

get_site_maps(domain)

Return the site maps associated with the given domain if the domain is acknowledged, otherwise returns None.

Parameters:

Name Type Description Default
domain str

A string representing the domain to retrieve site maps for.

required

Returns:

Type Description
list[str] | None

Union[list[str], None]: A list of strings representing the site maps associated with the domain, or None if the domain is not acknowledged.

Source code in astel/agent.py
81
82
83
84
85
86
87
88
89
90
91
92
def get_site_maps(self, domain: str) -> list[str] | None:
    """Return the site maps associated with the given domain if the domain is acknowledged, otherwise returns `None`.

    Args:
        domain (str): A string representing the domain to retrieve site maps for.

    Returns:
        Union[list[str], None]: A list of strings representing the site maps associated with the domain, or `None` if the domain is not acknowledged.
    """  # noqa: E501
    if domain not in self._acknowledged_domains:
        return None
    return self._acknowledged_domains[domain].site_maps()

respect(domain, robots_txt)

Process the rules in the robots.txt file in the URL and associates them to the given domain, if the domain has not already been acknowledged.

Parameters:

Name Type Description Default
domain str

A string representing the domain to be acknowledged.

required
robots_txt str

A string representing the content of the robots.txt file.

required
Source code in astel/agent.py
27
28
29
30
31
32
33
34
35
36
37
38
39
def respect(self, domain: str, robots_txt: str) -> None:
    """Process the rules in the robots.txt file in the URL and associates
    them to the given domain, if the domain has not already been acknowledged.

    Args:
        domain (str): A string representing the domain to be acknowledged.
        robots_txt (str): A string representing the content of the robots.txt file.
    """
    if domain in self._acknowledged_domains:
        return
    parser = RobotFileParser()
    parser.parse(robots_txt.splitlines())
    self._acknowledged_domains[domain] = parser

astel.crawler

Crawler module.

This module defines the Crawler class that can be used to crawl websites asynchronously.

Crawler

An asynchronous web crawler that can be used to extract, process and follow links in webpages.

Parameters:

Name Type Description Default
urls Iterable[str]

The URLs to start the crawler with.

required
options CrawlerOptions

The options to use for the crawler.

None
Source code in astel/crawler.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
class Crawler:
    """An asynchronous web crawler that can be used to extract, process and follow links in webpages.

    Args:
        urls (Iterable[str]): The URLs to start the crawler with.
        options (CrawlerOptions, optional): The options to use for the crawler.
    """  # noqa: E501

    _todo: asyncio.Queue[asyncio.Task]
    _client: httpx.AsyncClient
    _start_urls: Set[str]
    _urls_seen: Set[parsers.Url]
    _done: Set[str]
    _parser_factory: ParserFactory
    _agent: agent.UserAgent
    _rate_limiter: limiters.RateLimiter
    _num_workers: int
    _limit: int
    _total_pages: int
    _filters: List[filters.CallableFilter]
    _event_emitter: events.EventEmitter
    _workers: List[asyncio.Task]
    _options: CrawlerOptions
    _must_retry: RetryHandler | None

    def __init__(
        self, urls: Iterable[str], options: CrawlerOptions | None = None
    ) -> None:
        self._todo: asyncio.Queue[asyncio.Task] = asyncio.Queue()
        self._start_urls = set(urls)
        self._urls_seen: set[parsers.Url] = set()
        self._done: set[str] = set()
        self._filters: List[filters.Filter] = []
        self._options = merge_with_default_options(options)
        self._client = self._options["client"]
        self._parser_factory = self._options["parser_factory"]
        self._agent = agent.UserAgent(self._options["user_agent"])
        self._rate_limiter = self._options["rate_limiter"]
        self._num_workers = self._options["workers"]
        self._limit = self._options["limit"]
        self._total_pages = 0
        self._event_emitter = self._options["event_emitter_factory"]()

        def _must_retry(
            url: parsers.Url, response: Union[httpx.Response, None], _: Crawler
        ) -> bool:
            return bool(
                (
                    response
                    and response.status_code in self._options["retry_for_status_codes"]
                )
                and url
            )

        self._must_retry = (
            cast(RetryHandler, _must_retry)
            if self._options["retry_for_status_codes"]
            else None
        )

    async def run(self) -> None:
        """Run the crawler."""
        await self._on_found_links({parsers.parse_url(url) for url in self._start_urls})

        self._workers = [
            asyncio.create_task(self._worker()) for _ in range(self._num_workers)
        ]
        await self._todo.join()

        for worker in self._workers:
            worker.cancel()

    async def _worker(self) -> None:
        while True:
            try:
                await self._process_one()
            except asyncio.CancelledError:
                return

    async def _process_one(self) -> None:
        task = await self._todo.get()
        try:
            await task
        except httpx.HTTPError as e:
            self._emit_event(events.Event.ERROR, e)
            if self._must_retry and self._must_retry(
                parsers.parse_url(str(e.request.url)),
                getattr(e, "response", None),
                self,
            ):
                await self._put_todo(parsers.parse_url(str(e.request.url)))
        finally:
            self._todo.task_done()

    async def _crawl(self, url: parsers.Url) -> None:
        await self._rate_limiter.limit(url.raw)

        if self._agent.can_access(url.domain, url.raw):
            response = await self._send_request(url)
            self._emit_event(events.Event.RESPONSE, response)
            await self._on_found_links(
                await self._parse_links(
                    base=str(response.url),
                    text=response.text,
                )
            )

        self._done.add(url.raw)
        self._emit_event(events.Event.DONE, url)

    async def _send_request(self, url: parsers.Url) -> httpx.Response:
        request = httpx.Request(
            "GET", url.raw, headers={"User-Agent": self._agent.name}
        )
        self._emit_event(events.Event.REQUEST, request)
        return (
            await self._client.send(request, follow_redirects=True)
        ).raise_for_status()

    async def _parse_links(self, base: str, text: str) -> set[parsers.Url]:
        parser = self._parser_factory(base=base)
        parser.feed(text)
        return {link for link in parser.found_links if self._apply_filters(link)}

    def _apply_filters(self, url: parsers.Url) -> bool:
        return all(f(url) for f in self._filters)

    async def _acknowledge_domains(
        self, parsed_urls: set[parsers.Url]
    ) -> set[parsers.Url]:
        new = parsed_urls - self._urls_seen
        for result in new:
            robots_txt = (
                (
                    await self._client.get(
                        f"{result.scheme}://{result.domain}/robots.txt",
                        timeout=5,
                        follow_redirects=False,
                        headers={
                            "User-Agent": self._agent.name,
                            "Accept": "text/plain",
                        },
                    )
                )
                .raise_for_status()
                .text
            )
            self._agent.respect(result.domain, robots_txt)

            tasks = [
                asyncio.create_task(
                    self._acknowledge_domains(await self.parse_site_map(site_map_path))
                )
                for site_map_path in self._agent.get_site_maps(result.domain) or []
            ]
            if len(tasks) > 0:
                done, _ = await asyncio.wait(tasks)
                for future in done:
                    task_result = future.result()
                    if isinstance(task_result, set):
                        new.update(future.result())
                    else:
                        raise cast(BaseException, task_result)

            self._rate_limiter.configure(
                {
                    "domain": result.domain,
                    "crawl_delay": self._agent.get_crawl_delay(result.domain),
                    "request_rate": self._agent.get_request_rate(result.domain),
                }
            )

        self._urls_seen.update(new)

        return new

    async def parse_site_map(self, site_map_path: str) -> Set[parsers.Url]:
        """Parse a sitemap.xml file and return the URLs found in it.

        Args:
            site_map_path (str): The URL of the sitemap.xml file.

        Returns:
            Set[parsers.Url]: The URLs found in the sitemap.xml file.
        """
        parser = parsers.SiteMapParser(site_map_path)
        response = (await self._client.get(site_map_path)).raise_for_status()
        parser.feed(response.text)
        return parser.found_links

    def filter(self, *args: filters.CallableFilter, **kwargs) -> Self:
        """Add URL filters to the crawler.

        Filters can be used to determine which URLs should be ignored.

        Args:
            *args (Filter): A list of `Filter` objects to add to the crawler.
            **kwargs (Any): A list of keyword arguments to create `Filter` objects from.

        Returns:
            Crawler: The `Crawler` object with the added filters.

        Raises:
            ValueError: If a filter could not be created from the given keyword arguments.

        Examples:
            >>> crawler.filter(filters.StartsWith("scheme", "http"))
            >>> crawler.filter(filters.Matches("https://example.com"))
            >>> crawler.filter(domain__in=["example.com"])
        """  # noqa: E501
        self._filters.extend(
            [
                *args,
                *[
                    f
                    for f in (
                        filters.create_from_kwarg(key, value)
                        for key, value in kwargs.items()
                    )
                    if f is not None
                ],
            ],
        )
        return self

    async def _on_found_links(self, urls: set[parsers.Url]) -> None:
        for url in urls:
            self._emit_event(events.Event.URL_FOUND, url)
        for url in await self._acknowledge_domains(urls):
            await self._put_todo(url)

    async def _put_todo(self, url: parsers.Url) -> None:
        if self._total_pages > self._limit:
            return
        self._total_pages += 1
        await self._todo.put(asyncio.create_task(self._crawl(url)))

    def on(self, event: events.Event, handler: events.Handler) -> Self:
        """Add an event handler to the crawler.

        An event is emitted when
        - a request is ready to be sent (`Event.REQUEST`): the `httpx.Request` object is
        passed to the handler.
        - a response is received (`Event.RESPONSE`): the `httpx.Response` object is
        passed to the handler.
        - an error occurs (`Event.ERROR`): the `Error` object is passed to the handler.
        - a URL is done being processed (`Event.DONE`): the `astel.parsers.Url` object
        is passed to the handler.
        - a URL is found in a page (`Event.URL_FOUND`): the `astel.parsers.Url` object is passed to the handler.

        Args:
            event (str): The event to add the handler to.
            handler (Callable): The handler to add to the event.
        """  # noqa: E501
        self._event_emitter.on(event, handler)
        return self

    def _emit_event(self, event: events.Event, *data) -> None:
        self._event_emitter.emit(event, *data, crawler=self)

    def stop(self, *, reset: bool = False) -> None:
        """Stop the crawler current execution.

        Args:
            reset (bool, optional: Optionally, reset the crawler on the same call. Defaults to `False`.
        """  # noqa: E501
        for worker in self._workers:
            worker.cancel()
        if reset:
            self.reset()

    def reset(self) -> None:
        """Reset the crawler."""
        self._done.clear()
        self._urls_seen.clear()
        self._total_pages = 0

    def retry(self, handler: RetryHandler) -> Self:
        """Set a handler to determine whether a request should be retried.

        Args:
            handler (Callable): A function that takes a `httpx.Response` and a `astel.parsers.Url` object and returns a boolean indicating whether the request should be retried.

        Returns:
            Crawler: The `Crawler` object with the retry handler set.
        """  # noqa: E501
        self._must_retry = handler
        return self

    @property
    def total_pages(self) -> int:
        """The total number of pages queued by the crawler."""
        return self._total_pages

    @property
    def done(self) -> set[str]:
        """The URLs that have been crawled by the crawler."""
        return self._done

    @property
    def urls_seen(self) -> set[parsers.Url]:
        """The URLs that have been seen by the crawler."""
        return self._urls_seen

    @property
    def rate_limiter(self) -> limiters.RateLimiter:
        """The rate limiter used by the crawler."""
        return self._rate_limiter

    @property
    def num_workers(self) -> int:
        """The number of worker tasks used by the crawler."""
        return self._num_workers

    @property
    def limit(self) -> int:
        """The maximum number of pages to crawl.

        It is used as a fail-safe to prevent the crawler from running indefinitely.
        """
        return self._limit

    @property
    def parser_factory(self) -> ParserFactory:
        """The parser factory object used by the crawler to parse HTML responses."""
        return self._parser_factory

    @property
    def start_urls(self) -> Set[str]:
        """The URLs that the crawler was started with."""
        return self._start_urls

    @property
    def agent(self) -> str:
        """The user agent used by the crawler."""
        return self._agent.name

    @property
    def options(self) -> CrawlerOptions:
        """The options used by the crawler."""
        return self._options

    @options.setter
    def options(self, options: Optional[CrawlerOptions] = None) -> None:
        """Set the options used by the crawler."""
        self._options = merge_with_default_options(options)
        self._client = self._options["client"]
        self._agent = agent.UserAgent(self._options["user_agent"])
        self._rate_limiter = self._options["rate_limiter"]
        self._num_workers = self._options["workers"]
        self._limit = self._options["limit"]
        self._parser_factory = self._options["parser_factory"]
        self._event_emitter = self._options["event_emitter_factory"]()

agent: str property

The user agent used by the crawler.

done: set[str] property

The URLs that have been crawled by the crawler.

limit: int property

The maximum number of pages to crawl.

It is used as a fail-safe to prevent the crawler from running indefinitely.

num_workers: int property

The number of worker tasks used by the crawler.

options: CrawlerOptions property writable

The options used by the crawler.

parser_factory: ParserFactory property

The parser factory object used by the crawler to parse HTML responses.

rate_limiter: limiters.RateLimiter property

The rate limiter used by the crawler.

start_urls: Set[str] property

The URLs that the crawler was started with.

total_pages: int property

The total number of pages queued by the crawler.

urls_seen: set[parsers.Url] property

The URLs that have been seen by the crawler.

filter(*args, **kwargs)

Add URL filters to the crawler.

Filters can be used to determine which URLs should be ignored.

Parameters:

Name Type Description Default
*args Filter

A list of Filter objects to add to the crawler.

()
**kwargs Any

A list of keyword arguments to create Filter objects from.

{}

Returns:

Name Type Description
Crawler Self

The Crawler object with the added filters.

Raises:

Type Description
ValueError

If a filter could not be created from the given keyword arguments.

Examples:

>>> crawler.filter(filters.StartsWith("scheme", "http"))
>>> crawler.filter(filters.Matches("https://example.com"))
>>> crawler.filter(domain__in=["example.com"])
Source code in astel/crawler.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def filter(self, *args: filters.CallableFilter, **kwargs) -> Self:
    """Add URL filters to the crawler.

    Filters can be used to determine which URLs should be ignored.

    Args:
        *args (Filter): A list of `Filter` objects to add to the crawler.
        **kwargs (Any): A list of keyword arguments to create `Filter` objects from.

    Returns:
        Crawler: The `Crawler` object with the added filters.

    Raises:
        ValueError: If a filter could not be created from the given keyword arguments.

    Examples:
        >>> crawler.filter(filters.StartsWith("scheme", "http"))
        >>> crawler.filter(filters.Matches("https://example.com"))
        >>> crawler.filter(domain__in=["example.com"])
    """  # noqa: E501
    self._filters.extend(
        [
            *args,
            *[
                f
                for f in (
                    filters.create_from_kwarg(key, value)
                    for key, value in kwargs.items()
                )
                if f is not None
            ],
        ],
    )
    return self

on(event, handler)

Add an event handler to the crawler.

An event is emitted when - a request is ready to be sent (Event.REQUEST): the httpx.Request object is passed to the handler. - a response is received (Event.RESPONSE): the httpx.Response object is passed to the handler. - an error occurs (Event.ERROR): the Error object is passed to the handler. - a URL is done being processed (Event.DONE): the astel.parsers.Url object is passed to the handler. - a URL is found in a page (Event.URL_FOUND): the astel.parsers.Url object is passed to the handler.

Parameters:

Name Type Description Default
event str

The event to add the handler to.

required
handler Callable

The handler to add to the event.

required
Source code in astel/crawler.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def on(self, event: events.Event, handler: events.Handler) -> Self:
    """Add an event handler to the crawler.

    An event is emitted when
    - a request is ready to be sent (`Event.REQUEST`): the `httpx.Request` object is
    passed to the handler.
    - a response is received (`Event.RESPONSE`): the `httpx.Response` object is
    passed to the handler.
    - an error occurs (`Event.ERROR`): the `Error` object is passed to the handler.
    - a URL is done being processed (`Event.DONE`): the `astel.parsers.Url` object
    is passed to the handler.
    - a URL is found in a page (`Event.URL_FOUND`): the `astel.parsers.Url` object is passed to the handler.

    Args:
        event (str): The event to add the handler to.
        handler (Callable): The handler to add to the event.
    """  # noqa: E501
    self._event_emitter.on(event, handler)
    return self

parse_site_map(site_map_path) async

Parse a sitemap.xml file and return the URLs found in it.

Parameters:

Name Type Description Default
site_map_path str

The URL of the sitemap.xml file.

required

Returns:

Type Description
Set[Url]

Set[parsers.Url]: The URLs found in the sitemap.xml file.

Source code in astel/crawler.py
200
201
202
203
204
205
206
207
208
209
210
211
212
async def parse_site_map(self, site_map_path: str) -> Set[parsers.Url]:
    """Parse a sitemap.xml file and return the URLs found in it.

    Args:
        site_map_path (str): The URL of the sitemap.xml file.

    Returns:
        Set[parsers.Url]: The URLs found in the sitemap.xml file.
    """
    parser = parsers.SiteMapParser(site_map_path)
    response = (await self._client.get(site_map_path)).raise_for_status()
    parser.feed(response.text)
    return parser.found_links

reset()

Reset the crawler.

Source code in astel/crawler.py
295
296
297
298
299
def reset(self) -> None:
    """Reset the crawler."""
    self._done.clear()
    self._urls_seen.clear()
    self._total_pages = 0

retry(handler)

Set a handler to determine whether a request should be retried.

Parameters:

Name Type Description Default
handler Callable

A function that takes a httpx.Response and a astel.parsers.Url object and returns a boolean indicating whether the request should be retried.

required

Returns:

Name Type Description
Crawler Self

The Crawler object with the retry handler set.

Source code in astel/crawler.py
301
302
303
304
305
306
307
308
309
310
311
def retry(self, handler: RetryHandler) -> Self:
    """Set a handler to determine whether a request should be retried.

    Args:
        handler (Callable): A function that takes a `httpx.Response` and a `astel.parsers.Url` object and returns a boolean indicating whether the request should be retried.

    Returns:
        Crawler: The `Crawler` object with the retry handler set.
    """  # noqa: E501
    self._must_retry = handler
    return self

run() async

Run the crawler.

Source code in astel/crawler.py
84
85
86
87
88
89
90
91
92
93
94
async def run(self) -> None:
    """Run the crawler."""
    await self._on_found_links({parsers.parse_url(url) for url in self._start_urls})

    self._workers = [
        asyncio.create_task(self._worker()) for _ in range(self._num_workers)
    ]
    await self._todo.join()

    for worker in self._workers:
        worker.cancel()

stop(*, reset=False)

Stop the crawler current execution.

Parameters:

Name Type Description Default
reset bool

Optionally, reset the crawler on the same call. Defaults to False.

False
Source code in astel/crawler.py
284
285
286
287
288
289
290
291
292
293
def stop(self, *, reset: bool = False) -> None:
    """Stop the crawler current execution.

    Args:
        reset (bool, optional: Optionally, reset the crawler on the same call. Defaults to `False`.
    """  # noqa: E501
    for worker in self._workers:
        worker.cancel()
    if reset:
        self.reset()

astel.errors

Error

Bases: Exception

Base class for exceptions in this package

Source code in astel/errors.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class Error(Exception):
    """
    Base class for exceptions in this package
    """

    default_message: str | None = None

    def __init__(self, message: str = "") -> None:
        super().__init__(message)
        self.message = message or self.default_message

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.message})"

InvalidConfigurationError

Bases: Error

Raised when a rate limiter configure call is invalid

Source code in astel/errors.py
29
30
31
32
33
34
35
36
class InvalidConfigurationError(Error):
    """
    Raised when a rate limiter configure call is invalid
    """

    default_message = (
        "Invalid configuration. A crawl delay or a request rate must be given."
    )

InvalidUrlError

Bases: Error

Raised when a URL is invalid

Source code in astel/errors.py
19
20
21
22
23
24
25
26
class InvalidUrlError(Error):
    """
    Raised when a URL is invalid
    """

    def __init__(self, url: str) -> None:
        super().__init__(f'The URL "{url}" is invalid.')
        self.url = url

astel.events

Event handlers for the crawler.

This module defines the event handlers that can be used to do some action when a specific event occurs, like storing information about the pages crawled, logging errors, or stopping the execution. The handlers are called with the current Crawler instance (passed through the crawler kwarg) and the event data.

DoneHandler

Bases: Protocol

Handler for when a crawler finishes processing a URL.

Source code in astel/events.py
50
51
52
53
class DoneHandler(Protocol):
    """Handler for when a crawler finishes processing a URL."""

    def __call__(self, url: parsers.Url, crawler: "Crawler") -> None: ...

ErrorHandler

Bases: Protocol

Handler for errors occurred during a crawler execution.

Source code in astel/events.py
42
43
44
45
46
47
class ErrorHandler(Protocol):
    """Handler for errors occurred during a crawler execution."""

    def __call__(
        self, error: errors.Error, crawler: "Crawler", *, reraise: bool = False
    ) -> None: ...

EventEmitter

Bases: Protocol

Protocol for an event emitter.

Source code in astel/events.py
67
68
69
70
71
72
73
74
75
76
77
class EventEmitter(Protocol):
    """Protocol for an event emitter."""

    def emit(
        self,
        event: Event,
        *data: Union[httpx.Request, httpx.Response, errors.Error, parsers.Url],
        crawler: "Crawler",
    ) -> Self: ...

    def on(self, event: Event, handler: Handler) -> Self: ...

RequestHandler

Bases: Protocol

Handler for requests made by a crawler.

Source code in astel/events.py
30
31
32
33
class RequestHandler(Protocol):
    """Handler for requests made by a crawler."""

    def __call__(self, request: httpx.Request, crawler: "Crawler") -> None: ...

ResponseHandler

Bases: Protocol

Handler for responses received by a crawler.

Source code in astel/events.py
36
37
38
39
class ResponseHandler(Protocol):
    """Handler for responses received by a crawler."""

    def __call__(self, response: httpx.Response, crawler: "Crawler") -> None: ...

UrlFoundHandler

Bases: Protocol

Handler for when a URL is found in a page.

Source code in astel/events.py
56
57
58
59
class UrlFoundHandler(Protocol):
    """Handler for when a URL is found in a page."""

    def __call__(self, url: parsers.Url, crawler: "Crawler") -> None: ...

astel.filters

Filters for URLs.

Some URLs in a webpage may not be relevant to your use cases.

This module defines the filters that can be used to filter out URLs from the crawlers execution based on their properties.

CallableFilter

Bases: Protocol

Callable filter interface.

Source code in astel/filters.py
54
55
56
57
class CallableFilter(Protocol):
    """Callable filter interface."""

    def __call__(self, url: Url) -> bool: ...

Contains

Bases: TextFilter

Filter URLs based on a text substring.

Examples:

>>> from astel.filterers.filters import Contains
>>> domain_contains = Contains("domain", "example")
>>> domain_contains.filter(ParsedUrl(domain="https://example.com", ...))  # True
Source code in astel/filters.py
237
238
239
240
241
242
243
244
245
246
247
class Contains(TextFilter):
    """Filter URLs based on a text substring.

    Examples:
        >>> from astel.filterers.filters import Contains
        >>> domain_contains = Contains("domain", "example")
        >>> domain_contains.filter(ParsedUrl(domain="https://example.com", ...))  # True
    """

    def _apply(self, url: Url) -> bool:
        return self.text in self._get_url_property(url)

EndsWith

Bases: TextFilter

Filter URLs based on a text suffix.

Examples:

>>> from astel.filterers.filters import EndsWith
>>> domain_ends_with = EndsWith("domain", ".com")
>>> domain_ends_with.filter(ParsedUrl(domain="https://example.com", ...))  # True
Source code in astel/filters.py
224
225
226
227
228
229
230
231
232
233
234
class EndsWith(TextFilter):
    """Filter URLs based on a text suffix.

    Examples:
        >>> from astel.filterers.filters import EndsWith
        >>> domain_ends_with = EndsWith("domain", ".com")
        >>> domain_ends_with.filter(ParsedUrl(domain="https://example.com", ...))  # True
    """  # noqa: E501

    def _apply(self, url: Url) -> bool:
        return self._get_url_property(url).endswith(self.text)

Filter

Bases: ABC, Generic[T]

Base class for filters.

Filters are used to determine if a URL should be processed or not. They can be combined using the bitwise operator &: filter1 & filter2 will return a new filter that will pass only if both filter1 and filter2 pass.

New filters can be created by subclassing this class and implementing the _apply method.

Generic

Examples:

>>> from astel.filterers.filters import In
>>> domain_in_list = In("domain", ["example.com"])
>>> html_or_php = In(lambda url: url.path.split(".")[-1], ["html", "php"])
>>> my_filter = domain_in_list & html_or_php
Source code in astel/filters.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class Filter(ABC, Generic[T]):
    """
    Base class for filters.

    Filters are used to determine if a URL should be processed or not. They can be combined using the bitwise operator `&`: `filter1` & `filter2` will return a new filter that will pass only if both `filter1` and `filter2` pass.

    New filters can be created by subclassing this class and implementing the `_apply` method.

    Generic:
        T: The type of the filter parameter.

    Examples:
        >>> from astel.filterers.filters import In
        >>> domain_in_list = In("domain", ["example.com"])
        >>> html_or_php = In(lambda url: url.path.split(".")[-1], ["html", "php"])
        >>> my_filter = domain_in_list & html_or_php
    """  # noqa: E501

    url_prop: UrlProperty
    __inverted: bool
    _chained: list[Filter]
    param: T | None

    def __init__(
        self,
        url_prop: UrlProperty,
        param: T | None = None,
        *,
        _inverted: bool = False,
        _chained: list[Filter] | None = None,
    ) -> None:
        """Initializes the filter with the given URL property."""
        self.param = param
        self.url_prop = url_prop
        self.__inverted = _inverted
        self._chained = _chained or []

    @abstractmethod
    def _apply(self, url: Url) -> bool:
        """Test the filter rule on the given URL.

        Args:
            url (Url): The URL to test the filter on.

        Returns:
            bool: True if the URL passes the filter, False otherwise.
        """
        ...

    def _get_url_property(self, url: Url) -> str:
        """Return the URL property value for the given URL.

        Args:
            url (Url): The URL to get the property from.

        Returns:
            str: The URL property value.
        """
        return getattr(url, self.url_prop)

    def filter(self, url: Url) -> bool:
        """Applies the filter to the given URL.

        Args:
            url (Url): The URL to filter.

        Returns:
            bool: True if the URL passes the filter, False otherwise.
        """
        return all(
            (
                *(f.filter(url) for f in self._chained),
                bool(self._apply(url) - self.__inverted),
            )
        )

    def __call__(self, url: Url) -> bool:
        return self.filter(url)

    def __invert__(self) -> Filter:
        new = copy.deepcopy(self)
        new.__inverted = not self.__inverted  # noqa: SLF001
        return new

    def __and__(self, other: Filter) -> Filter:
        if not isinstance(other, Filter):
            raise NotImplementedError
        new = copy.deepcopy(self)
        new._chained.append(other)
        return new

__init__(url_prop, param=None, *, _inverted=False, _chained=None)

Initializes the filter with the given URL property.

Source code in astel/filters.py
83
84
85
86
87
88
89
90
91
92
93
94
95
def __init__(
    self,
    url_prop: UrlProperty,
    param: T | None = None,
    *,
    _inverted: bool = False,
    _chained: list[Filter] | None = None,
) -> None:
    """Initializes the filter with the given URL property."""
    self.param = param
    self.url_prop = url_prop
    self.__inverted = _inverted
    self._chained = _chained or []

filter(url)

Applies the filter to the given URL.

Parameters:

Name Type Description Default
url Url

The URL to filter.

required

Returns:

Name Type Description
bool bool

True if the URL passes the filter, False otherwise.

Source code in astel/filters.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def filter(self, url: Url) -> bool:
    """Applies the filter to the given URL.

    Args:
        url (Url): The URL to filter.

    Returns:
        bool: True if the URL passes the filter, False otherwise.
    """
    return all(
        (
            *(f.filter(url) for f in self._chained),
            bool(self._apply(url) - self.__inverted),
        )
    )

In

Bases: Filter[Sequence[str]]

Filter URLs based on a group of values.

Examples:

>>> from astel.filterers.filters import In
>>> domain_in_list = In("domain", ["example.com"])
>>> domain_in_list.filter(ParsedUrl(domain="https://example.com", ...))  # True
Source code in astel/filters.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class In(Filter[Sequence[str]]):
    """Filter URLs based on a group of values.

    Examples:
        >>> from astel.filterers.filters import In
        >>> domain_in_list = In("domain", ["example.com"])
        >>> domain_in_list.filter(ParsedUrl(domain="https://example.com", ...))  # True
    """

    def __init__(self, url_prop: UrlProperty, group: Sequence[str], **kwargs) -> None:
        super().__init__(url_prop, **kwargs)
        self.set = set(group)

    def _apply(self, url: Url) -> bool:
        return self._get_url_property(url) in self.set

Matches

Bases: Filter[Union[Pattern, str]]

Filter URLs based on a regular expression.

Examples:

>>> from astel.filterers.filters import Matches
>>> domain_matches = Matches("domain", r"example\..+")
>>> domain_matches.filter(ParsedUrl(domain="https://example.com", ...))  # True
Source code in astel/filters.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class Matches(Filter[Union[re.Pattern, str]]):
    r"""Filter URLs based on a regular expression.

    Examples:
        >>> from astel.filterers.filters import Matches
        >>> domain_matches = Matches("domain", r"example\..+")
        >>> domain_matches.filter(ParsedUrl(domain="https://example.com", ...))  # True
    """

    def __init__(
        self, url_prop: UrlProperty, regex: re.Pattern | str, **kwargs
    ) -> None:
        super().__init__(url_prop, regex, **kwargs)
        self.regex = re.compile(regex) if isinstance(regex, str) else regex

    def _apply(self, url: Url) -> bool:
        return re.match(self.regex, self._get_url_property(url)) is not None

StartsWith

Bases: TextFilter

Filter URLs based on a text prefix.

Examples:

>>> from astel.filterers.filters import StartsWith
>>> domain_starts_with = StartsWith("domain", "example")
>>> domain_starts_with.filter(ParsedUrl(domain="https://example.com", ...))  # True
Source code in astel/filters.py
211
212
213
214
215
216
217
218
219
220
221
class StartsWith(TextFilter):
    """Filter URLs based on a text prefix.

    Examples:
        >>> from astel.filterers.filters import StartsWith
        >>> domain_starts_with = StartsWith("domain", "example")
        >>> domain_starts_with.filter(ParsedUrl(domain="https://example.com", ...))  # True
    """  # noqa: E501

    def _apply(self, url: Url) -> bool:
        return self._get_url_property(url).startswith(self.text)

TextFilter

Bases: Filter[str], ABC

Base class for text filters.

Filters URLs based on a text value.

Source code in astel/filters.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class TextFilter(Filter[str], ABC):
    """Base class for text filters.

    Filters URLs based on a text value.
    """

    def __init__(
        self, url_prop: UrlProperty, text: str, *, case_sensitive: bool = True, **kwargs
    ) -> None:
        super().__init__(url_prop, **kwargs)
        self.case_sensitive = case_sensitive
        if not self.case_sensitive:
            text = text.lower()
        self.text = text

    def _get_url_property(self, url: Url) -> str:
        return (
            super()._get_url_property(url)
            if self.case_sensitive
            else super()._get_url_property(url).lower()
        )

create_from_kwarg(key, value)

Create a filter from a key-value pair.

Parameters:

Name Type Description Default
key str

The key to create the filter from.

required
value FilterParameter

The filter parameter.

required

Returns:

Type Description
Filter | None

Filter | None: The created filter or None if the key is invalid.

Source code in astel/filters.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def create_from_kwarg(key: str, value: T) -> Filter | None:
    """Create a filter from a key-value pair.

    Args:
        key (str): The key to create the filter from.
        value (FilterParameter): The filter parameter.

    Returns:
        Filter | None: The created filter or None if the key is invalid.
    """
    url_prop, filter_key = key.split("__")
    filter_key = _validate_filter_key(filter_key)
    url_prop = _validate_url_property(url_prop)

    for klass in _get_filter_subclasses():
        if klass.__name__.lower() == filter_key:
            return klass(url_prop, value)
        if klass.__name__.lower() == filter_key[1:]:
            klass = cast(Type[TextFilter], klass)
            if not isinstance(value, str):
                msg = f"Expected a string value for {klass.__name__} filter."
                raise ValueError(msg)
            modifier = filter_key[0]
            return klass(url_prop, value, case_sensitive=modifier != "i")
    return None

astel.limiters

Rate limiting module.

Most websites have rate limits to prevent abuse and to ensure that their servers.

This module defines the rate limiters that can be used to limit the amount of requests sent to a website.

NoLimitRateLimiter

Bases: RateLimiter

A limiter that does not limit the requests. Keep in mind that sending a lot of requests per second can result in throttling or even bans.

Source code in astel/limiters.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class NoLimitRateLimiter(RateLimiter):
    """
    A limiter that does not limit the requests. Keep in mind that sending a
    lot of requests per second can result in throttling or even bans.
    """

    async def limit(self) -> None:  # type: ignore[override]
        """
        Asynchronously sleeps for 0 seconds.
        """
        await asyncio.sleep(0)

    def configure(self, *args, **kwargs) -> None:
        """
        Does nothing
        """

configure(*args, **kwargs)

Does nothing

Source code in astel/limiters.py
115
116
117
118
def configure(self, *args, **kwargs) -> None:
    """
    Does nothing
    """

limit() async

Asynchronously sleeps for 0 seconds.

Source code in astel/limiters.py
109
110
111
112
113
async def limit(self) -> None:  # type: ignore[override]
    """
    Asynchronously sleeps for 0 seconds.
    """
    await asyncio.sleep(0)

PerDomainRateLimiter

Bases: RateLimiter

Limit the number of requests per domain using its especified limiter instance if given, otherwise uses the default limiter

Source code in astel/limiters.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
class PerDomainRateLimiter(RateLimiter):
    """Limit the number of requests per domain using its especified
    limiter instance if given, otherwise uses the default limiter
    """

    default_limiter: RateLimiter | None = None
    _domain_to_limiter: dict[str, RateLimiter]

    def __init__(
        self,
        default_limiter: RateLimiter | None = None,
    ) -> None:
        self.default_limiter = default_limiter
        self._domain_to_limiter = {}

    async def limit(self, url: str) -> None:  # type: ignore[override]
        """Limit the requests to the given URL by its domain.

        Args:
            url (str): The URL to limit

        Raises:
            errors.InvalidConfigurationError: If no limiter is found for the domain.
        """
        limiter = self._domain_to_limiter.get(
            self.extract_domain(url), self.default_limiter
        )
        if limiter is None:
            msg = "No limiter found for the domain."
            raise errors.InvalidConfigurationError(msg)

        await limiter.limit()

    def add_domain(self, domain: str, limiter: RateLimiter | None = None) -> None:
        """Adds a new domain to the limited domains with an optional rate limiter.

        Args:
            domain (str): A string representing the domain name to add.
            limiter (protocols.RateLimiter, optional): An optional `RateLimiter` instance used to limit the rate of requests to the domain. Defaults to None.

        Raises:
            errors.InvalidUrlError: If the given URL does not contain a valid domain.
        """  # noqa: E501
        if limiter is None and self.default_limiter is None:
            msg = "No limiter was provided and no default limiter was set."
            raise errors.InvalidConfigurationError(msg)

        self._domain_to_limiter[domain] = cast(
            RateLimiter, limiter or self.default_limiter
        )

    @staticmethod
    def extract_domain(url: str) -> str:
        """Extracts the domain from a given URL.

        Returns:
            str: A string representing the domain name extracted from the URL.
        """
        return tldextract.extract(url).domain

    def configure(self, config: RateLimiterConfig) -> None:
        """Configures the rate at which requests are made to a domain by defining its
        corresponding limiter.

        Args:
            config (RateLimiterConfig): The configuration to apply.

        Raises:
            errors.InvalidConfigurationError: If the new computed token rate is less than or equal to 0.
        """  # noqa: E501
        if (
            config["domain"] is not None
            and config["domain"] not in self._domain_to_limiter
        ):
            self.add_domain(config["domain"])
            self._domain_to_limiter[config["domain"]].configure(config)

    @property
    def domain_to_limiter(self) -> dict[str, RateLimiter]:
        return self._domain_to_limiter

add_domain(domain, limiter=None)

Adds a new domain to the limited domains with an optional rate limiter.

Parameters:

Name Type Description Default
domain str

A string representing the domain name to add.

required
limiter RateLimiter

An optional RateLimiter instance used to limit the rate of requests to the domain. Defaults to None.

None

Raises:

Type Description
InvalidUrlError

If the given URL does not contain a valid domain.

Source code in astel/limiters.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def add_domain(self, domain: str, limiter: RateLimiter | None = None) -> None:
    """Adds a new domain to the limited domains with an optional rate limiter.

    Args:
        domain (str): A string representing the domain name to add.
        limiter (protocols.RateLimiter, optional): An optional `RateLimiter` instance used to limit the rate of requests to the domain. Defaults to None.

    Raises:
        errors.InvalidUrlError: If the given URL does not contain a valid domain.
    """  # noqa: E501
    if limiter is None and self.default_limiter is None:
        msg = "No limiter was provided and no default limiter was set."
        raise errors.InvalidConfigurationError(msg)

    self._domain_to_limiter[domain] = cast(
        RateLimiter, limiter or self.default_limiter
    )

configure(config)

Configures the rate at which requests are made to a domain by defining its corresponding limiter.

Parameters:

Name Type Description Default
config RateLimiterConfig

The configuration to apply.

required

Raises:

Type Description
InvalidConfigurationError

If the new computed token rate is less than or equal to 0.

Source code in astel/limiters.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def configure(self, config: RateLimiterConfig) -> None:
    """Configures the rate at which requests are made to a domain by defining its
    corresponding limiter.

    Args:
        config (RateLimiterConfig): The configuration to apply.

    Raises:
        errors.InvalidConfigurationError: If the new computed token rate is less than or equal to 0.
    """  # noqa: E501
    if (
        config["domain"] is not None
        and config["domain"] not in self._domain_to_limiter
    ):
        self.add_domain(config["domain"])
        self._domain_to_limiter[config["domain"]].configure(config)

extract_domain(url) staticmethod

Extracts the domain from a given URL.

Returns:

Name Type Description
str str

A string representing the domain name extracted from the URL.

Source code in astel/limiters.py
261
262
263
264
265
266
267
268
@staticmethod
def extract_domain(url: str) -> str:
    """Extracts the domain from a given URL.

    Returns:
        str: A string representing the domain name extracted from the URL.
    """
    return tldextract.extract(url).domain

limit(url) async

Limit the requests to the given URL by its domain.

Parameters:

Name Type Description Default
url str

The URL to limit

required

Raises:

Type Description
InvalidConfigurationError

If no limiter is found for the domain.

Source code in astel/limiters.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
async def limit(self, url: str) -> None:  # type: ignore[override]
    """Limit the requests to the given URL by its domain.

    Args:
        url (str): The URL to limit

    Raises:
        errors.InvalidConfigurationError: If no limiter is found for the domain.
    """
    limiter = self._domain_to_limiter.get(
        self.extract_domain(url), self.default_limiter
    )
    if limiter is None:
        msg = "No limiter found for the domain."
        raise errors.InvalidConfigurationError(msg)

    await limiter.limit()

RateLimiter

Bases: ABC

Base class for rate limiters.

Source code in astel/limiters.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class RateLimiter(ABC):
    """Base class for rate limiters."""

    @abstractmethod
    def configure(
        self,
        config: RateLimiterConfig,
    ) -> None:
        """Configures the rate limiter to respect the rules defined by the domain with the given parameters.

        In the case of a craw delay, the craw delay is ignored.

        Args:
            config (RateLimiterConfig): The configuration to apply.
        """  # noqa: E501
        ...

    @abstractmethod
    async def limit(self, *args, **kwargs) -> None:
        """Asynchronously limits the specified URL."""
        ...

configure(config) abstractmethod

Configures the rate limiter to respect the rules defined by the domain with the given parameters.

In the case of a craw delay, the craw delay is ignored.

Parameters:

Name Type Description Default
config RateLimiterConfig

The configuration to apply.

required
Source code in astel/limiters.py
48
49
50
51
52
53
54
55
56
57
58
59
60
@abstractmethod
def configure(
    self,
    config: RateLimiterConfig,
) -> None:
    """Configures the rate limiter to respect the rules defined by the domain with the given parameters.

    In the case of a craw delay, the craw delay is ignored.

    Args:
        config (RateLimiterConfig): The configuration to apply.
    """  # noqa: E501
    ...

limit(*args, **kwargs) abstractmethod async

Asynchronously limits the specified URL.

Source code in astel/limiters.py
62
63
64
65
@abstractmethod
async def limit(self, *args, **kwargs) -> None:
    """Asynchronously limits the specified URL."""
    ...

RateLimiterConfig

Bases: TypedDict

Rate limiting configuration.

Attributes:

Name Type Description
domain str

The domain to crawl.

crawl_delay str

A string representing the delay between each crawl in the format "" (as of the format used by request_rate (RequestRate): The rate at which to make requests.

Source code in astel/limiters.py
32
33
34
35
36
37
38
39
40
41
42
class RateLimiterConfig(TypedDict, total=False):
    """Rate limiting configuration.

    Attributes:
        domain (str): The domain to crawl.
        crawl_delay (str, optional): A string representing the delay between each crawl in the format "<number><unit>" (as of the format used by request_rate (RequestRate): The rate at which to make requests.
    """  # noqa: E501

    domain: Optional[str]
    crawl_delay: Optional[str]
    request_rate: Optional[RequestRate]

StaticRateLimiter

Bases: RateLimiter

Limit the number of requests per second by waiting for a specified amount of time between requests

Parameters:

Name Type Description Default
time_in_seconds float

The amount of time to wait between requests

required
Source code in astel/limiters.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class StaticRateLimiter(RateLimiter):
    """Limit the number of requests per second by waiting for a
    specified amount of time between requests

    Args:
        time_in_seconds (float): The amount of time to wait between requests
    """

    def __init__(self, time_in_seconds: float) -> None:
        self.time = time_in_seconds

    async def limit(self) -> None:  # type: ignore[override]
        """Limit by wainting for the specified amount of time"""
        await asyncio.sleep(self.time)

    def configure(
        self,
        config: RateLimiterConfig,
    ) -> None:
        new_request_delay: Optional[float] = None
        if craw_delay := config.get("crawl_delay", None):
            new_request_delay = float(craw_delay)
        elif request_rate := config.get("request_rate", None):
            new_request_delay = request_rate.seconds / request_rate.requests

        if new_request_delay and new_request_delay < 0:
            msg = "The new request delay must be greater "
            "than 0 (got {new_request_delay})."
            raise errors.InvalidConfigurationError(msg)

        # Use the greater of the two in order to respect all the domains
        if new_request_delay and new_request_delay > self.time:
            self.time = new_request_delay

limit() async

Limit by wainting for the specified amount of time

Source code in astel/limiters.py
79
80
81
async def limit(self) -> None:  # type: ignore[override]
    """Limit by wainting for the specified amount of time"""
    await asyncio.sleep(self.time)

TokenBucketRateLimiter

Bases: RateLimiter

Limit the requests by using the token bucket algorithm

Parameters:

Name Type Description Default
tokens_per_second float

The amount of tokens to add to the bucket per second.

required
Source code in astel/limiters.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class TokenBucketRateLimiter(RateLimiter):
    """Limit the requests by using the token bucket algorithm

    Args:
        tokens_per_second (float): The amount of tokens to add to the bucket per second.
    """

    __slots__ = ("_tokens_per_second", "_tokens", "_last_refresh_time")

    def __init__(self, tokens_per_second: float) -> None:
        if tokens_per_second <= 0:
            msg = "tokens_per_second must be greater than 0"
            raise ValueError(msg)

        self._tokens_per_second = tokens_per_second
        self._tokens = 0.0
        self._last_refresh_time = self.utcnow()

    @staticmethod
    def utcnow() -> datetime:
        return datetime.now(timezone.utc)

    def _refresh_tokens(self) -> None:
        """Refreshes the tokens in the bucket based on the time elapsed since
        the last refresh
        """
        current_time = self.utcnow()
        time_elapsed = current_time - self._last_refresh_time
        new_tokens = time_elapsed.seconds * self._tokens_per_second
        self._tokens = float(min(self._tokens + new_tokens, self._tokens_per_second))
        self._last_refresh_time = current_time

    def consume(self, tokens: int = 1) -> bool:
        """Check if the given number of tokens can be consumed and decrease the
        number of available tokens if possible.

        Args:
            tokens (int, optional): The number of tokens to consume. Default is 1.

        Returns:
            bool: `True` if the tokens were consumed, `False` otherwise
        """
        self._refresh_tokens()
        if self._tokens >= tokens:
            self._tokens -= tokens
            return True
        return False

    async def limit(self) -> None:  # type: ignore[override]
        while not self.consume(1):
            pass

    @property
    def tokens(self) -> float:
        self._refresh_tokens()
        return self._tokens

    @property
    def tokens_per_second(self) -> float:
        return self._tokens_per_second

    @property
    def last_refresh_time(self) -> datetime:
        return self._last_refresh_time

    def configure(
        self,
        config: RateLimiterConfig,
    ) -> None:
        """Configures the rate at which requests are made to a domain by setting the
        tokens per second.
        """
        if config["crawl_delay"] is not None:
            new_token_rate = 1 / int(config["crawl_delay"])
        elif config["request_rate"] is not None:
            new_token_rate = (
                config["request_rate"].requests / config["request_rate"].seconds
            )
        else:
            return

        if new_token_rate < 0:
            msg = f"The new token rate must be greater than 0 (got {new_token_rate})."
            raise errors.InvalidConfigurationError(msg)

        if new_token_rate < self._tokens_per_second:
            self._tokens_per_second = new_token_rate

configure(config)

Configures the rate at which requests are made to a domain by setting the tokens per second.

Source code in astel/limiters.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def configure(
    self,
    config: RateLimiterConfig,
) -> None:
    """Configures the rate at which requests are made to a domain by setting the
    tokens per second.
    """
    if config["crawl_delay"] is not None:
        new_token_rate = 1 / int(config["crawl_delay"])
    elif config["request_rate"] is not None:
        new_token_rate = (
            config["request_rate"].requests / config["request_rate"].seconds
        )
    else:
        return

    if new_token_rate < 0:
        msg = f"The new token rate must be greater than 0 (got {new_token_rate})."
        raise errors.InvalidConfigurationError(msg)

    if new_token_rate < self._tokens_per_second:
        self._tokens_per_second = new_token_rate

consume(tokens=1)

Check if the given number of tokens can be consumed and decrease the number of available tokens if possible.

Parameters:

Name Type Description Default
tokens int

The number of tokens to consume. Default is 1.

1

Returns:

Name Type Description
bool bool

True if the tokens were consumed, False otherwise

Source code in astel/limiters.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def consume(self, tokens: int = 1) -> bool:
    """Check if the given number of tokens can be consumed and decrease the
    number of available tokens if possible.

    Args:
        tokens (int, optional): The number of tokens to consume. Default is 1.

    Returns:
        bool: `True` if the tokens were consumed, `False` otherwise
    """
    self._refresh_tokens()
    if self._tokens >= tokens:
        self._tokens -= tokens
        return True
    return False

astel.options

Options module.

This module defines the options that can be used to configure the crawlers behavior.

CrawlerOptions

Bases: TypedDict

Crawler options.

Attributes:

Name Type Description
client AsyncClient

An instance of httpx.AsyncClient to use for network requests.

workers int

The number of worker tasks to run in parallel.

limit int

The maximum number of pages to crawl.

user_agent str

The user agent to use for the requests.

parser_factory ParserFactory

A factory function to create a parser instance.

rate_limiter RateLimiter

The rate limiter to limit the number of requests sent per second.

event_limiter_factory Callable[[], EventEmitter]

A factory function to create an event limiter for the crawler.

retry_for_status_codes list[int]

A list of status codes for which the crawler should retry the request.

Source code in astel/options.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class CrawlerOptions(TypedDict, total=False):
    """Crawler options.

    Attributes:
        client (httpx.AsyncClient): An instance of `httpx.AsyncClient` to use for network requests.
        workers (int): The number of worker tasks to run in parallel.
        limit (int): The maximum number of pages to crawl.
        user_agent (str): The user agent to use for the requests.
        parser_factory (ParserFactory): A factory function to create a parser instance.
        rate_limiter (limiters.RateLimiter): The rate limiter to limit the number of requests sent per second.
        event_limiter_factory (Callable[[], events.EventEmitter]): A factory function to create an event limiter for the crawler.
        retry_for_status_codes (list[int]): A list of status codes for which the crawler should retry the request.
    """  # noqa: E501

    client: httpx.AsyncClient
    workers: int
    limit: int
    user_agent: str
    parser_factory: ParserFactory
    rate_limiter: limiters.RateLimiter
    event_emitter_factory: Callable[[], events.EventEmitter]
    retry_for_status_codes: list[int]

ParserFactory

Bases: Protocol

Callable that creates a parser instance.

Source code in astel/options.py
20
21
22
23
class ParserFactory(Protocol):
    """Callable that creates a parser instance."""

    def __call__(self, base: str | None = None) -> parsers.Parser: ...

RetryHandler

Bases: Protocol

Callable that determines whether the crawler should retry the request.

Source code in astel/options.py
26
27
28
29
30
31
class RetryHandler(Protocol):
    """Callable that determines whether the crawler should retry the request."""

    def __call__(
        self, url: parsers.Url, response: Union[httpx.Response, None], crawler: Crawler
    ) -> bool: ...

merge_with_default_options(options=None)

Merge the given options with the default options.

Parameters:

Name Type Description Default
options CrawlerOptions

The options to merge.

None

Returns:

Name Type Description
CrawlerOptions CrawlerOptions

The merged options.

Source code in astel/options.py
72
73
74
75
76
77
78
79
80
81
def merge_with_default_options(options: CrawlerOptions | None = None) -> CrawlerOptions:
    """Merge the given options with the default options.

    Args:
        options (CrawlerOptions): The options to merge.

    Returns:
        CrawlerOptions: The merged options.
    """
    return {**DEFAULT_OPTIONS, **(options or {})}  # type: ignore   # noqa: PGH003

astel.parsers

Parsers for extracting links from webpages and sitemaps.

This module defines the parsers that can be used to extract the links from the content of a webpage or a sitemap.

BaseParser

Bases: InitParserMixin, ABC

Base class to be used for implementing new parser classes.

Source code in astel/parsers.py
144
145
class BaseParser(InitParserMixin, ABC):
    """Base class to be used for implementing new parser classes."""

HTMLAnchorsParser

Bases: InitParserMixin, HTMLParser

A parser that extracts the urls from a webpage and filter them out with the given filterer.

Parameters:

Name Type Description Default
base str

The base URL to use to resolve relative URLs

None
Source code in astel/parsers.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class HTMLAnchorsParser(InitParserMixin, HTMLParser):
    """A parser that extracts the urls from a webpage and filter them out with the
    given filterer.

    Args:
        base (str): The base URL to use to resolve relative URLs
    """

    @override
    def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
        if tag != "a":
            return

        for attr, value in attrs:
            if attr == "href" and isinstance(value, str):
                self.found_links.add(parse_url(value, self.base))

InitParserMixin

Helper mixin to initialize the parser with a base URL.

Source code in astel/parsers.py
129
130
131
132
133
134
135
136
137
138
139
140
141
class InitParserMixin:
    """Helper mixin to initialize the parser with a base URL."""

    def __init__(self, base: str | None = None) -> None:
        self.base = base
        self.found_links: Set[Url] = set()
        super().__init__()

    def reset(self, base: str | None = None) -> None:
        if base is not None:
            self.base = base
        self.found_links.clear()
        getattr(super(), "reset", lambda: ...)()

Parser

Bases: Protocol

Parses the content of a file (webpages, or sitemaps, for example) to extract the links of interest.

Parameters:

Name Type Description Default
base Union[str, None]

The base URL to use to resolve relative URLs. Defaults to None.

None
Source code in astel/parsers.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class Parser(Protocol):
    """Parses the content of a file (webpages, or sitemaps, for example) to extract the links of interest.

    Args:
        base (Union[str, None]): The base URL to use to resolve relative URLs. Defaults to `None`.
    """  # noqa: E501

    def __init__(self, base: str | None = None) -> None: ...

    def feed(self, text: str) -> None:
        """Process the content of a website and update the `found_links` attribute

        Args:
            text (str): The content of the website
        """
        ...

    def reset(self, base: str | None = None) -> None:
        """Reset the parser to its initial state.

        Args:
            base (Union[str, None], optional): The base URL to use to resolve relative URLs. Defaults to `None`.
        """  # noqa: E501

    @property
    def base(self) -> str | None: ...

    @property
    def found_links(self) -> Set[Url]: ...

feed(text)

Process the content of a website and update the found_links attribute

Parameters:

Name Type Description Default
text str

The content of the website

required
Source code in astel/parsers.py
59
60
61
62
63
64
65
def feed(self, text: str) -> None:
    """Process the content of a website and update the `found_links` attribute

    Args:
        text (str): The content of the website
    """
    ...

reset(base=None)

Reset the parser to its initial state.

Parameters:

Name Type Description Default
base Union[str, None]

The base URL to use to resolve relative URLs. Defaults to None.

None
Source code in astel/parsers.py
67
68
69
70
71
72
def reset(self, base: str | None = None) -> None:
    """Reset the parser to its initial state.

    Args:
        base (Union[str, None], optional): The base URL to use to resolve relative URLs. Defaults to `None`.
    """  # noqa: E501

SiteMapParser

Bases: InitParserMixin

Parses a sitemap file to extract the links of interest.

Parameters:

Name Type Description Default
base str

The base URL to use to resolve relative URLs

None
Source code in astel/parsers.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class SiteMapParser(InitParserMixin):
    """Parses a sitemap file to extract the links of interest.

    Args:
        base (str): The base URL to use to resolve relative URLs
    """

    def feed(self, text: str) -> None:
        root = ElementTree.fromstring(text)

        for url_element in root.iter(
            "{http://www.sitemaps.org/schemas/sitemap/0.9}url"
        ):
            loc_element = url_element.find(
                "{http://www.sitemaps.org/schemas/sitemap/0.9}loc"
            )
            if loc_element is not None and loc_element.text:
                self.found_links.add(parse_url(loc_element.text))

Url

Bases: Protocol

Model of a URL for the library to work with.

Source code in astel/parsers.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Url(Protocol):
    """
    Model of a URL for the library to work with.
    """

    @property
    def domain(self) -> str: ...

    @property
    def path(self) -> str: ...

    @property
    def params(self) -> str: ...

    @property
    def scheme(self) -> str: ...

    @property
    def query(self) -> str: ...

    @property
    def fragment(self) -> str: ...

    @property
    def raw(self) -> str: ...

    @property
    def filetype(self) -> str: ...

parse_url(url, base=None)

Parse a URL into its components.

Parameters:

Name Type Description Default
url str

The URL to parse

required
base str

The base URL to use to resolve relative URLs. Defaults to None.

None

Returns:

Name Type Description
Url Url

The parsed URL

Source code in astel/parsers.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def parse_url(url: str, base: str | None = None) -> Url:
    """Parse a URL into its components.

    Args:
        url (str): The URL to parse
        base (str, optional): The base URL to use to resolve relative URLs. Defaults to `None`.

    Returns:
        Url: The parsed URL
    """  # noqa: E501
    result = parse.urlparse(url if base is None else parse.urljoin(base, url))
    return ParsedUrl(
        result.scheme,
        result.netloc,
        result.path,
        result.params,
        result.query,
        result.fragment,
    )