Sunday, September 19, 2010

Getting Started with NServiceBus Part 5: MVC Music Store Basic Pub/Sub

To recap, thus far we've implemented a basic Command/Query Segregation pattern in the ASP.NET MVC Music Store application.  This has decoupled us from the UI for performing important operations which include allowing users to shop and place orders.

Now we are going to expand upon our application to provide the facility to ship products.  The way we'll do this is by publishing a message when an Order has been accepted.  Publishing a message is fundamentally different than sending a message.  When you publish a message it is intended to be received by multiple parties.  Also you must be aware that there can only be one logical Publisher.  If you have multiple Publishers publishing the same message, then the system does not know who to listen to.  In our example we'll initially only have one Subscriber, the Shipping service.  The Shipping service will handle updating inventory and creating shipping notes for the shipping department.  In future installments we'll add more Subscribers to our Order Accepted message.

To get started we have to enhance our data model a bit so we have some place to store our inventory and our shipping notes.  I found the hard way that it is significantly easier to update the database and then the model when using the Entity Framework.  That being said, we'll add the new tables to the database and then update the model:
We start with our table of inventory positions, which tells us how much of the product we have on hand and how much we need to order.  When an order comes in, we'll check this table to see if we have the product, and if we do we'll add that to the Shipped Quantity on our shipping note.  If we don't have the product, we'll put it on back order on our note.

The first thing we need to do is modify our Music Store command handler to become a Publisher.  This is a change to our EndpointConfig.cs which previously was configured AsA_Server.  We'll change this to be configured AsA_Publisher.  So what does this do for us?  First and foremost it tells the endpoint to expect to receive not only the messages we've defined, but also subscription messages.  When a Subscriber starts up, it will put a subscription message into the Publishers input queue, the SAME queue it is currently handling commands upon.  The AsA_Publisher configuration also configures NSB to handle things like how to store the subscription messages.  For the Lite profile, subscriptions are held in memory, for Integration the messages are stored in a queue, and for Production the messages are stored in a database(leveraging NHibernate). Below is our small tweak to the endpoint configuration:
public class EndpointConfig : IConfigureThisEndpoint, AsA_Server
The next thing we need to do is actually publish the message. To accomplish this, we'll modify the PlaceOrderHandler to publish a message after it has successfully saved the order. The message we'll publish is the IOrderAcceptedEvent message:
public interface IOrderAcceptedEvent : IEvent
        Int32 OrderId { get; set; }
This message has been added to our public schema project(Messages.csproj). All we pass along here is the order id. We can look up the Order on the other side when we go to ship the product, so there is no need to publish the entire Order. There may be a reason to publish the entire order in the future, especially if there exists components that don't have a way to query for Orders. Note that I'm using the naming convention of provided an "Event" suffix. This is so that I can easily know what is an Event versus what is a Command(namespacing would also do).

Now we have a message to publish, we can update our IPlaceOrderHandler. The first thing we need to do is add a public property of type IBus so that NSB will inject a reference to the bus into our handler. After we have the reference, at the end of the handler we call Bus.Publish and provide an instance of the message using an Action:
public class IPlaceOrderHandler : IHandleMessages<IPlaceOrderCommand>
        public IBus Bus { get; set; }

        public void Handle(IPlaceOrderCommand message)
            MusicStoreEntities storeDB = new MusicStoreEntities();

            var order = new MvcMusicStore.Models.Order();

            order.Username = message.UserId;
            order.OrderDate = DateTime.Now;
            order.OrderId = message.OrderId;

            // Save Order

            //Process the order
            var cart = new ShoppingCart(message.CartId);


            this.Bus.Publish<IOrderAcceptedEvent>( o => o.OrderId = order.OrderId);
Now we have our handler publishing messages out to the bus for all subscribers. Next we'll build up our Shipping Subscriber. Add a new project and configure the endpoint AsA_Server just like our original command handler. Next we'll take a look at how we reference the Publisher. We need to know where to drop off our subscription messages. In our subscription message is the Subscriber address along with some other endpoint information. By giving our address to the Publisher, it knows where to push published messages to. To achieve this we add a reference to the Publisher in our app.config:

The overall pub/sub semantics goes something like this:

  1. Publisher starts up and loads subscriptions from storage(memory, queue, or DB)
  2. Subscriber starts up and sends a subscription message to the Publisher.  If the Publisher already has stored the subscription, it gets ignored.  If it does not have the subscription, store the message.
  3. Publisher makes a call to Bus.Publish.
  4. Publisher looks to see who is subscribed to that message.  It creates a message for each Subscriber and puts it on its outbound queue(this is an internal queue to MSMQ that you don't see unless it is sending the machine to another machine)
  5. MSMQ takes over and pushes the messages to their destination queues.  
  6. If the subscriber is up and running it will receive the message and call the appropriate handler.  If not, the message remains in the subscriber input queue.
Please note that if you have a lot of Subscribers and their queues are down, the messages will build up on the Publishers' outbound queue so you will have to size your storage appropriately.
The last thing we need to do is implement the logic for the Shipping service. We will hydrate the order from the order id in the message and then check our inventory. If we have the product, we ship it, otherwise we back order it. From there our processing is complete and we have a full implementation of Pub/Sub:
public class ShippingHandler : IHandleMessages<IOrderAcceptedEvent>
        #region IMessageHandler<IOrderAcceptedEvent> Members

        public void Handle(IOrderAcceptedEvent message)
            MusicStoreEntities storeDB = new MusicStoreEntities();

            var order = storeDB.Orders.Single(o => o.OrderId == message.OrderId);

            var shipNote = new ShippingNote
                FirstName = order.FirstName,
                LastName = order.LastName,
                Address = order.Address,
                City = order.City,
                State = order.State,
                PostalCode = order.PostalCode

            foreach (var detail in order.OrderDetails)
                var inventoryPosition = storeDB.InventoryPositions.Single(p => p.Album.AlbumId == detail.AlbumId);

                if (inventoryPosition.BalanceOnHand >= detail.Quantity)
                    inventoryPosition.BalanceOnHand -= detail.Quantity;
                    shipNote.ShippedQuantity += detail.Quantity;
                    shipNote.BackOrderQuantity = detail.Quantity - shipNote.ShippedQuantity;




Code is at github

1 comment:

  1. Adam

    I have to say that this is a really great series of articles and very useful to help understand NServiceBus.

    It's taken a few readings, but I finally get why I didn't get it before. It is easy to confuse subscription storage with the message storage (you answered a question of mine on SO that was rooted in this misunderstanding).

    Thanks very much for this top class set of articles!