Wednesday 5 June 2013

Polygonal Area Light Sources on DX11-Class Hardware

The surge in interest in Physically-Based Rendering has led to a surge in interest in area light sources. There is, however, something of a dearth of information available for the aspiring implementer.


Introduction

The objective of polygonal area light sources is to define an arbitrary polygon in space that may emit light. I'm going to make two simplifications here: firstly, that the polygon emits a constant colour across its surface and secondly, that we are not considering shadowing.

The earliest source I was able to find on this subject is James Arvo's 1994 paper on polyhedral light sources.  John Snyder also presents another excellent reference. The approach taken here is to compute a vector field such that at any point in space r, we may obtain the direction of the light. This vector field is a function of the position r, but also the polygon's vertices. This direction can then be substituted into your brdf equation for a particular surface.

Arvo's algorithm is simple: for each edge of the polygon he computes a normal named gamma, and a weighting value named theta. The normal is of the triangle formed by the polygon edge and the point r. The weight theta is the angle formed by the pair of vectors from the point r to each edge vertex. He forms a weighted sum of these normals, divides it by 2*PI (to normalise the weights returned by acos) and scales it by the light's intensity. This is all you need to perform basic area lighting.

There are a couple of twists. Firstly, you typically need to negate the computed vector to get it into a form ready to dot against your normal. Secondly, you must clip the polygon against the plane of the point being shaded. It is possible to have a valid light source that straddles the plane of the point being shaded.


Basic HLSL code

Here is the HLSL code to perform the basic calculation to obtain the light vector for a polygon at a given point. Here, I process quads. This is no algorithm restriction; this is purely to minimise buffer traffic.

// Derive quad vertices in world-space
float3 quadCentre = g_quadLights[lightIndex].pos;
float3 quadAxisX = g_quadLights[lightIndex].axisX * 0.5f;
float3 quadAxisY = g_quadLights[lightIndex].axisY * 0.5f;
float4 quadVertices[4] = 
{
    float4(quadCentre - quadAxisX - quadAxisY, 1.0f),
    float4(quadCentre + quadAxisX - quadAxisY, 1.0f),
    float4(quadCentre + quadAxisX + quadAxisY, 1.0f),
    float4(quadCentre - quadAxisX + quadAxisY, 1.0f),
};

float3 lightField = 0.0f;
uint polyVertex1 = 3;
for(uint polyVertex0 = 0; polyVertex0 < 4; ++polyVertex0)
{
    float3 delta0 = quadVertices[polyVertex0].xyz - worldSpacePos;
    float3 delta1 = clippedPolygon[polyVertex1].xyz - worldSpacePos;
    float3 edge0 = normalize(delta0);
    float3 edge1 = normalize(delta1);
    float theta = acos(dot(edge0, edge1));

    float3 gamma = normalize(cross(delta1, delta0)); // Note you can't just cross edge0 and edge1 to save the normalize - that gets you the wrong result!

    lightField += theta * gamma;
    polyVertex1 = polyVertex0;
}

// Note that we don't divide by 2*PI as in the paper. That is just there to account for the sum of the theta values
// We don't need it since we're doing a normalise - it's just a uniform scale

// Summing normalised vectors doesn't necessarily give a normalised result. Also flip the vector ready for light calculations
lightField = -normalize(lightField);


Clipping

We must now deal with the problem of clipping. This is a particularly awkward operation, especially when it may need to be done per-pixel. Firstly, we must establish whether we actually need to clip. Since the quads are formed using a quad centre and two axes, this may be handled using something very similar to an OBB vs plane test:

float criticalDistance = abs(dot(worldSpaceNormal, quadAxisX)) + abs(dot(worldSpaceNormal, quadAxisY));
float planeToQuadCentreDistance = dot(worldSpacePlane, float4(quadCentre, 1.0f));
if(planeToQuadCentreDistance < -criticalDistance)
{
    // Invisible
}

if(planeToQuadCentreDistance < -criticalDistance)
{
    // Fully visible - unclipped
}

We then use standard Sutherland-Hodgman clipping to trim the polygon against the plane:

float4 clippedPolygon[5]; // Clipping against 1 plane can add one extra vertex to the quad
uint numClippedVertices = 0;

// Clip the polygon against the plane of the point being shaded
float vertexPlaneSide[4] =
{
    sign(dot(quadVertices[0], worldSpacePlane)),
    sign(dot(quadVertices[1], worldSpacePlane)),
    sign(dot(quadVertices[2], worldSpacePlane)),
    sign(dot(quadVertices[3], worldSpacePlane)),
};

numClippedVertices = 0;
uint previous = 3;
for(uint current = 0; current < 4; ++current)
{
    float currentSide = vertexPlaneSide[current];
    if(currentSide < 0.0f)
    {
        if(vertexPlaneSide[previous] > 0.0f)
        {
            float4 v1 = quadVertices[previous];
            float4 v2 = quadVertices[current];
            float3 delta = v2.xyz - v1.xyz;
            float t = dot(worldSpacePlane, v2) / dot(worldSpacePlane.xyz, delta);
            clippedPolygon[numClippedVertices++] = float4(v2.xyz - delta * t, 1.0f);
        }
    }
    else
    {
        float4 v1 = quadVertices[current];
        if(currentSide > 0.0f && vertexPlaneSide[previous] < 0.0f)
        {
            float4 v2 = quadVertices[previous];
            float3 delta = v2.xyz - v1.xyz;
            float t = dot(worldSpacePlane, v2) / dot(worldSpacePlane.xyz, delta);
            clippedPolygon[numClippedVertices++] = float4(v2.xyz - delta * t, 1.0f);
                    }

            clippedPolygon[numClippedVertices++] = v1;
        }

        previous = current;
    }
}

The resulting polygon may then be used in the original light field calculation code.

Friday 17 May 2013

Image Histogram Construction In DX11

Preamble

Recently, I have been researching some next-gen rendering techniques. There's some great presentations out there by the usual suspects - Epic, Crytek, DICE, Guerilla and so on. Many of the vendors also have good sample code to explore. There's a lot of buzz about certain new techniques... but there's sometimes a little bit of a gap between the academic literature, the presentations and sample code. Sometimes its a little hard for the time-pressed practioner to google an example and jump straight in.

So, I thought I'd show a little example code to help people get started.

Introduction

Tone mapping using a histogram is becoming a trend in next-generation graphics. People are starting to experiment with moving away from the DX9/360/PS3 style "compute average luminance, apply exposure, apply film response curve" algorithms towards trying something different.

Histogram based methods aren't new. An early, commonly cited example is Ward et al '97. More recent examples are things like Duan et al 2010. The basic idea is that you take an input high dynamic range image, compute a histogram measuring the frequency of different luminances in that scene, and use that to build a response curve. The response curve maps from the scene's luminance range to your output display lumiance range.

