Wednesday, 31 July 2013

C++/CLI Asynchronous events using EventHandler<T>

While developing some code to expose a native C++ library as a managed component to be used in a C# component. I found that I needed to raise an event asynchronously from the C++/CLI class. Since I have a preference to use EventHandler and EventHandler<TEventArgs> delegates for exposing events rather than define the delegate and the event separately.

So naively, as its been a while since I wrote C++ and my first time writing C++/CLI, I proceeded to simply define an event as shown in the following example snippet:
ref class EventSource
{
 public:
  event EventHandler^ SyncEvent;
}

And invoke it as in the following example snippet:

void EventSource::RaiseSyncEvent(void)
{
 SyncEvent->BeginInvoke(this, EventArgs::Empty, gcnew AsyncCallback(this, &EventSource::Callback), nullptr);
}
Two which I received the following compiler error:
Error    1    error C3918: usage requires 'EventSource::SyncEvent' to be a data member

In the case above this is caused by the fact that you cannot access the backing field for the event delegate. If we decompile the code with ILSpy we can see that the compiler generates the following C# code:

image

This is due to the fact that the event syntax used to define the SyncEvent is called trivial event. A trivial event is similar to a trivial property where by the compiler provides the backing store field and the assessor methods.

The C++/CLI compiler also provides a raise_xxxxxxx method as shown below, which is kind of neat as the C# compiler requires that you write the equivalent code to insure thread safety when accessing the delegate.

image

To get access to the underlying event delegate the nontrivial event syntax must be used. This requires the definition of the assessor methods. The add and remove are mandatory and the raise method is optional.

Here is where I discovered another small problem. There are no examples of how to use EventHandler and EventHandler<TEventArgs> delegates on the Microsoft web site. All of the examples use the explicit delegate and event definition syntax.

