5
Posted December 23, 2012 by Hunter in Python Programming
 
 

Python Threading, Events, Queues and Locks Explained in Detail : What They Are and How to Use Them Properly

threads_400
threads_400

What?

A long time ago, in a galaxy an indeterminate distance from here, a race of hyper-dimensional mice built a supercomputer – a computer for whom to “calculate the trajectory of every single dust particle throughout a five-week Aldebaran sand blizzard” hardly requires thought, since within seconds of its booting up it had “contemplated the very vectors of the atoms in the Big Bang itself.” And with this majestic machine at their disposal, the mice set it to the most enlightened task of determining the Answer to Life, the Universe, and Everything.

And for seven and a half million years Deep Thought was silent, as it was, presumably, deep in thought. At the end of this time it delivered the Answer, which we all know so well: 42.

But what if, in the course of those millions of years, they had decided that they’d also like to know if prime numbers have a pattern? Or if there are more sensible axioms upon which to build mathematics? Or if someone had a homework question they just couldn’t get?

Even though these questions would have taken Deep Thought a fraction of a trillionth of a second, they would have had to wait, for millions of years, until the original task of determining the Answer was complete before they could get anything else out of the computer. This wouldn’t have been case if the original programmers had simply coded the computer such that each question asked of it ran in a separate thread. Then, it could spend the majority of its time thinking of the Answer, but when asked any other question, any other mundane question, it could spawn a new thread for that question that’ll run simultaneous with the Answer-seeking thread, and the asker can get his mundane answer.

For that is what threading does: Threading allows for a single processor to simultaneously run multiple bits of code.

(Threads are not technically simultaneous; they seem to be, though, because the processor switches between them so quickly.)

Why?

So that is the benefit of threading. You can have your code performing one or more thought-intensive tasks, such as heavy computation or searching through long strings, by spawning different threads in which for them to run, so that you don’t have to wait for the completion of one task before another is begun, while also still allowing user input.

How?

Let’s jump right into an example of threading in Python:

    import threading

    def teamThread():
        j = 1
        while j < 11:
            print("Team Thread is at:" + str(j))
            j += 1
        print("Team Thread is done!")

    t = threading.Thread(target = teamThread)
    t.start()
    i = 1
    while i < 11:
        print("Team Main is at:" + str(i))
        i += 1
    print("Team Main is done!")

OUTPUT:

Team Main is at: 1Team Thread is at: 1

Team Main is at: 2Team Thread is at: 2

Team Main is at: 3Team Thread is at: 3

Team Main is at: 4Team Thread is at: 4

Team Main is at: 5Team Thread is at: 5

Team Main is done!Team Thread is done!

Alright, so this example is a bit bland, but there are a few things I’d like to point out about the output. In Python, as you would know, the print() function has an automatic newline at the end, so that the next output on the screen will on the next line down. Yet here you see that “Team Thread is at: 1” prints directly after “Team Main is at: 1”, without a newline between. That is because the two print() functions are in separate threads, and though they share memory and resources, they run independently of each other. And this can lead to issues. For example, if you used:

print “Team Thread is at:”, j

and

print “Team Main is at:”, i

then you get this output instead:

Team Main is at: Team Thread is at: 11

Team Main is at: Team Thread is at: 22

Team Main is at: Team Thread is at: 33

Team Main is at: Team Thread is at: 44

Team Main is at: Team Thread is at: 55

Team Main is done!Team Thread is done!

See how the spawned thread prints in-between the “Team Main is at: ” and the i? This example only shows text output, but I’m sure it isn’t much of a challenge for you to imagine the complications that can arise if, in the middle of one thread performing an operation on a resource, another thread modifies that same resource. In fact, we’ll see an example of that later.

For now, I’d like to show you an example of what I was talking about regarding Deep Thought. Here:

    import threading
    import time


    # Find the factors of the number
    def factorize(n):
        factors = []
        i = 2
        while i < n**0.5:
            if n % i == 0:
                factors.append(i)
                factors.append(int(n/i))
            i += 1
        if n % n**0.5 == 0:
            factors.append(int(n**0.5))
        factors.append(1)
        factors.append(n)
        factors.sort()
        n = str(n)
        factors = str(factors)
        print("The factors of " + n + " are: " + factors + "\n")

    outtahere = False

    while not outtahere:
	# Give time for thread to print
        time.sleep(0.5)
        n = int(input("\nInput a number to factorize (0 to exit): "))
        if n == 0:
            outtahere = True
        else:
	    # Spawn thread that finds the factors of the number
            t = threading.Thread(target=factorize, args=(n,))
            t.start()


	    # Make the thread a daemon thread
            t.setDaemon(True)