So, in one fell swoop, you can take your HDR image and convert it to an LDR image. No repeated downsampling and cache thrashing, no incorrect half-pixel or half-texel offsets, no frame-by-frame adaptation. A simple algorithm that "just works".

This algorithm fits very naturally with compute shaders in DX11. However, not everyone out there is a DX11 expert; there are many practictioners busy shipping titles on DX9 class hardware who have not had the opportunity to experiment with DX11 yet. I shall detail the implementation of a very basic histogram based tone mapping operator that may serve as the basis for future expansion.

Running a Compute Shader Over an Image

Compute Shader Introduction

Compute shaders off a great deal of general, flexible power on modern DX11 GPUs. They are very flexible and offer immense general computation power. They are not, however, a direct replacement for C++ code; they have a specific programming model that you have to follow. Whilst you can ignore some details of this model, you will not achieve maximum performance. This programming model therefore largely dictates how you must structure your algorithm.

An individual compute shader contains the inner most operation of some loop. However, the compute shader is not executed as some kind of iterative loop. Instead, many separate invocations of the compute shader code are scheduled, dispatched and retired. Each invocation of the compute shader is known as a thread, and each thread receives unique addressing information. And these threads are executed in parallel. Threads are grouped into thread groups. Each thread is therefore separately executing some part of the desired operation. It is very akin to a parallel map operation in a functional language.

A compute shader is therefore unlike a general CPU process or job. It is unlike the jobs or task programming model of the Cell processor. It is very similar to shading a pixel. In contrast to pixel shading, compute shader threads can communicate with other concurrent threads. A thread may communicate with other threads in the same group using some on-chip fast memory known as group shared memory.

The conclusion of this is that when processing an image with a compute shader, you do not sit in a big x * y loop, fetching pixels and processing them. You do not DMA in parts of an image, process them, and DMA them out again. You write code that processes each pixel independently and dispatch that work to be processed as a whole in parallel. This requires a certain structure to your code, especially if you want to obtain maximum parallelism.

Tiling

A common paradigm when executing a compute shader over an image is to break the image up into tiles. Each thread typically corresponds to an input pixel.A thread group maps to a tile. Each thread is executed and aggregates data to the local, on-chip memory known as group shared memory. When all the threads in a group have completed, the results are exported from the transient group shared memory to some persistent off-chip memory.

But what is the reasoning behind this model?

If we liked, we could dispatch a compute shader with a thread group size of 1 x 1 x 1 thread. Inside this one thread of one shader, we would iterate through every single pixel in an image and perform our computation. But, this model offers very little parallelism. We would only enjoy the SIMD instruction level parallelism, and would have no thread-level parallelism. GPUs are built to be very, very wide. We would only be occupying a fraction of the GPU.

Ideally. we want to launch as many threads as possible. This undermines any kind of approach where one thread iterates across many pixels. We want a thread per pixel.

And this is what gives rise to the image tiling model. Here, we break the image into tiles of 16x16, 32x32 or whatever size yields the maximum threads and therefore parallelism possible. We exploit group shared memory rather than work directly in off-chip memory for performance reasons. A modern GPU will not simply sit and churn through every every thread of your compute shader until completion. It will execute wavefronts (groups of threads) from multiple different shaders simultaneously across the GPU. If we can read from off-chip memory but write to on-chip memory, we minimise our bandwidth consumption and play better with other shaders. Ultimately we will need to export the results, but we can do that once in a burst rather than constantly causing memory hazards.

We may need to execute subsequent passes that combine together this intermediate metadata to yield a final result. This arrangement is akin to the common map-reduce parallel programming model.

I will now dive straight into the HLSL compute shader code. For clarity and brevity, I won't present the detailed DX11 CPU-side code; I shall just detail what it needs to do.

Computing the Per-Tile Histogram.

Our first task is to compute a histogram for each tile of the image. These will later be combined to form a histogram for the whole image. Each histogram will map the scene's luminance range across N bins. Each bin holds a uint which represents the number of pixels in that luminance band.

To perform this, we need a few things on the DX11 side:
  • A shader resource view that allows us to read the source texture.
  • A shader resource view for our constant buffer.
  • A large buffer to store all of the per-tile histograms. Its size will be numTiles * numHistogramBins.
  • An unordered access view for the per-tile histogram buffer.
We can then dispatch our compute shader. Note, that we have to round up the image size to be a multiple of our compute size; we must then be careful to only sample the real pixels. Here's the code:

#define NUM_HISTOGRAM_BINS (64)
#define HISTOGRAM_TILE_SIZE_X (32)
#define HISTOGRAM_TILE_SIZE_Y (16)

Texture2DMS sourceTextureSRV;

cbuffer toneMapGammaConstants
{
    uint outputWidth;
    uint outputHeight;

    float inputLuminanceMin;
    float inputLuminanceMax;

    uint numPerTileHistograms;

    float outputLuminanceMin;
    float outputLuminanceMax;
};

// RGB to luminance
float rgbToLuminance(float3 colour)
{
     // Use this equation: 
http://en.wikipedia.org/wiki/Luminance_(relative)
     return dot(float3(0.2126f, 0.7152f, 0.0722f), colour);
}

// RGB to log-luminance
float rgbToLogLuminance(float3 colour)
{
     return log(rgbToLuminance(colour));
}

// Convert a luminance to a histogram bin index
uint rgbToHistogramBin(float3 colour)
{
    // Convert to luminance
    float luminance = rgbToLogLuminance(colour);
    // Work out response curve response
    return uint(float(NUM_HISTOGRAM_BINS) * saturate((luminance - inputLuminanceMin) / (inputLuminanceMax - inputLuminanceMin)));
 }

// Thread groups correspond to a tile.
// Threads correspond to a pixel

groupshared uint histogram[NUM_HISTOGRAM_BINS];

Buffer perTileHistogramSRV;
Buffer mergedHistogramSRV;
RWBuffer perTileHistogramUAV;
RWBuffer mergedHistogramUAV;

