Skip to content

Parallel asynchronous GET requests with asyncio

Louis Maddox edited this page Feb 10, 2021 · 23 revisions

The 3 types of async statement in Python 3.5+ to get to know here are

  • async with (asynchronous context manager)
  • async for (asynchronous generator)
  • (asynchronous generators)

via Łukasz Langa

(also will see async def asynchronous function)

Another via RealPython uses aiohttp

The high-level program structure will look like this:

  • Read a sequence of URLs from a local file, urls.txt.

  • Send GET requests for the URLs and decode the resulting content. If this fails, stop there for a URL.

  • Search for the URLs within href tags in the HTML of the responses.

  • Write the results to foundurls.txt.

Do all of the above as asynchronously and concurrently as possible. (Use aiohttp for the requests, and aiofiles for the file-appends. These are two primary examples of IO that are well-suited for the async IO model.)

Code example

#!/usr/bin/env python3
# areq.py

"""Asynchronously get links embedded in multiple pages' HMTL."""

import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse

import aiofiles
import aiohttp
from aiohttp import ClientSession

logging.basicConfig(
    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')

async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
    """GET request wrapper to fetch page HTML.

    kwargs are passed to `session.request()`.
    """

    resp = await session.request(method="GET", url=url, **kwargs)
    resp.raise_for_status()
    logger.info("Got response [%s] for URL: %s", resp.status, url)
    html = await resp.text()
    return html

async def parse(url: str, session: ClientSession, **kwargs) -> set:
    """Find HREFs in the HTML of `url`."""
    found = set()
    try:
        html = await fetch_html(url=url, session=session, **kwargs)
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logger.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
        return found
    except Exception as e:
        logger.exception(
            "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})
        )
        return found
    else:
        for link in HREF_RE.findall(html):
            try:
                abslink = urllib.parse.urljoin(url, link)
            except (urllib.error.URLError, ValueError):
                logger.exception("Error parsing URL: %s", link)
                pass
            else:
                found.add(abslink)
        logger.info("Found %d links for %s", len(found), url)
        return found

async def write_one(file: IO, url: str, **kwargs) -> None:
    """Write the found HREFs from `url` to `file`."""
    res = await parse(url=url, **kwargs)
    if not res:
        return None
    async with aiofiles.open(file, "a") as f:
        for p in res:
            await f.write(f"{url}\t{p}\n")
        logger.info("Wrote results for source URL: %s", url)

async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
    """Crawl & write concurrently to `file` for multiple `urls`."""
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                write_one(file=file, url=url, session=session, **kwargs)
            )
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    outpath = here.joinpath("foundurls.txt")
    with open(outpath, "w") as outfile:
        outfile.write("source_url\tparsed_url\n")

    asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))

The call to run at the end of this example means you don’t have to handle the opening and closing of the event loop itself, it’s handled for you (as the Python docs for asyncio recommend)

Application developers should typically use the high-level asyncio functions, such as asyncio.run(), and should rarely need to reference the loop object or call its methods. This section is intended mostly for authors of lower-level code, libraries, and frameworks, who need finer control over the event loop behavior.

aiostream

I quite liked this demo (via):

  • I removed the pipe part and added a generator function (TODO: try an async generator)
import asyncio
import aiohttp
import random
from aiostream import stream

async def main(urls):
    async with aiohttp.ClientSession() as session:
        ws = stream.repeat(session)
        xs = stream.zip(ws, stream.iterate(urls))
        ys = stream.starmap(xs, fetch, ordered=False, task_limit=10)
        zs = stream.map(ys, process)
        await zs

async def fetch(session, url):
    await asyncio.sleep(1 + 4*random.random())
    return url

async def process(data):
    print(data)

def get_urls():
    for s in ["qrx", "conf", "ocu", "pore", "cal", "poll"]:
        yield f"https://{s}.spin.systems"

foo = main(get_urls())
loop = asyncio.get_event_loop()
loop.run_until_complete(foo)
loop.close()

...but it doesn't actually download anything!

It's clear though that the HTTP GET request would go where the asyncio.sleep is, and an example of that can be found here:

import aiohttp
import asyncio

async def fetch(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            return await response.text()

async def fetch_all(session, urls, loop):
    results = await asyncio.wait([loop.create_task(fetch(session, url))
                                  for url in urls])
    return results

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # breaks because of the first url
    urls = ['http://SDFKHSKHGKLHSKLJHGSDFKSJH.com',
            'http://google.com',
            'http://twitter.com']
    with aiohttp.ClientSession(loop=loop) as session:
        the_results = loop.run_until_complete(
            fetch_all(session, urls, loop))
        # do something with the the_results

I found this example didn't actually run as intended any more, but the important part is to see the GET request taking place within the fetch (async) function:

async def fetch(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            return await response.text()

rather than

async def fetch(session, url):
    await asyncio.sleep(1 + 4*random.random())
    return url

The aiohttp.Timeout(10) context manager seems (from the docs) to be an outdated way of setting an option on the session (default seems to be 5 mins).

So I'd suggest:

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

and to copy the client reference:

async def fetch(client):
    async with client.get('http://python.org') as resp:
        assert resp.status == 200
        return await resp.text()

async def main():
    async with aiohttp.ClientSession() as client:
        html = await fetch(client)
        print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Clone this wiki locally