Friday, October 8, 2010

Getting Started with NServiceBus Part 6: MVC Music Store Saga(Workflow)

Last time were able to wire up the OrderAccepted event and publish to a single Shipping Subscriber.  This time around we are going to cast our net wider and include a Billing Subscriber.  This Subscriber will handle looking up the payment instrument for the order and then authorize that payment.  This presents a problem for our Shipping Subscriber as we cannot ship the product until we know the customer can pay for it.

NSB handles this situation quite well by implementing Sagas or workflow.  With a Saga we can tell the Shipping Subscriber to wait for the payment authorization to complete before picking and shipping the order.  NSB handles this by allowing us to store the state of the workflow in persistent storage(Saga Persister).  Since this is handled for us, all we need to do is reconcile the state of the workflow and mark it complete when we are done.

Let us begin by introducing a new entity, PaymentInstrument.  The easiest thing to do is create the table in the local DB and update the model from there.
Next we'll create the message that the Shipping Subscriber will receive once the payment has been authorized or rejected.
    public interface IAuthorizePaymentEvent : IEvent
    {
        Int32 OrderId { get; set; }
        Boolean Authorized { get; set; }
    }
In this message we simply tie back to the Order and send a flag giving us the status of the authorization. To complete the Billing Subscriber we create a new project and configure the endpoint AsA_Server and point to our Publisher, the Music Store command handler input queue. We need to add another endpoint to the config since we will be sending the Shipping Subscriber the IAuthroizePaymentEvent message. The mappings are as follows:

    
      
      
      
    
  
All we have left is to complete the handler for the published IOrderAcceptedEvent. In this handler we'll look up the user name on the order and the total. Based on this information we'll grab the payment instrument for the user and do some simple validation. Upon completion we'll send a message over to the Shipping Subscriber so it can continue the workflow.
      public void Handle(IOrderAcceptedEvent message)
      {
            MusicStoreEntities storeDB = new MusicStoreEntities();

            var order = storeDB.Orders.Single(o => o.OrderId == message.OrderId);

            var paymentInstrument = storeDB.PaymentInstruments.Single(pi => pi.UserName == order.Username);

            var authorizedEvent = this.Bus.CreateInstance<IAuthorizePaymentEvent>(p => p.OrderId = order.OrderId);

            // authorize the card...
            if (order.Total < 100)
                authorizedEvent.Authorized = true;
            else
                authorizedEvent.Authorized = false;

            this.Bus.Send(authorizedEvent);

        }
    }
Now we have a Billing Subscriber that is authorizing payment instruments and sending messages to our Shipping Subscriber. Now we have to do the work to make the Shipping Subscriber wait for the authorization. Let's begin by defining the data or state that we'd like to keep track of. At a bare minimum we need to keep track of the OrderId, the status of the payment authorization, and the status of the order(have we picked all the product?). NSB will also impose a few extra fields upon us as we implement the IContainSagaData interface.
    public class ShippingSagaData : IContainSagaData
    {
        public virtual Guid Id{ get; set; }
        public virtual String OriginalMessageId { get; set; }
        public virtual String Originator { get; set; }
        public virtual Boolean PaymentAuthorized { get; set; }
        public virtual Int32 OrderId { get; set; }
        public virtual Boolean OrderPicked { get; set; }
    }
Note that all the properties are marked as virtual. NSB uses NHibernate to store the Saga so we must give NH access to our properties. If you run the code without this it will complain. When implementing a Saga in NSB, your Saga becomes the handler and distributor of many messages. NSB needs to know how to start the Saga, which messages to handle, and what kind of data to store. The class definition says it all:
public class ShippingSaga : Saga<ShippingSagaData>,
        IAmStartedByMessages<IOrderAcceptedEvent>,
        IHandleMessages<IAuthorizePaymentEvent>,
        IHandleMessages<IOrderPickedEvent>
There is a new message here that we didn't have before. We need to know when we have completed picking the product from our shelves so we've introduced the IOrderPickedEvent.
    public interface IOrderPickedEvent : IEvent
    {
        Int32 ShippingNoteId { get; set; }
        Int32 OrderId { get; set; }
    }
The first thing we have to do is tell NSB how to look up our Saga. In other works, what makes a Saga unique? In our case we link everything up by OrderId. To make sure everything is unique we map all the message OrderIds to the Saga data OrderId.
        public override void ConfigureHowToFindSaga()
        {
            base.ConfigureMapping<IOrderAcceptedEvent>(s => s.OrderId, e => e.OrderId);
            base.ConfigureMapping<IAuthorizePaymentEvent>(s => s.OrderId, e => e.OrderId);
            base.ConfigureMapping<IOrderPickedEvent>(s => s.OrderId, e => e.OrderId);
        }
All of this is based off of the Order being accepted so our Saga starts by handling that event. We'll default some Saga data and then kick of the picking process.
        public void Handle(IOrderAcceptedEvent message)
        {
            base.Data.PaymentAuthorized = false;
            base.Data.OrderId = message.OrderId;
            base.Bus.SendLocal<IPickOrderCommand>( p => p.OrderId = message.OrderId);
        }
The interesting thing here is the Bus.SendLocal() method. What this does is put a message on the current endpoint's queue. We send a message to ourselves to kick of the pick process. The handler for this was detailed last time as we checked our inventory positions for the product and created ship notes. While we are picking the product, the payment is being authorized. If the payment comes back unauthorized we'll simply restock the order. Once we have picked the order we can check to see if we can let the order out the door.
        public void Handle(IOrderPickedEvent message)
        {
            base.Data.OrderPicked = true;
            this.TryComplete();
        }

        private void TryComplete()
        {
            if (base.Data.OrderPicked && Data.PaymentAuthorized)
                MarkAsComplete();
        }
The MarkAsComplete() method tells the NSB infrastructure that this Saga is over and we can ditch the state. We also need to call the TryComplete() method if the authorization comes in:
        public void Handle(IAuthorizePaymentEvent message)
        {
            base.Data.PaymentAuthorized = message.Authorized;
            this.TryComplete();
        }
We've completed our Saga except for one part. Normally we would only wait for an payment instrument authorization for so long. In our Saga we can request a timeout. When a timeout is requested, a message is sent to another endpoint that watches the clock for us. When time is up, the external endpoint sends us a message back. In this example we aren't requesting a timeout which means we will wait forever for an authorization. Normally we wouldn't do this and we'd have to run the Timeout Manager process that comes with NSB. To keep it simple I left it out this time. We still have to implement the Timeout() method, and in our case we'll simply kill the Saga for a timeout.
        public override void Timeout(object state)
        {
            MarkAsComplete();
        }
We've come a long way, we now have a music store that accepts Orders durably, handles payment authorization, picks the product, and creates shipping notes for the shipping dock. As always, code can be found at github: http://github.com/afyles/Blog

3 comments:

  1. Adam

    I am interested in your reasoning behind using Bus.SendLocal().

    Is this solely due to a deployment decision to handle IPickOrderCommands at the same endpoint as the saga? Or is there some other reasoning behind this?

    Also, what advantages does the use of SendLocal() give over Send()?

    Many thanks

    Sean

    ReplyDelete
  2. Yes, this was just to process those messages on the same endpoint. SendLocal() just sends a message to yourself which is helpful in many cases for say like running something on a timer. You also avoid creating a distributed transaction by using SendLocal(), but still get a clean separation of duties.

    ReplyDelete