For a lot of us, when we write Python code, if we're wanting to execute multiple tasks we will write a loop and execute them one at a time. However, the more tasks, the more time intensive these programs become. The time it takes grows with each new task we add.
However, there's another solution besides executing everything one at a time. We can execute these tasks asynchronously! Meaning, we can execute more than one task at a time.
In Python there are different methods for executing multiple tasks at the same time:
Multiprocessing
Threading
Asyncio
I won't go into too much detail about these methods in this article, but in this particular exercise, I'm going to utilize Asyncio. However, if you'd like to know more, please feel free to refer to some resources provided at the bottom of this post.
Even though the benefits are pretty apparent, there are a several issues I've had in the past with asynchronous programming in Python:
Coding can be very nebulous, and not super intuitive
Implementation can be very idiosyncratic and use-case specific
So, trying to solve these issues, I wanted to build a solution that could be generic enough to take a list of tasks and an asynchronous function as an argument. As well, I wanted to build something that could throttle the number of tasks executing at the same time.
The Use Case
Let's say I have a function that takes some_arg, does some stuff, and returns some_arg out.
import random
import time
def example_func(some_arg:str) -> str:
wait_time = random.randint(1, 5)
print(f"fetching some_arg: {some_arg}, will take {wait_time} seconds")
if wait_time == 5:
print(f"Something happened")
raise Exception('Something Happened')
time.sleep(wait_time)
return some_arg
I have 10 items in a list, each with a different argument I need to run through this function. So, to accomplish it, I just create a loop and iterate through.
example_list_of_tasks = [
{"some_arg": "Some Parameter 1"}
, {"some_arg": "Some Parameter 2"}
, {"some_arg": "Some Parameter 3"}
, {"some_arg": "Some Parameter 4"}
, {"some_arg": "Some Parameter 5"}
, {"some_arg": "Some Parameter 6"}
, {"some_arg": "Some Parameter 7"}
, {"some_arg": "Some Parameter 8"}
, {"some_arg": "Some Parameter 9"}
, {"some_arg": "Some Parameter 10"}
]
for task in example_list_of_tasks:
try:
print(example_func(**task))
except Exception as ex:
print(ex)
While this works, it could be faster if we execute multiple tasks at the same time. As well, while we're at it, let's make something generic enough to take a function and a list of tasks as an argument so we can utilize it in multiple situations.
Coding the Generic Class
We're going to use the Semaphore from the Asyncio module to manage our queue of tasks and execute them. The reason I chose Semaphore is that it can be implemented relatively easily, and you are easily able to manage the number of tasks that run at the same time.
So let's go ahead and create our class. Let's just call it AsyncSemaphorePool. As well, let's go ahead and set the size of the pool (i.e., the number of tasks that can be executed at the same time). Let's default our pool size to 2 tasks that can be executed at a time.
import asyncio
import random
class AsyncSemaphorePool:
def __init__(self,pool_size=2):
self._pool_size = pool_size
Next, let's go ahead and create the main method for this class. This is where the Asyncio task loop will be created, as well, it will return results from the function we want to run.
Our method will take the asynchronous function, and list of tasks we want to run.
In our main method, we must do the following:
Define the loop
Run it until complete
Close out the loop
Return the results.
def main(self,func,list_of_tasks):
#create the loop
loop = asyncio.get_event_loop()
try:
#run the loop until it's done
results = loop.run_until_complete()
finally:
#close the loop
loop.close()
#return the results
return results
While this looks good, we must pass something that creates the actual pool of tasks to the loop.run_until_complete(). So let's create something to create the pool of tasks, and we'll just call it a container.
We must define this container method as "async" because we utilize asynchronous functionality within. As well, similarly to our main method, it will take our asynchronous function, and the list of tasks we want to run.
Within this method, we must:
Create our Semaphore object to manage the loop
- The Semaphore will take our object's pool size and this will limit the number of tasks running at the same time
Create a list of futures* for our tasks
Await the results of those tasks
Return all the results.
\A future is simply a task we can expect a result from in the future*
As well, in our use case, if something fails, I want it to return back the exception, but keep executing the other tasks. To do this, I add the argument "return_exceptions=True".
We'll also redefine our main() method to use this container.
async def _container(self,func,list_of_tasks):
#create the Semaphore object with the pool size
sem = asyncio.Semaphore(self._pool_size)
#create a list of task futures
futures = [asyncio.ensure_future() for t in list_of_tasks]
#await the tasks
results = await asyncio.gather(*futures,return_exceptions=True)
#return the results
return results
def main(self,func,list_of_tasks):
#create the loop
loop = asyncio.get_event_loop()
try:
#run the loop until it's done
results = loop.run_until_complete(self._container(func,list_of_tasks))
finally:
#close the loop
loop.close()
#return the results
return results
Finally, we need a component to run the actual task. We'll just call this a worker.
The worker will be super simple. It will take the Semaphore object we created in the container, the asynchronous function, and the granular task as our arguments.
Within this method, we'll:
Utilize the Semaphore to manage the task queue
Await our function, and it's the corresponding result
- The function takes our task as a set of arguments
Return our result as a dictionary with the corresponding task and result
As well, we'll need to redefine the container to use the worker. We'll need to use the worker to create the list of futures.
async def _worker(self,sem:asyncio.Semaphore,func,task):
#use the semaphore object
async with sem:
#await results from the asynchronous function
result = await func(**task)
#return the results
return {"task":task,"result":result}
async def _container(self,func,list_of_tasks):
#create the Semaphore object with the pool size
sem = asyncio.Semaphore(self._pool_size)
#create a list of task futures
futures = [asyncio.ensure_future(self._worker(sem,func,t)) for t in list_of_tasks]
#await the future to return
results = await asyncio.gather(*futures,return_exceptions=True)
#return the results
return results
Our final class will look something like this:
import asyncio
import random
class AsyncSemaphorePool:
def __init__(self,pool_size=2):
self._pool_size = pool_size
async def _worker(self,sem:asyncio.Semaphore,func,task):
async with sem:
result = await func(**task)
return {"task":task,"result":result}
async def _container(self,func,list_of_tasks):
sem = asyncio.Semaphore(self._pool_size)
futures = [asyncio.ensure_future(self._worker(sem,func,t)) for t in list_of_tasks]
results = await asyncio.gather(*futures,return_exceptions=True)
return results
def main(self,func,list_of_tasks):
loop = asyncio.get_event_loop()
try:
results = loop.run_until_complete(self._container(func,list_of_tasks))
finally:
loop.close()
return results
Running Our Use Case Using Our New Class
The only thing we need to do is redefine our function as asynchronous. All we need to do is put an "async" in front. As well for our example, I'll have to change the sleep to an async-friendly version.
async def example_func(some_arg:str) -> str:
wait_time = random.randint(1, 5)
print(f"fetching some_arg: {some_arg}, will take {wait_time} seconds")
if wait_time == 5:
print(f"Something happened")
raise Exception('Something Happened')
#async friendly sleep
await asyncio.sleep(wait_time)
return some_arg
So we'll run the same use case with our new class:
example_list_of_tasks = [
{"some_arg": "Some Parameter 1"}
, {"some_arg": "Some Parameter 2"}
, {"some_arg": "Some Parameter 3"}
, {"some_arg": "Some Parameter 4"}
, {"some_arg": "Some Parameter 5"}
, {"some_arg": "Some Parameter 6"}
, {"some_arg": "Some Parameter 7"}
, {"some_arg": "Some Parameter 8"}
, {"some_arg": "Some Parameter 9"}
, {"some_arg": "Some Parameter 10"}
]
asyncpool = AsyncSemaphorePool(pool_size=5)
print(asyncpool.main(example_func,example_list_of_tasks))
Full Solution
import asyncio
import random
class AsyncSemaphorePool:
def __init__(self,pool_size=2):
self._pool_size = pool_size
async def _worker(self,sem:asyncio.Semaphore,func,task):
async with sem:
result = await func(**task)
return {"task":task,"result":result}
async def _container(self,func,list_of_tasks):
sem = asyncio.Semaphore(self._pool_size)
futures = [asyncio.ensure_future(self._worker(sem,func,t)) for t in list_of_tasks]
results = await asyncio.gather(*futures,return_exceptions=True)
return results
def main(self,func,list_of_tasks):
loop = asyncio.get_event_loop()
try:
results = loop.run_until_complete(self._container(func,list_of_tasks))
finally:
loop.close()
return results
if __name__ == '__main__':
async def example_func(some_arg:str) -> str:
wait_time = random.randint(1, 5)
print(f"fetching some_arg: {some_arg}, will take {wait_time} seconds")
if wait_time == 5:
print(f"Something happened")
raise Exception('Something Happened')
await asyncio.sleep(wait_time)
return some_arg
#list of tasks to loop through
example_list_of_tasks = [
{"some_arg": "Some Parameter 1"}
, {"some_arg": "Some Parameter 2"}
, {"some_arg": "Some Parameter 3"}
, {"some_arg": "Some Parameter 4"}
, {"some_arg": "Some Parameter 5"}
, {"some_arg": "Some Parameter 6"}
, {"some_arg": "Some Parameter 7"}
, {"some_arg": "Some Parameter 8"}
, {"some_arg": "Some Parameter 9"}
, {"some_arg": "Some Parameter 10"}
]
asyncpool = AsyncSemaphorePool(pool_size=5)
print(asyncpool.main(example_func,example_list_of_tasks))
Summary
So as you can see we've created a generic class to run multiple tasks at the same time through a corresponding function. While this particular use case is very generic, you can see the power of Asynchronous programming.
Even so, I acknowledge there are many ways to solve this same problem, and the solution I've provided here won't work for all use cases. So, always keep in mind there are other options for solving the execution of multiple tasks asynchronously. I encourage you to explore different options for your task, and code it out in a way it makes the most sense to you!
Thanks for taking the time to read this post! I hope you found it helpful!
Resources
Real Python Tutorials - Speed Up Your Python Program With Concurrency
masnun.rocks() - Async Python: The Different Forms of Concurrency