# Asyncio Demystified: Rebuilding it From Scratch One Yield at a Time

I don't know about you, but each time I await something in Python, I get a tiny itch in the back of my brain.

Not because I don’t know what it does or how to use it, that part’s fine. I write my async defs, slap some await in there, and it just works™️. No, what's bothering me is: why does it work?

If we’re not blocking the main process... who’s doing the actual work I’m awaiting? And more importantly, how does the result get back into my Python function?

Take a typical FastAPI route. Maybe we’re calling an external API:

```python

import httpx
from fastapi import APIRouter

router = APIRouter()

@router.get("/user/{id}")
async def get_user(id: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.example.com/users/{id}")
    return response.json()
```

The request comes in, the interpreter hits await client.get(...), and somehow... the server keeps handling other requests while the HTTP call is doing its thing.

Doing its thing *where*, exactly?

And once it's done, who taps the interpreter on the shoulder to say: "Hey, here’s your response. You can resume now." ?

It all feels a bit too magical, and I *hate* magic. That's why I always end up playing a sneaky archer in Skyrim.

This post is about removing the magic and understanding how it all works under the hood. We'll build our own version of asyncio step by step , from a simple generator all the way to running a fully working echo server using only our custom event loop.

Here is what the end result will look like :

```python
from socket import SO_REUSEADDR, SOL_SOCKET, socket
from scheduler.future import AcceptSocket, ReadSocket, Sleep
from scheduler.scheduler import Scheduler


async def read(conn):
    return await ReadSocket(conn)


async def accept(sock):
    return await AcceptSocket(sock)


async def echo(sock):
    while True:
        data = await read(sock)
        if not data:
            sock.close()

        print(f"received {data}")
        # assume non-blocking
        sock.send(data)


async def echo_server(scheduler: Scheduler, port: int):
    print("creating socket...")
    sock = socket()
    sock.bind(("localhost", port))

    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.listen(100)
    sock.setblocking(False)
    print("socket created waiting for connection")
    while True:
        conn = await accept(sock)
        # Schedule new concurrent connection
        scheduler.create_task(echo(conn))


if __name__ == "__main__":
    scheduler = Scheduler()
    scheduler.create_task(echo_server(scheduler, 1235))
    scheduler.run_forever()
```

To do that, we’ll need three things:

* A way to pause a function and resume where we left off.
    
* A way to switch between those paused/resumable functions.
    
* A way to start the work somewhere else, and get notified when it’s done.
    

Sound familiar? That first point should ring a bell: *generators*! Generators let us pause in the middle of a function and resume it later. And fun fact: that’s very close to how asyncio actually works under the hood.