[numthreads(HISTOGRAM_TILE_SIZE_X, HISTOGRAM_TILE_SIZE_Y, 1)]
void compute_per_tile_histogram_cs(uint3 globalIdx : SV_DispatchThreadID, uint3 localIdx : SV_GroupThreadID, uint3 groupIdx : SV_GroupID)
{
    uint localIdxFlattened = localIdx.x + localIdx.y * HISTOGRAM_TILE_SIZE_X; // Which pixel within a tile
    uint tileIdxFlattened = groupIdx.x + groupIdx.y * GetNumTilesX(); // Which tile from the screen

    // Initialise the contents of the group shared memory. Only one  thread does it to avoid bank conflict overhead

    if(localIdxFlattened == 0)
    {
        for(uint index = 0; index < NUM_HISTOGRAM_BINS; ++index)
        {
            histogram[index] = 0;
        }
    }

    GroupMemoryBarrierWithGroupSync();

    // For each thread, update the histogram

    // It is possible that we emit more threads than we have pixels. This is caused due to rounding up an image to a multiple of the tile size
    if(globalIdx.x < outputWidth && globalIdx.y < outputHeight)
    {
        // We just use sample 0 for efficiency reasons. It is unlikely that omitting the extra samples will make a significant difference to the histogram
        float3 pixelValue = sourceTextureSRV.Load(int2(globalIdx.xy), 0).rgb;
        uint bin = rgbToHistogramBin(pixelValue);
        InterlockedAdd(histogram[bin], 1u);
    }

    // Thread 0 outputs this thread group's histogram to the buffer
    GroupMemoryBarrierWithGroupSync();
    if(localIdxFlattened == 0)
    {
        // This could write uint4s to the UAV as an optimisation
        uint outputHistogramIndex = NUM_HISTOGRAM_BINS * tileIdxFlattened;
        for(uint index = 0; index < NUM_HISTOGRAM_BINS; ++index)
        {
            uint outputIndex = index + outputHistogramIndex;
            perTileHistogramUAV[outputIndex] = histogram[index];
        }
    }
}


We now need to take this buffer and combine it to form a single histogram for the whole image.

Merging the Per-Tile Histograms

This is a relatively simple compute shader. In this case, each thread maps to one of the tiles just computed, and each threadgroup therefore represents a group of tiles. This shader atomically adds each tile's histogram to the global histogram. In addition to the previous resources, we also now need:
  • A buffer to hold the histogram for the whole image.
  • An unordered access view to read said buffer.
  • A shader resource view for the per-tile histogram buffer.

// A thread per tile
#define NUM_TILES_PER_THREAD_GROUP 768 // DX10 maximum
[numthreads(NUM_TILES_PER_THREAD_GROUP, 1, 1)]
void merge_histograms_cs(uint3 globalIdx : SV_DispatchThreadID, uint3 localIdx : SV_GroupThreadID, uint3 groupIdx : SV_GroupID)
{
    // Each thread has the job of adding in the contents of the per-tile histogram to the overall histogram stored in GSM
    // This is an atomic operation, because many, many threads are trying to hit that memory simultaneously
    uint tileIndex = globalIdx.x;
    if(tileIndex < numPerTileHistograms)
    {
        for(uint binIndex = 0; binIndex < NUM_HISTOGRAM_BINS; ++binIndex)
        {
            InterlockedAdd(mergedHistogramUAV[binIndex], perTileHistogramSRV[tileIndex * NUM_HISTOGRAM_BINS + binIndex]);
        }
    }
}

Note that here we perform atomic operations on the merged histogram. You may do this on both group shared memory and unordered access views; clearly there is extra overhead in the latter. We now have a histogram for the whole image. Our job is now to convert that to a response curve. This is a function that maps from the input HDR image to a new LDR image. Wikipedia offers an easy explanation.

Computing the Response Curve

Again, more resources are needed:
  • A buffer holding a float for each histogram bin.
  • An unordered access view to write to it with.
  • A shader resource view for the whole-image histogram.
There is a small wrinkle in this code. We also include an extra step - the adjustment phase discussed in Page 14 of the Ward '97 paper. This is included only as an example to get you started. I use a 1x1x1 compute shader here as this is such a minimal operation that parallelism would gain little.


RWBuffer responseCurveUAV;

groupshared float frequencyPerBin[NUM_HISTOGRAM_BINS];
groupshared float initialFrequencyPerBin[NUM_HISTOGRAM_BINS];
[numthreads(1, 1, 1)]
void compute_histogram_response_curve_cs(uint3 globalIdx : SV_DispatchThreadID, uint3 localIdx : SV_GroupThreadID, uint3 groupIdx : SV_GroupID)
{
    // Compute the initial frequency per-bin, and save it
    float T = 0.0f;
    for(uint bin = 0; bin < NUM_HISTOGRAM_BINS; ++bin)
    {
        float frequency = float(mergedHistogramSRV[bin]);
        frequencyPerBin[bin] = frequency;
        initialFrequencyPerBin[bin] = frequency;
        T += frequency;
    }

    // Naive histogram adjustment. There are many, many such histogram modification algorithms you may seek to employ - this is 
    // an example implementation that will no doubt later be changed
    // This is an implementation of page 14 of "A Visibility Matching Tone Reproduction Operator for High Dynamic Range Scenes"
    // There are other, better approaches, like Duan 2010: http://ima.ac.uk/papers/duan2010.pdf
    float recipDisplayRange = 1.0f / (log(outputLuminanceMax) - log(outputLuminanceMin));

    // Histogram bin step size - in log(cd/m2)
    float deltaB = (inputLuminanceMax - inputLuminanceMin) / float(NUM_HISTOGRAM_BINS); // Luminance values are already log()d

    float tolerance = T * 0.025f;
    float trimmings = 0.0f;
    uint loops = 0;
    do
    {
        // Work out the new histogram total
        T = 0.0f;
        for(uint bin2 = 0; bin2 < NUM_HISTOGRAM_BINS; ++bin2)
        {
            T += frequencyPerBin[bin2];
        }

        if(T < tolerance)
        {
            // This convergence is wrong - put it back to the original
            T = 0.0f;
            for(uint index = 0; index < NUM_HISTOGRAM_BINS; ++index)
            {
                frequencyPerBin[index] = initialFrequencyPerBin[index];
                T += frequencyPerBin[index];
            }
            break;
        }

        // Compute the ceiling
        trimmings = 0.0f;
        float ceiling = T * deltaB * recipDisplayRange;
        for(uint bin3 = 0; bin3 < NUM_HISTOGRAM_BINS; ++bin3)
        {
            if(frequencyPerBin[bin3] > ceiling)
            {
                trimmings += frequencyPerBin[bin3] - ceiling;
                frequencyPerBin[bin3] = ceiling;
            }
        }
        T -= trimmings;

        ++loops;
    }
    while(trimmings > tolerance && loops < 10);

    // Compute the cumulative distribution function, per bin
    float recipT = 1.0f / T;
    float sum = 0.0f;
    for(uint bin5 = 0; bin5 < NUM_HISTOGRAM_BINS; ++bin5) // By now you are wondering about bin1, bin2, bin3 etc. It is a shader compiler thing.
    {
        float probability = frequencyPerBin[bin5] * recipT;
        sum += probability;
        responseCurveUAV[bin5] = sum;
    }
}



