A Modest Proposal: The Most Recent Update Pattern

As I’m currently working on the design of a new application, I have been reading up on design patterns lately. Using an established pattern to solve a problem that isn’t unique to your application is a great way to make your design more understandable to others and thus easier to maintain (so is writing unit tests, by the way).

For my particular problem, I was most interested in the Concurrency Patterns, though some them are pretty trivial (like basic locking, do we really need to call that a pattern?). Going over that list I found one pattern, I have used successfully in several multi-threaded applications processing large amounts of real-time data, missing. I have termed this, for the lack of a better term, the Most Recent Update Pattern.

Pattern Description

Let me explain what I mean by the Most Recent Update Pattern and where I see a use case for it, using as an example an application that processes stock quotes.

Use Case: Multi-threaded Producer Consumer Relationship

This application has an uplink to a market data provider (e.g. Bloomberg, Reuters) that provides it with a feed of quotes for thousands of different stocks. Data from this feed is used to update objects representing the stocks we’re interested in. The logic to read from the data feed and update the stock objects (henceforth referred to as data objects) is contained in the producer thread. Actually processing the stock quotes (e.g. doing some technical analysis on them) is very complex and time-consuming and therefore done in one or more separate threads a.k.a. the consumer threads. Each consumer tells the producer what data objects will be processing by subscribing them form the producer (also known as the observer pattern). Updates for the subscribed data objects are passed to the consumers via a message queue (one per consumer) where they wait until the consumer has time to process them.

Problem: Information Overload

If this was a simple FIFO queue and there would be a situation where the rate of new updates was greater than the rate at which updates are processed, the consumer might never get around to processing all updates and the application would eventually run out of memory. Even when the rate of incoming updates would go down and the consumer could catch up, there would probably be a lot of outdated updates in that queue. As the consumer is dequeuing updates from the head of the queue it would be unaware of newer updates waiting somewhere towards the tail of the queue.

Solution: Process only the most recent update

What you would really want in a situation like this, is to to tell the consumer which data objects have been updated, and when it has time to process one, it would do so using the most recent update for this data object. Thus it would ignore any updates there may have been in the mean time, but those would be outdated and thus irrelevant, anyway (i.e. no guaranteed message delivery). This way, the queue’s maximum size is bound by the number of data objects the consumer has subscribed. Also, when the consumer dequeues an update and begins its processing, it would always work with the most recent and up-to-date data.

The following graphic illustrates the objects involved, using the same terminology as the sample implementation below:

MostRecentUpdatePatternSample

Sample Implementation

Here is a sample implementation of the Most Recent Update pattern in C#. Since C# uses garbage collection of automatic memory management, we don’t have to worry about what will happen to updates that have been replaced by newer ones in the queue. I have, however, implemented this pattern in Delphi as well where one doesn’t have that luxury and has to keep track of the objects to discard them on the right thread. It’s actually a pretty simple modification, one that could be used in C# as well to implement the object pool pattern in order to avoid having to create and destroy so many update objects.

Update

The update class is merely a container to hold some value. For simplicity I have made that value a double, but it could be anything you like. It is important to note, though, that the Value field is set only once by the producer and any subsequent access to it by the consumer(s) is read-only. Therefore, we don’t need to have any locks or other synchronization mechanisms here.

class Update
{
    public Update(double value)
    {
        Value = value;
    }

    public readonly double Value;
}

Subscription

The subscription represents the producer consumer relationship and holds a reference the most recent update that has been produced but not yet consumed. The logic for accessing the the update field is contained in the SubscriptionQueue class and described in the section below.

internal class Subscription
{
    public  Update MostRecentUpdate;
}

SubscriptionQueue

The subscriptions queue contains most of the logic needed to implement the Most Recent Update pattern. Each consumer has its own queue that only this one consumer reads from, although multiple producers may write to the queue simultaneously. Since I am aiming for a lock-free design wherever possible, I am using the ConcurrentQueue<T> class from version 4 of the .NET framework.

This queue contains the subscriptions of objects that have been updated, but not the updates themselves. When there is an update for a subscription, the MostRecentUpdate field of the subscription is updated in the producer thread using Interlocked.Exchange(). This operation is atomic and thus thread-safe, guaranteeing that there is no simultaneous access to that field by the consumer without using locks.

This reference is set to null by the consumer when it has taken ownership of the update in TryDequeue() using Interlocked.Exchange() as well. Interlocked.Exchange() returns the previous value of the field, when a new value is placed into it. If the previous returned to the producer when enqueuing an update is null, the producer knows that the consumer has received the previous update and the subscription is to be queued up again. A non-null value means the subscription is still queued and a previous value has just been replaced. The consumer will thus never see the previous update.

class SubscriptionQueue
{
    private ConcurrentQueue<Subscription> _queue = new ConcurrentQueue<Subscription>();

    public void Enqueue(Subscription subscription, Update update)
    {
        if (null == Interlocked.Exchange(ref subscription.MostRecentUpdate, update))
        {
            _queue.Enqueue(subscription);
        }
        //
        // If the previous value of subscription.Update was something other than
        // null, the subscription was still queued and we just replaced the update
        // with a newer one
        //
    }

    public bool TryDequeue(out Update update)
    {
        Subscription subscription;
        update = null;
        //
        // The update on a subscription may be null (if it has been voided)
        // in which case we move on to the next subscription
        //
        while (null == update)
        {
            if (!_queue.TryDequeue(out subscription))
            {
                //
                // Stop when there are no more subscriptions
                //
                return false;
            }
            update = Interlocked.Exchange(ref subscription.MostRecentUpdate, update);
        }
        return true;
    }
}

DataObject

The DataObject class is pretty boring and not really part of the pattern’s core logic. It is provided here only as an example of how the SubscriptionQueue might be used.

class DataObject
{
    private Dictionary<SubscriptionQueue, Subscription> _subscriptions =
        new Dictionary<SubscriptionQueue, Subscription>();

    private double _value;
    public double Value
    {
        get
        {
            return _value;
        }
        set
        {
            _value = value;
            //
            // Post the same update object (because its data read-only) with the new value to 
            // each subscriber (TODO: use object pool for update objects)
            //
            PostUpdateToSubscribers(new Update(value));
        }
    }

    public void VoidValue()
    {
        PostUpdateToSubscribers(null);
    }

    private void PostUpdateToSubscribers(Update update)
    {
        foreach (KeyValuePair<SubscriptionQueue, Subscription> kvp in _subscriptions)
        {
            //
            // Enqueue the subscription (kvp.Value) with the update in the queue (kvp.Key)
            //
            kvp.Key.Enqueue(kvp.Value, update);
        }
    }

    /// <remarks>
    /// Call on producer thread only
    /// </remarks>
    public void Subscribe(SubscriptionQueue subscriber)
    {
        Subscription subscription;
        if (_subscriptions.TryGetValue(subscriber, out subscription))
        {
            throw new ArgumentException("Cannot add same subscriber twice", "subscriber");
        }
        subscription = new Subscription();
        _subscriptions.Add(subscriber, subscription);
    }

    /// <remarks>
    /// Call on producer thread only
    /// </remarks>
    public void Unsubscribe(SubscriptionQueue subscriber)
    {
        _subscriptions.Remove(subscriber);
    }
}

Conclusion

This article introduced the Most Recent Update Pattern, a pattern used to manage processing large numbers of updates in a multi-threaded application where guaranteed message delivery is not a requirement, but working only with the most up-to-date data is. The sample implementation has shown that this pattern can be implemented completely lock-free making it perfectly suited for high-throughput data applications working with real-time data.

About these ads

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s