To define a non trivial event first define the private field/property.
ref class EventSource
{
 private:
  EventHandler^ m_asyncEvent;

Then define the explicit add and remove methods.
event EventHandler^ AsyncEvent
{
 void add(EventHandler^ eventHandler) 
 {
  m_asyncEvent += eventHandler;
 }

 void remove(EventHandler^ eventHandler) 
 {
  m_asyncEvent -= eventHandler;
 }
}

You can now invoke the event asynchronously via BeginInvoke, note you must check for null as the compiler no longer generates the nice raise method.
void EventSource::RaiseAsyncEvent(void)
{
 if(m_asyncEvent != nullptr)
 {
  m_asyncEvent->BeginInvoke(this, EventArgs::Empty, gcnew AsyncCallback(this, &EventSource::Callback), nullptr);
 }
}

Note that a call-back must provided to clean up the IAsyncResult instance by calling the EndInvoke method, see here for further reference.
void EventSource::Callback(IAsyncResult^ ar)
{
 Console::WriteLine("EventSource::Callback invoked on Thread Id: {0}", Thread::CurrentThread->ManagedThreadId);
 AsyncResult^ result = (AsyncResult^) ar;
    EventHandler^ caller = (EventHandler^) result->AsyncDelegate;
 caller->EndInvoke(ar);
}


The code is available here: GitHub AsyncEvent

Wednesday, 24 July 2013

A Bounded Task Dispatcher

During a recent implementation I had to resolve some information from a database in response to a network event. These network events are delivered in a serialised order via a custom protocol. In order to decouple the processing of event messages from the query of the database, which is an order of magnitude slower than the event rate, it was necessary to dispatch the event processing onto a separate thread. This would allow the events to be processed in parallel. A secondary requirement was to limit the number of parallel threads to some upper bound.

To execute event processing on a separate thread there are a number of options:
  1. Create a Thread explicitly and manage its lifecycle
  2. Use the ThreadPool to dispatch the request processing
  3. Use a BackGroundWorker to dispatch request processing
  4. Create a Task and manage its lifecycle
There are a number of disadvantages to the options 1 and 2, which basically boil down to the amount of plumbing code required to manage and synchronise thread execution and lifecycle. For example handling exceptions, return data etc.

Option 3 is not really a viable option, it is on the list just for completeness, I would not use, or recommend, the BackGroundWorker for dispatching requests for a backend component as it is primarily a UI component.

The .Net framework 4.0 introduced the System.Threading.Tasks namespace which contains a number of classes and types that simplify the work of writing concurrent and asynchronous code. The Task class is the type from System.Threading.Tasks that the bounded dispatcher is based on.

The Task dispatcher code walk through

The TaskDispatcher type encapsulates the management of the dispatched tasks. Looking at the constructor we can see it takes an argument that specifies the maximum number of tasks that can be dispatched in parallel.
/// <summary>
/// The <see cref="TaskDispatcher"/> constructor
/// </summary>
/// <param name="maxTasks">The maximum number of tasks that can be dispatched before the call to <seealso cref="Dispatch"/> blocks</param>
public TaskDispatcher(Int32 maxTasks)
{
 _executingTasks = new List<Task>();
 _tasks = new Semaphore(maxTasks, maxTasks);
 _tokenSource = new CancellationTokenSource();
}

The _executingTasks is a list that references the executing tasks. The _tasks Semaphore is used to bound the upper limit of Tasks that can be dispatched by the TaskDispatcher. The _tokenSource is a CancellationToken that is use to cancel executing tasks.

Dispatching Tasks

/// <summary>
/// Dispatches an <see cref="Action"/> for execution. 
/// </summary>
/// <param name="action">The <see cref="Action"/> to dispatch</param>
/// <param name="completionCallback">The callback action to invoke when the task completes</param>
/// <param name="dispatchTimeout">The <see cref="TimeSpan"/> to wait for a task to be dispatched</param>
/// <exception cref="TimeoutException">An exception thrown when the <see cref="action"/> cannot be dispatched within the <paramref name="dispatchTimeout"/> </exception>
public void Dispatch(Action<CancellationToken> action, Action<Task,Exception> completionCallback, TimeSpan dispatchTimeout)
{
 if (_tasks.WaitOne(dispatchTimeout))
 {
  lock (_executingTasks)
  {
   var task = new Task(() => action(_tokenSource.Token));
   task.ContinueWith(t => Completion(t, completionCallback));
   task.Start();

   _executingTasks.Add(task);
  }
 }
 else
 {
  throw new TimeoutException(String.Format("Unable to dispatch action within timeout {0}", dispatchTimeout));
 }
}
To dispatch a task the Dispatch method is called, passing a parameter action of type Action<CancellationToken> and a parameter completionCallback of type Action<Task, Exception> delegate which is invoked when the dispatched action delegate completes.

The thread invoking the Dispatch method attempts to enter the _tasks semaphore by calling the WaitOne method. If the semaphore is not signalled within the timeout period then an exception is thrown to indicate that a Task cannot be dispatched.

If the thread can enter the _tasks semaphore a Task is created passing a CancellationToken to the constructor, the CancellationToken is used to control the executing Tasks cancellation. The Task is added to the list of executing tasks, and the calling thread returns.

If there are multiple competing threads trying to enter the _tasks semaphore there is no guaranteed order as to which competing thread will be signalled. This is due to the behaviour of the semaphore. Also it is possible that to threads enter the Semaphore, thread A followed by thread B. Due to pre-emptive scheduling thread B may acquire the lock on the _executingTasks before thread A.

On Completion


/// <summary>
/// The completion callback thats called when a dispatch task was completed
/// </summary>
/// <param name="completedTask">The <see cref="Task"/> that completed</param>
/// <param name="completionCallback">The callback that is invoked when the dispatched <see cref="Task"/> executes to completion</param>
private void Completion(Task completedTask, Action<Task,Exception> completionCallback)
{
 _tasks.Release();

 lock (_executingTasks)
 {
  _executingTasks.Remove(completedTask);
 }

 completionCallback(completedTask, completedTask.Exception);
}
On completion of an instance of a Task the private method Completion is called. This method releases the _tasks semaphore and removes the Task from the _executingTasks list. It then invokes the completionCallback delegate which was provided to the completedTask when it was constructed. If there is an exception associated with the completed task it is passed to the completion call-back.

Cancelling Tasks

/// <summary>
/// Cancel all executing <see cref="Task"/>
/// </summary>
public void CancelAll()
{
 _tokenSource.Cancel();

 Task.WaitAll(_executingTasks.ToArray());
}
To cancel executing tasks the CancelAll method of the task dispatcher is invoked. This method sets the _tokenSource to cancelled. This sets the isCancellationRequested property to true. Dispatched actions should check this property for example:
static void CancellableTask(CancellationToken cancellationToken)
{
 for (int i = 0; i < 100000; i++)
 {
  // Simulating work.
  Thread.SpinWait(5000000);

  if (cancellationToken.IsCancellationRequested)
  {
   // Perform cleanup if necessary.
   //...
   // Terminate the operation.
   break;
  }
 }
}

Note: An errant task method that does not honour the CancellationToken will block the call to CancelAll method. This behaviour could of the CancelAll method could be modified by causing the Task.WaitAll to wait a period of time and then terminate the tasks that did not terminate with the timeout. This would be drastic and should be avoided, if it is possible modify the behaviour of the dispatched methods to hour honour the CancellationToken

Further enhancements


The code presented here is the basic functionality of a bounded task dispatcher some further improvements could be made. For example:

  • Make the entire Dispatch class generic so that the action parameter type can be specified for example as a Func delegate so a return value can be passed back from the dispatched method.

  • Add methods to allow individual tasks to be cancelled.
The code is available here: GitHub DumpingCoreMemory.Threading