And we now have a response curve to apply to our image!

Applying the Response Curve

Again, we need one more new resource: A shader resource view to access the response curve buffer. The code is again quite simple. Here, we're running a compute shader over tiles of the image. I've also included an MSAA resolve and gamma correction step, as these are features you will probably need to implement too in a next-generation linearly correct renderer. Note, that you ideally tone-map before the MSAA resolve. I felt it was simpler to present the code this way around. Secondly, ideally your frame buffer would have separated chrominance from luminance and you would just scale the luminance. Again, I've presented it this way for a simple example.

#define TONE_MAP_GAMMA_TILE_SIZE (8)

Buffer responseCurveSRV;
RWTexture2D outputUAV;

[numthreads(TONE_MAP_GAMMA_TILE_SIZE, TONE_MAP_GAMMA_TILE_SIZE, 1)]
void main_cs(uint3 threadID : SV_DispatchThreadID)
{
    float3 sourceRGB = 0;
    for(uint sampleIndex = 0; sampleIndex < NUM_MSAA_SAMPLES; ++sampleIndex)
    {
        sourceRGB += sourceTextureSRV.Load(int2(threadID.xy), sampleIndex).rgb;
    }
    sourceRGB /= NUM_MSAA_SAMPLES;

    // Work out response curve response
    uint bin = rgbToHistogramBin(sourceRGB);
    float scalar = bin < NUM_HISTOGRAM_BINS ? responseCurveSRV[bin] : 1.0f;
    sourceRGB *= scalar;

    outputUAV[threadID.xy] = float4(saturate(pow(sourceRGB, 1.0f / 2.2f)), 1.0f));}

That's It

And there we have it. A simple, bare-bones implementation to illustrate how these things may be implemented. Something to get the practitioner up and running.

Thursday 25 April 2013

Distant Shadowing "brainfart"

Recently, I've been having a little bit of a think about how to handle realtime distant shadowing in games, in order to avoid shadow baking.

Clearly, it's entirely possible to use some kind of cascaded shadow map extending all the way out. But, this soon ends up getting expensive in terms of textures and rendering. Alternatively, you might consider using something like stencil shadow volumes, and using the extended compute capabilities of modern GPUs to clip down the shadow volume so it doesn't consume enormous amounts of rasterisation or clipping time.

Alternatively, I've been thinking a lot about reprojection. Let's say you interrogate your cascaded shadow map and use that to build a screen-space mask of the shadow response. You want soft(ish) shadows, so you commit a lot of PCF samples.

How about reprojecting last frame's shadow mask into this frame's shadow mask, then filling in the holes with some simple filtering operations? Distant shadows are typically pretty low-frequency things, so you could well get away with some blurring and imperfection in the shadow mask. Then, you need only render the geometry that became newly visible this frame into the shadow map. You could just subtract one frame's light frustum from the other, yielding a convex polytope, and only render the intersecting objects.

Just a brainfart...

Thursday 28 March 2013

Thoughts on VPL Generation Using Cube Maps

Recently, I've been doing a lot of research and thinking about realtime GI and its related ideas. A common theme in realtime GI is the use of VPLs, a way to approximate the radiance bouncing off a surface after interacting with its material. A VPL is only formed where there is a definite light-to-surface interaction. The VPL can therefore be used to simulate light bouncing off geometry.

One technique in use today for generating VPLs is the Reflective Shadow Map algorithm. Here, we render the scene from the light's point of view, and every pixel in the output image represents a surface receiving primary illumination from the light source. This is obviously a fairly efficient way of finding all the surfaces that receive radiance from a given light source without too much wastage. You can then construct VPLs using that image, reduce the working set and inject them into some lighting algorithm to bounce light around your scene.

The difficulty is, this may not scale well as each new light source would require a new RSM render.

One alternative I considered was the possibility of using a cube map to sample the primary illumination from multiple lights simultaneously. You would put your cube map at your camera position, or some other meaningful point-of-interest. You then render the scene into each cube map face. As you render it, you light it from all of your light sources simultaneously, with shadow maps too if desired. Obviously this would be an expensive render.. but it would only be done once, you can vary the resolution of it and you can always use techniques like tiled lighting to accelerate it if you have many lights.

When complete, this cube map holds the radiance at the first light bounce for many surfaces in the scene near to the point of interest - ie the camera. It contains samples for surfaces all around - both in front of and behind the camera, which is important for GI. You may then use this cube map to generate VPLs for further bounce lighting.

Now, this is as-yet a completely untested idea. I will get round to it, though. I thought I'd throw it out there as an idea for using a commonly-accelerated rendering structure to try to help the scalability of realtime GI solutions.

Tuesday 19 February 2013

C++ 11 Experiences

I've recently been developing a lot of code in C++ 11. I find it a huge and useful improvement over previous version of C++; here's some observations on some of the features in it that I've found rather valuable.

emplace

This simple, easily-overlooked addition to the STL has to be one of the most practically useful features of C++ 11. In itself, it does very little: it constructs a new object in-place inside a container. This in itself is very useful for optimisation purposes. However, the real advantage lies in semantics, rather than optimisation.

emplace() means that you are no longer forced to include copy semantics into classes that really, really do not want them in order to place (copy) them into a container type. If the container implements some emplace() functionality, you are free to then delete all copy assignment and copy construction operations, only providing move assignment and construction instead.

This is a huge boon for classes such as resource classes, where assignment or copy can sometimes be a very unnatural and dangerous operation. With emplace() and move semantics, you are then free to create resource classes that make extensive use of const and RAII, ensuring that only one class ever binds to a particular resource.

Lambdas for Complex Initialisation

How often have you written code such as:

SomeType blah = unwantedDefaultValue;
if(x)
{
    blah = a; // In reality this would be much more complex
}

else if(y)
{
    blah = b;
}

else
{
    blah = c;
}


Which means that "blah" is not const, when it really needs to be, and you have a big chunk of complex code polluting the flow of some code. Typically, you even have the same thing near-duplicated.

You can always factor the big initialisation statement into a function, and assign it to a const variable... but often to do so artificially externalises details you may prefer not to.

With lambdas, you can now factor all that into a lambda expression, capturing nearby variables or passing different values as a parameter:

const auto initialiser = []()
{
    if(x)

    {
        return a;
    }

    else if(y)
    {
        return b;
    }

    else
    {
        return c;
    }

};

...

const SomeType blah = initialiser();
const SomeType blah2 = initialiser2(x, y);
const SomeType blah3 = initialiser3({1, 2, 3, 4});
const SomeType blah4 = initialiser3({});

And so on. This can make the code a little more focused, more const-safe, less duplicated and easier to step through and debug.

Avoid Ternary Expressions Using Lambdas

A simple one. Ternary expressions can be a little difficult to debug, especially when they are nested:

const int a = d < e ? f :
              g < h ?: i :
              j < k  ? m : n;

And so on. Whilst I don't advocate it, it does occur. Along the lines of the above point, using a lambda creates code you can place breakpoints on and debug easily, at no performance loss.

Reduction of Code Duplication Using Lambdas

Lambdas are very useful to reduce internal code duplication within a function. This perhaps seems a little excessive... but it can help because it factors out the details of a particular operation into one place, leaving the rest of the code simpler. Again, this avoids the problem of artifical external functions that reduce encapsulation... though I do not propose that everything be written as giant functions containing lambdas!

What's more, I've found that the compiler is generally smart enough to inline the expansion of these lambas as appropriate. So there is no performance penalty!

Move Semantics For Resource Assignment

A common issue I face with code is in creation code, where one function creates a resource, passes it to another object which is assumed to take responsibility for its lifetime management and destruction. However, often the constructor just takes a pointer to an object - this declaration is less than clear on what it will do with that resource. This doesn't tell you whether it'll just hold that reference, or manage it.

I find move semantics can help here, a little. A constructor which has an rvalue reference to some resource declares that it will acquire the resource, hold a reference to it, but also divest the calling code of that object. This perhaps makes it a little clearer that ownership of the resource is being transferred, not simply passed.

Adding the secret sauce of std::unique_ptr<> makes lifetime management even more painless.

Auto

Auto seems one of the potentially more divisive additions to C++ 11. At first, I was unsure. I can see how it can avoid template craziness... but would the lack of explicit type names make things harder to debug?

So far, I generally use auto by default as much as I possibly can. I find that if your code is making good use of functions and constructors, they tend to make it fairly clear what types are being constructed. Furthermore, it makes the code much more amenable to change and refactoring, as it reduces the number of times you have to name a particular type. This seems to be a good thing.

The main downsides of it are that you have to be extremely careful to remember to include & to make some types a reference. If you do not, then you end up implicitly copying objects around, which can lead to performance, correctness or compilation issues.

Range-based for

This one's not such a big deal. It can be useful, as it can save on code verbosity... but so often, you need to iterate over multiple containers simultaneously, and that's where range-based for ceases to be useful.

constexpr

This is a nice simple win. This at least means you don't have to start making trivial functions into templates to get some compile-time optimisations. It's unfortunate that it insists on the single-line return statements, as this can mean things such as data tables spill out of the scope of the function.

Class Member Initialisation

I find that being able to assign values to class instance members inside the class declaration to be enormously useful. This allows you to vastly simplify your constructors, especially when you have multiple constructors. This removes a lot of duplication, and allows you to instead implement only the specialisations. This seems to be the right way round to me.

Promises, and Futures

By and large, these are quite nice facilities. The big thing they lack though is the ability to poll a future. This seems like quite a major omission, and can make them difficult to use for some algorithms. As such, I tend to find myself rolling my own cut-down variants for many purposes.


In summary, it's very agreeable. No real change is mandated, so you simply opt in or opt out of as many or as few new features you'd like to employ. I'm a convert.

Friday 2 November 2012

Job Graphs in C++ 11

Introduction

I've recently been experimenting with a lot of the new features in C++ 11, purely for discovery and experimentation. One major new feature are the threading libraries. As using is the best way of learning, I set upon the task of writing a currently-fashionable task/job-graph system using these new features.

I'm not promoting this algorithm as the one true approach for parallelism, as there are many other valuable algorithms that are much more suitable in certain occasions. This one seems a useful, generic solution that fits many cases fairly well. In the process, I found a few interesting quirks during implementation that I thought may be interesting for others.

Job Graphs

Job graphs are currently finding wide favour as an approach to the challenge of building a flexible architecture for multithreaded processing. In this scheme, jobs are stored in a graph structure. The edges of the graph are used to represent ordering and data dependencies between jobs. Assuming the graph is structured correctly, this ensures that work is processed in the correct order to ensure the correct and timely flow of data. Nodes that do not share a data dependency may be executed concurrently. The programmer shoulders the responsibility of structuring the graph correctly, and ensuring any conflicts between resources are either resolved via the graph, or using some locking mechanism.

Typically, the jobs are pushed into some kind of queue and distributed to a thread pool. Ideally you would also use work-stealing for maximum efficiency.

The goal is to use this graph structure to allow non-dependent work to execute concurrently, saturating all hardware threads. It is a very simplistic idea, yet at its most it is NP-hard to solve.

C++ 11 Threading Overview

The C++ 11 threading library is really quite a lightweight, spare API. Broadly, it offers the facilities to:
  • Create and manage threads in a cross platform fashion.
  • Synchronise activity between these threads using simple primitives such as mutexes, condition variables and promises/futures.
  • Package up tasks to be executed on a thread.

Implementation

The implementation of a job graph requires relatively few components:
  • A job class
  • A job graph container
  • A job pool
  • A thread pool

Job Class

The job class is fairly straightforward. It serves two purposes: to package up some functionality to be executed, and have knowledge of precursor jobs and dependent jobs (this concept is sometimes described as continuations).

The major consideration in the design of the job class is the mechanism used to sequence their operation. Ideally, you want to use some kind of OS threading primitive as opposed to some kind of user-space mechanism. The OS is in a far better position to manage threads as it has clear information of the state and dependencies of all of them, and can wake and sleep threads with minimal context switches. User space code often ends up with some kind of inefficient wake-predicate-sleep polling pattern causing a lot of wasted time and context switches.

I therefore chose to use a C++ 11 thread library primitive: std::promise and std::future. A promise provides the ability to calculate some value asynchronously, in a deferred context, in the future. It represents a contract for some piece of code to provide a value based on some inputs. A future is the mechanism used to retrieve that value. The promise is the producer, the future is the client.

The promise and future abstraction provides  a good mechanism to couple jobs together. A job can provide a "future" to each of its dependent jobs. The dependent job uses this future to wait for the job's completion. The future could pass data, or it could solely be used to sequence computation.

A second feature of C++ 11 that proves useful for the job class is std::function and lambda functions. An std::function is simply a wrapper for some callable target. My job class contains one of these to hold the job's processing kernel. It is initialised in the constructor, which allows me to pass in a lambda function when creating the job. This provides an interesting alternative to using inheritance and virtual functions, and the resultant proliferation of classes. Now, I have just one Job class, and the data handles the variation.

