Friday, 2 November 2012

Job Graphs in C++ 11

Introduction

I've recently been experimenting with a lot of the new features in C++ 11, purely for discovery and experimentation. One major new feature are the threading libraries. As using is the best way of learning, I set upon the task of writing a currently-fashionable task/job-graph system using these new features.

I'm not promoting this algorithm as the one true approach for parallelism, as there are many other valuable algorithms that are much more suitable in certain occasions. This one seems a useful, generic solution that fits many cases fairly well. In the process, I found a few interesting quirks during implementation that I thought may be interesting for others.

Job Graphs

Job graphs are currently finding wide favour as an approach to the challenge of building a flexible architecture for multithreaded processing. In this scheme, jobs are stored in a graph structure. The edges of the graph are used to represent ordering and data dependencies between jobs. Assuming the graph is structured correctly, this ensures that work is processed in the correct order to ensure the correct and timely flow of data. Nodes that do not share a data dependency may be executed concurrently. The programmer shoulders the responsibility of structuring the graph correctly, and ensuring any conflicts between resources are either resolved via the graph, or using some locking mechanism.

Typically, the jobs are pushed into some kind of queue and distributed to a thread pool. Ideally you would also use work-stealing for maximum efficiency.

The goal is to use this graph structure to allow non-dependent work to execute concurrently, saturating all hardware threads. It is a very simplistic idea, yet at its most it is NP-hard to solve.

C++ 11 Threading Overview

The C++ 11 threading library is really quite a lightweight, spare API. Broadly, it offers the facilities to:
  • Create and manage threads in a cross platform fashion.
  • Synchronise activity between these threads using simple primitives such as mutexes, condition variables and promises/futures.
  • Package up tasks to be executed on a thread.

Implementation

The implementation of a job graph requires relatively few components:
  • A job class
  • A job graph container
  • A job pool
  • A thread pool

Job Class

The job class is fairly straightforward. It serves two purposes: to package up some functionality to be executed, and have knowledge of precursor jobs and dependent jobs (this concept is sometimes described as continuations).

The major consideration in the design of the job class is the mechanism used to sequence their operation. Ideally, you want to use some kind of OS threading primitive as opposed to some kind of user-space mechanism. The OS is in a far better position to manage threads as it has clear information of the state and dependencies of all of them, and can wake and sleep threads with minimal context switches. User space code often ends up with some kind of inefficient wake-predicate-sleep polling pattern causing a lot of wasted time and context switches.

I therefore chose to use a C++ 11 thread library primitive: std::promise and std::future. A promise provides the ability to calculate some value asynchronously, in a deferred context, in the future. It represents a contract for some piece of code to provide a value based on some inputs. A future is the mechanism used to retrieve that value. The promise is the producer, the future is the client.

The promise and future abstraction provides  a good mechanism to couple jobs together. A job can provide a "future" to each of its dependent jobs. The dependent job uses this future to wait for the job's completion. The future could pass data, or it could solely be used to sequence computation.

A second feature of C++ 11 that proves useful for the job class is std::function and lambda functions. An std::function is simply a wrapper for some callable target. My job class contains one of these to hold the job's processing kernel. It is initialised in the constructor, which allows me to pass in a lambda function when creating the job. This provides an interesting alternative to using inheritance and virtual functions, and the resultant proliferation of classes. Now, I have just one Job class, and the data handles the variation.

