Let’s go on a trip down memory lane and look back at some threading stuff.

Compare and Exchange

At the end of the day, in order to do any type of threading, you need an atomic operation offered to you by the CPU. The CPU offers several atomic operations, one of them is the compare and exchange operation. This operation is used to:

  1. See if a value in memory is equal to a certain value
  2. If it is, set it to a new value
  3. Return a value (notifying you whether the memory was updated or not)

All these steps are done atomically, in one go.

This allows a single thread, in 1 step, to know whether a shared resource is acquirable or not, and if it is, acquire it. Again, this entire thing happens in one go, including the acquiring.

This means that while a thread is trying to acquire a resource, no other thread can squeeze in and acquire it as well.

Ultimately, this is the basis of all multi-threading.

Mutex

We are a bit too close to hardware, and as you know, when you are this close to hardware, the OS wants a say.

Actually, the OS provides quite a nice facility built on top of the compare and exchange operation. This facility is called a mutex.

Let’s start by assuming that some thread did the compare and exchange operation and acquired a resource (a lock). From now on, I’m gonna call a resource a lock.

Let’s say another threado wants a say in things and tries to do a compare and exchange operation to acquire the lock. Obviously, we know that the return of his operation will tell him the value was not updated (aka lock is taken). Now, the OS takes this poor thread and puts it into a wait queue. It is no longer running, it is waiting (for a mutex).

When the OG thread (the one that acquired the lock) is done with the lock, it releases it (calls OS API). The OS then notices there is a thread waiting for it, it wakes this thread up, and it acquires the lock.

That’s all there is to it really!

There is just one little advancement that we can make. The OS allows multiple threads to wait for a single lock. When a lock becomes available, the OS wakes up the first thread in the wait queue.

Semaphore

A semaphore is a generalization of a mutex. A mutex is a semaphore with a count of 1.

Each time a thread acquires a semaphore, the count is decremented. When the count reaches 0, the next thread that tries to acquire the semaphore is put into a wait queue. When a thread releases a semaphore, the count is incremented.

The end result: a certain max number of threads can acquire a semaphore at the same time.

Under the hood, in the CPU, a semaphore is implemented using an atomic increment and decrement operation. These operations check a memory value, and if it is not equal to 0, it is decremented (and the return value tells you that it was decremented). If it is equal to 0, it is not decremented and the return value tells you that it was not decremented. The OS puts a wait queue and the threads that don’t decrement the value are put there.

Condition Variable

Ok, by now you get that for these synchronization primitives you need 1) hardware support and 2) OS support.

There is another synchronization primitive called a condition variable. When a thread is waiting for a certain condition to be true, the OS puts it in a wait queue. Multiple threads can wait for the same condition to be true, and they all are put into the same wait queue (one for each condition).

A thread can “signal” a condition variable. When a thread does this, the OS wakes up the first thread in the wait queue. There is an alternative to signal called “broadcast”, this one wakes up all da boyz waiting in the wait queue.

Producer/Consumer

This is a recurring problem in the multi-threading world. You have a producer thread that produces data and a consumer thread that consumes data. The producer thread puts data into a buffer and the consumer thread takes data out.

So the shared resource here is the buffer, thus we need a mutex to guard it!

The basic logic is:

  • producer should acquire mutex, then put data into the buffer
  • consumer should acquire mutex, then take data out of the buffer

We can get a little more advance and handle the case when the buffer is full or empty. When it is full, the producer should wait until it is not full. When it is empty, the consumer should wait until it is not empty. So we have 2 conditions here, thus we need 2 condition variables (not_full and not_empty).

I think if I show you the code for the producer and consumer, you will get it yourself now.

def producer():
    while True:
        with mutex:  # with statement to ensure mutex is always released, regardless of how the block exits
            while buffer.is_full(): # while instead of if to handle spurious wakeups
                not_full.wait(mutex)  # releases mutex before waiting and re-acquire after waking up1
            buffer.put(data)
            not_empty.signal() # wake up a consumer (but the consumer still has not acquired the mutex, b/c we are still holding it till we exit the block)

def consumer():
    while True:
        with mutex:
            while buffer.is_empty():
                not_empty.wait(mutex)
            data = buffer.get()
            not_full.signal()  # wake up a producer

A note about spurious wakeups: sometimes the OS wakes up a thread that is waiting for a condition variable, for absolutely no reason at all. This is not a bug or anything, it is just an implementation limitation. So you just have to handle it lol. That’s why we use while instead of if in the code above.

