C++ Asynch future promise package_task and shared_future

When writing threaded programs it is useful to have a fire and forget calculation that goes off does some work and then returns the value once the work is completed. Once you create the piece of work that needs to be done the work can be executed in a separate thread of execution in parallel to the tasks you are doing at present, hence the reason they are asynchronous. Once the value is ready it is stored in the Future object for you to collect when you are ready. However if the calculation is very long and you try to collect the result from the Future before it is ready it will block until the value is ready to be handed back.

It should also be noted that Futures manage the synchronization between themselves and the thread getting the value. This is done because the value generated is moveable so once you have done get() the value of the future is finished and the value is no longer available.

A simple
Future that returns the value 1 and takes an enter second to do it ( Simulating a lot of turning and thought in the background ) is shown below.
As part fo the call to the Future we provide a launch type if you do not specify anything the Compiler will determine which option to launch at build time.
std::launch::async - This says launch the future Asynchronously
std::launch::deferred - This says launch the future and do the calc only when I call the get() - ie defer execution till I ask for it.
The second parameter is a function that we call to do the actual work.

The return parameter we know is int because we created a
std::future





template <typename T>
int doSomeWork(){
//sleep to simulate some chugging
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return 1;
}

int async_main(){
try{
std::future<int> asyncFuture = std::async(std::launch::async, &doSomeWork<int>);
//... do some processing while the future is being set
// Get the result from the future
std::cout << "Value is (Will block if result not ready) :" ;
std::cout << asyncFuture.get() << std::endl;
//Note if we used std::launch::deferred then the execution would defer until the get is invoked and then the function gets called.
//If you want to check if the get is ready to be called we can do
// if(asyncFuture.valid()) do something when ready else do something when not ready
}
catch (const std::exception &ex) {
std::cout << "We threw an Exception" << std::endl;
}

return 0;
}




Futures can be used directly as above or they can be used as part of a std::promise to do something. A promise allows you to store a value or exception for Future use. Implicit in a promise is a future which can be obtained by get_future(). The idea is to create a promise to do some work and then execute that work via the future. Let me show you what I mean. Here I take a list of numbers and add them up ie I accumulate the result. To do this I have a function that takes a promise that will do the work. Once I create my promise I can get its future and then later call that futures get() for the result.




void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> myPromise)
{
try{
int sum = std::accumulate(first, last, 0);
myPromise.
set_value(sum); // Notify future
}
catch(...){
myPromise.
set_exception(std::current_exception());
//When you call get on the Future the Exception gets re thrown
}
}

int main(){

//
//A promise is a facility to stare a value or exception so you can retrieve the value later asynchronously.
//You can use a Promise to hold a future that contains a result as a Promise implicitly contains a Future.
//


// Demonstrate using promise to transmit a result between threads.
std::vector<int> vectorNumbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> myPromise; //create Promise
std::future<int> myFuture = myPromise.get_future(); // Get Future from Promise for later use

//Run a thread and give it our promise
std::thread work_thread(accumulate, vectorNumbers.begin(), vectorNumbers.end(),
std::move(myPromise));

// future::get() will wait until the future has a valid result and retrieves it.
// Calling wait() before get() is not needed
//myFuture.wait(); // wait for result
std::cout << "result from Future of the Promise=" << myFuture.get() << '\n';
work_thread.
join(); // wait for thread completion
return 0;
}




Notice in the function we can catch an exception. If an exception is thrown we set it as part of the promise and when we do get() it will get re thrown.

Now that we understand the basics of a promise we can look at other types of Future usages. For example you could create set of tasks as Futures and then put them in a queue and then retrieve of the Q later and process them at our leisure. In order to do this we need to create a Q of tasks and create each task as a
std::packaged_task. A packaged task can take any type of Callable be it a lambda, a bound function using std:bind or a thread. Once we create the tasks we put it in the Q. Then later we can drain the Q and process al the tasks that are in there. Let us look at this sample.
Here we create a task with a template of int() so return is int and it takes no parameters. We then create the tasks and put them on a Q. Note the Q implementation
std::deque is not thread safe so we must protect it via a mutex. Once all the tasks are created we can then run processTasks to loop through the Q and run each of the specified tasks. This is a practical way to use a sequence of Futures.