Job.h:
        class Job;

        //! This monitors the completion of a number of jobs, and sends out a signal when they're all done
        class JobCompletionMonitor
        {
        public:
            JobCompletionMonitor(void) :
                m_numJobsPending(0)
            {
            }

            // Forbidden constructors
            JobCompletionMonitor(JobCompletionMonitor&) = delete;
            JobCompletionMonitor& operator=(JobCompletionMonitor&) = delete;

            // Jobs tell the monitor what's going on
            void notifyStart(Job* job);
            void notifyFinish(Job* job);

            // Other people can find out when they're all done
            void waitForCompletion(void);

            bool allJobsFinished(void) const;

        private:
            std::atomic_int m_numJobsPending;
            std::condition_variable m_allJobsComplete;
            std::mutex m_mutex;
        };

        //! A self-contained job that is executed somewhere in the frame
        class Job : public objects::CoreObject
        {
        public:
            // Priority system
            typedef unsigned int Priority;
            static const Priority HighPriority = 1000;
            static const Priority AboveNormalPriority = 5000;
            static const Priority NormalPriority = 10000;
            static const Priority BelowNormalPriority = 15000;
            static const Priority LowPriority = 20000;

            // Permitted constructors
            Job(const resources::NameManager::Name& name,
                const std::initializer_list<std::shared_ptr<Job>>& precursorJobs = {},
                std::function<void(float)> kernel = [](const float){},
                const Priority priority = NormalPriority);
            ~Job();

            // Forboidden constructors
            Job(Job& job) = delete;
            Job& operator=(Job& job) = delete;

            // Management of dependents
            void addDependent(const std::shared_ptr<Job>& dependent);
            void removeDependent(const std::shared_ptr<Job>& dependent);

            // Execution API
            void prepare(JobCompletionMonitor* monitor);
            bool canRun(void) const;
            void execute(const float timeStep);
            void cleanup(void);

            // Accessors
            inline const std::vector<std::shared_ptr<Job>>& getDependents(void) const
            {
                return m_dependents;
            }

            inline std::vector<std::shared_ptr<Job>> getPrecursorJobs(void) const
            {
                return m_precursors;
            }

            inline bool operator<(const Job& other) const
            {
                return m_priority < other.m_priority;
            }

        private:
            // Connection to other jobs
            std::vector<std::shared_ptr<Job>> m_precursors; //!< The job that must complete before we can run. We are a continuation of this job
            std::vector<std::shared_ptr<Job>> m_dependents; //!< A list of all the jobs that continue after us

            // Futures to sequence processing
            std::vector<std::future<int>> m_precursorFutures; //!< The future that is used to tell us when to start - if we have a precursor
            std::vector<std::promise<int>> m_dependentPromises; //!< The promises we fulfill to trigger other continuations

            // Override this function to implement your logic
            std::function<void(const float)> m_kernel; //!< A simple routine to call to implement the logic for this task

            // What is the execution priority of this job?
            Priority m_priority; //!< Execution priority of this job

            JobCompletionMonitor* m_completionMonitor = nullptr; //!< This object monitors the job. The job must notify it on completion
        };