Readers/Writers

Another recurring threading problem. You have multiple reader threads and multiple writer threads, all operating on the same data. As you may know, you can have as many threads as you want viewing/reading the same data, and you won’t run into data-race. But if there is even 1 writer, nothing else, no readers, and no other writers, can access it at the same time.

So you have 2 scenarios.

  1. Multiple readers can read, all at the same time.
  2. Only 1 writer can write, and no one else can read or write.
import threading

class ReaderWriter:
    def __init__(self):
        self.data = 0
        self.readers = 0
        self.mutex = threading.Lock()
        self.read_gate = threading.Condition(self.mutex)
        self.write_gate = threading.Condition(self.mutex)

    def read(self):
        with self.read_gate: # 'with' on a condition variable automatically acquires and releases the *associated lock*
            self.readers += 1

        # ... do some reading

        with self.read_gate:
            self.readers -= 1
            if self.readers == 0: # if no readers, a writer is allowed
                self.write_gate.notify() # wake up a writer (but the writer will still need to acquire the mutex)

    def write(self, value):
        with self.write_gate:
            while self.readers > 0:
                self.write_gate.wait()

            # do some writing
            self.data = value

        # release the lock (another writer can take it, or a reader)

Future

A future is a synchronization primitive that allows another thread to set its value. Generally, the other thread does some computation and then sets the value of the future.

from concurrent.futures import ThreadPoolExecutor

def a_bg_task(v):
    return v * 2

with ThreadPoolExecutor() as executor:
    future_result = executor.submit(a_bg_task, 3) # a_bg_task(3), will run in a background thread (in this case one from a thread pool)

    result = future_result.result() # block until the result is available

Note, the computation happens in the background, in a separate thread. In my example above, this thread happens to be from a thread pool, but you don’t have to use a thread pool.

Event Loop

If you’ve ever done GUI programming, you know what an event loop is. You are constantly looping, looking into a thread safe event queue, and calling associated handlers (functions).

There is a way to do asynchronous programming using an event loop, though you won’t get concurrency.

The main idea is to do a little bit of your long running computation at a time. You do a little, and then return to the event loop and schedule the next bit to do. Every time a bit is done, you schedule the next bit. This way you never block the event loop for too long, you give other handlers a chance to run!

Let’s do pseudo code for this one :)

def part1:
    # do the first bit of computation
    if not done:
        schedule(part2) # put part2 in the event queue so it runs at some point in the near future

# part2 will schedule part3, part3 will schedule part4, and so on

Async IO

Speaking of asynchronous programming, IO operations take a long time, and you sometimes don’t want to block your thread when doing them.

The OS provides some underlying facilities for you here.

When a thread issues an IO request, it can tell the OS to associate the IO request with a specificEvent or IOCP (I/O Completion Port) object. The thread can then continue doing other computation and eventually (maybe periodically) check the status of the Event/IOCP object.

Eventually, your thread can choose to wait on the Event/IOCP object. In this case it will block until the corresponding IO is done. But the idea is, that you do quite a bit of other computation before you decide to wait for the IO.

So you issue the IO request, and then be on your merry way, then later see if it’s done.

Coroutines

Coroutines are a way to do asynchronous programming. They are like functions, but they can pause and resume. They are not threads, they are not processes, they are just functions that can pause and resume.

import asyncio

async def long_running_io(): # function that is marked as async returns a coroutine object when called
    await asyncio.sleep(1) # long running IO operation

async def long_running_computation():
    asyncio.to_thread(some_long_running_computation) # run some_fun in a separate thread, main thread will be notified when its done, and it will handle it in the event loop

asyncio.run(long_running_io()) # or asyncio.run(long_running_computation())

# or if you want to do both in the same event loop
async def main():
    await asyncio.gather(long_running_io(), long_running_computation())

asyncio.run(main())

This is exactly like what we talked about in the event loop section. It just provides a framework for you. Your function is split into parts, based on await statements.

Note that even asyncio has the name io in it, you can use it to asynchronously run long computations as well, you just need to use asyncio.to_thread to specify that they should be run in a background thread. This background thread can be a pool, etc. The implementation doesn’t matter.

But, if you are running some stuff in another thread, be watchful of data-race/race-conditions. Remember, anytime you have 2 or more threads going through a single function, be careful.

Conclusion

Wow! That’s a lotta threading for one day! Have a good day!