Alright, so first of all, try putting in a large number, like, 14 digits. You won’t get an answer – instead, you’ll be allowed to input more numbers. But the thread that was spawned when you input that large number is still running, and it will spit out the answer when it has it, but in the meantime you can still find the factors of other numbers. Without threading, you would have to wait for each answer before you could ask another.

A few things to explain about the code, though. Notice that I make the main thread sleep for half a second before the input() line. That’s because input() blocks – that is, the whole program stops what it’s doing and watches you intensely, awaiting your word, the Word of the User. So we need to give the thread some time to think, so it’s not always pausing to look at you.

See also that you can create multiple threads from the same function. The threads are given default names, “Thread-1”, “Thread-2”, and so on, but you can easily give it a unique name by:

t = threading.Thread(name=”My_Thread!” target=factorize)

You see, too, that you can pass arguments to a thread with

args=(arg1, arg2, …,)

Notice, also, that if you exit the program while the thread is still thinking, it will continue thinking and still give you its output. That’s because the thread is, by default, not a daemon. But if you t.setDaemon(True), you have a daemon thread. What a daemon thread is, is a thread that will end when only daemon threads are left running in the code. As an instance, in this example, the thread was not marked as daemon and so it is by default non-daemon, so it continues on, blindly, mechanically, to the completion of its task, where, if we set it as daemon, it would have the consciousness to pack up once it sees that it’s been left all alone.

Synchronization

Okay, so now we’ve actually seen a use for threads, but what about those first two examples, and their ominous portents of complications? Well, for that, we have thread synchronization! There are various ways to synchronize your threads, to work only when you want them to, to not trample over each other and stick all their hands into the cookie jar of the same resource at the same time – among them, events, locks, Rlocks, queues.

Events

First, events.

    import threading
    import time
    import random

    def you(love):
        age = 18
        while not love.isSet():
            # wait for one second to see if the event 'love' has been set
            finding_love = love.wait(1)
            # if the event has been set
            if finding_love:
                f_age = age
                age = str(age)
                if f_age < 30:
                    print("Lucky you!  You found love at age " + age + ".")
                else:
                    if f_age < 45:
                        print("Took you long enough.  You found love at age " + age + ".")
                    else:
                        if f_age < 60:
                            print("We though you'd never find love.  But you did, at age " + age + ".")
                        else:
                            print("Just in time for the end.  I hope your golden years are beautiful, and I'm sure they will be, because you found love, even though you were " + age + " years old when you did.")
                break
            age += 1
            c_age = str(age)
            print("\nAnother year alone... you're " + c_age + " years old now.")
            if age > 80:
                print("You die alone.  Tough luck.")
                break
            

    # create an event
    love = threading.Event()

    print("You are a bright-eyed young person of 18 years, ready to set out into the world and find that most elusive of treasures, love.")
    standard = int(input("Enter the standard you hold for potential companions in love and life, on a scale of 1 to 100, with 1 being that you'd be glad for anyone who'll look at you, and 100 being you will wait forever for The One: "))

    # create a thread, passing the event 'love'
    y = threading.Thread(target = you, args=(love,))

    y.start()

    while y.isAlive():
        current = random.randint(1,100)
        print "\nYou find someone who meets the standards of", current
        if current >= standard:
            # set the event
            love.set()
            break
        time.sleep(1)

OUTPUT:

You are a bright-eyed young person of 18 years, ready to set out into the world and find that most elusive of treasures, love.
Enter the standard you hold for potential companions in love and life, on a scale of 1 to 100, with 1 being that you’d be glad for anyone who’ll look at you, and 100 being you will wait forever for The One: 75

You find someone who meets the standards of 72

Another year alone… you’re 19 years old now.

You find someone who meets the standards of 70