Job.cpp:
        //! Have all of our jobs now finished?
        bool JobCompletionMonitor::allJobsFinished(void) const
        {
            return m_numJobsPending == 0;
        }

        //! The job tells the monitor when it starts
        void JobCompletionMonitor::notifyStart(Job* job)
        {
            INC_RT_ASSERT(job != nullptr);
            ++m_numJobsPending;
        }

        //! The job notifies the monitor that it is complete
        void JobCompletionMonitor::notifyFinish(Job* job)
        {
            INC_RT_ASSERT(job != nullptr);
            --m_numJobsPending;
            if(m_numJobsPending == 0)
            {
                m_allJobsComplete.notify_all();
            }
        }

        //! Wait until all the jobs managed by this monitor are complete
        void JobCompletionMonitor::waitForCompletion(void)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_allJobsComplete.wait(lock);
            INC_RT_ASSERT(m_numJobsPending == 0);
        }

        //! Construction
        Job::Job(const resources::NameManager::Name& name,
                 const std::initializer_list<std::shared_ptr<Job>>& precursorJobs,
                 std::function<void(float)> kernel,
                 const Priority priority) :
            objects::CoreObject(name),
            m_precursors(precursorJobs),
            m_dependents(),
            m_dependentPromises(),
            m_kernel(kernel),
            m_priority(priority)
        {
        }

        //! Destruction
        Job::~Job()
        {
            // Add some assertions here to ensure the job has been correctly deleted
        }

        // Notify this job of a new dependent that expects to be told when this has finished
        void Job::addDependent(const std::shared_ptr<Job>& dependent)
        {
            INC_RT_ASSERT(dependent != nullptr);

            // If this job isn't already a dependent, then add it
            if(std::find(m_dependents.begin(), m_dependents.end(), dependent) == m_dependents.end())
            {
                m_dependents.push_back(dependent);
            }
        }

        // Remove a dependency from this job
        void Job::removeDependent(const std::shared_ptr<Job>& dependent)
        {
            INC_RT_ASSERT(dependent != nullptr);

            // If this job isn't already a dependent, then add it, and re-issue futures
            auto it = std::find(m_dependents.begin(), m_dependents.end(), dependent);
            if(it != m_dependents.end())
            {
                // Remove this dependent
                m_dependents.erase(it);
            }
        }

        //! Reset the job ready for graph traversal and execution
        void Job::prepare(JobCompletionMonitor* monitor)
        {
            INC_RT_ASSERT(monitor != nullptr);
            m_completionMonitor = monitor;

            // Ask our precursors for a future
            for(auto& precursor : m_precursors)
            {
                precursor->m_dependentPromises.push_back(std::promise<int>());
                m_precursorFutures.push_back(precursor->m_dependentPromises.back().get_future());
            }
        }

        //! Clean up after execution. Remove all the promises and futures
        void Job::cleanup(void)
        {
            m_precursorFutures.clear();
            m_dependentPromises.clear();
        }

        // Can this job run? Have all of our futures been satisfied?
        bool Job::canRun(void) const
        {
            for(auto& it : m_precursorFutures)
            {
                if(it.valid() == false)
                {
                    return false;
                }
            }

            return true;
        }

        // Actually run this job
        void Job::execute(const float timeStep)
        {
            // Wait to receive the promised values from all precursor jobs
            for(auto& future : m_precursorFutures)
            {
                future.wait();
                (void)future.get();
            }

            // Do our actual work
            m_kernel(timeStep);

            // Fulfill promises to our children
            for(auto& promise : m_dependentPromises)
            {
                promise.set_value(1);
            }

            m_completionMonitor->notifyFinish(this);

            // Now that we are done, clear out all of the stale old futures we were waiting for -
            // prepare() will re-issue them next time around
            m_precursorFutures.clear();
        }

Job Graph

The job graph really has few responsibilities: To contain the jobs, traverse the graph and manage the resources needed for execution.

The job graphs are contained in a simple graph structure. I may later shift to a more capable container such as boost::graph

The main task at hand for the job graph is to traverse the graph, pushing all of the jobs into an ordered pool ready for execution. A thread pool then pulls jobs from this pool when they are ready for execution.

JobGraph.h:
        // A graph structure holding all the jobs and conditions
        class JobGraph
        {
        public:
            JobGraph(const size_t numThreads);
            ~JobGraph();

            void add(const std::shared_ptr<Job>& job);
            void remove(const std::shared_ptr<Job>& job);

            void execute(const float timeStep);

        private:
            // At the start of the frame, the job graph is traversed and all the jobs to be processed are placed into this pool
            JobPool m_jobPool;

            // This is a pool of threads which work on executing those jobs
            // Threads are free to steal jobs from the overall pool to keep things ticking forwards
            ThreadPool m_threadPool;

            // This is a special root-level job
            std::shared_ptr<Job> m_rootJob;

            // Need to measure when all jobs have completed
            // (not just dispatched - *completed*)
            JobCompletionMonitor m_jobCompletionMonitor;

            // Mutually exclude operations like construction, adding jobs and execution
            std::mutex m_mutex;

            // Does a (parent) job exist within the graph?
            std::unordered_map<Job*, bool> m_jobContained;
        };

