Monday, October 18, 2010

Getting Started with NServiceBus Part 7: Custom Performance Counters

Out of the box, NSB gives you a performance counter that tells you hold old the oldest message in the queue is. This gives you an idea as to how long messages are sitting in the queue. We decided to add a couple of new counters to tell us some other things about our performance.

The first one is what is the average processing time for a message? This is the time from when the message was pulled off the queue until the time the transaction completes. What this won't give us is granularity in determining which message handlers are moving slowly. Since most installations will have only a single or small amount of message handlers, this should be good enough for us.

The second counter is how many failures do I have per second? This is important since NSB has built-in retry logic. We have no visibility to failures unless we look at the log as processing occurs or waiting until the message moves to the error queue. We wanted to see quickly if we were processing slowing due to a lot of errors.

To get this going we can create a new profile similar to the NServiceBus.PerformanceCounters profile in NSB. First we create some classes to represent out monitoring:
    public class Monitoring : IProfile { }
    public class InstallMonitoring : IProfile { }
We have to have 2 profiles, one for running the counters and one for installing the counters. We'll cover the installation first. The first thing we need to do is remove the counters and reinstall them if they exist.
        public void ProfileActivated()
        {
            Logger.Debug("Starting installation of  monitoring");

            if (PerformanceCounterCategory.Exists(CategoryName))
            {
                Logger.Warn(String.Format("Category {0} already exists, going to delete it first", CategoryName));
                PerformanceCounterCategory.Delete(CategoryName);
            }

            CounterCreationDataCollection counterData = new CounterCreationDataCollection();

            counterData.AddRange(this.InstallAverageMessageProcessTimeCounter());
            counterData.AddRange(this.InstalledFailedMessageProcessingCounter());

            PerformanceCounterCategory.Create(CategoryName, "NServiceBus Monitoring", PerformanceCounterCategoryType.MultiInstance, counterData);

        }
Once we've ditched the counter we rebuild it by creating some counter data. I'll show one of the installs. To calculate the average time of something we need to do two things. We need to increment the time and also increment the basis of the event. The basis must be added after the average timer(you'll get an error if you don't).
        private CounterCreationDataCollection InstallAverageMessageProcessTimeCounter()
        {
            Logger.Debug("Starting installation of average process time monitoring");

            CounterCreationDataCollection counterData = new CounterCreationDataCollection();

            CounterCreationData avgTime = new CounterCreationData(AvgProcessTimeCounterName, 
                "Avg message processing time", 
                PerformanceCounterType.AverageTimer32);

            counterData.Add(avgTime);

            CounterCreationData avgBase = new CounterCreationData(AvgProcessTimeBaseCounterName, 
                "Avg message processing time base", 
                PerformanceCounterType.AverageBase);

            counterData.Add(avgBase);

            Logger.Debug("Installation of average processing time monitoring successful.");

            return counterData;
        }
Now that we have our counters installed, we need to create the profile to actual post to the counters. When your profile is activated, we'll tie into 2 key events in the NSB runtime, "StartedMessageProcessing" and "FinishedMessageProcessing". In doing so we can calculate the time that has elapsed and update our basis. We'll use a simple StopWatch to keep track of it all.
        private void MonitorAverageMessageProcessTime()
        {
            PerformanceCounter avgProcessTimeCounter = null;
            PerformanceCounter avgProcessTimeBaseCounter = null;
            Stopwatch watch = null;

            try
            {
                avgProcessTimeCounter = new PerformanceCounter(CategoryName, AvgProcessTimeCounterName, Program.EndpointId, false);
                avgProcessTimeBaseCounter = new PerformanceCounter(CategoryName, AvgProcessTimeBaseCounterName, Program.EndpointId, false);
            }
            catch (Exception e)
            {
                throw new InvalidOperationException("NServiceBus monitoring is not set up correctly. Running this process with the flag NServiceBus.InstallMonitoring should fix the problem.", e);
            }

            GenericHost.ConfigurationComplete +=
                (o, e) =>
                {
                    ITransport transport = Configure.Instance.Builder.Build<ITransport>();
                    transport.StartedMessageProcessing += (s, ea) => { watch = Stopwatch.StartNew(); };
                    transport.FinishedMessageProcessing +=
                        (s, ea) =>
                        {
                            watch.Stop();
                            avgProcessTimeCounter.IncrementBy(watch.ElapsedTicks);
                            avgProcessTimeBaseCounter.Increment();
                        };
                };
        }
Please note that since we are posting to a counter for each message this will somewhat degrade the performance. All we have to do now is run our process once with the "InstallMonitoring" profile to install the counters and then again with the "Monitoring" profile to post to the counters. You must have the process running before you see anything in PerfMon. When you fire PerfMon up, you should see something like this:

