During a recent implementation I had to resolve some information from a database in response to a network event. These network events are delivered in a serialised order via a custom protocol. In order to decouple the processing of event messages from the query of the database, which is an order of magnitude slower than the event rate, it was necessary to dispatch the event processing onto a separate thread. This would allow the events to be processed in parallel. A secondary requirement was to limit the number of parallel threads to some upper bound.
To execute event processing on a separate thread there are a number of options:
- Create a Thread explicitly and manage its lifecycle
- Use the ThreadPool to dispatch the request processing
- Use a BackGroundWorker to dispatch request processing
- Create a Task and manage its lifecycle
There are a number of disadvantages to the options 1 and 2, which basically boil down to the amount of plumbing code required to manage and synchronise thread execution and lifecycle. For example handling exceptions, return data etc.
Option 3 is not really a viable option, it is on the list just for completeness, I would not use, or recommend, the
BackGroundWorker for dispatching requests for a backend component as it is primarily a UI component.
The .Net framework 4.0 introduced the
System.Threading.Tasks namespace which contains a number of classes and types that simplify the work of writing concurrent and asynchronous code. The
Task class is the type from
System.Threading.Tasks that the bounded dispatcher is based on.
The Task dispatcher code walk through
The
TaskDispatcher type encapsulates the management of the dispatched tasks. Looking at the constructor we can see it takes an argument that specifies the maximum number of tasks that can be dispatched in parallel.
/// <summary>
/// The <see cref="TaskDispatcher"/> constructor
/// </summary>
/// <param name="maxTasks">The maximum number of tasks that can be dispatched before the call to <seealso cref="Dispatch"/> blocks</param>
public TaskDispatcher(Int32 maxTasks)
{
_executingTasks = new List<Task>();
_tasks = new Semaphore(maxTasks, maxTasks);
_tokenSource = new CancellationTokenSource();
}
The
_executingTasks is a list that references the executing tasks. The
_tasks Semaphore is used to bound the upper limit of Tasks that can be dispatched by the
TaskDispatcher. The
_tokenSource is a
CancellationToken that is use to cancel executing tasks.
Dispatching Tasks
/// <summary>
/// Dispatches an <see cref="Action"/> for execution.
/// </summary>
/// <param name="action">The <see cref="Action"/> to dispatch</param>
/// <param name="completionCallback">The callback action to invoke when the task completes</param>
/// <param name="dispatchTimeout">The <see cref="TimeSpan"/> to wait for a task to be dispatched</param>
/// <exception cref="TimeoutException">An exception thrown when the <see cref="action"/> cannot be dispatched within the <paramref name="dispatchTimeout"/> </exception>
public void Dispatch(Action<CancellationToken> action, Action<Task,Exception> completionCallback, TimeSpan dispatchTimeout)
{
if (_tasks.WaitOne(dispatchTimeout))
{
lock (_executingTasks)
{
var task = new Task(() => action(_tokenSource.Token));
task.ContinueWith(t => Completion(t, completionCallback));
task.Start();
_executingTasks.Add(task);
}
}
else
{
throw new TimeoutException(String.Format("Unable to dispatch action within timeout {0}", dispatchTimeout));
}
}
To dispatch a task the Dispatch method is called, passing a parameter action of type Action<CancellationToken> and a parameter completionCallback of type Action<Task, Exception> delegate which is invoked when the dispatched action delegate completes.
The thread invoking the Dispatch method attempts to enter the _tasks semaphore by calling the WaitOne method. If the semaphore is not signalled within the timeout period then an exception is thrown to indicate that a Task cannot be dispatched.
If the thread can enter the _tasks semaphore a Task is created passing a CancellationToken to the constructor, the CancellationToken is used to control the executing Tasks cancellation. The Task is added to the list of executing tasks, and the calling thread returns.
If there are multiple competing threads trying to enter the _tasks semaphore there is no guaranteed order as to which competing thread will be signalled. This is due to the behaviour of the semaphore. Also it is possible that to threads enter the Semaphore, thread A followed by thread B. Due to pre-emptive scheduling thread B may acquire the lock on the _executingTasks before thread A.
On Completion
/// <summary>
/// The completion callback thats called when a dispatch task was completed
/// </summary>
/// <param name="completedTask">The <see cref="Task"/> that completed</param>
/// <param name="completionCallback">The callback that is invoked when the dispatched <see cref="Task"/> executes to completion</param>
private void Completion(Task completedTask, Action<Task,Exception> completionCallback)
{
_tasks.Release();
lock (_executingTasks)
{
_executingTasks.Remove(completedTask);
}
completionCallback(completedTask, completedTask.Exception);
}
On completion of an instance of a Task the private method Completion is called. This method releases the _tasks semaphore and removes the Task from the _executingTasks list. It then invokes the completionCallback delegate which was provided to the completedTask when it was constructed. If there is an exception associated with the completed task it is passed to the completion call-back.
Cancelling Tasks
/// <summary>
/// Cancel all executing <see cref="Task"/>
/// </summary>
public void CancelAll()
{
_tokenSource.Cancel();
Task.WaitAll(_executingTasks.ToArray());
}
To cancel executing tasks the CancelAll method of the task dispatcher is invoked. This method sets the _tokenSource to cancelled. This sets the isCancellationRequested property to true. Dispatched actions should check this property for example:
static void CancellableTask(CancellationToken cancellationToken)
{
for (int i = 0; i < 100000; i++)
{
// Simulating work.
Thread.SpinWait(5000000);
if (cancellationToken.IsCancellationRequested)
{
// Perform cleanup if necessary.
//...
// Terminate the operation.
break;
}
}
}
Note: An errant task method that does not honour the CancellationToken will block the call to CancelAll method. This behaviour could of the CancelAll method could be modified by causing the Task.WaitAll to wait a period of time and then terminate the tasks that did not terminate with the timeout. This would be drastic and should be avoided, if it is possible modify the behaviour of the dispatched methods to hour honour the CancellationToken
Further enhancements
The code presented here is the basic functionality of a bounded task dispatcher some further improvements could be made. For example:
- Make the entire Dispatch class generic so that the action parameter type can be specified for example as a Func delegate so a return value can be passed back from the dispatched method.
- Add methods to allow individual tasks to be cancelled.
The code is available here:
GitHub DumpingCoreMemory.Threading