JobGraph.cpp:
        // A generic minimal visitor pattern
        class JobGraphVisitor
        {
        public:
            virtual ~JobGraphVisitor() {}

            void traverse(Job* job)
            {
                INC_RT_ASSERT(job != nullptr);

                visit(job);

                for(auto child : job->getDependents())
                {
                    traverse(child.get());
                }
            };

        private:
            virtual void visit(Job* job) {}
        };

        // Visit with a view to pushing a job to the task pool
        class PushJobVisitor : public JobGraphVisitor
        {
        public:
            inline PushJobVisitor(JobPool& jobPool, const float timeStep, JobCompletionMonitor* jobCompletionMonitor) :
                m_jobPool(jobPool),
                m_timeStep(timeStep),
                m_jobCompletionMonitor(jobCompletionMonitor)
            {
            }

        private:
            JobPool& m_jobPool;
            const float m_timeStep;
            JobCompletionMonitor* const m_jobCompletionMonitor;
            std::unordered_map<Job*, bool> m_jobPushed;

            virtual void visit(Job* job) override
            {
                if(m_jobPushed[job] == false)
                {
                    m_jobCompletionMonitor->notifyStart(job);
                    m_jobPool.push(job);
                    m_jobPushed[job] = true;
                }
            }
        };

        // Reset jobs in preparation
        class ResetJobVisitor : public JobGraphVisitor
        {
        public:
            inline ResetJobVisitor(JobCompletionMonitor* jobCompletionMonitor) :
                m_jobCompletionMonitor(jobCompletionMonitor)
            {
            }

        private:
            JobCompletionMonitor* const m_jobCompletionMonitor;

            virtual void visit(Job* job) override
            {
                job->prepare(m_jobCompletionMonitor);
            }
        };

        //! Post-execution by-product clean-up
        class CleanUpJobVisitor : public JobGraphVisitor
        {
        public:
            inline CleanUpJobVisitor(void)
            {
            }

        private:
            virtual void visit(Job* job) override
            {
                job->cleanup();
            }
        };

        // Construction
        JobGraph::JobGraph(const size_t numThreads) :
            m_jobPool(),
            m_threadPool(numThreads, m_jobPool),
            m_rootJob(new Job(resources::getUniqueName("Root Job")))
        {
            INC_RT_ASSERT(numThreads != 0);
            std::unique_lock<std::mutex> lock(m_mutex);
        }

        // Destruction
        JobGraph::~JobGraph()
        {
            INC_RT_ASSERT(m_jobPool.jobAvailable() == false);
            m_jobPool.shutdown();
            m_threadPool.shutdown();
            std::this_thread::yield();
        }

        // Add a job to the graph
        void JobGraph::add(const std::shared_ptr<Job>& job)
        {
            INC_RT_ASSERT(job != nullptr);
            std::unique_lock<std::mutex> lock(m_mutex);

            if(job->getPrecursorJobs().size() == 0)
            {
                // A null precursor means that it's a root level task that lives under our
                // special root job
                m_rootJob->addDependent(job);
            }
            else
            {
                // Assert to ensure that the parent job actually exists within the graph!
                for(auto& precursor : job->getPrecursorJobs())
                {
                    INC_RT_ASSERT(m_jobContained[precursor.get()]);
                    precursor->addDependent(job);
                }
            }

            m_jobContained[job.get()] = true;
        }

        // Remove a job from the graph
        void JobGraph::remove(const std::shared_ptr<Job>& job)
        {
            INC_RT_ASSERT(job != nullptr);
            std::unique_lock<std::mutex> lock(m_mutex);

            if(job->getPrecursorJobs().size() == 0)
            {
                m_rootJob->removeDependent(job);
            }
            else
            {
                for(auto& precursor : job->getPrecursorJobs())
                {
                    precursor->removeDependent(job);
                }
            }

            m_jobContained[job.get()] = false;
        }

        // Execute all jobs in the graph
        void JobGraph::execute(const float timeStep)
        {
            std::unique_lock<std::mutex> lock(m_mutex);

            // Make sure the job queue is fully empty from last time
            INC_RT_ASSERT(m_jobPool.jobAvailable() == false);

            // Get it all reset ready for execution
            {
                ResetJobVisitor resetJobVisitor(&m_jobCompletionMonitor);
                resetJobVisitor.traverse(m_rootJob.get());
            }

            // Traverse the whole job graph, pushing the jobs into the task pool
            {
                PushJobVisitor pushJobVisitor(m_jobPool, timeStep, &m_jobCompletionMonitor);
                pushJobVisitor.traverse(m_rootJob.get());
            }

            // (Note that tasks will be consumed asynchronously as soon as we start to push them)

            m_jobCompletionMonitor.waitForCompletion();
            
            // We should be all done now. Nothing should be here

            INC_RT_ASSERT(m_jobPool.jobAvailable() == false);

            // Get it all cleaned up ready for next time
            {
                CleanUpJobVisitor cleanUpVisitor;
                cleanUpVisitor.traverse(m_rootJob.get());
            }
        }

