Multithreading in C++0x part 8: Futures, Promises and Asynchronous Function Calls
Thursday, 11 February 2010
This is the eighth in a series of blog posts introducing the new C++0x thread library. See the end of this article for a full set of links to the rest of the series.
In this installment we'll take a look at the "futures" mechanism from C++0x. Futures are a high level mechanism for passing a value between threads, and allow a thread to wait for a result to be available without having to manage the locks directly.
Futures and asynchronous function calls
The most basic use of a future is to hold the result of
a call to the new std::async
function for running some code
asynchronously:
#include <future> #include <iostream> int calculate_the_answer_to_LtUaE(); void do_stuff(); int main() { std::future<int> the_answer=std::async(calculate_the_answer_to_LtUaE); do_stuff(); std::cout<<"The answer to life, the universe and everything is " <<the_answer.get()<<std::endl; }
The call to std::async
takes care of creating a thread, and
invoking calculate_the_answer_to_LtUaE
on that
thread. The main thread can then get on with
calling do_stuff()
whilst the immensely time consuming
process of calculating the ultimate answer is done in the
background. Finally, the call to the get()
member
function of the std::future<int>
object then
waits for the function to complete and ensures that the necessary
synchronization is applied to transfer the value over so the main
thread can print "42".
Sometimes asynchronous functions aren't really asynchronous
Though I said that std::async
takes care of creating a
thread, that's not necessarily true. As well as the function being
called, std::async
takes a launch policy which
specifies whether to start a new thread or create a "deferred
function" which is only run when you wait for it. The default launch
policy for std::async
is std::launch::any
,
which means that the implementation gets to choose for you. If you
really want to ensure that your function is run on its own thread
then you need to specify the std::launch::async
policy:
std::future<int> the_answer=std::async(std::launch::async,calculate_the_answer_to_LtUaE);
Likewise, if you really want the function to be executed in
the get()
call then you can specify
the std::launch::sync
policy:
std::future<int> the_answer=std::async(std::launch::sync,calculate_the_answer_to_LtUaE);
In most cases it makes sense to let the library choose. That way you'll avoid creating too many threads and overloading the machine, whilst taking advantage of the available hardware threads. If you need fine control, you're probably better off managing your own threads.
Divide and Conquer
std::async
can be used to easily parallelize simple algorithms. For example,
you can write a parallel version of for_each
as
follows:
template<typename Iterator,typename Func> void parallel_for_each(Iterator first,Iterator last,Func f) { ptrdiff_t const range_length=last-first; if(!range_length) return; if(range_length==1) { f(*first); return; } Iterator const mid=first+(range_length/2); std::future<void> bgtask=std::async(¶llel_for_each<Iterator,Func>, first,mid,f); try { parallel_for_each(mid,last,f); } catch(...) { bgtask.wait(); throw; } bgtask.get(); }
This simple bit of code recursively divides up the range into
smaller and smaller pieces. Obviously an empty range doesn't require
anything to happen, and a single-point range just requires
calling f
on the one and only value. For bigger ranges
then an asynchronous task is spawned to handle the first half, and
then the second half is handled by a recursive
call.
The try
- catch
block just ensures that
the asynchronous task is finished before we leave the function even
if an exception in order to avoid the background tasks potentially
accessing the range after it has been destroyed. Finally,
the get()
call waits for the background task, and propagates any
exception thrown from the background task. That way if an exception
is thrown during any of the processing then the calling code will
see an exception. Of course if more than one exception is thrown
then some will get swallowed, but C++ can only handle one exception
at a time, so that's the best that can be done without using a
custom composite_exception
class to collect them
all.
Many algorithms can be readily parallelized this way, though you may want to have more than one element as the minimum range in order to avoid the overhead of spawning the asynchronous tasks.
Promises
An alternative to
using std::async
to spawn the task and return the future is to manage the threads
yourself and use
the std::promise
class template to provide the
future. Promises provide a basic mechanism for transferring values
between threads:
each std::promise
object is associated with a
single std::future
object. A thread with access to
the std::future
object can use wait for the result to be set, whilst another thread
that has access to the
corresponding std::promise
object can
call set_value()
to store the value and make the future ready. This works
well if the thread has more than one task to do, as information can
be made ready to other threads as it becomes available rather than
all of them having to wait until the thread doing the work has
completed. It also allows for situations where multiple threads
could produce the answer: from the point of view of the waiting
thread it doesn't matter where the answer came from, just that it is
there so it makes sense to have a single future to represent that
availability.
For example, asynchronous I/O could be modelled on a promise/future basis: when you submit an I/O request then the async I/O handler creates a promise/future pair. The future is returned to the caller, which can then wait on the future when it needs the data, and the promise is stored alongside the details of the request. When the request has been fulfilled then the I/O thread can set the value on the promise to pass the value back to the waiting thread before moving on to process additional requests. The following code shows a sample implementation of this pattern.
class aio { class io_request { std::streambuf* is; unsigned read_count; std::promise<std::vector<char> > p; public: explicit io_request(std::streambuf& is_,unsigned count_): is(&is_),read_count(count_) {} io_request(io_request&& other): is(other.is), read_count(other.read_count), p(std::move(other.p)) {} io_request(): is(0),read_count(0) {} std::future<std::vector<char> > get_future() { return p.get_future(); } void process() { try { std::vector<char> buffer(read_count); unsigned amount_read=0; while((amount_read != read_count) && (is->sgetc()!=std::char_traits<char>::eof())) { amount_read+=is->sgetn(&buffer[amount_read],read_count-amount_read); } buffer.resize(amount_read); p.set_value(std::move(buffer)); } catch(...) { p.set_exception(std::current_exception()); } } }; thread_safe_queue<io_request> request_queue; std::atomic_bool done; void io_thread() { while(!done) { io_request req=request_queue.pop(); req.process(); } } std::thread iot; public: aio(): done(false), iot(&aio::io_thread,this) {} std::future<std::vector<char> > queue_read(std::streambuf& is,unsigned count) { io_request req(is,count); std::future<std::vector<char> > f(req.get_future()); request_queue.push(std::move(req)); return f; } ~aio() { done=true; request_queue.push(io_request()); iot.join(); } }; void do_stuff() {} void process_data(std::vector<char> v) { for(unsigned i=0;i<v.size();++i) { std::cout<<v[i]; } std::cout<<std::endl; } int main() { aio async_io; std::filebuf f; f.open("my_file.dat",std::ios::in | std::ios::binary); std::future<std::vector<char> > fv=async_io.queue_read(f,1048576); do_stuff(); process_data(fv.get()); return 0; }
Next Time
The sample code above also demonstrates passing exceptions between
threads using
the set_exception()
member function
of std::promise
. I'll
go into more detail about exceptions in multithreaded next time.
Subscribe to the RSS feed or email newsletter for this blog to be sure you don't miss the rest of the series.
Try it out
If you're using Microsoft Visual Studio 2008 or g++ 4.3 or 4.4 on
Ubuntu Linux you can try out the examples from this series using our
just::thread
implementation of the new C++0x thread library. Get your copy
today.
Multithreading in C++0x Series
Here are the posts in this series so far:
- Multithreading in C++0x Part 1: Starting Threads
- Multithreading in C++0x Part 2: Starting Threads with Function Objects and Arguments
- Multithreading in C++0x Part 3: Starting Threads with Member Functions and Reference Arguments
- Multithreading in C++0x Part 4: Protecting Shared Data
- Multithreading
in C++0x Part 5: Flexible locking
with
std::unique_lock<>
- Multithreading in C++0x part 6: Lazy initialization and double-checked locking with atomics
- Multithreading in C++0x part 7: Locking multiple mutexes without deadlock
- Multithreading in C++0x part 8: Futures, Promises and Asynchronous Function Calls
Posted by Anthony Williams
[/ threading /] permanent link
Tags: concurrency, multithreading, C++0x, thread, future, promise, async
Stumble It! | Submit to Reddit | Submit to DZone
If you liked this post, why not subscribe to the RSS feed or Follow me on Twitter? You can also subscribe to this blog by email using the form on the left.
Design and Content Copyright © 2005-2025 Just Software Solutions Ltd. All rights reserved. | Privacy Policy
7 Comments
any chance you have a link to the `thread_safe_queue` - I could not find an example of a thread safe queue, which fits your solution
Great tutorial !
Just to be sure : It is the library that decides if a function will be "asynchronized" or not, that means at compilation time, is that correct ? So in the Divide and Conquer example, will there be one thread for each element of the container (as the compiler can't infer the number of cores on the machine) ? or will the number of threads will be decided at runtime depending on the availability of the cores of the machine (and multiple elements of the container are treated on a same shared thread) ?
RE: Sometimes asynchronous functions aren't really asynchronous
You are confused. Asynchronous is just that. Not to be confused with in parallel. Asynchronous means it gets done when it gets done. Period.
While your examples may be great for learning, they are a terible practice to get into.
You should consider that creating a new thread consumes some stack, on windows 32 the default is about 5 megs. The stack of course must undergo a stack wipe before it can be used. This makes a simple operation take longer to perform than just doing it all synchronously. You should avoid creating more threads than you have the hardware to run.
Cheers,
Dan -
This is a great series. Thank you. I have one suggestion: could you place the code comments in the code itself? That would be very handy.
I think this article is my favorite in your series ;) Thanks! Future concept is _very_ useful.
One question though regarding your futures implementation recently added into boost. How would I find out the return result of the future in a generic way? Why not having 'result_type' typedef? There is a 'reference' typedef both in unique_future and shared_future classes but it's not public...
You don't need 'explicit' in ctors that have more than one arg, do you? I thought that was just to prevent conversions.