閱讀359 返回首頁    go 阿裏雲 go 技術社區[雲棲]


通過WCF擴展實現消息壓縮

對於需要進行大規模數據傳輸的WCF應用來說,對於請求消息和回複消息進行傳輸前的壓縮,不但可以降低網絡流量,也可以提高網絡傳輸的性能。由於WCF的擴展性,我們可以采用不同的方式實現對消息的壓縮,本文提供一種比較簡單的實現方式。[源代碼從這裏下載]

一、三種可行的消息壓縮方案
二、DataCompressor——用於數據壓縮與解壓縮組件
三、MessageCompressor——用於消息壓縮與解壓的組件
四、CompressionMessageFormatter——用於對請求/回複消息壓縮和解壓縮的組件
五、CompressionOperationBehaviorAttribute——將CompressionMessageFormatter用於WCF運行時框架的操作行為
六、查看結構壓縮後的消息
七、補充說明

消息壓縮在WCF中的實現其實很簡單,我們隻需要在消息(請求消息/回複消息)被序列化之後,發送之前進行壓縮;在接收之後,反序列化之前進行解壓縮即可。針對壓縮/解壓縮使用的時機,我們具有三種典型的解決方案。

我們支持兩種方式的壓縮,Dflate和GZip。兩種不同的壓縮算法通過如下定義的CompressionAlgorithm枚舉表示。

   1: public enum CompressionAlgorithm
   2: {
   3:     GZip,
   4:     Deflate
   5: }

而如下定義的DataCompressor負責基於上述兩種壓縮算法實際上的壓縮和解壓縮工作。

   1: internal class DataCompressor
   2: {
   3:     public static byte[] Compress(byte[] decompressedData, CompressionAlgorithm algorithm)
   4:     {
   5:         using (MemoryStream stream = new MemoryStream())
   6:         {
   7:             if (algorithm == CompressionAlgorithm.Deflate)
   8:             {
   9:                 GZipStream stream2 = new GZipStream(stream, CompressionMode.Compress, true);
  10:                 stream2.Write(decompressedData, 0, decompressedData.Length);
  11:                 stream2.Close();
  12:             }
  13:             else
  14:             {
  15:                 DeflateStream stream3 = new DeflateStream(stream, CompressionMode.Compress, true);
  16:                 stream3.Write(decompressedData, 0, decompressedData.Length);
  17:                 stream3.Close();
  18:             }
  19:             return stream.ToArray();
  20:         }
  21:     }
  22:  
  23:     public static byte[] Decompress(byte[] compressedData, CompressionAlgorithm algorithm)
  24:     {
  25:         using (MemoryStream stream = new MemoryStream(compressedData))
  26:         {
  27:             if (algorithm == CompressionAlgorithm.Deflate)
  28:             {
  29:                 using (GZipStream stream2 = new GZipStream(stream, CompressionMode.Decompress))
  30:                 {
  31:                     return LoadToBuffer(stream2);
  32:                 }
  33:             }
  34:             else
  35:             {
  36:                 using (DeflateStream stream3 = new DeflateStream(stream, CompressionMode.Decompress))
  37:                 {
  38:                     return LoadToBuffer(stream3);
  39:                 }
  40:             }
  41:         }
  42:     }
  43:  
  44:     private static byte[] LoadToBuffer(Stream stream)
  45:     {
  46:         using (MemoryStream stream2 = new MemoryStream())
  47:         {
  48:             int num;
  49:             byte[] buffer = new byte[0x400];
  50:             while ((num = stream.Read(buffer, 0, buffer.Length)) > 0)
  51:             {
  52:                 stream2.Write(buffer, 0, num);
  53:             }
  54:             return stream2.ToArray();
  55:         }
  56:     }
  57: }