Job.h:
        class Job;

        //! This monitors the completion of a number of jobs, and sends out a signal when they're all done
        class JobCompletionMonitor
        {
        public:
            JobCompletionMonitor(void) :
                m_numJobsPending(0)
            {
            }

            // Forbidden constructors
            JobCompletionMonitor(JobCompletionMonitor&) = delete;
            JobCompletionMonitor& operator=(JobCompletionMonitor&) = delete;

            // Jobs tell the monitor what's going on
            void notifyStart(Job* job);
            void notifyFinish(Job* job);

            // Other people can find out when they're all done
            void waitForCompletion(void);

            bool allJobsFinished(void) const;

        private:
            std::atomic_int m_numJobsPending;
            std::condition_variable m_allJobsComplete;
            std::mutex m_mutex;
        };

        //! A self-contained job that is executed somewhere in the frame
        class Job : public objects::CoreObject
        {
        public:
            // Priority system
            typedef unsigned int Priority;
            static const Priority HighPriority = 1000;
            static const Priority AboveNormalPriority = 5000;
            static const Priority NormalPriority = 10000;
            static const Priority BelowNormalPriority = 15000;
            static const Priority LowPriority = 20000;

            // Permitted constructors
            Job(const resources::NameManager::Name& name,
                const std::initializer_list<std::shared_ptr<Job>>& precursorJobs = {},
                std::function<void(float)> kernel = [](const float){},
                const Priority priority = NormalPriority);
            ~Job();

            // Forboidden constructors
            Job(Job& job) = delete;
            Job& operator=(Job& job) = delete;

            // Management of dependents
            void addDependent(const std::shared_ptr<Job>& dependent);
            void removeDependent(const std::shared_ptr<Job>& dependent);

            // Execution API
            void prepare(JobCompletionMonitor* monitor);
            bool canRun(void) const;
            void execute(const float timeStep);
            void cleanup(void);

            // Accessors
            inline const std::vector<std::shared_ptr<Job>>& getDependents(void) const
            {
                return m_dependents;
            }

            inline std::vector<std::shared_ptr<Job>> getPrecursorJobs(void) const
            {
                return m_precursors;
            }

            inline bool operator<(const Job& other) const
            {
                return m_priority < other.m_priority;
            }

        private:
            // Connection to other jobs
            std::vector<std::shared_ptr<Job>> m_precursors; //!< The job that must complete before we can run. We are a continuation of this job
            std::vector<std::shared_ptr<Job>> m_dependents; //!< A list of all the jobs that continue after us

            // Futures to sequence processing
            std::vector<std::future<int>> m_precursorFutures; //!< The future that is used to tell us when to start - if we have a precursor
            std::vector<std::promise<int>> m_dependentPromises; //!< The promises we fulfill to trigger other continuations

            // Override this function to implement your logic
            std::function<void(const float)> m_kernel; //!< A simple routine to call to implement the logic for this task

            // What is the execution priority of this job?
            Priority m_priority; //!< Execution priority of this job

            JobCompletionMonitor* m_completionMonitor = nullptr; //!< This object monitors the job. The job must notify it on completion
        };

Job.cpp:
        //! Have all of our jobs now finished?
        bool JobCompletionMonitor::allJobsFinished(void) const
        {
            return m_numJobsPending == 0;
        }

        //! The job tells the monitor when it starts
        void JobCompletionMonitor::notifyStart(Job* job)
        {
            INC_RT_ASSERT(job != nullptr);
            ++m_numJobsPending;
        }

        //! The job notifies the monitor that it is complete
        void JobCompletionMonitor::notifyFinish(Job* job)
        {
            INC_RT_ASSERT(job != nullptr);
            --m_numJobsPending;
            if(m_numJobsPending == 0)
            {
                m_allJobsComplete.notify_all();
            }
        }

        //! Wait until all the jobs managed by this monitor are complete
        void JobCompletionMonitor::waitForCompletion(void)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_allJobsComplete.wait(lock);
            INC_RT_ASSERT(m_numJobsPending == 0);
        }

        //! Construction
        Job::Job(const resources::NameManager::Name& name,
                 const std::initializer_list<std::shared_ptr<Job>>& precursorJobs,
                 std::function<void(float)> kernel,
                 const Priority priority) :
            objects::CoreObject(name),
            m_precursors(precursorJobs),
            m_dependents(),
            m_dependentPromises(),
            m_kernel(kernel),
            m_priority(priority)
        {
        }

        //! Destruction
        Job::~Job()
        {
            // Add some assertions here to ensure the job has been correctly deleted
        }

        // Notify this job of a new dependent that expects to be told when this has finished
        void Job::addDependent(const std::shared_ptr<Job>& dependent)
        {
            INC_RT_ASSERT(dependent != nullptr);

            // If this job isn't already a dependent, then add it
            if(std::find(m_dependents.begin(), m_dependents.end(), dependent) == m_dependents.end())
            {
                m_dependents.push_back(dependent);
            }
        }

        // Remove a dependency from this job
        void Job::removeDependent(const std::shared_ptr<Job>& dependent)
        {
            INC_RT_ASSERT(dependent != nullptr);

            // If this job isn't already a dependent, then add it, and re-issue futures
            auto it = std::find(m_dependents.begin(), m_dependents.end(), dependent);
            if(it != m_dependents.end())
            {
                // Remove this dependent
                m_dependents.erase(it);
            }
        }

        //! Reset the job ready for graph traversal and execution
        void Job::prepare(JobCompletionMonitor* monitor)
        {
            INC_RT_ASSERT(monitor != nullptr);
            m_completionMonitor = monitor;

            // Ask our precursors for a future
            for(auto& precursor : m_precursors)
            {
                precursor->m_dependentPromises.push_back(std::promise<int>());
                m_precursorFutures.push_back(precursor->m_dependentPromises.back().get_future());
            }
        }

        //! Clean up after execution. Remove all the promises and futures
        void Job::cleanup(void)
        {
            m_precursorFutures.clear();
            m_dependentPromises.clear();
        }

        // Can this job run? Have all of our futures been satisfied?
        bool Job::canRun(void) const
        {
            for(auto& it : m_precursorFutures)
            {
                if(it.valid() == false)
                {
                    return false;
                }
            }

            return true;
        }

        // Actually run this job
        void Job::execute(const float timeStep)
        {
            // Wait to receive the promised values from all precursor jobs
            for(auto& future : m_precursorFutures)
            {
                future.wait();
                (void)future.get();
            }

            // Do our actual work
            m_kernel(timeStep);

            // Fulfill promises to our children
            for(auto& promise : m_dependentPromises)
            {
                promise.set_value(1);
            }

            m_completionMonitor->notifyFinish(this);

            // Now that we are done, clear out all of the stale old futures we were waiting for -
            // prepare() will re-issue them next time around
            m_precursorFutures.clear();
        }

Job Graph

The job graph really has few responsibilities: To contain the jobs, traverse the graph and manage the resources needed for execution.