Another year alone… you’re 20 years old now.

You find someone who meets the standards of 55

Another year alone… you’re 21 years old now.

You find someone who meets the standards of 95
Lucky you! You found love at age 21.

Here you see an example of an event in use. Events are a way to either pause a thread until some particular thing has occurred that trips the event, or to give the thread a fork of directions, so that if the event is set, path 1 is taken, and if the event is not set, path 2 is taken. It is the latter that you see here, where the thread waits for one second to see if love has been set:

finding_love = love.wait(1)

and if during the second it is, then finding_love gains the value of True. To completely halt the thread until the event has been set, simply write:

finding_love = love.wait()

In this case, the thread will wait until love is set, and then fining_love will become True.

Events can also be cleared, by event_name.clear().

Events are a great way of ensuring that a thread performs a certain operation only if certain conditions outside the thread are met. It is a means by which threads can communicate with each other, as here we see the main thread communicating with the thread “you”.

Locks

    import threading
    import time
    import random

    def door(lock):
        while 1:
            try:
                lock.acquire()
                time.sleep(random.random())
            finally:
                lock.release()
                time.sleep(random.random())

    def thief(lock):
        picked = 0
        while picked < 3:
            time.sleep(random.random())
            # Tries to acquire the lock, pick becomes True if lock is acquired
            pick = lock.acquire(False)
            if pick:
                print("\nPicked a lock!\n")
                picked += 1
                lock.release()
            else:
                print("\nThis looked easier in the correspondent course...\n")
        print("\nAh ha!  Got it!  Let's see what's on the other side...")

    lock = threading.Lock()

    d = threading.Thread(target = door, args=(lock,))
    d.setDaemon(True)
    t = threading.Thread(target = thief, args=(lock,))

    print("You're walking through a dark alley, as is usual, and you notice a door at the end, protected by three locks.  Filled with an inexorable drive to know what is beyond the door, you pull out your Acme Toolkit and set to work picking the locks...\n")
    time.sleep(1.5)
    d.start()
    t.start()

OUTPUT:

You’re walking through a dark alley, as is usual, and you notice a door at the end, protected by three locks. Filled with an inexorable drive to know what is beyond the door, you pull out your Acme Toolkit and set to work picking the locks…

This looked easier in the correspondent course…

Picked a lock!

This looked easier in the correspondent course…

This looked easier in the correspondent course…

This looked easier in the correspondent course…

This looked easier in the correspondent course…

Picked a lock!

This looked easier in the correspondent course…

Picked a lock!

Ah ha! Got it! Let’s see what’s on the other side…

Locks are a way of protecting resources from simultaneous access, by permitting only the thread that holds the lock to run. A thread comes to be ‘holding’ a lock by lock_name.acquire(), and, not surprisingly, comes to be no longer holding a lock by lock_name.release().

When a thread reads lock.acquire(), it will be unable to continue until it as acquired the lock. This can cause issues if, for instance, the thread that currently holds the lock encounters an error and is unable to release the lock – in which case the other thread that is waiting at lock.acquire() will wait forever. It is therefore good practice to use try…finally when you want a thread to acquire() a lock.

In this example, we have two threads, one (the door) which holds and releases the lock at random intervals, and another (the thief) which tries to acquire the lock at random intervals. The program ends when the thief thread has acquired the lock three times.

We see in the thief thread, though, the line

pick = lock.acquire(False)

By passing False to acquire(), the thread no longer stops to wait until the lock is acquired, but simply tries to get ahold of it, and if it cannot, moves on with its life. If we had lock.aquire() in that line instead, then the thread would lie in wait for the moment when the lock was released, and immediately grab it for itself. With lock.aquire(False), the thread can still perform other functions for which the lock is not required.

RLOCKS

But if acquire() halts the thread until the lock is acquired, what about nested locks? That is, you have the following:


	lock = threading.Lock()

	def Outer_Thread(lock)
		lock.acquire()
		Inner_Thread(lock)
		Other_Inner_Thread(lock)
		lock.release()

	def Inner_Thread(lock)
		lock.acquire()
		# Do stuff with resource1
		lock.release()

	def Other_Inner(lock)
		lock.aquire()
		# Do other stuff with resource1
		lock.release()

