Async::Worker: Parallelism with ZeroMQ

I’ve put the source to my Async::Worker system, documentation and examples here.

From the examples, how to offload batches of work for processing in parallel:

    int main(int argc, const char* const argv[])
        static const size_t NumberOfElements = 20000000 ;
        static const size_t GroupSize = 8192 ;
        Numbers numbers ;
        numbers.resize(NumberOfElements) ;
        for ( size_t i = 0 ; i < NumberOfElements ; ++i )
            numbers[i] = (rand() & 65535) + 1 ;

        uint64_t parallelResult = 0 ;

        // Dispatch groups of numbers to workers.
        Numbers::iterator it = numbers.begin() ;
            Numbers::iterator end = std::min(it + GroupSize, numbers.end()) ;
            Async::Queue(new CrunchNumbersRange(it, end, &parallelResult)) ;
            it = end ;
        while ( it != numbers.end() ) ;

        // Wait for all the results, calling Result() on each
        // returned object to produce a total.
        Async::GetResults() ;

        printf("Done. Calculated sum as %lu.\n", (unsigned long int)parallelResult) ;

        return 0 ;

and simulated offloading of async commits to a database:

#include "async-worker.h"       // The Asyc:: classes.
#include <OpenThreads/Thread>   // For thread sleep.
#include <string>               // For std::string.
#include <stdio.h>              // For printf

class DBOffload : public Async::FireAndForget
public: DBOffload(const std::string& query) : m_query(query) {}

public: virtual void Work() const
            In practice you would open the database connection
            here and connect to it, something like:

            MYSQL* conn = msql_connect(database);
            mysql_real_query(conn, query.c_str(), query.length());

            For demonstration purposes, I'm just going to print
            the query while the caller sleeps.

            // Work-simulation.
            OpenThreads::Thread::microSleep(5000) ;
            printf("query being done while main thread does something else (sleeps in this case)\n") ;
            printf("%s\n", m_query.c_str()) ;
            OpenThreads::Thread::microSleep(5000) ;
            printf("Done, this worker will now self-destruct\n") ;

private: std::string m_query ;
} ;

int main(int argc, const char* const argv[])
    // Method 1.
    Async::Queue(new DBOffload("INSERT INTO `user` (`beverage`, `quantity`) VALUES ('coffee', 'significant')")) ;

    // Method 2.
    DBOffload* work = new DBOffload("DELETE FROM `user` WHERE `beverage` = 'nasty tasting'") ;
    work->Queue() ;

    // DO NOT DO THE FOLLOWING because FireAndForget workers 'delete' themselves.
        DBOffload work("SELECT crash FROM application") ;
        work.Queue() ;

    // Now lets simulate doing some other work while that goes on in the background.

    printf("[originating thread is ''busy'' doing other stuff now]\n") ;
    OpenThreads::Thread::microSleep(1 * 1000 * 1000) ;  // Wait a second.
    printf("[originating thread has finished. is it over now?]\n") ;

    return 0 ;

I probably need to add a mechanism for delivering resources to the worker pool, such as database handles, or an easy way to extend the worker pool to obtain its own resources…

Or, failing that, a way to create separate worker pools and associate individual workers with those pools. I just chose not to do that with this version so as not to over complicate it.

Trackbacks and Pingbacks

Instead… « kfsone's pittanceJuly 26, 2010 at 7:37 am

[…] you watch through to the end, I do a little benchmark of simple parallelism with ZeroMQ and Async::Worker. Categories: Coding, WWIIOL Tags: zeromq Comments (0) Trackbacks (0) Leave a comment […]

[…] be honest, this stuff goes way over my head, but if detailed posts like Async::Worker: Parallelism with ZeroMQ(complete with source code, documentation, and examples) are what you're looking for, then you're […]

Leave a Reply

Name and email address are required. Your email address will not be published.

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

You may use these HTML tags and attributes:

<a href="" title="" rel=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <pre> <q cite=""> <s> <strike> <strong> 

%d bloggers like this: