Introduction to NServiceBus
View more presentations from afyles.
Once the Distributor is configured we must tell each Worker how to communicate with the Distributor.
Once we have everything configured the workflow is a multi-step process that goes like this:
param([string]$qname) if ( [System.String]::IsNullOrEmpty($qname) ) { write-host "You must provide a queue name" exit } [Reflection.Assembly]::LoadWithPartialName("System.Messaging") | out-null $q = new-object System.Messaging.MessageQueue($qname) $msgs = $q.GetAllMessages() foreach ( $msg in $msgs ) { $msg.BodyStream.Position = 0 $sr = new-object System.IO.StreamReader( $msg.BodyStream ) $sr.ReadToEnd() $input = read-host -prompt "Enter n for the next message, q to quit" if ( $input -eq "q" ) { exit } }
public interface IProductUpdatedEvent : IProductChangedEvent { } public interface IProductChangedEvent : IEvent { Int32 ProductNumber { get; set; } String Name { get; set; } String Description { get; set; } }Now let's say we want to add a new field to our event. We create the new event in a different namespace so we can use the same interface name and add our new field after we implement the old interface:
public interface IProductUpdatedEvent : Messages.IProductUpdatedEvent { Int32 DepartmentNumber { get; set; } }So now we have two distinct messages that have different data sets. This means our legacy clients can continue to receive the legacy message and our new clients can receive the new message. Here is an example of a handler pointing to the new message type:
public class EventMessageHandler : IMessageHandler<Messages.New.IProductUpdatedEvent> { private static ILog log = LogManager.GetLogger(typeof(EventMessageHandler)); public void Handle(Messages.New.IProductUpdatedEvent message) { log.Debug(String.Format("{0} Event Received for Product {1}: {2} : {3}", message.GetType().UnderlyingSystemType.Name, message.ProductNumber, message.Name, message.DepartmentNumber)); } }The rest of our Subscribers remain the same. All that is left is to update the Publisher to publish the new event. In the example code there is a Windows Forms app(UI) that pushes all events to a common Publisher endpoint. See the Documentation directory for a diagram. In the example all we have to do is add the new field and publish the new event.
Messages.New.IProductUpdatedEvent pu = new Messages.New.ProductUpdatedMessage { ProductNumber = Int32.Parse(textBoxUpdateID.Text), Description = textBoxUpdateDesc.Text, Name = textBoxUpdateName.Text, EventId = Guid.NewGuid(), Time = DateTime.Now, DepartmentNumber = 10 }; FireEvent(pu);If you pull down the sample and look in the PubSub directory, fire up the BusServer, UI, Subscriber1, and Subscriber3 projects. From the UI simple type in second group box and hit the "Update" button. This will send the new message to the "BusServer" Publisher endpoint which will do the actual Bus.Publish(). From there you will see Subscriber1 receive the legacy event and Subscriber3 receive the new event with the new data.
public class Monitoring : IProfile { } public class InstallMonitoring : IProfile { }We have to have 2 profiles, one for running the counters and one for installing the counters. We'll cover the installation first. The first thing we need to do is remove the counters and reinstall them if they exist.
public void ProfileActivated() { Logger.Debug("Starting installation of monitoring"); if (PerformanceCounterCategory.Exists(CategoryName)) { Logger.Warn(String.Format("Category {0} already exists, going to delete it first", CategoryName)); PerformanceCounterCategory.Delete(CategoryName); } CounterCreationDataCollection counterData = new CounterCreationDataCollection(); counterData.AddRange(this.InstallAverageMessageProcessTimeCounter()); counterData.AddRange(this.InstalledFailedMessageProcessingCounter()); PerformanceCounterCategory.Create(CategoryName, "NServiceBus Monitoring", PerformanceCounterCategoryType.MultiInstance, counterData); }Once we've ditched the counter we rebuild it by creating some counter data. I'll show one of the installs. To calculate the average time of something we need to do two things. We need to increment the time and also increment the basis of the event. The basis must be added after the average timer(you'll get an error if you don't).
private CounterCreationDataCollection InstallAverageMessageProcessTimeCounter() { Logger.Debug("Starting installation of average process time monitoring"); CounterCreationDataCollection counterData = new CounterCreationDataCollection(); CounterCreationData avgTime = new CounterCreationData(AvgProcessTimeCounterName, "Avg message processing time", PerformanceCounterType.AverageTimer32); counterData.Add(avgTime); CounterCreationData avgBase = new CounterCreationData(AvgProcessTimeBaseCounterName, "Avg message processing time base", PerformanceCounterType.AverageBase); counterData.Add(avgBase); Logger.Debug("Installation of average processing time monitoring successful."); return counterData; }Now that we have our counters installed, we need to create the profile to actual post to the counters. When your profile is activated, we'll tie into 2 key events in the NSB runtime, "StartedMessageProcessing" and "FinishedMessageProcessing". In doing so we can calculate the time that has elapsed and update our basis. We'll use a simple StopWatch to keep track of it all.
private void MonitorAverageMessageProcessTime() { PerformanceCounter avgProcessTimeCounter = null; PerformanceCounter avgProcessTimeBaseCounter = null; Stopwatch watch = null; try { avgProcessTimeCounter = new PerformanceCounter(CategoryName, AvgProcessTimeCounterName, Program.EndpointId, false); avgProcessTimeBaseCounter = new PerformanceCounter(CategoryName, AvgProcessTimeBaseCounterName, Program.EndpointId, false); } catch (Exception e) { throw new InvalidOperationException("NServiceBus monitoring is not set up correctly. Running this process with the flag NServiceBus.InstallMonitoring should fix the problem.", e); } GenericHost.ConfigurationComplete += (o, e) => { ITransport transport = Configure.Instance.Builder.Build<ITransport>(); transport.StartedMessageProcessing += (s, ea) => { watch = Stopwatch.StartNew(); }; transport.FinishedMessageProcessing += (s, ea) => { watch.Stop(); avgProcessTimeCounter.IncrementBy(watch.ElapsedTicks); avgProcessTimeBaseCounter.Increment(); }; }; }Please note that since we are posting to a counter for each message this will somewhat degrade the performance. All we have to do now is run our process once with the "InstallMonitoring" profile to install the counters and then again with the "Monitoring" profile to post to the counters. You must have the process running before you see anything in PerfMon. When you fire PerfMon up, you should see something like this:
public interface IAuthorizePaymentEvent : IEvent { Int32 OrderId { get; set; } Boolean Authorized { get; set; } }In this message we simply tie back to the Order and send a flag giving us the status of the authorization. To complete the Billing Subscriber we create a new project and configure the endpoint AsA_Server and point to our Publisher, the Music Store command handler input queue. We need to add another endpoint to the config since we will be sending the Shipping Subscriber the IAuthroizePaymentEvent message. The mappings are as follows:
All we have left is to complete the handler for the published IOrderAcceptedEvent. In this handler we'll look up the user name on the order and the total. Based on this information we'll grab the payment instrument for the user and do some simple validation. Upon completion we'll send a message over to the Shipping Subscriber so it can continue the workflow.
public void Handle(IOrderAcceptedEvent message) { MusicStoreEntities storeDB = new MusicStoreEntities(); var order = storeDB.Orders.Single(o => o.OrderId == message.OrderId); var paymentInstrument = storeDB.PaymentInstruments.Single(pi => pi.UserName == order.Username); var authorizedEvent = this.Bus.CreateInstance<IAuthorizePaymentEvent>(p => p.OrderId = order.OrderId); // authorize the card... if (order.Total < 100) authorizedEvent.Authorized = true; else authorizedEvent.Authorized = false; this.Bus.Send(authorizedEvent); } }Now we have a Billing Subscriber that is authorizing payment instruments and sending messages to our Shipping Subscriber. Now we have to do the work to make the Shipping Subscriber wait for the authorization. Let's begin by defining the data or state that we'd like to keep track of. At a bare minimum we need to keep track of the OrderId, the status of the payment authorization, and the status of the order(have we picked all the product?). NSB will also impose a few extra fields upon us as we implement the IContainSagaData interface.
public class ShippingSagaData : IContainSagaData { public virtual Guid Id{ get; set; } public virtual String OriginalMessageId { get; set; } public virtual String Originator { get; set; } public virtual Boolean PaymentAuthorized { get; set; } public virtual Int32 OrderId { get; set; } public virtual Boolean OrderPicked { get; set; } }Note that all the properties are marked as virtual. NSB uses NHibernate to store the Saga so we must give NH access to our properties. If you run the code without this it will complain. When implementing a Saga in NSB, your Saga becomes the handler and distributor of many messages. NSB needs to know how to start the Saga, which messages to handle, and what kind of data to store. The class definition says it all:
public class ShippingSaga : Saga<ShippingSagaData>, IAmStartedByMessages<IOrderAcceptedEvent>, IHandleMessages<IAuthorizePaymentEvent>, IHandleMessages<IOrderPickedEvent>There is a new message here that we didn't have before. We need to know when we have completed picking the product from our shelves so we've introduced the IOrderPickedEvent.
public interface IOrderPickedEvent : IEvent { Int32 ShippingNoteId { get; set; } Int32 OrderId { get; set; } }The first thing we have to do is tell NSB how to look up our Saga. In other works, what makes a Saga unique? In our case we link everything up by OrderId. To make sure everything is unique we map all the message OrderIds to the Saga data OrderId.
public override void ConfigureHowToFindSaga() { base.ConfigureMapping<IOrderAcceptedEvent>(s => s.OrderId, e => e.OrderId); base.ConfigureMapping<IAuthorizePaymentEvent>(s => s.OrderId, e => e.OrderId); base.ConfigureMapping<IOrderPickedEvent>(s => s.OrderId, e => e.OrderId); }All of this is based off of the Order being accepted so our Saga starts by handling that event. We'll default some Saga data and then kick of the picking process.
public void Handle(IOrderAcceptedEvent message) { base.Data.PaymentAuthorized = false; base.Data.OrderId = message.OrderId; base.Bus.SendLocal<IPickOrderCommand>( p => p.OrderId = message.OrderId); }The interesting thing here is the Bus.SendLocal() method. What this does is put a message on the current endpoint's queue. We send a message to ourselves to kick of the pick process. The handler for this was detailed last time as we checked our inventory positions for the product and created ship notes. While we are picking the product, the payment is being authorized. If the payment comes back unauthorized we'll simply restock the order. Once we have picked the order we can check to see if we can let the order out the door.
public void Handle(IOrderPickedEvent message) { base.Data.OrderPicked = true; this.TryComplete(); } private void TryComplete() { if (base.Data.OrderPicked && Data.PaymentAuthorized) MarkAsComplete(); }The MarkAsComplete() method tells the NSB infrastructure that this Saga is over and we can ditch the state. We also need to call the TryComplete() method if the authorization comes in:
public void Handle(IAuthorizePaymentEvent message) { base.Data.PaymentAuthorized = message.Authorized; this.TryComplete(); }We've completed our Saga except for one part. Normally we would only wait for an payment instrument authorization for so long. In our Saga we can request a timeout. When a timeout is requested, a message is sent to another endpoint that watches the clock for us. When time is up, the external endpoint sends us a message back. In this example we aren't requesting a timeout which means we will wait forever for an authorization. Normally we wouldn't do this and we'd have to run the Timeout Manager process that comes with NSB. To keep it simple I left it out this time. We still have to implement the Timeout() method, and in our case we'll simply kill the Saga for a timeout.
public override void Timeout(object state) { MarkAsComplete(); }We've come a long way, we now have a music store that accepts Orders durably, handles payment authorization, picks the product, and creates shipping notes for the shipping dock. As always, code can be found at github: http://github.com/afyles/Blog
public class EndpointConfig : IConfigureThisEndpoint, AsA_Server { }The next thing we need to do is actually publish the message. To accomplish this, we'll modify the PlaceOrderHandler to publish a message after it has successfully saved the order. The message we'll publish is the IOrderAcceptedEvent message:
public interface IOrderAcceptedEvent : IEvent { Int32 OrderId { get; set; } }This message has been added to our public schema project(Messages.csproj). All we pass along here is the order id. We can look up the Order on the other side when we go to ship the product, so there is no need to publish the entire Order. There may be a reason to publish the entire order in the future, especially if there exists components that don't have a way to query for Orders. Note that I'm using the naming convention of provided an "Event" suffix. This is so that I can easily know what is an Event versus what is a Command(namespacing would also do).
public class IPlaceOrderHandler : IHandleMessages<IPlaceOrderCommand> { public IBus Bus { get; set; } public void Handle(IPlaceOrderCommand message) { MusicStoreEntities storeDB = new MusicStoreEntities(); var order = new MvcMusicStore.Models.Order(); order.Username = message.UserId; order.OrderDate = DateTime.Now; order.OrderId = message.OrderId; // Save Order storeDB.AddToOrders(order); storeDB.SaveChanges(); //Process the order var cart = new ShoppingCart(message.CartId); cart.CreateOrder(order); this.Bus.Publish<IOrderAcceptedEvent>( o => o.OrderId = order.OrderId); } }Now we have our handler publishing messages out to the bus for all subscribers. Next we'll build up our Shipping Subscriber. Add a new project and configure the endpoint AsA_Server just like our original command handler. Next we'll take a look at how we reference the Publisher. We need to know where to drop off our subscription messages. In our subscription message is the Subscriber address along with some other endpoint information. By giving our address to the Publisher, it knows where to push published messages to. To achieve this we add a reference to the Publisher in our app.config:
The overall pub/sub semantics goes something like this:
public class ShippingHandler : IHandleMessages<IOrderAcceptedEvent> { #region IMessageHandler<IOrderAcceptedEvent> Members public void Handle(IOrderAcceptedEvent message) { MusicStoreEntities storeDB = new MusicStoreEntities(); var order = storeDB.Orders.Single(o => o.OrderId == message.OrderId); var shipNote = new ShippingNote { FirstName = order.FirstName, LastName = order.LastName, Address = order.Address, City = order.City, State = order.State, PostalCode = order.PostalCode }; foreach (var detail in order.OrderDetails) { var inventoryPosition = storeDB.InventoryPositions.Single(p => p.Album.AlbumId == detail.AlbumId); if (inventoryPosition.BalanceOnHand >= detail.Quantity) { inventoryPosition.BalanceOnHand -= detail.Quantity; shipNote.ShippedQuantity += detail.Quantity; } else { shipNote.BackOrderQuantity = detail.Quantity - shipNote.ShippedQuantity; } } storeDB.AddToShippingNotes(shipNote); storeDB.SaveChanges(); } #endregion }
Program $(ProjectDir)$(OutputPath)NServiceBus.Host.exe NServiceBus.Integration
The MsmqTransportConfig section defines four main properties. The InputQueue property tells our web app where to put the messages. The ErrorQueue property is where our messages will go if they cannot be processed after the configured amount of retries. The number of retries is configured using the MaxRetries property. The NumberOfWorkerThreads specifies how many threads will be spun up to read messages from the queue and process them. Initially you should start with one and ramp the threads up from there if necessary.
public class EndpointConfig : IConfigureThisEndpoint, AsA_Server { }The IConfigureThisEndpoint is an interface that marks the class for NSB to pick up during configuration. The AsA_Server interface tells NSB to configure an endpoint that is transactional and that does not purge messages. Now that we have NSB configured, we can start getting into the message handlers.
public class AddToCartHandler : IHandleMessagesNote that the method is void since we are processing the messages in a completely different app domain that the web site. Once the interface has been implemented, it is straight .NET programming from there. We do the same for the PlaceOrderHandler:{ public void Handle(IAddToCartCommand message) { MusicStoreEntities storeDB = new MusicStoreEntities(); // Retrieve the album from the database var addedAlbum = storeDB.Albums .Single(album => album.AlbumId == message.AlbumId); // Add it to the shopping cart var cart = new ShoppingCart( message.CartId ); cart.AddToCart(addedAlbum); } }
public void Handle(IPlaceOrderCommand message) { MusicStoreEntities storeDB = new MusicStoreEntities(); var order = new MvcMusicStore.Models.Order(); order.Username = message.UserId; order.OrderDate = DateTime.Now; order.OrderId = message.OrderId; // Save Order storeDB.AddToOrders(order); storeDB.SaveChanges(); //Process the order var cart = new ShoppingCart(message.CartId); cart.CreateOrder(order); }Note that we are taking the order id from the client. This is because we must show the user the order id right after they have ordered on the client side. We have to modify the underlying data model in order to accommodate this requirement. Simply edit the Order and Order Details tables in the models so that their ids are not generated by the database. Lastly, to get our server up and running we need to edit the Debug properties of the project. First build the project and then set the Debug session to start using an external program. Point to the generic host that comes with NSB, NServiceBus.Host.exe(in the bin directory).
public ActionResult AddedItemToCart(int id) { var cart = ShoppingCart.GetCart(this.HttpContext); // Retrieve the album from the database var addedAlbum = storeDB.Albums .Single(album => album.AlbumId == id); // Set up our ViewModel var viewModel = new ShoppingCartViewModel { CartItems = new System.Collections.Generic.ListAll we have to do now is redirect to our new action instead of the old one(Index):(), CartTotal = cart.GetTotal() + addedAlbum.Price }; viewModel.CartItems.Add(new Cart { Album = addedAlbum, AlbumId = addedAlbum.AlbumId, Count = 1 }); return View(viewModel); }
// Go back to the main store page for more shopping return RedirectToAction("AddedItemToCart", new { id = addedAlbum.AlbumId } );You'll also note that you may get an error when placing an order. The Music Store immediately validates against the database that your order is there. This is now an unnecessary step because via messaging we are guaranteeing the delivery of the order. All we need to do is simply comment out all the validation code:
public ActionResult Complete(int id) { //// Validate customer owns this order //bool isValid = storeDB.Orders.Any( // o => o.OrderId == id && // o.Username == User.Identity.Name); //if (isValid) //{ // return View(id); //} //else //{ // return View("Error"); //} return View(id); }Now with our minor tweaks to the UI we have fully implemented NSB in our Music Store application. We can guarantee that our users will be able to shop and place orders even if our database is down. This becomes increasingly important especially for those big spenders, we don't want to have to tell them to start all over and that we lost their order.
Next we add that section to create an Endpoint Mapping. What this does is tell the bus that the given Endpoint will be accepting messages defined in our schema assembly.
In our case the Endpoint is the address to the queue that will be accepting our messages. If you want at this point you can create this as a local, private, transactional MSMQ queue. Also be sure to have the Distributed Transaction Coordinator service up and running. Now that we have configured our Endpoint, we have to bootstrap NServiceBus and keep it going for the duration of our ASP.NET application. We can do this in the Global.asax.cs file using NSB's fluent configuration:
protected void Application_Start() { AreaRegistration.RegisterAllAreas(); RegisterRoutes(RouteTable.Routes); Bus = NServiceBus.Configure.WithWeb() .Log4Net() .DefaultBuilder() .XmlSerializer() .MsmqTransport() .IsTransactional(false) .PurgeOnStartup(false) .UnicastBus() .ImpersonateSender(false) .CreateBus() .Start(); }I'll explain each configuration item bit by bit:
public ActionResult AddToCart(int id) { // Retrieve the album from the database var addedAlbum = storeDB.Albums .Single(album => album.AlbumId == id); // Add it to the shopping cart var cart = ShoppingCart.GetCart(this.HttpContext); //cart.AddToCart(addedAlbum); Helpers.ServiceAgent<IAddToCartCommand>.Send( c => { c.CartId = cart.GetCartId(this.HttpContext); c.AlbumId = addedAlbum.AlbumId; }); // Go back to the main store page for more shopping return RedirectToAction("Index"); }I've added a utility class to actually put the message on the bus using similar NSB semantics. I'm doing this so that I don't have an explicit reference to NSB in my Controller code. The utility is very simple:
public static class ServiceAgent<T> where T : ICommand { public static void Send(Action<T> messageConstructor) { if (null != messageConstructor) MvcApplication.Bus.Send<T>(messageConstructor); } }We'll do the same thing for placing an order. We modify the code in CheckOutController.AddressAndPayment:
[HttpPost] public ActionResult AddressAndPayment(FormCollection values) { ... else { //order.Username = User.Identity.Name; //order.OrderDate = DateTime.Now; //Save Order //storeDB.AddToOrders(order); //storeDB.SaveChanges(); //Process the order var cart = ShoppingCart.GetCart(this.HttpContext); //cart.CreateOrder(order); Int32 syntheticId = Helpers.IdGenerator.Generate(); Helpers.ServiceAgentHere we're using the same utility class to put a message on the bus. Note that I'm generating the Order Id on the client side. This application shows the order id back to the user after they have placed the order. In order to pull this off, we have to create the id client site. I've created another utility to generate the Order Id:.Send( c => { c.OrderId = syntheticId; c.CartId = cart.GetCartId(this.HttpContext); }); return RedirectToAction("Complete", new { id = syntheticId }); } ... }
public static class IdGenerator { public static Int32 Generate() { byte[] buffer = Guid.NewGuid().ToByteArray(); return BitConverter.ToInt32(buffer, 0); } }Now if you fire up the client and try to add items to the cart, you won't see any data in the subsequent screen. This is because we haven't implemented the server side to our solution that handles the messages. When placing an order you should see and order id just like you normally would.
public ActionResult AddToCart(int id) { ... cart.AddToCart(addedAlbum); ... }
public void AddToCart(Album album) { ... if (cartItem == null) { // Create a new cart item cartItem = new Cart { AlbumId = album.AlbumId, CartId = shoppingCartId, Count = 1, DateCreated = DateTime.Now }; storeDB.AddToCarts(cartItem); } else { // Add one to the quantity cartItem.Count++; } // Save it storeDB.SaveChanges(); }
public interface IAddToCartCommand : ICommand { String CartId { get; set; } Int32 AlbumId { get; set; } } public interface ICommand : IMessage { }
public ActionResult AddressAndPayment(FormCollection values) { var order = new Order(); ... order.Username = User.Identity.Name; order.OrderDate = DateTime.Now; //Save Order storeDB.AddToOrders(order); storeDB.SaveChanges(); //Process the order var cart = ShoppingCart.GetCart(this.HttpContext); cart.CreateOrder(order); return RedirectToAction("Complete", new { id = order.OrderId }); ... } public int CreateOrder(Order order) { ... var cartItems = GetCartItems(); foreach (var cartItem in cartItems) { var orderDetails = new OrderDetail { AlbumId = cartItem.AlbumId, OrderId = order.OrderId, UnitPrice = cartItem.Album.Price }; storeDB.OrderDetails.AddObject(orderDetails); ... } //Save the order storeDB.SaveChanges(); ... //Return the OrderId as a confirmation number return order.OrderId; }
public interface IPlaceOrderCommand : ICommand { String CartId { get; set; } }
[ServiceContract] public interface IProductCreatedRestService { [OperationContract] [WebInvoke(UriTemplate = "products", RequestFormat = WebMessageFormat.Xml, ResponseFormat = WebMessageFormat.Xml)] void Create(ProductCreatedMessage message); }
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] public class ProductCreatedRestService : IProductCreatedRestService { private readonly IBus bus; public ProductCreatedRestService() { if ( null == this.bus ) this.bus = Configure.Instance.Builder.Build<IBus>(); } public void Create(ProductCreatedMessage message) { this.bus.Send(message); } }
public class RestServiceStartup : IWantToRunAtStartup { private WebServiceHost host; public void Run() { this.host = new WebServiceHost(typeof(ProductCreatedRestService)); this.host.Open(); } public void Stop() { if (null != this.host && this.host.State == System.ServiceModel.CommunicationState.Opened) this.host.Close(); } }
public void Handle(ProductCreatedMessage message) { // Normally you would do a Bus.Send here first if (message.ProductNumber == 1111) this.Bus.Return((Int32)CommandErrorCodes.Fail); else this.Bus.Return((Int32)CommandErrorCodes.Success); }