阅读551 返回首页    go 微信


Java 收发消息__HTTP 接入(简单)_消息队列 MQ-阿里云

本文主要描述如何在 Java 环境下使用 HTTP 协议收发 MQ 消息。


1. 准备环境

在工程 POM 文件添加 HTTP Java 客户端的依赖。

  1. <dependency>
  2. <groupId>org.eclipse.jetty</groupId>
  3. <artifactId>jetty-client</artifactId>
  4. <version>9.3.4.RC1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.aliyun.openservices</groupId>
  8. <artifactId>ons-client</artifactId>
  9. <version>1.1.11</version>
  10. </dependency>


2. 运行代码配置(user.properties)

您需要设置配置文件(user.properties)的相关内容,具体请参考申请 MQ 资源

  1. [property]
  2. #您在控制台创建的Topic
  3. Topic=xxx
  4. #公测url
  5. URL=https://publictest-rest.ons.aliyun.com
  6. #阿里云身份验证码
  7. Ak=xxx
  8. #阿里云身份验证密钥
  9. Sk=xxx
  10. #MQ控制台创建的Producer ID
  11. ProducerID=xxx
  12. #MQ控制台创建的Consumer ID
  13. ConsumerID=xxx

说明:

URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可,可以放在 user.properties 里面。


3. HTTP 发送消息示例代码

您可以按以下说明设置相应参数并测试 HTTP 消息发送功能。

  1. package com.aliyun.openservice.ons.http.demo;
  2. import java.nio.charset.Charset;
  3. import java.util.Date;
  4. import java.util.Properties;
  5. import org.eclipse.jetty.client.HttpClient;
  6. import org.eclipse.jetty.client.api.ContentProvider;
  7. import org.eclipse.jetty.client.api.ContentResponse;
  8. import org.eclipse.jetty.client.api.Request;
  9. import org.eclipse.jetty.client.util.StringContentProvider;
  10. import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;
  11. public class HttpProducer {
  12. public static String SIGNATURE="Signature";
  13. public static String NUM="num";
  14. public static String CONSUMERID="ConsumerID";
  15. public static String PRODUCERID="ProducerID";
  16. public static String TIMEOUT="timeout";
  17. public static String TOPIC="Topic";
  18. public static String AK="AccessKey";
  19. public static String BODY="body";
  20. public static String MSGHANDLE="msgHandle";
  21. public static String TIME="time";
  22. public static void main(String[] args) throws Exception {
  23. HttpClient httpClient=new HttpClient();
  24. httpClient.setMaxConnectionsPerDestination(1);
  25. httpClient.start();
  26. Properties properties=new Properties();
  27. properties.load(HttpProducer.class.getClassLoader().getResourceAsStream("user.properties"));
  28. String topic=properties.getProperty("Topic"); //请在user.properties配置您的Topic
  29. String url=properties.getProperty("URL");//公测集群配置为https://publictest-rest.ons.aliyun.com/
  30. String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak
  31. String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk
  32. String pid=properties.getProperty("ProducerID");//请在user.properties配置您的Producer ID
  33. String date=String.valueOf(new Date().getTime());
  34. String sign=null;
  35. String body="hello ons http";
  36. String NEWLINE="n";
  37. String signString;
  38. for (int i = 0; i < 10; i++) {
  39. date=String.valueOf(new Date().getTime());
  40. Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http");
  41. ContentProvider content=new StringContentProvider(body);
  42. req.content(content);
  43. signString=topic+NEWLINE+pid+NEWLINE+MD5.getInstance().getMD5String(body)+NEWLINE+date;
  44. System.out.println(signString);
  45. sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
  46. req.header(SIGNATURE, sign);
  47. req.header(AK, ak);
  48. req.header(PRODUCERID, pid);
  49. ContentResponse response;
  50. response=req.send();
  51. System.out.println("send msg:"+response.getStatus()+response.getContentAsString());
  52. }
  53. }
  54. }


4. HTTP接收消息示例代码

