I’ve put the source to my Async::Worker system, documentation and examples here.
From the examples, how to offload batches of work for processing in parallel:
int main(int argc, const char* const argv[]) { static const size_t NumberOfElements = 20000000 ; static const size_t GroupSize = 8192 ; Numbers numbers ; numbers.resize(NumberOfElements) ; for ( size_t i = 0 ; i < NumberOfElements ; ++i ) { numbers[i] = (rand() & 65535) + 1 ; } uint64_t parallelResult = 0 ; // Dispatch groups of numbers to workers. Numbers::iterator it = numbers.begin() ; do { Numbers::iterator end = std::min(it + GroupSize, numbers.end()) ; Async::Queue(new CrunchNumbersRange(it, end, ¶llelResult)) ; it = end ; } while ( it != numbers.end() ) ; // Wait for all the results, calling Result() on each // returned object to produce a total. Async::GetResults() ; printf("Done. Calculated sum as %lu.\n", (unsigned long int)parallelResult) ; return 0 ; }
and simulated offloading of async commits to a database:
#include "async-worker.h" // The Asyc:: classes. #include <OpenThreads/Thread> // For thread sleep. #include <string> // For std::string. #include <stdio.h> // For printf class DBOffload : public Async::FireAndForget { public: DBOffload(const std::string& query) : m_query(query) {} public: virtual void Work() const { /* In practice you would open the database connection here and connect to it, something like: MYSQL* conn = msql_connect(database); mysql_real_query(conn, query.c_str(), query.length()); For demonstration purposes, I'm just going to print the query while the caller sleeps. */ // Work-simulation. OpenThreads::Thread::microSleep(5000) ; printf("query being done while main thread does something else (sleeps in this case)\n") ; printf("%s\n", m_query.c_str()) ; OpenThreads::Thread::microSleep(5000) ; printf("Done, this worker will now self-destruct\n") ; } private: std::string m_query ; } ; int main(int argc, const char* const argv[]) { // Method 1. Async::Queue(new DBOffload("INSERT INTO `user` (`beverage`, `quantity`) VALUES ('coffee', 'significant')")) ; // Method 2. DBOffload* work = new DBOffload("DELETE FROM `user` WHERE `beverage` = 'nasty tasting'") ; work->Queue() ; // DO NOT DO THE FOLLOWING because FireAndForget workers 'delete' themselves. /* DBOffload work("SELECT crash FROM application") ; work.Queue() ; */ // Now lets simulate doing some other work while that goes on in the background. printf("[originating thread is ''busy'' doing other stuff now]\n") ; OpenThreads::Thread::microSleep(1 * 1000 * 1000) ; // Wait a second. printf("[originating thread has finished. is it over now?]\n") ; return 0 ; }
I probably need to add a mechanism for delivering resources to the worker pool, such as database handles, or an easy way to extend the worker pool to obtain its own resources…
Or, failing that, a way to create separate worker pools and associate individual workers with those pools. I just chose not to do that with this version so as not to over complicate it.
Trackbacks and Pingbacks
[…] you watch through to the end, I do a little benchmark of simple parallelism with ZeroMQ and Async::Worker. Categories: Coding, WWIIOL Tags: zeromq Comments (0) Trackbacks (0) Leave a comment […]
[…] be honest, this stuff goes way over my head, but if detailed posts like Async::Worker: Parallelism with ZeroMQ(complete with source code, documentation, and examples) are what you're looking for, then you're […]