You don’t want a separate thread accessing resource1 in-between the calls to the inner functions, so you lock the outer function.

What happens here? Well, nothing would happen, is what, because Outer_Thread holds the lock, so when Inner_Thread tries to acquire it, it cannot. Because Inner_Thread wants the same lock. Just as if you wrote

lock.acquire()
lock.acquire()

which would halt all progress, nested locks cause the same problem. So what do we do?

We use Re-entry Lock!

Re-entry locks, or RLocks, are locks designed for nested functions. Here are a few examples to illustrate the difference:


    import threading
    import time

    def add_flour(lock, flour):
        lock.acquire()
        try:
            flour.append("flour")
        finally:
            lock.release()
        return flour[len(flour)-1]

    def add_milk(lock, milk):
        with lock:
            if len(milk) < 4:
                milk.append("milk")
        return milk[len(milk)-1]

    def add_sugar(lock, sugar):
        with lock:
            if len(sugar) < 4:
                sugar.append("sugar")
        return sugar[len(sugar)-1]

    def add_egg(lock, egg):
        with lock:
            if len(egg) < 4:
                egg.append("egg")
        return egg[len(egg)-1]

    # Jumps in and modifies the resource 'ingredients,' so that our final data isn't what we intended
    def kiddie(beetle):
        time.sleep(2.5)
        with lock:
            if len(beetle) < 4:
                beetle.append("beetle")
        return beetle[len(beetle)-1]

    # Call functions to add ingredients to mix
    def make_cookie(lock, ingre):
        step_1 = add_flour(lock, ingre)
        print("We added " + step_1 + " to our mix.")
        time.sleep(1)
        step_2 = add_milk(lock, ingre)
        print("We added " + step_2 + " to our mix.")
        time.sleep(1)
        step_3 = add_sugar(lock, ingre)
        print("We added " + step_3 + " to our mix.")
        time.sleep(1)
        step_4 = add_egg(lock, ingre)
        print("We added " + step_4 + " to our mix.")

    ingredients = []
    lock = threading.Lock()

    m_c = threading.Thread(target = make_cookie, args=(lock, ingredients,))
    k = threading.Thread(target = kiddie, args=(ingredients,))

    print("Let's make a cookie!  Let's see, we seem to have all the ingredients here... flour, milk, sugar, egg.  Yup.\n")
    time.sleep(2)

    m_c.start()
    k.start()

    # Wait for the threads to finish before continuing
    m_c.join()
    k.join()

    print("\nSo here is everything we have in our final mix:")
    print(ingredients)

OUTPUT:

Let’s make a cookie! Let’s see, we seem to have all the ingredients here… flour, milk, sugar, egg. Yup.

We added flour to our mix.
We added milk to our mix.
We added sugar to our mix.
We added beetle to our mix.

So here is everything we have in our final mix:
[‘flour’, ‘milk’, ‘sugar’, ‘beetle’]

Here we see an example what happens with nested functions when the outer function isn’t locked. A second thread managed to jump in between the calls that the outer function of the first thread is making to the inner functions, modified the resource, and left us with beetle cookies. So try locking the make_cookie thread, and this is what you get:

Let’s make a cookie! Let’s see, we seem to have all the ingredients here… flour, milk, sugar, egg. Yup.

So here is everything we have in our final mix:
[]

The inner functions try to acquire the lock, but the outer one is holding it, so nothing happens. But what if we keep the outer function locked, but change one character in the code? Could that solve everything? Yes! Simply change the line

lock = threading.Lock()

to

lock = threading.RLock()

and all our problems disappear. We have delicious cookies, completely beetle-free.

The RLock recognizes when an attempt to acquire a lock is a nested attempt, and permits it. And when the nested lock is released, the lock is re-acquired automatically by the outer function. Pretty neat, huh? Just be sure to call release() as many times as you are nested, because the lock acquisition doesn’t return to the upper function on the end of the inner, but only on a release() call.

A couple bits of new code to point out from this example. First,

with lock:
# Do stuff

This works exactly as does

lock.acquire()
try:
# Stuff
finally:
lock.release()

and is simply a slightly simpler and more easily readable way of writing the same code. Seconds,

my_thread.join()