Job Pool

It is initially tempting to simply traverse the graph, push all of the jobs out for execution, and let the OS arbitrate everything. There are a few difficulties here. First of all, unless your target platform's runtime explicitly models tasks or jobs, this tends to lead to mass thread creation and destruction - a substantial overhead. Secondly, you may wish to cancel the execution of jobs after a precursor has completed. Thirdly, it makes little sense to dispatch many jobs for execution that have no chance of execution. They will just immediately hit a threading hazard and context-switch away. Finally, consider the case where you have N cores and N+1 jobs. Job 0 does some work, and jobs 1..N wait on job 0. Imagine if your target platform has a pooly-implemented runtime, and assigns jobs 1..N to cores instead of job 0. Those jobs may wait, and job 0 may not execute. This case is unlikely in a well-implemented runtime, but it is a risk, and why present code that is inherently at risk?

The job pool is simply an std::set of jobs, ordered by job priority. Whenever a thread in the thread pool wakes up, it pops the first (highest priority) job from the pool and executes it.

JobPool.h:
        // This represents a pool of tasks that need doing, for all threads to
        // pull from (and therefore work-steal from)
        class JobPool
        {
        public:
            JobPool(void);
            JobPool(JobPool&) = delete;
            JobPool& operator=(JobPool&) = delete;

            bool jobAvailable(void);
            void push(Job* job);
            Job* pop(void);

            void waitForJob(void);
            bool shouldTerminate(void);
            void shutdown(void);

        private:
            //! This gets notified when a task is pushed, to allow the other consuming
            //! threads to wake up
            std::condition_variable m_jobPushNotification;
            std::condition_variable m_queueEmptyNotification;

            //! We maintain a set of jobs, so that we can find the first, highest-priority job
            //! that is in an executable state
            std::set<Job*> m_jobs;

            //! This mutex is used to ensure thread safety when pushing a job in or pulling it out
            std::mutex m_mutex;

            //! Used when it's time to shut down
            std::atomic<bool> m_shouldTerminate;
        };

JobPool.cpp:
        // Construct it
        JobPool::JobPool() :
            m_shouldTerminate(false)
        {
        }

        // A consumer calls this function to wait for notification via the condition_variable of a new task
        void JobPool::waitForJob(void)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_jobPushNotification.wait_for(lock, std::chrono::milliseconds(100));

            // The above timeout means that spurious wakeup is possible
        }

        // Is there a job available in the set?
        bool JobPool::jobAvailable(void)
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            return m_jobs.empty() == false;
        }

        // Push a new job into the pool
        void JobPool::push(Job* job)
        {
            INC_RT_ASSERT(job != nullptr);
            std::lock_guard<std::mutex> lock(m_mutex);

            INC_RT_ASSERT(m_shouldTerminate == false);
            m_jobs.insert(job);
            m_jobPushNotification.notify_one();
        }

        // Try to return an executable job from the pool
        Job* JobPool::pop(void)
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            if(m_jobs.empty())
            {
                // Nothing to run!
                return nullptr;
            }
            else
            {
                // Find the highest priority job that can run
                for(auto job : m_jobs)
                {
                    if(job->canRun())
                    {
                        m_jobs.erase(job);
                        return job;
                    }
                }
            }

            // It is possible that we're asked for a job but none are able to run... yet
            return nullptr;
        }

        //! Should we terminate now?
        bool JobPool::shouldTerminate(void)
        {
            return m_shouldTerminate;
        }

        //! Send termination message
        void JobPool::shutdown(void)
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_shouldTerminate = true;
            m_jobs.clear();
            m_jobPushNotification.notify_all();
        }