而針對消息的壓縮和解壓縮通過如下一個MessageCompressor來完成。具體來說,我們通過上麵定義的DataCompressor對消息的主體部分內容進行壓縮,並將壓縮後的內容存放到一個預定義的XML元素中(名稱和命名空間分別為CompressedBody和https://www.artech.com/comporession/),同時添加相應的MessageHeader表示消息經過了壓縮,以及采用的壓縮算法。對於解壓縮,則是通過消息是否具有相應的MessageHeader判斷該消息是否經過壓縮,如果是則根據相應的算法對其進行解壓縮。具體的實現如下:

   1: public class MessageCompressor
   2:  {
   3:      public MessageCompressor(CompressionAlgorithm algorithm)
   4:      {
   5:          this.Algorithm = algorithm;
   6:      }
   7:      public Message CompressMessage(Message sourceMessage)
   8:      {
   9:          byte[] buffer;
  10:          using (XmlDictionaryReader reader1 = sourceMessage.GetReaderAtBodyContents())
  11:          {
  12:              buffer = Encoding.UTF8.GetBytes(reader1.ReadOuterXml());
  13:          }
  14:          if (buffer.Length == 0)
  15:          {
  16:              Message emptyMessage = Message.CreateMessage(sourceMessage.Version, (string)null);
  17:              sourceMessage.Headers.CopyHeadersFrom(sourceMessage);
  18:              sourceMessage.Properties.CopyProperties(sourceMessage.Properties);
  19:              emptyMessage.Close();
  20:              return emptyMessage;
  21:          }
  22:          byte[] compressedData = DataCompressor.Compress(buffer, this.Algorithm);
  23:          string copressedBody = CompressionUtil.CreateCompressedBody(compressedData);
  24:          XmlTextReader reader = new XmlTextReader(new StringReader(copressedBody), new NameTable());
  25:          Message message2 = Message.CreateMessage(sourceMessage.Version, null, (XmlReader)reader);
  26:          message2.Headers.CopyHeadersFrom(sourceMessage);
  27:          message2.Properties.CopyProperties(sourceMessage.Properties);
  28:          message2.AddCompressionHeader(this.Algorithm);
  29:          sourceMessage.Close();
  30:          return message2;
  31:      }
  32:  
  33:      public Message DecompressMessage(Message sourceMessage)
  34:      {
  35:          if (!sourceMessage.IsCompressed())
  36:          {
  37:              return sourceMessage;
  38:          }
  39:          CompressionAlgorithm algorithm = sourceMessage.GetCompressionAlgorithm();
  40:          sourceMessage.RemoveCompressionHeader();
  41:          byte[] compressedBody = sourceMessage.GetCompressedBody();
  42:          byte[] decompressedBody = DataCompressor.Decompress(compressedBody, algorithm);
  43:          string newMessageXml = Encoding.UTF8.GetString(decompressedBody);
  44:          XmlTextReader reader2 = new XmlTextReader(new StringReader(newMessageXml));
  45:          Message newMessage = Message.CreateMessage(sourceMessage.Version, null, reader2);
  46:          newMessage.Headers.CopyHeadersFrom(sourceMessage);
  47:          newMessage.Properties.CopyProperties(sourceMessage.Properties);
  48:          return newMessage;
  49:      }
  50:  
  51:      public CompressionAlgorithm Algorithm { get; private set; }
  52:  }

下麵是針對Message類型而定義了一些擴展方法和輔助方法。

   1: public static class CompressionUtil
   2: {
   3:     public const string CompressionMessageHeader = "Compression";
   4:     public const string CompressionMessageBody = "CompressedBody";
   5:     public const string Namespace = "https://www.artech.com/compression";
   6:  
   7:     public static bool IsCompressed(this Message message)
   8:     {
   9:         return message.Headers.FindHeader(CompressionMessageHeader, Namespace) > -1;
  10:     }
  11:  
  12:     public static void AddCompressionHeader(this Message message, CompressionAlgorithm algorithm)
  13:     { 
  14:         message.Headers.Add(MessageHeader.CreateHeader(CompressionMessageHeader, Namespace, string.Format("algorithm = \"{0}\"",algorithm)));
  15:     }
  16:  
  17:     public static void RemoveCompressionHeader(this Message message)
  18:     {
  19:         message.Headers.RemoveAll(CompressionMessageHeader, Namespace);
  20:     }
  21:  
  22:     public static CompressionAlgorithm GetCompressionAlgorithm(this Message message)
  23:     {
  24:         if (message.IsCompressed())
  25:         {
  26:             var algorithm = message.Headers.GetHeader<string>(CompressionMessageHeader, Namespace);
  27:             algorithm = algorithm.Replace("algorithm =", string.Empty).Replace("\"", string.Empty).Trim();
  28:             if (algorithm == CompressionAlgorithm.Deflate.ToString())
  29:             {
  30:                 return CompressionAlgorithm.Deflate;
  31:             }
  32:  
  33:             if (algorithm == CompressionAlgorithm.GZip.ToString())
  34:             {
  35:                 return CompressionAlgorithm.GZip;
  36:             }
  37:             throw new InvalidOperationException("Invalid compression algrorithm!");
  38:         }
  39:         throw new InvalidOperationException("Message is not compressed!");
  40:     }
  41:  
  42:     public static byte[] GetCompressedBody(this Message message)
  43:     {
  44:         byte[] buffer;
  45:         using (XmlReader reader1 = message.GetReaderAtBodyContents())
  46:         {
  47:             buffer = Convert.FromBase64String(reader1.ReadElementString(CompressionMessageBody, Namespace));
  48:         }
  49:         return buffer;
  50:     }
  51:  
  52:     public static string CreateCompressedBody(byte[] content)
  53:     {
  54:         StringWriter output = new StringWriter();
  55:         using (XmlWriter writer2 = XmlWriter.Create(output))
  56:         {
  57:             writer2.WriteStartElement(CompressionMessageBody, Namespace);
  58:             writer2.WriteBase64(content, 0, content.Length);
  59:             writer2.WriteEndElement();
  60:         }
  61:         return output.ToString();
  62:     }
  63: }

消息的序列化和反序列化最終是通過MessageFormatter來完成的。具體來說,客戶端通過ClientMessageFormatter實現對請求消息的序列化和對回複消息的序列化,而服務端通過DispatchMessageFormatter實現對請求消息的反序列化和對回複消息的序列化。

在默認的情況下,WCF選用的MessageFormatter為DataContractSerializerOperationFormatter,它采用DataContractSerializer進行實際的序列化和法序列化操作。我們自定義的MessageFormatter實際上是對DataContractSerializerOperationFormatter的封裝,我們依然使用它來完成序列化和反序列化工作,額外實現序列化後的壓縮和法序列化前的解壓縮。

因為DataContractSerializerOperationFormatter是一個internal類型,我們隻有通過反射的方式來創建它。如下的代碼片斷為用於進行消息壓縮與解壓縮的自定義MessageFormatter,即CompressionMessageFormatter的定義。

   1: public class CompressionMessageFormatter: IDispatchMessageFormatter, IClientMessageFormatter
   2: {
   3:     private const string DataContractSerializerOperationFormatterTypeName = "System.ServiceModel.Dispatcher.DataContractSerializerOperationFormatter, System.ServiceModel, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089";
   4:  
   5:     public IDispatchMessageFormatter InnerDispatchMessageFormatter { get; private set; }
   6:     public IClientMessageFormatter InnerClientMessageFormatter { get; private set; }
   7:     public MessageCompressor MessageCompressor { get; private set; }
   8:  
   9:     public CompressionMessageFormatter(CompressionAlgorithm algorithm, OperationDescription description, DataContractFormatAttribute dataContractFormatAttribute, DataContractSerializerOperationBehavior serializerFactory)
  10:     {
  11:         this.MessageCompressor = new MessageCompressor(algorithm);
  12:         Type innerFormatterType = Type.GetType(DataContractSerializerOperationFormatterTypeName);
  13:         var innerFormatter = Activator.CreateInstance(innerFormatterType, description, dataContractFormatAttribute, serializerFactory);
  14:         this.InnerClientMessageFormatter = innerFormatter as IClientMessageFormatter;
  15:         this.InnerDispatchMessageFormatter = innerFormatter as IDispatchMessageFormatter;
  16:     }
  17:  
  18:     public void DeserializeRequest(Message message, object[] parameters)
  19:     {
  20:         message = this.MessageCompressor.DecompressMessage(message);
  21:         this.InnerDispatchMessageFormatter.DeserializeRequest(message, parameters);
  22:     }
  23:  
  24:     public Message SerializeReply(MessageVersion messageVersion, object[] parameters, object result)
  25:     {
  26:         var message = this.InnerDispatchMessageFormatter.SerializeReply(messageVersion, parameters, result);
  27:         return this.MessageCompressor.CompressMessage(message);
  28:     }
  29:  
  30:     public object DeserializeReply(Message message, object[] parameters)
  31:     {
  32:         message = this.MessageCompressor.DecompressMessage(message);
  33:         return this.InnerClientMessageFormatter.DeserializeReply(message, parameters);
  34:     }
  35:  
  36:     public Message SerializeRequest(MessageVersion messageVersion, object[] parameters)
  37:     {
  38:         var message = this.InnerClientMessageFormatter.SerializeRequest(messageVersion, parameters);
  39:         return this.MessageCompressor.CompressMessage(message);
  40:     }
  41: }

ClientMessageFormatter和DispatchMessageFormatter實際上屬於ClientOperation和DispatchOperation的組件。我們可以通過如下一個自定義的操作行為CompressionOperationBehaviorAttribute將其應用到相應的操作上。

   1: [AttributeUsage( AttributeTargets.Method)]
   2: public class CompressionOperationBehaviorAttribute: Attribute, IOperationBehavior
   3: {
   4:     public CompressionAlgorithm Algorithm { get; set; }
   5:  
   6:     public void AddBindingParameters(OperationDescription operationDescription, BindingParameterCollection bindingParameters) { }
   7:  
   8:     public void ApplyClientBehavior(OperationDescription operationDescription, ClientOperation clientOperation)
   9:     {
  10:         clientOperation.SerializeRequest = true;
  11:         clientOperation.DeserializeReply = true;
  12:         var dataContractFormatAttribute = operationDescription.SyncMethod.GetCustomAttributes(typeof(DataContractFormatAttribute), true).FirstOrDefault() as DataContractFormatAttribute;
  13:         if (null == dataContractFormatAttribute)
  14:         {
  15:             dataContractFormatAttribute = new DataContractFormatAttribute();
  16:         }
  17:  
  18:         var dataContractSerializerOperationBehavior = operationDescription.Behaviors.Find<DataContractSerializerOperationBehavior>();
  19:         clientOperation.Formatter = new CompressionMessageFormatter(this.Algorithm, operationDescription, dataContractFormatAttribute, dataContractSerializerOperationBehavior);            
  20:     }
  21:  
  22:     public void ApplyDispatchBehavior(OperationDescription operationDescription, DispatchOperation dispatchOperation)
  23:     {
  24:         dispatchOperation.SerializeReply        = true;
  25:         dispatchOperation.DeserializeRequest    = true;
  26:         var dataContractFormatAttribute = operationDescription.SyncMethod.GetCustomAttributes(typeof(DataContractFormatAttribute), true).FirstOrDefault() as DataContractFormatAttribute;
  27:         if (null == dataContractFormatAttribute)
  28:         {
  29:             dataContractFormatAttribute = new DataContractFormatAttribute();
  30:         }
  31:         var dataContractSerializerOperationBehavior = operationDescription.Behaviors.Find<DataContractSerializerOperationBehavior>();
  32:         dispatchOperation.Formatter = new CompressionMessageFormatter(this.Algorithm, operationDescription, dataContractFormatAttribute, dataContractSerializerOperationBehavior);     
  33:     }
  34:  
  35:     public void Validate(OperationDescription operationDescription) { }
  36: }

為了驗證應用了CompressionOperationBehaviorAttribute特性的操作方法對應的消息是否經過了壓縮,我們可以通過一個簡單的例子來檢驗。我們采用常用的計算服務的例子,下麵是服務契約和服務類型的定義。我們上麵定義的CompressionOperationBehaviorAttribute應用到服務契約的Add操作上。

   1: [ServiceContract(Namespace= "https://www.artech.com/")]
   2: public interface ICalculator
   3: {
   4:     [OperationContract]
   5:     [CompressionOperationBehavior]
   6:     double Add(double x, double y);
   7: }
   8: public class CalculatorService : ICalculator
   9: {
  10:     public double Add(double x, double y)
  11:     {
  12:         return x + y;
  13:     }
  14: }

我們采用BasicHttpBinding作為終結點的綁定類型(具體的配置請查看源代碼),下麵是通過Fiddler獲取的消息的內容,它們的主體部分都經過了基於壓縮的編碼。

   1: <s:Envelope xmlns:s="https://schemas.xmlsoap.org/soap/envelope/">
   2:   <s:Header>
   3:     <Compression xmlns="https://www.artech.com/compression">algorithm = "GZip"</Compression>
   4:   </s:Header>
   5:   <s:Body>
   6:     <CompressedBody xmlns="https://www.artech.com/compression">7L0HYBx ... CQAA//8=</CompressedBody>
   7:   </s:Body>
   8: </s:Envelope>

回複消息

   1: <s:Envelope xmlns:s="https://schemas.xmlsoap.org/soap/envelope/">
   2:   <s:Header>
   3:     <Compression xmlns="https://www.artech.com/compression">algorithm = "GZip"</Compression>
   4:   </s:Header>
   5:   <s:Body>
   6:     <CompressedBody xmlns="https://www.artech.com/compression">7L0H...PAAAA//8=</CompressedBody>
   7:   </s:Body>
   8: </s:Envelope>

由於CompressionMessageFormatter使用基於DataContractSerializer序列化器的DataContractSerializerOperationFormatter進行消息的序列化和發序列化工作。而DataContractSerializer僅僅是WCF用於序列化的一種默認的選擇(WCF還可以采用傳統的XmlSeriaizer)。為了讓CompressionMessageFormatter能夠使用其他序列化器,你可以對於進行相應的修正。


作者:蔣金楠
微信公眾賬號:大內老A
微博:www.weibo.com/artech
如果你想及時得到個人撰寫文章以及著作的消息推送,或者想看看個人推薦的技術資料,可以掃描左邊二維碼(或者長按識別二維碼)關注個人公眾號(原來公眾帳號蔣金楠的自媒體將會停用)。
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁麵明顯位置給出原文連接,否則保留追究法律責任的權利。
原文鏈接

最後更新:2017-10-26 16:04:40

  上一篇:go  WCF服務端運行時架構體係詳解[續篇]
  下一篇:go  WCF客戶端運行時架構體係詳解[上篇]