(Want to skip ahead? Check out the [full repo here](https://github.com/jbrocher/asyncio-model) with all the code we'll write in this post.)

## Primer on Generators and Coroutines

First a small primer on generators, and their twin coroutines.

In python, generators are created (among other ways) using `yield`. A yield statement pauses the callee and returns something to the caller. This allows to do cool stuff like maintaining state across multiple function executions.

Here's an example generator that when iterated over return the Fibonacci sequence:

```python

from time import sleep


def fibonnaci_generator():
    n_1 = 0
    n_2 = 1
    while True:
        n = n_1 + n_2
        yield n

        n_2 = n_1
        n_1 = n


if __name__ == "__main__":
    generator = fibonnaci_generator()
    for x in generator:
        print(x)
        sleep(1)
```

![fibo.gif](https://res.cloudinary.com/dllztfozz/image/upload/c_crop,w_500,h_300,g_north_west/v1746179004/blog/kata-asyncio/jfdt9snkapccjtah3qlk.gif align="center")

Calling fibonacci\_generator() gives us a generator object. Iterating over it calls yield repeatedly, pausing and resuming the function between each number.

But yield has a hidden superpower: it doesn’t just let a generator produce values, it also allows it to receive values using `my_generator.send(value)`. This turns the generator into a data consumer, not just a producer.

A generator that can receive data like this is often referred to as a [coroutine](https://en.wikipedia.org/wiki/Coroutine). Why **co**routine? Because unlike regular subroutines (which run top-to-bottom and return once) they can pause, yield control, and be resumed later with external data, possibly by some external scheduler. That makes them perfect for cooperative multitasking, where functions can run concurrently by explicitly handing control back and forth, and sending values to each other, rather than stacking up on the call stack like traditional functions.

> ⚠️ Quick note on terminology:
> 
> In this post, I’m using the term **coroutine** in its broader, general sense. In Python, this includes generator functions that use `yield` and `.send()` to cooperate with a scheduler.
> 
> These generator-based coroutines were the first way to write asynchronous code in Python. Since version 3.5, Python introduced **native coroutines** using the `async def` and `await` keywords. Native coroutines are more ergonomic, but conceptually, they do the same thing: pause, wait for something, then resume.
> 
> We’ll explore those native coroutines, and how our library supports them, later in the post.

`send` will come in handy when we want to send the result of an asynchronous operation back into the paused coroutine.

Let’s see this in action. We’ll write a coroutine that filters even numbers from the Fibonacci sequence (yes, it’s very useful):

```python

from time import sleep


def fibonnaci_generator():
    n_1 = 0
    n_2 = 1
    while True:
        n = n_1 + n_2
        yield n
        sleep(1)

        n_2 = n_1
        n_1 = n


def filter_even():
    while True:
        x = yield
        if x > 200:
            return x
        if x % 2 == 0:
            print(x)


if __name__ == "__main__":
    filter = filter_even()
    generator = fibonnaci_generator()
    filter.send(None)
    for x in generator:
        filter.send(x)
```

![filter_even.gif](https://res.cloudinary.com/dllztfozz/image/upload/c_crop,w_500,h_300,g_north_west/v1746187915/blog/kata-asyncio/hgbhby99fmav3qjxfukp.gif align="center")

Notice how `filter_even` consumes values generated by `fibonnaci_generator` ? That's the difference between generator and coroutines. Of course, a generator can do both, but that gets messy quickly, and I won’t get into why here. (You can find a good explanation [p.16 of A Curious Course on Coroutines and Concurrency](https://www.dabeaz.com/coroutines/Coroutines.pdf#page=16))

That's enough theory on coroutines for now. But look, we've already completed step 1: we now have a way to pause and resume execution. Next up: running coroutines concurrently with a scheduler.

## Building a Scheduler

We'll build a Scheduler that well be responsible for, that's right, scheduling our coroutines. First, let's write a `Task` wrapper to encapsulate all the coroutine stuff:

```python
from typing import Generator
import uuid


class Task:
    def __init__(self, coro: Generator):
        self.id = uuid.uuid4()
        self._coro = coro
        self.val = None

    def run(self):
        return self._coro.send(self.val)
```

the `val` attribute represents the next value we want to send to the coroutine. We’ll use it to give back some result to the wrapped coroutine later on. After initialization, it’s `None`. Sending None to a coroutine is equivalent to calling `next` on it, and will advance to the next `yield` statement.

Next, we write a simple scheduler to manage the tasks queue:

```python
from collections import deque
from typing import Generator

from scheduler.task import Task


class Scheduler:
    def __init__(self):
        self._queue = deque([])

    def create_task(self, coro: Generator):
        task = Task(coro)
        self._queue.append(task)

    def run_forever(self):
        while self._queue:
            task = self._queue.pop()
            try:
                task.run()
                self._queue.appendleft(task)
            except StopIteration as err:
                print(err.value)
```

This is as simple while loop that take the next task in the queue and run it once. Then either:

* We reschedule the task at the back of the queue
    
* StopIteration is raised, meaning the coroutine is over, and the task isn't rescheduled
    

> 💡 `StopIteration` is raised when a generator (or generator-style coroutine) finishes execution.  
> This happens either:
> 
> * When it reaches the end of the function, or
>     
> * When it hits a `return` statement (in which case the `return` value is stored in the exception’s `.value` attribute)
>     

Let's see this in action. We'll build two simple coroutines, `ping` and `pong`, that print their own name and yield. We can use your brand new `Scheduler` to run them.

```python

import time

from scheduler.scheduler import Scheduler


def ping():
    while True:
        print("ping")
        time.sleep(1)
        (yield)


def pong():
    while True:
        print("pong")
        time.sleep(1)
        (yield)


if __name__ == "__main__":
    scheduler = Scheduler()
    scheduler.create_task(ping())
    scheduler.create_task(pong())
    scheduler.run_forever()
```

![ping_pong.gif](https://res.cloudinary.com/dllztfozz/image/upload/c_crop,w_500,h_300,g_north_west/v1746188103/blog/kata-asyncio/opwtar3s50nhi0hfnjxm.gif align="center")

Great! As you can see, our scheduler alternates between the `ping` and `pong` tasks. That’s step 2 done.

However, for now, we still can’t really “await” anything. If we modify `ping` to use a blocking `sleep(30)`, it will pause for 30 seconds before the scheduler can switch to `pong`.

Right now, our scheduler behaves similarly to running both tasks in separate threads, except we are the ones deciding when the switch happens. This model is called [**cooperative multitasking**](https://en.wikipedia.org/wiki/Cooperative_multitasking#:~:text=Cooperative%20multitasking%2C%20also%20known%20as,running%20process%20to%20another%20process.), but we are still missing our third ingredient: something to delegate the work and be notify when it's done.

This is where the `Future` class comes into play

## Adding a Future

A Future is similar to a Promise in TypeScript. It represents a value that’s not available yet, but will be eventually. That’s exactly what we need: we want our coroutine to yield a Future, and then have our scheduler “pause” it until that future is resolved.

Here’s a base Future class we can work with:

```python
class Future(TaskProtocol, metaclass=ABCMeta):
    def __init__(self):
        self._done_callbacks = []
        self.is_done = False
        self._result = None
        self._coro = self._run()

    def add_done_callback(self, callback: Callable[[Any], None]):
        self._done_callbacks.append(callback)

    def run(self):
        self._coro.send(None)

    def _run(self):
        while True:
            result = self._check_result()
            if result:
                self._done(result)
                return result
            yield

    def _done(self, result: Any):
        self.is_done = True
        self._result = result
        for callback in self._done_callbacks:
            callback(self._result)

    @abstractmethod
    def _check_result(self) -> Any:
        pass
```

A few things to notice here:

The Future has a run method, just like our Task. This is because the Future will replace the task in the queue, having a task-like interface allows us to treat it like a normal task.

> 💡 The `TaskProtocol` is define elsewhere, it simply enforces this shared interface between `Future` and `Task`

The Future wraps a coroutine in \_run, which polls for the result. When it’s ready, we mark the future as done and trigger any callbacks waiting on it.

When a Task yields a Future, that future replaces the task in the queue, and holds a reference to the task. That’s how we “pause” the coroutine. Once the future is complete, it will call its registered callback to reschedule the waiting task with the result, resuming it. In our case, the scheduler will be responsible for setting this callback.

```python

from collections import deque
from typing import Generator

from scheduler.future import Future
from scheduler.task import Task
from scheduler.task_protocol import TaskProtocol


class Scheduler:
    def __init__(self):
        self._queue = deque([])

    def create_task(self, coro: Generator):
        task = Task(coro)
        self._schedule(task)

    def _schedule(self, task: TaskProtocol):
        self._queue.appendleft(task)

    def _continue(self, task):
        def callback(result):
            task.val = result
            self._schedule(task)

        return callback

    def run_forever(self):
        while self._queue:
            task = self._queue.pop()
            try:
                future = task.run()
                if isinstance(future, Future):
                    future.add_done_callback(self._continue(task))
                    self._schedule(future)
                else:
                    self._schedule(task)
            except StopIteration:
                pass
```

Now when a task yields a Future, the scheduler sets a callback that:

* Assigns the result back to the task (task.val)
    
* Reschedules the task so it can resume execution
    

This is not very easy to visualize, so here is a sequence diagram showing what happens:

![Handling a Future in the Event Loop](https://www.mermaidchart.com/raw/dfe16b2d-172b-4342-b126-5d7d8eeb9488?theme=light&version=v0.1&format=svg align="center")

---

Let’s put this to the test. Here’s an implementation of a non-blocking sleep using this future logic:

```python



class Sleep(Future):
    def __init__(self, wait_for: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._wait_for = timedelta(seconds=wait_for)
        self._ready_at = datetime.now(tz=timezone.utc) + self._wait_for

    def _check_result(self):
        if datetime.now(tz=timezone.utc) >= self._ready_at:
            return True
```

And now let's do some non-blocking awaiting:

```python

import time

from scheduler.future import Sleep
from scheduler.scheduler import Scheduler


def sleep(n: int):
    return (yield Sleep(n))


def ping():
    print("waiting for result before ping...")
    result = yield from sleep(5)

    print(f"result: {result}")
    while True:
        print("ping")
        time.sleep(1)
        yield from sleep(0)


def pong():
    while True:
        print("pong")
        time.sleep(1)
        yield from sleep(0)


if __name__ == "__main__":
    scheduler = Scheduler()
    scheduler.create_task(ping())
    scheduler.create_task(pong())
    scheduler.run_forever()
```

![non_blocking_ping_pong.gif](https://res.cloudinary.com/dllztfozz/image/upload/c_crop,w_500,h_300,g_north_west/v1746188175/blog/kata-asyncio/jklx673pvmkomrwcchen.gif align="center")

We did it! ping pauses for 5 seconds, and during that time, pong can continue doing its thing. Just like with the real `asyncio.sleep`.

Also, note the use of yield from when nesting coroutines.If we had just done yield Sleep(n), we’d yield a generator instead of the result we wanted. If this seems too magical, [check out the branch step/nested-coroutines](https://github.com/jbrocher/asyncio-model/tree/step/nested-coroutines) in the linked repo. In that branch, I use stack in the `Task` class to handle nested co-routine manually.

Now we could stop here for our scheduler and skip to doing some I/O. But the real `asyncio` lib works with the `async/await` keywords. Wouldn’t it be nice if we could do that as well ?

## Adhering to async/await

The keywords `async` and `await` don't introduce new features to Python's execution model. They simply organize and improve what we've been doing with `yield from` and generators.

There's no magic involved. `await` works like `yield from`, and `async` alters our function type from `Generator` to `Coroutine`. This helps keep asynchronous generators/coroutines separate from regular generators.

To convert our generator-based coroutines into true coroutines, we just need to add `async` before them. Then we can use `await` with them (which is a drop-in replacement for `yield from`).

To integrate our custom `Future` with Python's `await` system, we need to make it *awaitable*. This means implementing the `__await__` method, which Python uses under the hood when you write `await my_future`.

```python
class Future(TaskProtocol, metaclass=ABCMeta):
   def __init__(self):
       self._done_callbacks = []
       self.is_done = False
       self._result = None
       self._coro = self._run()

   def __await__(self):
       return (yield self)


    # ... rest of the class
```

And that's it, now we can change `sleep` like so

```python
async def sleep(n: int):
    return await Sleep(n)
```

which is functionally the same as calling `Sleep(n).__await__()`.

Let me say that again because it is important: `async` and `await` do **no** magic whatsoever. They are just part of a protocol, much like the `Iterator` protocol. Their purpose is to keep us from mixing up regular generators with coroutines that are meant to be managed by a scheduler or an event loop.

And now that our futures are awaitable, we can use `async` and `await` everywhere. In fact, we must use them, because once a function is declared with `async def`, the interpreter expects `await` inside, not `yield`.

```python

from scheduler.future import Sleep
import time
from scheduler.scheduler import Scheduler


async def sleep(n: int):
    return await Sleep(n)


async def ping():
    print("waiting for result before ping...")
    result = await sleep(5)
    print(f"result: {result}")
    while True:
        print("ping")
        time.sleep(1)
        # yield control to the event loop. We cannot use (yield in a "true" coroutine)
        await sleep(0)


async def pong():
    while True:
        time.sleep(1)
        await sleep(0)
        print("pong")


if __name__ == "__main__":
    scheduler = Scheduler()
    scheduler.create_task(ping())
    scheduler.create_task(pong())
    scheduler.run_forever()
```

> You can still use `yield` inside an async function in the context of [Asynchronous Generators](https://peps.python.org/pep-0525/). But this is outside the scope of this post.

Look at that ! Pretty much exactly what this would look like using the regular `asyncio`, except it comes from our own library. Pretty neat, uh ?

## Adding Real I/O to Our Async Framework

So far so good, but we still haven't handled any real I/O. What we have, though, is a framework that lets us build custom `Future` subclasses. Each of them can poll for a result and resume a task once that result is available.

At the beginning of the post, in the final code example, you may have noticed these two lines:

```python
async def read(conn):
    return await ReadSocket(conn)

async def accept(sock):
    return await AcceptSocket(sock)
```

Well guess what? Both `ReadSocket` and `AcceptSocket` are subclasses of our `Future` class.

`AcceptSocket` allows us to suspend execution while waiting for new incoming connections on a socket. `ReadSocket` does the same for receiving data from a connection. Now that all the async plumbing is in place, their implementation is surprisingly simple:

```python
import selectors
from socket import socket

class AcceptSocket(Future):
    def __init__(self, sock: socket, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._select = selectors.DefaultSelector()
        self._sock = sock
        self._select.register(sock, selectors.EVENT_READ)

    def _check_result(self):
        events = self._select.select(0)
        if events:
            conn, addr = self._sock.accept()
            print("accepted", conn, "from", addr)
            conn.setblocking(False)
            return conn

class ReadSocket(Future):
    def __init__(self, sock: socket, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._select = selectors.DefaultSelector()
        self._sock = sock
        self._select.register(sock, selectors.EVENT_READ)

    def _check_result(self):
        events = self._select.select(0)
        if events:
            return self._sock.recv(1000)
```

I won't go into the details of how sockets work in Python. If you need a refresher (I did!), the [socket module docs](https://docs.python.org/3/library/socket.html) are a great place to start.

The key thing here is the [`selectors`](https://docs.python.org/3/library/selectors.html) module. It lets us monitor socket readiness without blocking the main thread. And that, finally, answers the question I posed at the beginning of this article:

> **"Who's doing the work while we await?"**

The answer is simple. **The operating system is**. When we call `register()`, we ask the OS to monitor the socket for readiness. Each time we call `select(0)`, we check for any ready events without blocking. This makes implementing the `_check_result` overrides a piece of cake. We simply check if something is ready in the socket, and if so we return it. If not, we’ll check again during the next loop iteration. Our scheduler handles resuming the appropriate task.

---

Now that everything is in place, we can finally build a working echo server:

```python


from socket import SO_REUSEADDR, SOL_SOCKET, socket
from scheduler.future import AcceptSocket, ReadSocket, Sleep
from scheduler.scheduler import Scheduler


async def sleep(n: int):
    return await Sleep(n)


async def read(conn):
    return await ReadSocket(conn)


async def accept(sock):
    return await AcceptSocket(sock)


async def echo(sock):
    while True:
        data = await read(sock)
        if not data:
            sock.close()

        print(f"received {data}")
        # assume non-blocking
        sock.send(data)


async def echo_server(scheduler: Scheduler, port: int):
    print("creating socket...")
    sock = socket()
    sock.bind(("localhost", port))

    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.listen(100)
    sock.setblocking(False)
    print("socket created waiting for connection")
    while True:
        conn = await accept(sock)
        # Schedule new concurrent connection
        scheduler.create_task(echo(conn))


if __name__ == "__main__":
    scheduler = Scheduler()
    scheduler.create_task(echo_server(scheduler, 1235))
    scheduler.run_forever()
```

![echo_server.gif](https://res.cloudinary.com/dllztfozz/image/upload/c_crop,g_north_west/v1746273388/blog/kata-asyncio/fi0dyjap69smswi4y2ir.gif align="center")

The `echo` coroutine handles a single client connection. The `echo_server` coroutine waits for new clients and launches a new concurrent instance of `echo` for each one. You can run the script and connect multiple clients to port `1235` to see it in action. The [echo\_server is avaible in the `scheduler` package of the repository](https://github.com/jbrocher/asyncio-model)

## 🎉 Conclusion

If you've made it this far, congratulations! Our implementation is pretty close to the real thing, and asyncio should feel way less mysterious.

Of course, it gets more complex when you want to support timeouts, cancellation, priorities, etc., but the foundation remains the same. If you want to dig deeper, check out the [asyncio source in the CPtyon repository](https://github.com/python/cpython/blob/main/Lib/asyncio/futures.py), it should look pretty familiar now.

Turns out, the real magic isn’t the `await`. It's the yield we wrote along the way.

## References

1. [https://www.dabeaz.com/coroutines/](https://www.dabeaz.com/coroutines/)
    
2. [https://docs.python.org/3/library/asyncio-task.html](https://docs.python.org/3/library/asyncio-task.html)
    
3. [https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/](https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/)
