Three async patterns I used building a broken-link checker¶
A broken-link checker is the kind of project that begs for async — you're waiting on hundreds of HTTP responses, and doing that sequentially is painfully slow. I built link_checker, an async crawler that checks every link on a website, and along the way I leaned on three asyncio patterns that keep showing up in real-world async Python code.
This post walks through those three patterns using code from the project. The full source is on GitHub.
Pattern 1: Rate limiting with asyncio.Semaphore¶
The first instinct with async HTTP is to fire off all requests at once. That's a good way to get rate-limited, overwhelm the target server, or exhaust file descriptors. A semaphore puts a ceiling on how many requests are in flight at any moment.
Here's the core of fetcher.py:
class LinkFetcher:
def __init__(self, semaphore: asyncio.Semaphore, timeout: aiohttp.ClientTimeout):
self.semaphore = semaphore
self.timeout = timeout
async def check_url(self, session, url, source_url=None, method="GET"):
start = asyncio.get_running_loop().time()
try:
async with self.semaphore:
async with session.request(method, url, timeout=self.timeout,
allow_redirects=True) as response:
# ... process response
except asyncio.TimeoutError:
# ... handle timeout
except aiohttp.ClientError as e:
# ... handle connection errors
The key line is async with self.semaphore:. Every call to check_url blocks at the semaphore until a slot opens up. If you create Semaphore(10), at most 10 requests run concurrently — regardless of how many workers are pulling URLs off the queue.
Semaphore vs. connection pool
aiohttp's TCPConnector(limit=N) also caps connections, but that's a transport-level limit. The semaphore is an application-level throttle — you can set it independently to control politeness toward target servers without touching the connection pool.
Pattern 2: Queue + TaskGroup worker pool¶
The crawler needs to do a breadth-first crawl: start with one URL, fetch the page, discover new links, check those, discover more links, and so on. The work is dynamic — you don't know the full URL list upfront.
asyncio.Queue combined with asyncio.TaskGroup gives you a clean worker pool pattern. From crawler.py:
async def run(self) -> CrawlReport:
self.visited.add(self.start_url)
await self.queue.put(CrawlURL(self.start_url, source_url=None, depth=0, is_internal=True))
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=20)) as session:
async with asyncio.TaskGroup() as task_group:
workers = []
for i in range(self.num_workers):
task = task_group.create_task(self._worker(i, session))
workers.append(task)
await self.queue.join()
for task in workers:
task.cancel()
return self.report
Each worker runs a simple loop — pull a URL, process it, mark it done:
async def _worker(self, worker_id, session):
while True:
crawl_url = await self.queue.get()
try:
if crawl_url.is_internal:
result = await self._process_internal(session, crawl_url)
else:
result = await self._process_external(session, crawl_url)
self.report.add_result(result)
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"worker {worker_id} error on {crawl_url.url}: {e}")
finally:
self.queue.task_done()
Here's how the pieces fit together:
flowchart LR
Q[asyncio.Queue] -->|get| W1[Worker 1]
Q -->|get| W2[Worker 2]
Q -->|get| W3[Worker N]
W1 -->|new links| Q
W2 -->|new links| Q
W3 -->|new links| Q
W1 -->|results| R[CrawlReport]
W2 -->|results| R
W3 -->|results| R
queue.join() must be inside the TaskGroup
queue.join() blocks until every item has had task_done() called. If you put the join after the TaskGroup block, the TaskGroup would wait for workers to finish first — but workers loop forever with while True, so they never finish. The join must be inside the TaskGroup so it can cancel workers once the queue drains.
No lock needed for the visited set
The _enqueue_if_new method checks if url in self.visited and then does self.visited.add(url). This looks like a race condition, but it's not — asyncio is single-threaded, and there's no await between the check and the add. The coroutine can't be preempted in that window.
Pattern 3: Graceful shutdown with partial results¶
When you Ctrl+C a crawl that's checked 50 out of 300 URLs, you still want to see what was found so far. The signal handler and CancelledError catch in __main__.py make this work:
async def async_main(args):
loop = asyncio.get_running_loop()
current_task = asyncio.current_task()
if current_task:
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, current_task.cancel)
try:
link_checker = LinkChecker(start_url=args.url, ...)
report = await link_checker.run()
print_report(report)
except (asyncio.CancelledError, KeyboardInterrupt):
logger.info("Operation was canceled")
print_report(link_checker.report)
The signal handler calls current_task.cancel(), which raises CancelledError at whatever await is currently active inside link_checker.run(). The local variable report is never assigned — but that doesn't matter.
Results accumulate on the instance
As workers process URLs, they call self.report.add_result(result) one at a time. The CrawlReport instance on link_checker always has the latest partial results. In the except block, link_checker.report gives you everything collected before the cancellation. If you tried to use the local report variable, you'd get a NameError.
Wrapping up¶
These three patterns — semaphore for rate limiting, queue + TaskGroup for dynamic work distribution, and signal-based graceful shutdown — cover a lot of ground for I/O-bound async Python. They're not specific to link checking; any project that fans out to many network calls can use the same building blocks.
The full source is at github.com/dsdesign-dev/link_checker_async.