Sunday, November 28, 2010

New Series Announcement: NServiceBus 3.0

My apologies for the long hiatus, I was trying to figure out where to go next after doing the "Getting Started" series. I think I've pretty much gotten through the basics with NServiceBus. After posting on SO and not getting an answer as to what the new features will be in NSB 3.0, I decided I'll take the time to dissect it myself. I guess I had forgotten this is a very DIY framework so I'll do my best to "discover" the new features. So my next post will be the first in a series about NSB 3. See you soon!

Sunday, November 7, 2010

Getting Started with NServiceBus Part 9: Horizontal Scaling with the Distributor

A single thread NServiceBus instance can handle quite of bit of messages. We're talking around 30 messages/second or roughly 2.5 million messages a day. It may be the case that if you need more throughput you can ramp up the number of threads NSB uses. If you would rather not do that and keep it single threaded but still scale out you can do so with the built-in Distributor process.  Check out the first slide at the end of the post for the general message flow.

Fundamentally the Distributor is comprised of at least 3 processes. The first is the Distributor process itself which holds onto the entire set of work that the Workers will operate upon. The reason the Distributor holds the work is so that if one of the Workers goes down, it can shift that work over to the other Worker(s). The other 2 processes are Worker processes that do the actual work and also report their status back to the Distributor. At its core the Distributor receives work from some other process and then dishes out the work based on the current status of the Workers.

The workflow is controlled by a few internal queues to the Distributor itself. The Distributor has a Control, Storage, and Data(work) queue. The Control queue is where the Workers will put their Ready messages. The Storage queue keeps track of the addresses of the Workers and is used to determine which Worker is available next for work. The Data queue keeps the set of work. All of these settings are kept in the app.config of the Distributor process.

    
    
    
    
    
    
    
    
    
    
     
    
    
    
    
Once the Distributor is configured we must tell each Worker how to communicate with the Distributor.

    
    
  
Once we have everything configured the workflow is a multi-step process that goes like this:

  1. A Worker starts up and places a Ready message on the Distributor's Control queue.  This message is read and pushes a message on the Storage queue to indicate that we have an available Worker.
  2. Step 1 is repeated for all Worker that start up.
  3. A Client issuing the work sends a message to the Data queue.
  4. The message is read from the Data queue and the Distributor pops the next available Worker from the Storage queue
  5. The message is then forwarded to the Worker and the Worker handles the message.
  6. Once the Worker is done it sends a Ready message back to the Control queue to signify that it is ready for more work.
  7. If the first Worker is still working as more work comes in, the next available Worker gets the next message.  If all Workers are busy, then the Distributor queues up the messages in the Data queue.
Check out the second slide in the deck at the end of the post to have a visual of the flow.

The Worker message Handlers are written just like any old NSB handler.  The only other missing component is that you will need to send the Distributor some work.  When you fire up all the processes you can send messages to the Distributor and watch the work load balance.  Since the Distributor holds all the work you may want to make the Distributor highly available and put it into a cluster.

I have a full working sample on github: https://github.com/afyles/Blog/tree/master/NServiceBusExplorer/. All you need to do is fire up the Distributor and the 2 Workers in the Workers folder under the Source directory.

Thursday, November 4, 2010

Powershell: Reading Messages from MSMQ

I'm throwing this out there since I couldn't find anything good online. We have admins that need to quickly look at messages in queues and the built-in tooling just isn't pretty to look at. Slap this PS code into a script file and you should be ready to rock!

param([string]$qname)

if ( [System.String]::IsNullOrEmpty($qname) )
{
    write-host "You must provide a queue name"
    exit
}

[Reflection.Assembly]::LoadWithPartialName("System.Messaging") | out-null

$q = new-object System.Messaging.MessageQueue($qname)

$msgs = $q.GetAllMessages()

foreach ( $msg in $msgs )
{
    $msg.BodyStream.Position = 0
    $sr = new-object System.IO.StreamReader( $msg.BodyStream )
    $sr.ReadToEnd()
    $input = read-host -prompt "Enter n for the next message, q to quit"
    
    if ( $input -eq "q" )
    {
        exit
    }
}

Monday, November 1, 2010

Getting Started with NServiceBus Part 8: Message Versioning

So now you've gone live with your first system based on NSB. Time goes by and you have a new requirement to add some data to a published event. How will we provide backwards compatibility? How will we upgrade? What if legacy subscribers don't care about the new information? Luckily for us NSB handles this given we modeled our messages as interfaces, giving us an inheritance structure we can use to our advantage. This article will how how easily this can be done.

Let's begin given our current message:
public interface IProductUpdatedEvent : IProductChangedEvent
    {
    }
    public interface IProductChangedEvent : IEvent
    {
        Int32 ProductNumber { get; set; }
        String Name { get; set; }
        String Description { get; set; }       
    }
Now let's say we want to add a new field to our event. We create the new event in a different namespace so we can use the same interface name and add our new field after we implement the old interface:
public interface IProductUpdatedEvent : Messages.IProductUpdatedEvent
    {
        Int32 DepartmentNumber { get; set; }
    }
So now we have two distinct messages that have different data sets. This means our legacy clients can continue to receive the legacy message and our new clients can receive the new message. Here is an example of a handler pointing to the new message type:
public class EventMessageHandler : IMessageHandler<Messages.New.IProductUpdatedEvent>
    {
        private static ILog log = LogManager.GetLogger(typeof(EventMessageHandler));

        public void Handle(Messages.New.IProductUpdatedEvent message)
        {
            log.Debug(String.Format("{0} Event Received for Product {1}: {2} : {3}", message.GetType().UnderlyingSystemType.Name, message.ProductNumber, message.Name, message.DepartmentNumber));
        }
    }
The rest of our Subscribers remain the same. All that is left is to update the Publisher to publish the new event. In the example code there is a Windows Forms app(UI) that pushes all events to a common Publisher endpoint. See the Documentation directory for a diagram. In the example all we have to do is add the new field and publish the new event.
Messages.New.IProductUpdatedEvent pu = new Messages.New.ProductUpdatedMessage
            {
                ProductNumber = Int32.Parse(textBoxUpdateID.Text),
                Description = textBoxUpdateDesc.Text,
                Name = textBoxUpdateName.Text,
                EventId = Guid.NewGuid(),
                Time = DateTime.Now,                
                DepartmentNumber = 10
            };

            FireEvent(pu);
If you pull down the sample and look in the PubSub directory, fire up the BusServer, UI, Subscriber1, and Subscriber3 projects. From the UI simple type in second group box and hit the "Update" button. This will send the new message to the "BusServer" Publisher endpoint which will do the actual Bus.Publish(). From there you will see Subscriber1 receive the legacy event and Subscriber3 receive the new event with the new data.

NSB allows us to gracefully support new message types and uphold backwards compatibility. We were able to introduce new data and rely upon inheritance to handle the rest. Full code can be found here: http://github.com/afyles/Blog/tree/master/NServiceBusExplorer/