359
技術社區[雲棲]
通過WCF擴展實現消息壓縮
對於需要進行大規模數據傳輸的WCF應用來說,對於請求消息和回複消息進行傳輸前的壓縮,不但可以降低網絡流量,也可以提高網絡傳輸的性能。由於WCF的擴展性,我們可以采用不同的方式實現對消息的壓縮,本文提供一種比較簡單的實現方式。[源代碼從這裏下載]
一、三種可行的消息壓縮方案
二、DataCompressor——用於數據壓縮與解壓縮組件
三、MessageCompressor——用於消息壓縮與解壓的組件
四、CompressionMessageFormatter——用於對請求/回複消息壓縮和解壓縮的組件
五、CompressionOperationBehaviorAttribute——將CompressionMessageFormatter用於WCF運行時框架的操作行為
六、查看結構壓縮後的消息
七、補充說明
消息壓縮在WCF中的實現其實很簡單,我們隻需要在消息(請求消息/回複消息)被序列化之後,發送之前進行壓縮;在接收之後,反序列化之前進行解壓縮即可。針對壓縮/解壓縮使用的時機,我們具有三種典型的解決方案。
- 通過自定義MessageEncoder和MessageEncodingBindingElement 來完成。具體的實現,可以參閱張玉彬的文章《WCF進階:將編碼後的字節流壓縮傳輸》和MSDN的文章《Custom Message Encoder: Compression Encoder》。
- 直接創建用於壓縮和解壓縮的信道,在CodePlex中具有這麼一個WCF Extensions;
- 自定義MessageFormatter實現序列化後的壓縮和法序列化前的解壓縮,這就是我們今天將要介紹的解決方案。
我們支持兩種方式的壓縮,Dflate和GZip。兩種不同的壓縮算法通過如下定義的CompressionAlgorithm枚舉表示。
1: public enum CompressionAlgorithm
2: {
3: GZip,
4: Deflate
5: }
而如下定義的DataCompressor負責基於上述兩種壓縮算法實際上的壓縮和解壓縮工作。
1: internal class DataCompressor
2: {
3: public static byte[] Compress(byte[] decompressedData, CompressionAlgorithm algorithm)
4: {
5: using (MemoryStream stream = new MemoryStream())
6: {
7: if (algorithm == CompressionAlgorithm.Deflate)
8: {
9: GZipStream stream2 = new GZipStream(stream, CompressionMode.Compress, true);
10: stream2.Write(decompressedData, 0, decompressedData.Length);
11: stream2.Close();
12: }
13: else
14: {
15: DeflateStream stream3 = new DeflateStream(stream, CompressionMode.Compress, true);
16: stream3.Write(decompressedData, 0, decompressedData.Length);
17: stream3.Close();
18: }
19: return stream.ToArray();
20: }
21: }
22:
23: public static byte[] Decompress(byte[] compressedData, CompressionAlgorithm algorithm)
24: {
25: using (MemoryStream stream = new MemoryStream(compressedData))
26: {
27: if (algorithm == CompressionAlgorithm.Deflate)
28: {
29: using (GZipStream stream2 = new GZipStream(stream, CompressionMode.Decompress))
30: {
31: return LoadToBuffer(stream2);
32: }
33: }
34: else
35: {
36: using (DeflateStream stream3 = new DeflateStream(stream, CompressionMode.Decompress))
37: {
38: return LoadToBuffer(stream3);
39: }
40: }
41: }
42: }
43:
44: private static byte[] LoadToBuffer(Stream stream)
45: {
46: using (MemoryStream stream2 = new MemoryStream())
47: {
48: int num;
49: byte[] buffer = new byte[0x400];
50: while ((num = stream.Read(buffer, 0, buffer.Length)) > 0)
51: {
52: stream2.Write(buffer, 0, num);
53: }
54: return stream2.ToArray();
55: }
56: }
57: }
而針對消息的壓縮和解壓縮通過如下一個MessageCompressor來完成。具體來說,我們通過上麵定義的DataCompressor對消息的主體部分內容進行壓縮,並將壓縮後的內容存放到一個預定義的XML元素中(名稱和命名空間分別為CompressedBody和https://www.artech.com/comporession/),同時添加相應的MessageHeader表示消息經過了壓縮,以及采用的壓縮算法。對於解壓縮,則是通過消息是否具有相應的MessageHeader判斷該消息是否經過壓縮,如果是則根據相應的算法對其進行解壓縮。具體的實現如下:
1: public class MessageCompressor
2: {
3: public MessageCompressor(CompressionAlgorithm algorithm)
4: {
5: this.Algorithm = algorithm;
6: }
7: public Message CompressMessage(Message sourceMessage)
8: {
9: byte[] buffer;
10: using (XmlDictionaryReader reader1 = sourceMessage.GetReaderAtBodyContents())
11: {
12: buffer = Encoding.UTF8.GetBytes(reader1.ReadOuterXml());
13: }
14: if (buffer.Length == 0)
15: {
16: Message emptyMessage = Message.CreateMessage(sourceMessage.Version, (string)null);
17: sourceMessage.Headers.CopyHeadersFrom(sourceMessage);
18: sourceMessage.Properties.CopyProperties(sourceMessage.Properties);
19: emptyMessage.Close();
20: return emptyMessage;
21: }
22: byte[] compressedData = DataCompressor.Compress(buffer, this.Algorithm);
23: string copressedBody = CompressionUtil.CreateCompressedBody(compressedData);
24: XmlTextReader reader = new XmlTextReader(new StringReader(copressedBody), new NameTable());
25: Message message2 = Message.CreateMessage(sourceMessage.Version, null, (XmlReader)reader);
26: message2.Headers.CopyHeadersFrom(sourceMessage);
27: message2.Properties.CopyProperties(sourceMessage.Properties);
28: message2.AddCompressionHeader(this.Algorithm);
29: sourceMessage.Close();
30: return message2;
31: }
32:
33: public Message DecompressMessage(Message sourceMessage)
34: {
35: if (!sourceMessage.IsCompressed())
36: {
37: return sourceMessage;
38: }
39: CompressionAlgorithm algorithm = sourceMessage.GetCompressionAlgorithm();
40: sourceMessage.RemoveCompressionHeader();
41: byte[] compressedBody = sourceMessage.GetCompressedBody();
42: byte[] decompressedBody = DataCompressor.Decompress(compressedBody, algorithm);
43: string newMessageXml = Encoding.UTF8.GetString(decompressedBody);
44: XmlTextReader reader2 = new XmlTextReader(new StringReader(newMessageXml));
45: Message newMessage = Message.CreateMessage(sourceMessage.Version, null, reader2);
46: newMessage.Headers.CopyHeadersFrom(sourceMessage);
47: newMessage.Properties.CopyProperties(sourceMessage.Properties);
48: return newMessage;
49: }
50:
51: public CompressionAlgorithm Algorithm { get; private set; }
52: }
下麵是針對Message類型而定義了一些擴展方法和輔助方法。
1: public static class CompressionUtil
2: {
3: public const string CompressionMessageHeader = "Compression";
4: public const string CompressionMessageBody = "CompressedBody";
5: public const string Namespace = "https://www.artech.com/compression";
6:
7: public static bool IsCompressed(this Message message)
8: {
9: return message.Headers.FindHeader(CompressionMessageHeader, Namespace) > -1;
10: }
11:
12: public static void AddCompressionHeader(this Message message, CompressionAlgorithm algorithm)
13: {
14: message.Headers.Add(MessageHeader.CreateHeader(CompressionMessageHeader, Namespace, string.Format("algorithm = \"{0}\"",algorithm)));
15: }
16:
17: public static void RemoveCompressionHeader(this Message message)
18: {
19: message.Headers.RemoveAll(CompressionMessageHeader, Namespace);
20: }
21:
22: public static CompressionAlgorithm GetCompressionAlgorithm(this Message message)
23: {
24: if (message.IsCompressed())
25: {
26: var algorithm = message.Headers.GetHeader<string>(CompressionMessageHeader, Namespace);
27: algorithm = algorithm.Replace("algorithm =", string.Empty).Replace("\"", string.Empty).Trim();
28: if (algorithm == CompressionAlgorithm.Deflate.ToString())
29: {
30: return CompressionAlgorithm.Deflate;
31: }
32:
33: if (algorithm == CompressionAlgorithm.GZip.ToString())
34: {
35: return CompressionAlgorithm.GZip;
36: }
37: throw new InvalidOperationException("Invalid compression algrorithm!");
38: }
39: throw new InvalidOperationException("Message is not compressed!");
40: }
41:
42: public static byte[] GetCompressedBody(this Message message)
43: {
44: byte[] buffer;
45: using (XmlReader reader1 = message.GetReaderAtBodyContents())
46: {
47: buffer = Convert.FromBase64String(reader1.ReadElementString(CompressionMessageBody, Namespace));
48: }
49: return buffer;
50: }
51:
52: public static string CreateCompressedBody(byte[] content)
53: {
54: StringWriter output = new StringWriter();
55: using (XmlWriter writer2 = XmlWriter.Create(output))
56: {
57: writer2.WriteStartElement(CompressionMessageBody, Namespace);
58: writer2.WriteBase64(content, 0, content.Length);
59: writer2.WriteEndElement();
60: }
61: return output.ToString();
62: }
63: }
消息的序列化和反序列化最終是通過MessageFormatter來完成的。具體來說,客戶端通過ClientMessageFormatter實現對請求消息的序列化和對回複消息的序列化,而服務端通過DispatchMessageFormatter實現對請求消息的反序列化和對回複消息的序列化。
在默認的情況下,WCF選用的MessageFormatter為DataContractSerializerOperationFormatter,它采用DataContractSerializer進行實際的序列化和法序列化操作。我們自定義的MessageFormatter實際上是對DataContractSerializerOperationFormatter的封裝,我們依然使用它來完成序列化和反序列化工作,額外實現序列化後的壓縮和法序列化前的解壓縮。
因為DataContractSerializerOperationFormatter是一個internal類型,我們隻有通過反射的方式來創建它。如下的代碼片斷為用於進行消息壓縮與解壓縮的自定義MessageFormatter,即CompressionMessageFormatter的定義。
1: public class CompressionMessageFormatter: IDispatchMessageFormatter, IClientMessageFormatter
2: {
3: private const string DataContractSerializerOperationFormatterTypeName = "System.ServiceModel.Dispatcher.DataContractSerializerOperationFormatter, System.ServiceModel, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089";
4:
5: public IDispatchMessageFormatter InnerDispatchMessageFormatter { get; private set; }
6: public IClientMessageFormatter InnerClientMessageFormatter { get; private set; }
7: public MessageCompressor MessageCompressor { get; private set; }
8:
9: public CompressionMessageFormatter(CompressionAlgorithm algorithm, OperationDescription description, DataContractFormatAttribute dataContractFormatAttribute, DataContractSerializerOperationBehavior serializerFactory)
10: {
11: this.MessageCompressor = new MessageCompressor(algorithm);
12: Type innerFormatterType = Type.GetType(DataContractSerializerOperationFormatterTypeName);
13: var innerFormatter = Activator.CreateInstance(innerFormatterType, description, dataContractFormatAttribute, serializerFactory);
14: this.InnerClientMessageFormatter = innerFormatter as IClientMessageFormatter;
15: this.InnerDispatchMessageFormatter = innerFormatter as IDispatchMessageFormatter;
16: }
17:
18: public void DeserializeRequest(Message message, object[] parameters)
19: {
20: message = this.MessageCompressor.DecompressMessage(message);
21: this.InnerDispatchMessageFormatter.DeserializeRequest(message, parameters);
22: }
23:
24: public Message SerializeReply(MessageVersion messageVersion, object[] parameters, object result)
25: {
26: var message = this.InnerDispatchMessageFormatter.SerializeReply(messageVersion, parameters, result);
27: return this.MessageCompressor.CompressMessage(message);
28: }
29:
30: public object DeserializeReply(Message message, object[] parameters)
31: {
32: message = this.MessageCompressor.DecompressMessage(message);
33: return this.InnerClientMessageFormatter.DeserializeReply(message, parameters);
34: }
35:
36: public Message SerializeRequest(MessageVersion messageVersion, object[] parameters)
37: {
38: var message = this.InnerClientMessageFormatter.SerializeRequest(messageVersion, parameters);
39: return this.MessageCompressor.CompressMessage(message);
40: }
41: }
ClientMessageFormatter和DispatchMessageFormatter實際上屬於ClientOperation和DispatchOperation的組件。我們可以通過如下一個自定義的操作行為CompressionOperationBehaviorAttribute將其應用到相應的操作上。
1: [AttributeUsage( AttributeTargets.Method)]
2: public class CompressionOperationBehaviorAttribute: Attribute, IOperationBehavior
3: {
4: public CompressionAlgorithm Algorithm { get; set; }
5:
6: public void AddBindingParameters(OperationDescription operationDescription, BindingParameterCollection bindingParameters) { }
7:
8: public void ApplyClientBehavior(OperationDescription operationDescription, ClientOperation clientOperation)
9: {
10: clientOperation.SerializeRequest = true;
11: clientOperation.DeserializeReply = true;
12: var dataContractFormatAttribute = operationDescription.SyncMethod.GetCustomAttributes(typeof(DataContractFormatAttribute), true).FirstOrDefault() as DataContractFormatAttribute;
13: if (null == dataContractFormatAttribute)
14: {
15: dataContractFormatAttribute = new DataContractFormatAttribute();
16: }
17:
18: var dataContractSerializerOperationBehavior = operationDescription.Behaviors.Find<DataContractSerializerOperationBehavior>();
19: clientOperation.Formatter = new CompressionMessageFormatter(this.Algorithm, operationDescription, dataContractFormatAttribute, dataContractSerializerOperationBehavior);
20: }
21:
22: public void ApplyDispatchBehavior(OperationDescription operationDescription, DispatchOperation dispatchOperation)
23: {
24: dispatchOperation.SerializeReply = true;
25: dispatchOperation.DeserializeRequest = true;
26: var dataContractFormatAttribute = operationDescription.SyncMethod.GetCustomAttributes(typeof(DataContractFormatAttribute), true).FirstOrDefault() as DataContractFormatAttribute;
27: if (null == dataContractFormatAttribute)
28: {
29: dataContractFormatAttribute = new DataContractFormatAttribute();
30: }
31: var dataContractSerializerOperationBehavior = operationDescription.Behaviors.Find<DataContractSerializerOperationBehavior>();
32: dispatchOperation.Formatter = new CompressionMessageFormatter(this.Algorithm, operationDescription, dataContractFormatAttribute, dataContractSerializerOperationBehavior);
33: }
34:
35: public void Validate(OperationDescription operationDescription) { }
36: }
為了驗證應用了CompressionOperationBehaviorAttribute特性的操作方法對應的消息是否經過了壓縮,我們可以通過一個簡單的例子來檢驗。我們采用常用的計算服務的例子,下麵是服務契約和服務類型的定義。我們上麵定義的CompressionOperationBehaviorAttribute應用到服務契約的Add操作上。
1: [ServiceContract(Namespace= "https://www.artech.com/")]
2: public interface ICalculator
3: {
4: [OperationContract]
5: [CompressionOperationBehavior]
6: double Add(double x, double y);
7: }
8: public class CalculatorService : ICalculator
9: {
10: public double Add(double x, double y)
11: {
12: return x + y;
13: }
14: }
我們采用BasicHttpBinding作為終結點的綁定類型(具體的配置請查看源代碼),下麵是通過Fiddler獲取的消息的內容,它們的主體部分都經過了基於壓縮的編碼。
1: <s:Envelope xmlns:s="https://schemas.xmlsoap.org/soap/envelope/">
2: <s:Header>
3: <Compression xmlns="https://www.artech.com/compression">algorithm = "GZip"</Compression>
4: </s:Header>
5: <s:Body>
6: <CompressedBody xmlns="https://www.artech.com/compression">7L0HYBx ... CQAA//8=</CompressedBody>
7: </s:Body>
8: </s:Envelope>
回複消息
1: <s:Envelope xmlns:s="https://schemas.xmlsoap.org/soap/envelope/">
2: <s:Header>
3: <Compression xmlns="https://www.artech.com/compression">algorithm = "GZip"</Compression>
4: </s:Header>
5: <s:Body>
6: <CompressedBody xmlns="https://www.artech.com/compression">7L0H...PAAAA//8=</CompressedBody>
7: </s:Body>
8: </s:Envelope>
由於CompressionMessageFormatter使用基於DataContractSerializer序列化器的DataContractSerializerOperationFormatter進行消息的序列化和發序列化工作。而DataContractSerializer僅僅是WCF用於序列化的一種默認的選擇(WCF還可以采用傳統的XmlSeriaizer)。為了讓CompressionMessageFormatter能夠使用其他序列化器,你可以對於進行相應的修正。
微信公眾賬號:大內老A
微博:www.weibo.com/artech
如果你想及時得到個人撰寫文章以及著作的消息推送,或者想看看個人推薦的技術資料,可以掃描左邊二維碼(或者長按識別二維碼)關注個人公眾號(原來公眾帳號蔣金楠的自媒體將會停用)。
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁麵明顯位置給出原文連接,否則保留追究法律責任的權利。
最後更新:2017-10-26 16:04:40