What join() does is causes whatever thread you’re in to wait for the join()-ed thread to finish before continuing. Without join()-ing the m_c and k threads in this example, the final two lines of printing will be run before all the ingredients have been added.

Queues

The final topic for today is queues. Queues are fun because they present one of the simplest way to synchronize thread operations, of preventing threads from tying knots in each other. Here’s an example:

    import linecache
    import string
    import sys
    import Queue
    import threading

    works_of_genius = ['/users/nchunterhayden/desktop/text1thru50', '/users/nchunterhayden/desktop/text51thru100', '/users/nchunterhayden/desktop/text101thru154']

    queue_reading = Queue.Queue()
    queue_mining = Queue.Queue()
    abundance_of_love = []

    def reading_beauty(queue_reading, queue_mining):
        while 1:
            # Retrieve element from queue and open a file
            text = open(queue_reading.get(),'r')
            text.read()
            # Put the retrieved element into another queue
            queue_mining.put(text)
            # Mark 
            queue_reading.task_done()

    # Searches for love (like the rest of us)
    def mining_for_love(queue_mining, abundance_of_love):
        while 1:
            # Retrieve element from queue
            text = queue_mining.get()
            end = text.tell() - 1
            text.seek(0)
            i = 0
            count = 0
            while i < end:
                add = 0
                if text.read(1).lower() == "l":
                    add += 1
                    i += 1
                    if text.read(1).lower() == "o":
                        add += 1
                        i += 1
                        if text.read(1).lower() == "v":
                            add += 1
                            i += 1
                            if text.read(1).lower() == "e":
                                add += 1
                                i += 1
                                count += 1
                                abundance_of_love.append('love')
                            else:
                                text.seek(text.tell()-1)
                                i -= 1
                        else:
                            text.seek(text.tell()-1)
                            i -= 1
                    else:
                        text.seek(text.tell()-1)
                        i -= 1
                else:
                    i += 1

            queue_mining.task_done()

    def main():

        # Put elements into queue
        for section in works_of_genius:
            queue_reading.put(section)

        # Spawn threads
        for i in range(len(works_of_genius)):
            r = threading.Thread(target = reading_beauty, args=(queue_reading, queue_mining,))
            r.setDaemon(True)
            r.start()

            m = threading.Thread(target = mining_for_love, args=(queue_mining, abundance_of_love))
            m.setDaemon(True)
            m.start()

        queue_reading.join()
        queue_mining.join()

    main()
    print "There are",len(abundance_of_love),"instances of 'love' in all of Shakespeare's sonnets."

OUTPUT:

There are 233 instances of ‘love’ in all of Shakespeare’s sonnets.

Note: By the way, that would be about one and a half times per sonnet. That means you’re almost certain to run into the word ‘love’ or a word that contains ‘love’ while reading any random particular of his Sonnets.

So! Check it out. All you do is queue.put() the items you want your threads to handle into a queue, and then queue.get() them where you want them. You modify them, handle them, play with them, do whatever you want to them, and then you queue.task_done() them, sending a signal that you’ve handled an element of the queue, so that your code knows when you’ve handled every element of the queue.

The way I think of queues is as a stream flowing through filters, possibly branching off into separate streams. Here we have one stream and two filters. The data of the filenames flow, via the queue, through the first filter, the first manipulation, and now our data consists of the text of the sonnets, and then we send that data in a stream, via another queue, through another filter that leaves us with the data of how many instances of ‘love’ there are.

Queues can be used in this way for useful things, not just silly things like in my example. For instance, data mining. This example was actually a kind of data mining, but something more useful might be code that looked for keywords on websites. If you had a list of urls, you could pass these urls to threads in a queue, open them, read them, pass this information to another thread via another queue, and search for the keyword, returning the instances.

Summary

We’ve learned what a thread is, imagined instances where such things can be useful (such as Deep Thought), and seen several examples of threads in action. These examples showed us how to create a thread, how to communicate between threads with such tools as events and locks. We have also seen an example of queues, learning how to use them, and for what instances they can be useful.

We’ve also picked locks and found out what’s beyond the door, we’ve made a cookie without the interference of a kiddie, we’ve looked at some great English literature, and even found love for ourselves.


Hunter