Monday, June 3, 2013

DataBus Compression

We have a requirement to be a bit conservative about our network traffic.  We are using the DataBus to move larger data elements to distributed locations.  My first attempt was to implement a transport message mutator to perform some compression, but it did not compress the DataBus properties.

Come to find out, those are stripped after the file is written to disc, so the compression never hits.  What I found to be the simplest thing to do is to just override the IDataBus/FileShareDataBus altogether.  IDataBus has a couple of simple methods, Get and Put to override.  I used the existing FileShareDataBus as a model and just injected some simple compression.  Let's start with Put:

        public string Put(Stream stream, TimeSpan timeToBeReceived)
        {
            var key = GenerateKey(timeToBeReceived);

            var filePath = Path.Combine(basePath, key);

            Directory.CreateDirectory(Path.GetDirectoryName(filePath));

            var outStream = new FileStream(filePath, FileMode.CreateNew);

            using (var tinyStream = new GZipStream(outStream, CompressionMode.Compress))
            {
                var buffer = new byte[32 * 1024];
                Int32 read = 0;

                while ((read = stream.Read(buffer, 0, buffer.Length)) > 0)
                {
                    tinyStream.Write(buffer, 0, read);
                }
            }

            return key;
        }

Note that all I really did was slip in the GZipStream. Now for Get:
        public Stream Get(String key)
        {
            var bigStreamOut = new MemoryStream();

            using (var bigStream = new GZipStream(File.OpenRead(Path.Combine(this.basePath, key)), CompressionMode.Decompress))
            {
                bigStream.CopyTo(bigStreamOut);
            }

            bigStreamOut.Position = 0;

            return bigStreamOut;
        }

Lastly all we need is a little bit of configuration magic:
    public static class ConfigureCompressedFileShareDataBus
    {
        public static Configure CompressedFileShareDataBus(this Configure config, String basePath)
        {
            var bus = new CompressedFileShareDataBus(basePath);

            config.Configurer.RegisterSingleton<IDataBus>(bus);

            return config;
        }
    }

Now we have a heck of a lot less data on the network...enjoy!

Tuesday, February 12, 2013

NSB Custom Fault Handling in 15 Minutes

We had a need to divert some of our exceptions over to a level 2 help desk.  The help desk would then either fix the problem on behalf of the customer or initiate contact with the customer to fix the problem.  We implemented a custom fault handler and it really only took 15 minutes.

You must be aware that when you take control of the faults, it is up to you to handle all scenarios and in fact SLRs will not work.  In our case, that doesn't really matter since the majority of our exceptions will be handled by a person with this new process.

First we started out by adding some custom config to wire up the handler.  We wanted to maintain the existing process for exceptions that the help desk would not handle, so we borrowed the code from the forwarding handler.

 
public static Configure ForwardToHelpDeskInCaseOfFault(this Configure config)
{
      ....//left out for brevity
 config.Configurer.ConfigureComponent<HelpDeskFaultHandler>(DependencyLifecycle.InstancePerCall)
                .ConfigureProperty(fm => fm.ErrorQueue, ErrorQueue);

      return config;
}

Now we can just simply add that to our endpoint config.
.ForwardToHelpDeskInCaseOfFault();

Lastly, all we need to do is implement the interface and decide base on the exception where it should end up. With this, we are DONE!
 public class HelpDeskFaultHandler : IManageMessageFailures
    {
        private Address localAddress;
        private static readonly ILog Logger = LogManager.GetLogger("WebGateway.HelpDeskFaultHandler");

        public Address ErrorQueue { get; set; }

        public void Init(Address address)
        {
            this.localAddress = address;
        }

        public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e)
        {
            this.SendToHelpDesk(message, e, "ProcessingFailed");
        }

        public void SerializationFailedForMessage(TransportMessage message, Exception e)
        {
            this.SendToErrorQueue(message, e, "SerializationFailed");
        }

        private void SendToErrorQueue(TransportMessage message, Exception e, String reason)
        {
            ...//left out for brevity
        }

        private void SendToHelpDesk(TransportMessage message, Exception e, String reason)
        {
            if ( e.GetType().IsAssignableFrom(typeof(MyCustomException)))
            {
                try
                {
                    HelpDeskException hd = new HelpDeskException(message, e.Message);
                    hd.Save();
                }
                catch (Exception ex)
                {
                    Logger.Warn("Failed to send to help desk, sending to error queue", ex);
                    this.SendToErrorQueue(message, e, "SendToHelpDeskFailed");
                }
            }
            else
            {
                this.SendToErrorQueue(message, e, reason);
            }
        }

        private void SetExceptionHeaders(TransportMessage message, Exception e, String reason)
        {
            ...//left out for brevity
        }
    }