请按以下说明设置相应参数并测试 HTTP 消息接收功能。

  1. package com.aliyun.openservice.ons.http.demo;
  2. import java.nio.charset.Charset;
  3. import java.util.Date;
  4. import java.util.List;
  5. import java.util.Properties;
  6. import org.eclipse.jetty.client.HttpClient;
  7. import org.eclipse.jetty.client.api.ContentProvider;
  8. import org.eclipse.jetty.client.api.ContentResponse;
  9. import org.eclipse.jetty.client.api.Request;
  10. import org.eclipse.jetty.client.util.StringContentProvider;
  11. import org.eclipse.jetty.http.HttpMethod;
  12. import com.alibaba.fastjson.JSON;
  13. import com.aliyun.openservice.ons.mqtt.demo.MqttProducer;
  14. import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;
  15. public class HttpConsumer {
  16. public static String SIGNATURE="Signature";
  17. public static String NUM="num";
  18. public static String CONSUMERID="ConsumerID";
  19. public static String PRODUCERID="ProducerID";
  20. public static String TIMEOUT="timeout";
  21. public static String TOPIC="Topic";
  22. public static String AK="AccessKey";
  23. public static String BODY="body";
  24. public static String MSGHANDLE="msgHandle";
  25. public static String TIME="time";
  26. public static void main(String[] args) throws Exception {
  27. HttpClient httpClient=new HttpClient();
  28. httpClient.setMaxConnectionsPerDestination(1);
  29. httpClient.start();
  30. Properties properties=new Properties();
  31. properties.load(HttpConsumer.class.getClassLoader().getResourceAsStream("user.properties"));
  32. String topic=properties.getProperty("Topic"); //请在user.properties配置您的topic
  33. String url=properties.getProperty("URL");//公测集群配置为https://publictest-rest.ons.aliyun.com/
  34. String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak
  35. String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk
  36. String cid=properties.getProperty("ConsumerID");//请在user.properties配置您的Consumer ID
  37. String date=String.valueOf(new Date().getTime());
  38. String sign=null;
  39. String NEWLINE="n";
  40. String signString;
  41. System.out.println(NEWLINE+NEWLINE);
  42. while (true) {
  43. try {
  44. date=String.valueOf(new Date().getTime());
  45. Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&num="+32);
  46. req.method(HttpMethod.GET);
  47. ContentResponse response;
  48. signString=topic+NEWLINE+cid+NEWLINE+date;
  49. sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
  50. req.header(SIGNATURE, sign);
  51. req.header(AK, ak);
  52. req.header(CONSUMERID, cid);
  53. long start=System.currentTimeMillis();
  54. response=req.send();
  55. System.out.println("get cost:"+(System.currentTimeMillis()-start)/1000
  56. +" "+response.getStatus()+" "+response.getContentAsString());
  57. List<SimpleMessage> list = null;
  58. if (response.getContentAsString()!=null&&!response.getContentAsString().isEmpty()) {
  59. list=JSON.parseArray(response.getContentAsString(), SimpleMessage.class);
  60. }
  61. if (list==null||list.size()==0) {
  62. Thread.sleep(100);
  63. continue;
  64. }
  65. System.out.println("size is :"+list.size());
  66. for (SimpleMessage simpleMessage : list) {
  67. date=String.valueOf(new Date().getTime());
  68. System.out.println("receive msg:"+simpleMessage.getBody()+" born time "+simpleMessage.getBornTime());
  69. req=httpClient.POST(url+"message/?msgHandle="+simpleMessage.getMsgHandle()+"&topic="+topic+"&time="+date);
  70. req.method(HttpMethod.DELETE);
  71. signString=topic+NEWLINE+cid+NEWLINE+simpleMessage.getMsgHandle()+NEWLINE+date;
  72. sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
  73. req.header(SIGNATURE, sign);
  74. req.header(AK, ak);
  75. req.header(CONSUMERID, cid);
  76. response=req.send();
  77. System.out.println("delete msg:"+response.toString());
  78. }
  79. Thread.sleep(100);
  80. } catch (Exception e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. }
  85. }


5. HTTP示例程序工具类

(1)消息封装类: SimpleMessage.java

  1. package com.aliyun.openservice.ons.http.demo;
  2. public class SimpleMessage {
  3. private String body;
  4. private String msgId;
  5. private String bornTime;
  6. private String msgHandle;
  7. private int reconsumeTimes;
  8. private String tag;
  9. public void setTag(String tag) {
  10. this.tag = tag;
  11. }
  12. public String getTag() {
  13. return tag;
  14. }
  15. public int getReconsumeTimes() {
  16. return reconsumeTimes;
  17. }
  18. public void setReconsumeTimes(int reconsumeTimes) {
  19. this.reconsumeTimes = reconsumeTimes;
  20. }
  21. public void setMsgHandle(String msgHandle) {
  22. this.msgHandle = msgHandle;
  23. }
  24. public String getMsgHandle() {
  25. return msgHandle;
  26. }
  27. public String getBody() {
  28. return body;
  29. }
  30. public void setBody(String body) {
  31. this.body = body;
  32. }
  33. public String getMsgId() {
  34. return msgId;
  35. }
  36. public void setMsgId(String msgId) {
  37. this.msgId = msgId;
  38. }
  39. public String getBornTime() {
  40. return bornTime;
  41. }
  42. public void setBornTime(String bornTime) {
  43. this.bornTime = bornTime;
  44. }
  45. }

(2)字符串签名类: MD5.java

  1. package com.aliyun.openservice.ons.http.demo;
  2. import java.io.UnsupportedEncodingException;
  3. import java.nio.charset.Charset;
  4. import java.security.MessageDigest;
  5. import java.sql.SQLException;
  6. import java.util.Date;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. import java.util.concurrent.ConcurrentHashMap;
  10. import java.util.concurrent.locks.ReentrantLock;
  11. import org.slf4j.LoggerFactory;
  12. public class MD5 {
  13. private static final org.slf4j.Logger log = LoggerFactory.getLogger(MD5.class);
  14. private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
  15. private static Map<Character, Integer> rDigits = new HashMap<Character, Integer>(16);
  16. static {
  17. for (int i = 0; i < digits.length; ++i) {
  18. rDigits.put(digits[i], i);
  19. }
  20. }
  21. private static MD5 me = new MD5();
  22. private MessageDigest mHasher;
  23. private final ReentrantLock opLock = new ReentrantLock();
  24. private MD5() {
  25. try {
  26. this.mHasher = MessageDigest.getInstance("md5");
  27. } catch (Exception e) {
  28. throw new RuntimeException(e);
  29. }
  30. }
  31. public static MD5 getInstance() {
  32. return me;
  33. }
  34. public String getMD5String(String content) {
  35. return this.bytes2string(this.hash(content));
  36. }
  37. public String getMD5String(byte[] content) {
  38. return this.bytes2string(this.hash(content));
  39. }
  40. public byte[] getMD5Bytes(byte[] content) {
  41. return this.hash(content);
  42. }
  43. public byte[] hash(String str) {
  44. this.opLock.lock();
  45. try {
  46. byte[] bt = this.mHasher.digest(str.getBytes("utf-8"));
  47. if (null == bt || bt.length != 16) {
  48. throw new IllegalArgumentException("md5 need");
  49. }
  50. return bt;
  51. } catch (UnsupportedEncodingException e) {
  52. throw new RuntimeException("unsupported utf-8 encoding", e);
  53. } finally {
  54. this.opLock.unlock();
  55. }
  56. }
  57. public byte[] hash(byte[] data) {
  58. this.opLock.lock();
  59. try {
  60. byte[] bt = this.mHasher.digest(data);
  61. if (null == bt || bt.length != 16) {
  62. throw new IllegalArgumentException("md5 need");
  63. }
  64. return bt;
  65. } finally {
  66. this.opLock.unlock();
  67. }
  68. }
  69. public String bytes2string(byte[] bt) {
  70. int l = bt.length;
  71. char[] out = new char[l << 1];
  72. for (int i = 0, j = 0; i < l; i++) {
  73. out[j++] = digits[(0xF0 & bt[i]) >>> 4];
  74. out[j++] = digits[0x0F & bt[i]];
  75. }
  76. if (log.isDebugEnabled()) {
  77. log.debug("[hash]" + new String(out));
  78. }
  79. return new String(out);
  80. }
  81. public byte[] string2bytes(String str) {
  82. if (null == str) {
  83. throw new NullPointerException("Argument is not allowed empty");
  84. }
  85. if (str.length() != 32) {
  86. throw new IllegalArgumentException("String length must equals 32");
  87. }
  88. byte[] data = new byte[16];
  89. char[] chs = str.toCharArray();
  90. for (int i = 0; i < 16; ++i) {
  91. int h = rDigits.get(chs[i * 2]).intValue();
  92. int l = rDigits.get(chs[i * 2 + 1]).intValue();
  93. data[i] = (byte) ((h & 0x0F) << 4 | l & 0x0F);
  94. }
  95. return data;
  96. }
  97. }

最后更新:2016-11-23 16:04:13

  上一篇:go HTTP 协议规范__HTTP 接入(简单)_消息队列 MQ-阿里云
  下一篇:go PHP 收发消息__HTTP 接入(简单)_消息队列 MQ-阿里云