Since we installed the counters as Multi-Instance you should see multiple entries for each process running.  I have a simple Request/Response sample that I used to test this out.  Once you get sample running you can send a bunch of messages and start to see some data:

Sweet!  Now we have some additional ways we can monitor NSB.  As always, code can be found here: http://github.com/afyles/Blog/tree/master/NServiceBusExplorer/  Take a look at the "Profiles" project and the "RequestResponse" projects.

Friday, October 8, 2010

Getting Started with NServiceBus Part 6: MVC Music Store Saga(Workflow)

Last time were able to wire up the OrderAccepted event and publish to a single Shipping Subscriber.  This time around we are going to cast our net wider and include a Billing Subscriber.  This Subscriber will handle looking up the payment instrument for the order and then authorize that payment.  This presents a problem for our Shipping Subscriber as we cannot ship the product until we know the customer can pay for it.

NSB handles this situation quite well by implementing Sagas or workflow.  With a Saga we can tell the Shipping Subscriber to wait for the payment authorization to complete before picking and shipping the order.  NSB handles this by allowing us to store the state of the workflow in persistent storage(Saga Persister).  Since this is handled for us, all we need to do is reconcile the state of the workflow and mark it complete when we are done.

Let us begin by introducing a new entity, PaymentInstrument.  The easiest thing to do is create the table in the local DB and update the model from there.
Next we'll create the message that the Shipping Subscriber will receive once the payment has been authorized or rejected.
    public interface IAuthorizePaymentEvent : IEvent
    {
        Int32 OrderId { get; set; }
        Boolean Authorized { get; set; }
    }
In this message we simply tie back to the Order and send a flag giving us the status of the authorization. To complete the Billing Subscriber we create a new project and configure the endpoint AsA_Server and point to our Publisher, the Music Store command handler input queue. We need to add another endpoint to the config since we will be sending the Shipping Subscriber the IAuthroizePaymentEvent message. The mappings are as follows:

    
      
      
      
    
  
All we have left is to complete the handler for the published IOrderAcceptedEvent. In this handler we'll look up the user name on the order and the total. Based on this information we'll grab the payment instrument for the user and do some simple validation. Upon completion we'll send a message over to the Shipping Subscriber so it can continue the workflow.
      public void Handle(IOrderAcceptedEvent message)
      {
            MusicStoreEntities storeDB = new MusicStoreEntities();

            var order = storeDB.Orders.Single(o => o.OrderId == message.OrderId);

            var paymentInstrument = storeDB.PaymentInstruments.Single(pi => pi.UserName == order.Username);

            var authorizedEvent = this.Bus.CreateInstance<IAuthorizePaymentEvent>(p => p.OrderId = order.OrderId);

            // authorize the card...
            if (order.Total < 100)
                authorizedEvent.Authorized = true;
            else
                authorizedEvent.Authorized = false;

            this.Bus.Send(authorizedEvent);

        }
    }
Now we have a Billing Subscriber that is authorizing payment instruments and sending messages to our Shipping Subscriber. Now we have to do the work to make the Shipping Subscriber wait for the authorization. Let's begin by defining the data or state that we'd like to keep track of. At a bare minimum we need to keep track of the OrderId, the status of the payment authorization, and the status of the order(have we picked all the product?). NSB will also impose a few extra fields upon us as we implement the IContainSagaData interface.
    public class ShippingSagaData : IContainSagaData
    {
        public virtual Guid Id{ get; set; }
        public virtual String OriginalMessageId { get; set; }
        public virtual String Originator { get; set; }
        public virtual Boolean PaymentAuthorized { get; set; }
        public virtual Int32 OrderId { get; set; }
        public virtual Boolean OrderPicked { get; set; }
    }
Note that all the properties are marked as virtual. NSB uses NHibernate to store the Saga so we must give NH access to our properties. If you run the code without this it will complain. When implementing a Saga in NSB, your Saga becomes the handler and distributor of many messages. NSB needs to know how to start the Saga, which messages to handle, and what kind of data to store. The class definition says it all:
public class ShippingSaga : Saga<ShippingSagaData>,
        IAmStartedByMessages<IOrderAcceptedEvent>,
        IHandleMessages<IAuthorizePaymentEvent>,
        IHandleMessages<IOrderPickedEvent>
There is a new message here that we didn't have before. We need to know when we have completed picking the product from our shelves so we've introduced the IOrderPickedEvent.
    public interface IOrderPickedEvent : IEvent
    {
        Int32 ShippingNoteId { get; set; }
        Int32 OrderId { get; set; }
    }