Thread Pool

The purpose of the thread pool is to provide a persistent set of threads capable of executing jobs, saving on the cost of thread creation and destruction. These threads pull jobs from the job pool until they are asked to terminate.

Spurious Wakeup

The threads in the thread pool wait on an std::condition_variable to notify them on the availability of a job. It is important to consider the possibility of spurious wakeup in this system. Therefore, when a thread wakes up, it must ensure a job actually exists in the pool to be executed.

I actually deliberately allow the std::condition_variable::wait() to time out after 100 ms. The rationale for this is that some operating systems become suspicious of threads that sleep for long periods of time. For stability reasons, I prefer my system to wake up unneccesarily and to service the outer thread loop than wait for infinite amounts of time. This therefore deliberately introduces some degree of spurious wakeup into the system.

ThreadPool.h:
        // A pool of threads hungry to execute tasks
        class ThreadPool
        {
        public:
            ThreadPool(const size_t numThreads, JobPool& jobPool);
            ~ThreadPool();

            void shutdown(void);

        private:
            //! The set of threads in the thread pool
            std::vector<std::thread> m_threads;
        };

ThreadPool.cpp:
        // Construction / destruction
        ThreadPool::ThreadPool(const size_t numThreads, JobPool& jobPool)
        {
            for(size_t thread = 0; thread < numThreads; ++thread)
            {
                m_threads.push_back(std::thread([](JobPool* jobPool)
                    {
                        for(;;)
                        {
                            // Sleep, until a job is available
                            // (use a condition_variable here)
                            jobPool->waitForJob();

                            // Have we been told to shut down?
                            if(jobPool->shouldTerminate())
                            {
                                return;
                            }

                            // Pull a job out of the task pool and execute it
                            if(jobPool->jobAvailable())
                            {
                                // Whilst we'd prefer to execute task() here, instead we have to do
                                // the pop-and-execute as an atomic-like operation to avoid
                                // two threads popping the same task, or one thread popping
                                // a task that the other thread expected to receive
                                Job* job = jobPool->pop();
                                if(job != nullptr)
                                {
                                    // Clearly the correct time step should be plumbed here
                                    job->execute(1.0f / 30.0f);
                                }
                            }
                        }
                    }, &jobPool));
            }
        }

        ThreadPool::~ThreadPool()
        {
        }

        // Shutdown the threads. You must have told the JobPool to terminate
        void ThreadPool::shutdown(void)
        {
            for(auto& thread : m_threads)
            {
                thread.join();
            }
        }

Findings

The new C++ 11 libraries largely made this system easy and quick to implement. The libraries are quite clearly defined, unobtrusive, easy to use with few dependencies.

One of the main difficulties encountered in the implementation of this system is simply the life cycles of the C++ 11 std::promises and std::futures. Their life cycles are not clearly documented and they can be difficult to reason about. It was only by reading the STL code that I found these primitives fundamentally hold a "use and discard" policy. This leads to the need for some code to create and destroy these resources each frame. Note that this may possibly vary across platforms, but this seems the best least-common-denominator assumption. This is largely the reason I avoided the use of std::packaged_task. With things like move semantics transferring the object between threads and queues, it becomes very difficult to reason about the life cycle of the object! Furthermore, once you have your own job pool and thread pool, it largely obviates the need for this class. This class only seems useful for "go and load some assets on a thread" type work.

The system itself has thus far proven stable. I have unit tests for a variety of graph configurations, which all pass, and it successfully manages to run my current engine. This engine is in its infancy, and so has not massively load tested this system... but, ultimately, this is meant to be an illustrative, exploratory post!