The job graphs are contained in a simple graph structure. I may later shift to a more capable container such as boost::graph

The main task at hand for the job graph is to traverse the graph, pushing all of the jobs into an ordered pool ready for execution. A thread pool then pulls jobs from this pool when they are ready for execution.

JobGraph.h:
        // A graph structure holding all the jobs and conditions
        class JobGraph
        {
        public:
            JobGraph(const size_t numThreads);
            ~JobGraph();

            void add(const std::shared_ptr<Job>& job);
            void remove(const std::shared_ptr<Job>& job);

            void execute(const float timeStep);

        private:
            // At the start of the frame, the job graph is traversed and all the jobs to be processed are placed into this pool
            JobPool m_jobPool;

            // This is a pool of threads which work on executing those jobs
            // Threads are free to steal jobs from the overall pool to keep things ticking forwards
            ThreadPool m_threadPool;

            // This is a special root-level job
            std::shared_ptr<Job> m_rootJob;

            // Need to measure when all jobs have completed
            // (not just dispatched - *completed*)
            JobCompletionMonitor m_jobCompletionMonitor;

            // Mutually exclude operations like construction, adding jobs and execution
            std::mutex m_mutex;

            // Does a (parent) job exist within the graph?
            std::unordered_map<Job*, bool> m_jobContained;
        };

JobGraph.cpp:
        // A generic minimal visitor pattern
        class JobGraphVisitor
        {
        public:
            virtual ~JobGraphVisitor() {}

            void traverse(Job* job)
            {
                INC_RT_ASSERT(job != nullptr);

                visit(job);

                for(auto child : job->getDependents())
                {
                    traverse(child.get());
                }
            };

        private:
            virtual void visit(Job* job) {}
        };

        // Visit with a view to pushing a job to the task pool
        class PushJobVisitor : public JobGraphVisitor
        {
        public:
            inline PushJobVisitor(JobPool& jobPool, const float timeStep, JobCompletionMonitor* jobCompletionMonitor) :
                m_jobPool(jobPool),
                m_timeStep(timeStep),
                m_jobCompletionMonitor(jobCompletionMonitor)
            {
            }

        private:
            JobPool& m_jobPool;
            const float m_timeStep;
            JobCompletionMonitor* const m_jobCompletionMonitor;
            std::unordered_map<Job*, bool> m_jobPushed;

            virtual void visit(Job* job) override
            {
                if(m_jobPushed[job] == false)
                {
                    m_jobCompletionMonitor->notifyStart(job);
                    m_jobPool.push(job);
                    m_jobPushed[job] = true;
                }
            }
        };

        // Reset jobs in preparation
        class ResetJobVisitor : public JobGraphVisitor
        {
        public:
            inline ResetJobVisitor(JobCompletionMonitor* jobCompletionMonitor) :
                m_jobCompletionMonitor(jobCompletionMonitor)
            {
            }

        private:
            JobCompletionMonitor* const m_jobCompletionMonitor;

            virtual void visit(Job* job) override
            {
                job->prepare(m_jobCompletionMonitor);
            }
        };

        //! Post-execution by-product clean-up
        class CleanUpJobVisitor : public JobGraphVisitor
        {
        public:
            inline CleanUpJobVisitor(void)
            {
            }

        private:
            virtual void visit(Job* job) override
            {
                job->cleanup();
            }
        };

        // Construction
        JobGraph::JobGraph(const size_t numThreads) :
            m_jobPool(),
            m_threadPool(numThreads, m_jobPool),
            m_rootJob(new Job(resources::getUniqueName("Root Job")))
        {
            INC_RT_ASSERT(numThreads != 0);
            std::unique_lock<std::mutex> lock(m_mutex);
        }

        // Destruction
        JobGraph::~JobGraph()
        {
            INC_RT_ASSERT(m_jobPool.jobAvailable() == false);
            m_jobPool.shutdown();
            m_threadPool.shutdown();
            std::this_thread::yield();
        }

        // Add a job to the graph
        void JobGraph::add(const std::shared_ptr<Job>& job)
        {
            INC_RT_ASSERT(job != nullptr);
            std::unique_lock<std::mutex> lock(m_mutex);

            if(job->getPrecursorJobs().size() == 0)
            {
                // A null precursor means that it's a root level task that lives under our
                // special root job
                m_rootJob->addDependent(job);
            }
            else
            {
                // Assert to ensure that the parent job actually exists within the graph!
                for(auto& precursor : job->getPrecursorJobs())
                {
                    INC_RT_ASSERT(m_jobContained[precursor.get()]);
                    precursor->addDependent(job);
                }
            }

            m_jobContained[job.get()] = true;
        }

        // Remove a job from the graph
        void JobGraph::remove(const std::shared_ptr<Job>& job)
        {
            INC_RT_ASSERT(job != nullptr);
            std::unique_lock<std::mutex> lock(m_mutex);

            if(job->getPrecursorJobs().size() == 0)
            {
                m_rootJob->removeDependent(job);
            }
            else
            {
                for(auto& precursor : job->getPrecursorJobs())
                {
                    precursor->removeDependent(job);
                }
            }

            m_jobContained[job.get()] = false;
        }

        // Execute all jobs in the graph
        void JobGraph::execute(const float timeStep)
        {
            std::unique_lock<std::mutex> lock(m_mutex);

            // Make sure the job queue is fully empty from last time
            INC_RT_ASSERT(m_jobPool.jobAvailable() == false);

            // Get it all reset ready for execution
            {
                ResetJobVisitor resetJobVisitor(&m_jobCompletionMonitor);
                resetJobVisitor.traverse(m_rootJob.get());
            }

            // Traverse the whole job graph, pushing the jobs into the task pool
            {
                PushJobVisitor pushJobVisitor(m_jobPool, timeStep, &m_jobCompletionMonitor);
                pushJobVisitor.traverse(m_rootJob.get());
            }

            // (Note that tasks will be consumed asynchronously as soon as we start to push them)

            m_jobCompletionMonitor.waitForCompletion();
            
            // We should be all done now. Nothing should be here

            INC_RT_ASSERT(m_jobPool.jobAvailable() == false);

            // Get it all cleaned up ready for next time
            {
                CleanUpJobVisitor cleanUpVisitor;
                cleanUpVisitor.traverse(m_rootJob.get());
            }
        }

Job Pool

It is initially tempting to simply traverse the graph, push all of the jobs out for execution, and let the OS arbitrate everything. There are a few difficulties here. First of all, unless your target platform's runtime explicitly models tasks or jobs, this tends to lead to mass thread creation and destruction - a substantial overhead. Secondly, you may wish to cancel the execution of jobs after a precursor has completed. Thirdly, it makes little sense to dispatch many jobs for execution that have no chance of execution. They will just immediately hit a threading hazard and context-switch away. Finally, consider the case where you have N cores and N+1 jobs. Job 0 does some work, and jobs 1..N wait on job 0. Imagine if your target platform has a pooly-implemented runtime, and assigns jobs 1..N to cores instead of job 0. Those jobs may wait, and job 0 may not execute. This case is unlikely in a well-implemented runtime, but it is a risk, and why present code that is inherently at risk?