The first thing we have to do is tell NSB how to look up our Saga. In other works, what makes a Saga unique? In our case we link everything up by OrderId. To make sure everything is unique we map all the message OrderIds to the Saga data OrderId.
        public override void ConfigureHowToFindSaga()
        {
            base.ConfigureMapping<IOrderAcceptedEvent>(s => s.OrderId, e => e.OrderId);
            base.ConfigureMapping<IAuthorizePaymentEvent>(s => s.OrderId, e => e.OrderId);
            base.ConfigureMapping<IOrderPickedEvent>(s => s.OrderId, e => e.OrderId);
        }
All of this is based off of the Order being accepted so our Saga starts by handling that event. We'll default some Saga data and then kick of the picking process.
        public void Handle(IOrderAcceptedEvent message)
        {
            base.Data.PaymentAuthorized = false;
            base.Data.OrderId = message.OrderId;
            base.Bus.SendLocal<IPickOrderCommand>( p => p.OrderId = message.OrderId);
        }
The interesting thing here is the Bus.SendLocal() method. What this does is put a message on the current endpoint's queue. We send a message to ourselves to kick of the pick process. The handler for this was detailed last time as we checked our inventory positions for the product and created ship notes. While we are picking the product, the payment is being authorized. If the payment comes back unauthorized we'll simply restock the order. Once we have picked the order we can check to see if we can let the order out the door.
        public void Handle(IOrderPickedEvent message)
        {
            base.Data.OrderPicked = true;
            this.TryComplete();
        }

        private void TryComplete()
        {
            if (base.Data.OrderPicked && Data.PaymentAuthorized)
                MarkAsComplete();
        }
The MarkAsComplete() method tells the NSB infrastructure that this Saga is over and we can ditch the state. We also need to call the TryComplete() method if the authorization comes in:
        public void Handle(IAuthorizePaymentEvent message)
        {
            base.Data.PaymentAuthorized = message.Authorized;
            this.TryComplete();
        }
We've completed our Saga except for one part. Normally we would only wait for an payment instrument authorization for so long. In our Saga we can request a timeout. When a timeout is requested, a message is sent to another endpoint that watches the clock for us. When time is up, the external endpoint sends us a message back. In this example we aren't requesting a timeout which means we will wait forever for an authorization. Normally we wouldn't do this and we'd have to run the Timeout Manager process that comes with NSB. To keep it simple I left it out this time. We still have to implement the Timeout() method, and in our case we'll simply kill the Saga for a timeout.
        public override void Timeout(object state)
        {
            MarkAsComplete();
        }
We've come a long way, we now have a music store that accepts Orders durably, handles payment authorization, picks the product, and creates shipping notes for the shipping dock. As always, code can be found at github: http://github.com/afyles/Blog

Monday, October 4, 2010

Monitoring NServiceBus: Performance Counters Starting Point

We are starting to look at monitoring NServiceBus internally and we're starting with PerfMon in our test and cert environments. I've come up with short list of suggested counters to look at and thought I'd share then with everyone.  I'm also working on a set of custom counters for NSB that I'll be posting shortly on top of the one built in counter.

.NET CLR Exceptions(NServiceBus.Host)\# of Exceps Thrown / sec
     o This will tell you if the app is throwing tons of exceptions, this is important as NSB will retry under
exception scenarios and therefore you may be unaware that your app is blowing up

.NET CLR Memory(NServiceBus.Host)\# of Pinned Objects
     o The number of objects GC can't move back into memory(leak or locked up thread)

.NET CLR Memory(NServiceBus.Host)\% Time in GC
     o How long it takes to do a GC, this will tell us if our memory footprint is too big or out of control for some reason

Distributed Transaction Coordinator\Transactions/sec
     o How many transactions is DTC pumping out

MSMQ Queue(Computer Queues)\Messages in Queue
     o How many messages are in your NSB input queue

MSMQ Queue(info-dev11\private$\nservicebus_error)\Messages in Queue
     o How many messages are in the error queue

MSMQ Service\Incoming Messages/sec
     o Number of messages coming in, tells us if we are getting overloaded

MSMQ Service\Outgoing Messages/sec
     o Number of messages going out, are we sending messages fast enough

NServiceBus\*
     o Critical Time - how old is the oldest message in the queue? This tells us we are processing messages slowly

Friday, October 1, 2010

Enabling NServiceBus Performance Counters

First you must install the counters using the Runner.exe tool. The next thing you need to do is add the "NServiceBus.PerformanceCounters" profile to your install. Without this you will not see any instances in PerfMon.