std::mutex m;
std::deque<std::packaged_task<int()> > tasks;

void processTasks()
{

std::lock_guard<std::mutex> lk(m);
while(!tasks.empty()){
std::packaged_task<int()> task;
{
task=
std::move(tasks.front());
tasks.pop_front();
}
task();
}
}

template<typename Func>
std::future<int> createTasks(Func f)
{
std::packaged_task<int()> task(f);
std::future<int> res=task.get_future();
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task));
return res;
}



int main(){
//Creating futures from a function call here int() but it can be any Callable
//(lambda Function Bind Function or Thread )
//the function call is provided to Packaged_task and the a future is created
//This allows you to create a Q of these to all be executed later
//
//A promise is a facility to stare a value or exception so you can retrieve the value later asynchronously.
//You can use a Promise to hold a future that contains a result as a Promise implicitly contains a Future.
//

int numTasks={10};
//create tasks
for(int i=0; i createTasks([i](){
int r=std::pow(2,i);
std::cout << "task result= " << r << std::endl;
return r;
});
}


//Run Tasks
processTasks();


return 0;
}




However our discussion on Future is not yet finished as there is yet another way you can use a Future and that is to use them with a shared_future. A shared_future allows multiple threads to share the same future but each thread must also have their own future in order to return a result. The shared_future in the example is used to signal the threads that they can start.





//Shared_future allows access to the same shared state ie the sharedFuture
//using multiple threads, a Future is bound to one execution thread.
//So all the threads are prepared and ready but can not proceed until
//the promise of the sharedFuture is set then the threads can execute and
//return their results via get.
int main()
{
std::promise<void> myPromise, t1Promise, t2Promise;
std::shared_future<void> sharedFuture(myPromise.get_future());

std::chrono::time_point<std::chrono::high_resolution_clock> start;

auto fun1 = [&, sharedFuture]() -> std::chrono::duration<double, std::milli>
{
t1Promise.
set_value();
sharedFuture.
wait(); // waits for the signal from main()
return std::chrono::high_resolution_clock::now() - start;
};


auto fun2 = [&, sharedFuture]() -> std::chrono::duration<double, std::milli>
{
t2Promise.
set_value();
sharedFuture.
wait(); // waits for the signal from main()
return std::chrono::high_resolution_clock::now() - start;
};

auto fut1 = t1Promise.get_future();
auto fut2 = t2Promise.get_future();

auto result1 = std::async(std::launch::async, fun1);
auto result2 = std::async(std::launch::async, fun2);

// wait for the threads to become ready
fut1.
wait();
fut2.
wait();

// the threads are ready, start the clock
start =
std::chrono::high_resolution_clock::now();

// signal the threads to go
myPromise.
set_value();

std::cout << "Thread 1 received the signal "
<< result1.
get().count() << " ms after start\n"
<<
"Thread 2 received the signal "
<< result2.
get().count() << " ms after start\n";
}




As you can see both threads share a
std::shared_future. This shared_future provides the signal to start the processing however each thread uses its own promise which of course has an embedded Future in it so we can run the threads when we are ready with the shared_future but each thread uses the promise and its embedded future to return a result.




People who enjoyed this article also enjoyed the following:



Creating C++ threads
C++ Mutex and Preventing Race conditions
Low Latency Java using CAS and LongAdder
Naive Bayes classification AI algorithm
K-Means Clustering AI algorithm
Equity Derivatives tutorial
Fixed Income tutorial


And the following Trails:

C++
Java
python
Scala
Investment Banking tutorials

HOME
homeicon




By clicking Dismiss you accept that you may get a cookie that is used to improve your user experience and for analytics.
All data is anonymised. Our privacy page is here =>
Privacy Policy
This message is required under GDPR (General Data Protection Rules ) and the ICO (Information Commissioners Office).