The job pool is simply an std::set of jobs, ordered by job priority. Whenever a thread in the thread pool wakes up, it pops the first (highest priority) job from the pool and executes it.

JobPool.h:
        // This represents a pool of tasks that need doing, for all threads to
        // pull from (and therefore work-steal from)
        class JobPool
        {
        public:
            JobPool(void);
            JobPool(JobPool&) = delete;
            JobPool& operator=(JobPool&) = delete;

            bool jobAvailable(void);
            void push(Job* job);
            Job* pop(void);

            void waitForJob(void);
            bool shouldTerminate(void);
            void shutdown(void);

        private:
            //! This gets notified when a task is pushed, to allow the other consuming
            //! threads to wake up
            std::condition_variable m_jobPushNotification;
            std::condition_variable m_queueEmptyNotification;

            //! We maintain a set of jobs, so that we can find the first, highest-priority job
            //! that is in an executable state
            std::set<Job*> m_jobs;

            //! This mutex is used to ensure thread safety when pushing a job in or pulling it out
            std::mutex m_mutex;

            //! Used when it's time to shut down
            std::atomic<bool> m_shouldTerminate;
        };

JobPool.cpp:
        // Construct it
        JobPool::JobPool() :
            m_shouldTerminate(false)
        {
        }

        // A consumer calls this function to wait for notification via the condition_variable of a new task
        void JobPool::waitForJob(void)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_jobPushNotification.wait_for(lock, std::chrono::milliseconds(100));

            // The above timeout means that spurious wakeup is possible
        }

        // Is there a job available in the set?
        bool JobPool::jobAvailable(void)
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            return m_jobs.empty() == false;
        }

        // Push a new job into the pool
        void JobPool::push(Job* job)
        {
            INC_RT_ASSERT(job != nullptr);
            std::lock_guard<std::mutex> lock(m_mutex);

            INC_RT_ASSERT(m_shouldTerminate == false);
            m_jobs.insert(job);
            m_jobPushNotification.notify_one();
        }

        // Try to return an executable job from the pool
        Job* JobPool::pop(void)
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            if(m_jobs.empty())
            {
                // Nothing to run!
                return nullptr;
            }
            else
            {
                // Find the highest priority job that can run
                for(auto job : m_jobs)
                {
                    if(job->canRun())
                    {
                        m_jobs.erase(job);
                        return job;
                    }
                }
            }

            // It is possible that we're asked for a job but none are able to run... yet
            return nullptr;
        }

        //! Should we terminate now?
        bool JobPool::shouldTerminate(void)
        {
            return m_shouldTerminate;
        }

        //! Send termination message
        void JobPool::shutdown(void)
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_shouldTerminate = true;
            m_jobs.clear();
            m_jobPushNotification.notify_all();
        }

Thread Pool

The purpose of the thread pool is to provide a persistent set of threads capable of executing jobs, saving on the cost of thread creation and destruction. These threads pull jobs from the job pool until they are asked to terminate.

Spurious Wakeup

The threads in the thread pool wait on an std::condition_variable to notify them on the availability of a job. It is important to consider the possibility of spurious wakeup in this system. Therefore, when a thread wakes up, it must ensure a job actually exists in the pool to be executed.

I actually deliberately allow the std::condition_variable::wait() to time out after 100 ms. The rationale for this is that some operating systems become suspicious of threads that sleep for long periods of time. For stability reasons, I prefer my system to wake up unneccesarily and to service the outer thread loop than wait for infinite amounts of time. This therefore deliberately introduces some degree of spurious wakeup into the system.

ThreadPool.h:
        // A pool of threads hungry to execute tasks
        class ThreadPool
        {
        public:
            ThreadPool(const size_t numThreads, JobPool& jobPool);
            ~ThreadPool();

            void shutdown(void);

        private:
            //! The set of threads in the thread pool
            std::vector<std::thread> m_threads;
        };

ThreadPool.cpp:
        // Construction / destruction
        ThreadPool::ThreadPool(const size_t numThreads, JobPool& jobPool)
        {
            for(size_t thread = 0; thread < numThreads; ++thread)
            {
                m_threads.push_back(std::thread([](JobPool* jobPool)
                    {
                        for(;;)
                        {
                            // Sleep, until a job is available
                            // (use a condition_variable here)
                            jobPool->waitForJob();

                            // Have we been told to shut down?
                            if(jobPool->shouldTerminate())
                            {
                                return;
                            }

                            // Pull a job out of the task pool and execute it
                            if(jobPool->jobAvailable())
                            {
                                // Whilst we'd prefer to execute task() here, instead we have to do
                                // the pop-and-execute as an atomic-like operation to avoid
                                // two threads popping the same task, or one thread popping
                                // a task that the other thread expected to receive
                                Job* job = jobPool->pop();
                                if(job != nullptr)
                                {
                                    // Clearly the correct time step should be plumbed here
                                    job->execute(1.0f / 30.0f);
                                }
                            }
                        }
                    }, &jobPool));
            }
        }

        ThreadPool::~ThreadPool()
        {
        }

        // Shutdown the threads. You must have told the JobPool to terminate
        void ThreadPool::shutdown(void)
        {
            for(auto& thread : m_threads)
            {
                thread.join();
            }
        }

Findings

The new C++ 11 libraries largely made this system easy and quick to implement. The libraries are quite clearly defined, unobtrusive, easy to use with few dependencies.

One of the main difficulties encountered in the implementation of this system is simply the life cycles of the C++ 11 std::promises and std::futures. Their life cycles are not clearly documented and they can be difficult to reason about. It was only by reading the STL code that I found these primitives fundamentally hold a "use and discard" policy. This leads to the need for some code to create and destroy these resources each frame. Note that this may possibly vary across platforms, but this seems the best least-common-denominator assumption. This is largely the reason I avoided the use of std::packaged_task. With things like move semantics transferring the object between threads and queues, it becomes very difficult to reason about the life cycle of the object! Furthermore, once you have your own job pool and thread pool, it largely obviates the need for this class. This class only seems useful for "go and load some assets on a thread" type work.

The system itself has thus far proven stable. I have unit tests for a variety of graph configurations, which all pass, and it successfully manages to run my current engine. This engine is in its infancy, and so has not massively load tested this system... but, ultimately, this is meant to be an illustrative, exploratory post!