閱讀256 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Python 收發消息__HTTP 接入(簡單)_消息隊列 MQ-阿裏雲

本文主要描述如何在 Python 環境下使用 HTTP 協議收發 MQ 消息。


運行環境準備

用 HTTP 協議發送或者接收消息 ,請完成以下環境準備工作。

Windows

  1. 從 Python 官網(https://www.python.org/downloads/windows/) 下載並安裝 Python 2.7。
  2. 打開 Windows 終端窗口(dos界麵),輸入 python 命令,檢查 Python 是否安裝成功, 如下圖所示。

Linux/Unix

您可以在 Linux 終端輸入 python 指令確認是否有預裝。如果有執行信息,則說明本台機器已經預裝。如果沒有,請按以下步驟進行安裝:

  1. 從 Python 官網下載 Python2.7 Linux 版本安裝包:https://www.python.org/download/

  2. 在 Python2.7 安裝包下載保存目錄下,運行指令進行解壓,例如:tar –xzf python-2.7.11.tgz

  3. 進入2.7.11目錄,執行./configure 指令。此步驟主要是用於生成 makefile 文件。

  4. 執行 make 指令進行實際編譯。

  5. 執行 make install 指令進行安裝操作。

  6. 安裝完畢後執行 python 命令檢查安裝是否成功,如下圖:


運行示例代碼

按照以下步驟運行示例代碼。

Windows

  1. 將下文具體示例程序小節所提供的配置文件以及示例程序拷貝到本地,並保存為 .py 文件,如下圖:

  2. 根據下文示例代碼裏的說明修改 user.properties 文件中的相關字段。

  3. 將終端窗口切換到保存 .py 的文件目錄,執行保存的 python 文件,如下圖:

Linux/Unix

步驟參見 Windows。執行結果參照下圖:


具體示例程序

您可以參考以下示例程序測試消息收發功能。

1.配置文件

您需要設置配置文件(user.properties)的相關內容,具體請參考申請 MQ 資源

  1. [property]
  2. #您在控製台創建的Topic
  3. Topic=xxx
  4. #公測集群URL
  5. URL=https://publictest-rest.ons.aliyun.com
  6. #阿裏雲官網身份驗證訪問碼
  7. Ak=xxx
  8. #阿裏雲身份驗證密鑰
  9. Sk=xxx
  10. #MQ控製台創建的Producer ID
  11. ProducerID=xxx
  12. #MQ控製台創建的Consumer ID
  13. ConsumerID=xxx

說明:URL中的 Key,Tag 以及 POST Content-Type 沒有任何的限製,隻要確保 Key 和 Tag 相同唯一即可,可以放在 user.properties 裏麵。

2.發送消息示例程序

通過 HTTP 協議發送消息,請參考以下示例代碼。

  1. #encoding:utf-8
  2. import ConfigParser
  3. import hashlib
  4. import httplib
  5. import time
  6. from urlparse import urlparse
  7. from Util import parseURL,calSignature
  8. """
  9. 消息發布者
  10. """
  11. class HttpProducer(object):
  12. def __init__(self):
  13. """簽名值"""
  14. self.signature = "Signature"
  15. """ProducerID"""
  16. self.producerid = "ProducerID"
  17. """消息主題"""
  18. self.topic = "Topic"
  19. """訪問碼"""
  20. self.ak = "AccessKey"
  21. """配置文件解析器"""
  22. self.cf = ConfigParser.ConfigParser()
  23. """MD5對象"""
  24. self.md5 = hashlib.md5()
  25. """
  26. 發布消息主流程
  27. """
  28. def process(self):
  29. """讀取配置文件"""
  30. self.cf.read("user.properties")
  31. """讀取消息主題"""
  32. topic = self.cf.get("property","Topic")
  33. """存儲消息URL路徑"""
  34. url = self.cf.get("property","URL")
  35. """訪問碼"""
  36. ak = self.cf.get("property","Ak")
  37. """密鑰"""
  38. sk = self.cf.get("property","Sk")
  39. """Producer ID"""
  40. pid = self.cf.get("property","ProducerID")
  41. """HTTP請求主體內容"""
  42. content = U"中文".encode('utf-8')
  43. """分隔符"""
  44. newline = "n"
  45. """獲取URL域名地址"""
  46. urlname = urlparse(url).hostname
  47. """根據HTPP主體內容計算MD5值"""
  48. self.md5.update(content)
  49. """建立HTTP連接對象"""
  50. conn = httplib.HTTPConnection(parseURL(urlname))
  51. try:
  52. for index in range(0,100):
  53. """時間戳"""
  54. date = repr(int(time.time() * 1000))[0:13]
  55. """構造簽名字符串"""
  56. signString = str(topic + newline + pid + newline + self.md5.hexdigest() + newline + date)
  57. """計算簽名"""
  58. sign = calSignature(signString,sk)
  59. """內容類型"""
  60. contentFlag ="Content-type"
  61. """HTTP請求頭部對象"""
  62. headers = {
  63. self.signature : sign,
  64. self.ak : ak,
  65. self.producerid : pid,
  66. contentFlag : "text/html;charset=UTF-8"
  67. }
  68. """開始發送HTTP請求消息"""
  69. conn.request(method="POST",url="/message/?topic="+topic+"&time="+date+"&tag=http&key=http",
  70. body=content,
  71. headers=headers)
  72. """獲取HTTP應答消息"""
  73. response = conn.getresponse()
  74. """讀取HTTP應答內容"""
  75. msg = response.read()
  76. print "response:"+msg
  77. except Exception,e:
  78. print e
  79. finally:
  80. conn.close()
  81. """流程入口"""
  82. if __name__ == '__main__':
  83. """創建消息發布者"""
  84. producer = HttpProducer()
  85. """開啟消息發布者"""
  86. producer.process()

3. 接收消息示例程序

通過 HTTP 協議接收消息,請參考以下示例代碼。

  1. #encoding:utf-8
  2. import ConfigParser
  3. import httplib
  4. import json
  5. import time
  6. from urlparse import urlparse
  7. from Util import parseURL,calSignature
  8. """
  9. 消息訂閱者
  10. """
  11. class HttpConsumer(object):
  12. def __init__(self):
  13. """簽名字段"""
  14. self.signatue = "Signature"
  15. """Consumer ID"""
  16. self.consumerid = "ConsumerID"
  17. """消費主題"""
  18. self.topic = "Topic"
  19. """訪問碼"""
  20. self.ak = "AccessKey"
  21. """配置文件解析器"""
  22. self.cf = ConfigParser.ConfigParser()
  23. """
  24. 訂閱消息流程
  25. """
  26. def process(self):
  27. """開始讀取配置文件"""
  28. self.cf.read("user.properties")
  29. """讀取主題"""
  30. topic = self.cf.get("property", "Topic")
  31. """存儲消息的URL路徑"""
  32. url = self.cf.get("property", "URL")
  33. """訪問碼"""
  34. ak = self.cf.get("property", "Ak")
  35. """密鑰"""
  36. sk = self.cf.get("property", "Sk")
  37. """Consumer ID"""
  38. cid = self.cf.get("property", "ConsumerID")
  39. newline = "n"
  40. """獲取URL主機域名地址"""
  41. urlname = urlparse(url).hostname
  42. """連接存儲消息的服務器"""
  43. conn = httplib.HTTPConnection(parseURL(urlname))
  44. while True:
  45. try:
  46. """時間戳"""
  47. date = repr(int(time.time() * 1000))[0:13]
  48. """構造簽名字符串"""
  49. signString = topic + newline + cid + newline + date
  50. """計算簽名值"""
  51. sign = calSignature(signString,sk)
  52. """請求消息HTTP頭部"""
  53. headers = {
  54. self.signatue : sign,
  55. self.ak : ak,
  56. self.consumerid : cid
  57. }
  58. """開始發送獲取消息的HTTP請求"""
  59. conn.request(method="GET",url=url+"/message/?topic="+topic+"&time="+date+"&num=32",headers=headers)
  60. """獲取HTTP應答消息"""
  61. response = conn.getresponse()
  62. """驗證應答消息狀態值"""
  63. if response.status != 200:
  64. continue
  65. """從應答消息中讀取實際的消息內容"""
  66. msg = response.read()
  67. """將實際的消費消息進行解碼"""
  68. messages = json.loads(msg)
  69. if len(messages) == 0:
  70. time.sleep(2)
  71. continue
  72. """依次獲取每條消費消息"""
  73. for message in messages:
  74. """計算時間戳"""
  75. date = repr(int(time.time() * 1000))[0:13]
  76. """構建刪除消費消息URL路徑"""
  77. delUrl = url + "/message/?msgHandle="+message['msgHandle'] + "&topic="+topic+"&time="+date
  78. """構造簽名字符串"""
  79. signString = topic + newline + cid + newline + message['msgHandle'] + newline + date
  80. """進行簽名"""
  81. sign = calSignature(signString,sk)
  82. """構造刪除消費消息HTTP頭部"""
  83. delheaders = {
  84. self.signatue : sign,
  85. self.ak : ak,
  86. self.consumerid : cid,
  87. }
  88. """發送刪除消息請求"""
  89. conn.request(method="DELETE", url=delUrl, headers=delheaders)
  90. """獲取請求應答"""
  91. response = conn.getresponse()
  92. """讀取應答內容"""
  93. msg = response.read()
  94. print "delete msg:"+msg
  95. except Exception,e:
  96. print e
  97. conn.close()
  98. """啟動入口"""
  99. if __name__ == '__main__':
  100. """構造消息訂閱者"""
  101. consumer = HttpConsumer()
  102. """開始啟動消息訂閱者"""
  103. consumer.process()

4. 工具方法示例程序

以下為示例中使用的工具方法。

  1. #encoding:utf-8
  2. import socket
  3. import hmac
  4. from hashlib import sha1
  5. """
  6. 解析URL
  7. """
  8. def parseURL(url):
  9. iplist = socket.gethostbyname_ex(url)
  10. if len(iplist) == 0:
  11. return None
  12. ips = iplist[2]
  13. if len(ips) == 0:
  14. return None
  15. return ips[0]
  16. """
  17. 認證簽名
  18. """
  19. def calSignature(signString, sk):
  20. mac = hmac.new(sk, signString, sha1)
  21. return mac.digest().encode('base64').rstrip()

最後更新:2016-12-06 17:05:11

  上一篇:go PHP 收發消息__HTTP 接入(簡單)_消息隊列 MQ-阿裏雲
  下一篇:go HTTP 定時消息__HTTP 接入(簡單)_消息隊列 MQ-阿裏雲