閱讀288 返回首頁    go 阿裏雲 go 技術社區[雲棲]


mina的編碼和解碼以及斷包的處理,發送自定義協議,仿qq聊天,發送xml或json

最近一段時間以來,mina很火,和移動開發一樣,異常的火爆。前麵寫了幾篇移動開發的文章,都還不錯,你們的鼓勵就是我最大的動力。好了,廢話少說。我們來看下tcp通訊吧。
tcp通訊對於java來說是很簡單的。就是socket,也就是大家常說的套接字。大家不要把它看的很難。說白了tcp通訊其實就是數據流的讀寫。一條輸入流,一條輸出流。分別複雜發消息和接收消息。
明白了這些,ok,我們來看看我寫的例子吧。先看服務器端的測試類的源碼:
package com.minaqq.test;

import com.minaqq.server.ServerMsgProtocol;
import com.minaqq.worker.ServerSendMsgThread;

public class MsgServerTest {
	public static void main(String[] args) {
		if(ServerMsgProtocol.serverStart()){
			System.out.println("服務器啟動成功......");
			ServerSendMsgThread ssmt=new ServerSendMsgThread();
			ssmt.start();
			System.out.println("工作線程啟動成功......");
		}
	}
}

服務端連接代碼:

package com.minaqq.server;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

import com.minaqq.protocol.MsgProtocol;

/**
 * @see 服務器啟動類,字符串消息測試類
 * @author Herman.Xiong
 * @date 2013年12月6日 09:23:31
 * @file MinaServer.java
 * @package com.minaqq.server
 * @project MINA_QQ
 * @version 1.0
 * @since jdk1.6,mina 2.0
 */
public class ServerMsgProtocol {
	
	//30秒後超時 
    private static final int IDELTIMEOUT = 30;
    //15秒發送一次心跳包
    private static final int HEARTBEATRATE = 15;
    
    private static SocketAcceptor acceptor;

    private ServerMsgProtocol() {}
    
    public static SocketAcceptor getAcceptor(){
    	if(null==acceptor){
    		// 創建非阻塞的server端的Socket連接
    		acceptor = new NioSocketAcceptor();
    	}
    	return acceptor;
    }

    public static boolean serverStart() {
        DefaultIoFilterChainBuilder filterChain = getAcceptor().getFilterChain();
        // 添加編碼過濾器 處理亂碼、編碼問題
        filterChain.addLast("codec", new ProtocolCodecFilter(new MsgProtocol()));
        LoggingFilter loggingFilter = new LoggingFilter();
        loggingFilter.setMessageReceivedLogLevel(LogLevel.INFO);
        loggingFilter.setMessageSentLogLevel(LogLevel.INFO);
        // 添加日誌過濾器
        filterChain.addLast("loger", loggingFilter);
        // 設置核心消息業務處理器
        getAcceptor().setHandler(new ServerMessageHandler());
        //KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
        //KeepAliveRequestTimeoutHandler heartBeatHandler = new KeepAliveRequestTimeoutHandlerImpl();
        //KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,IdleStatus.BOTH_IDLE, heartBeatHandler);
        // 是否回發 
        //heartBeat.setForwardEvent(false);
        // 發送頻率 
        //heartBeat.setRequestInterval(HEARTBEATRATE);
        //getAcceptor().getFilterChain().addLast("heartbeat", heartBeat);
        getAcceptor().getSessionConfig().setReceiveBufferSize(2048*5000);//接收緩衝區1M
        getAcceptor().getSessionConfig().setBothIdleTime(30);
        //getAcceptor().getSessionConfig().setKeepAlive(true);
        // 設置session配置,30秒內無操作進入空閑狀態
        getAcceptor().getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDELTIMEOUT);
        try {
            // 綁定端口3456
        	getAcceptor().bind(new InetSocketAddress(8888));
        	return true;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

}
服務器的消息處理:
package com.minaqq.server;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

import com.minaqq.domain.MsgPack;
/**
 * @see 處理服務器端消息
 * @author Herman.Xiong
 * @date 2012-6-26 下午01:12:34
 * @file ServerMessageHandler.java
 * @package com.minaqq.server
 * @project MINA_QQ
 * @version 1.0
 * @since jdk1.6,mina 2.0
 */
public class ServerMessageHandler implements IoHandler{

    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        System.out.println("服務器發生異常:"+ cause.toString());
    }

    public void messageReceived(IoSession session, Object message) throws Exception {
    	MsgPack mp=(MsgPack)message;
        System.out.println("收到客戶端數據messageReceived----------:"+ mp.toString());
		/*//請求協議
		mp.setMsgMethod(3000);
		mp.setMsgPack("我是服務器發的消息");
		mp.setMsgLength(mp.getMsgPack().getBytes().length);
        session.write(mp);*/
        /*String content = mp.toString();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        String datetime = sdf.format(new Date());
        System.out.println(datetime+"服務器接收到數據的內容為messageReceived----------: " + content);*/
        // 拿到所有的客戶端Session
        /*Collection<IoSession> sessions = session.getService().getManagedSessions().values();
        // 向所有客戶端發送數據
        for (IoSession sess : sessions) {
            sess.write(datetime + "\t" + content);
        }*/
        
    }

    public void messageSent(IoSession session, Object message) throws Exception {
       /* System.out.println("服務器發送消息messageSent----------: "+ message);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        String datetime = sdf.format(new Date());
        System.out.println(datetime+"服務器發送消息messageSent----------: "+message.toString());*/
    }
 
    public void sessionClosed(IoSession session) throws Exception {
        System.out.println("關閉當前session: "+session.getId()+session.getRemoteAddress());
        CloseFuture closeFuture = session.close(true);
        closeFuture.addListener(new IoFutureListener<IoFuture>() {
            public void operationComplete(IoFuture future) {
                if (future instanceof CloseFuture) {
                    ((CloseFuture) future).setClosed();
                    System.out.println("sessionClosed CloseFuture setClosed-->"+ future.getSession().getId());
                }
            }
        });
    }

    public void sessionCreated(IoSession session) throws Exception {
        System.out.println("創建一個新連接:"+ session.getRemoteAddress()+"  id:  "+session.getId());
        session.write("welcome to the chat room !");
    }

    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        System.out.println("當前連接處於空閑狀態:"+ session.getRemoteAddress()+ status);
    }

    public void sessionOpened(IoSession session) throws Exception {
        System.out.println("打開一個session id:"+ session.getId()+"  空閑連接個數IdleCount:  "+ session.getBothIdleCount());
    }
}
自定義協議類:

/**
 * @see 自定義協議
 * @author Herman.Xiong
 * @date 2014年6月11日 10:30:40
 */
public class MsgProtocol implements ProtocolCodecFactory{
	private static final Charset charset=Charset.forName("UTF-8");
  
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {  
        return new MsgProtocolDecoder(charset);
    }  
  
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {  
        return new MsgProtocolEncoder(charset);
    }
}
協議解碼類:
package com.minaqq.charset;

import java.nio.ByteOrder;
import java.nio.charset.Charset;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

import com.minaqq.domain.MsgPack;
/**
 * @see 協議解碼
 * @author Herman.Xiong
 * @date 2014年6月11日 16:47:24
 */
public class MsgProtocolDecoder extends CumulativeProtocolDecoder  {  
    private Charset charset=null;  
  
    public MsgProtocolDecoder() {  
        this(Charset.defaultCharset());  
    }  
    
    public MsgProtocolDecoder(Charset charset) {  
        this.charset = charset;  
    }
    
	public void decode1(IoSession is, IoBuffer buf, ProtocolDecoderOutput out)
			throws Exception {
		buf.order(ByteOrder.LITTLE_ENDIAN);
		MsgPack mp=new MsgPack();
		//獲取消息的內容長度
		mp.setMsgLength(buf.getInt());
		//獲取消息的功能函數
		mp.setMsgMethod(buf.getInt());
		byte[] msg=new byte[mp.getMsgLength()];
		buf.get(msg);
		mp.setMsgPack(new String(msg,charset));
		buf.flip();
		out.write(mp);
	}
	
	public void dispose(IoSession arg0) throws Exception {
		
	}
	
	public void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1)
			throws Exception {
		
	}

	public void decode0(IoSession arg0, IoBuffer arg1, ProtocolDecoderOutput arg2)
			throws Exception {
		int limit = arg1.limit();
		byte[] bytes = new byte[limit];
		arg1.get(bytes);
		arg2.write(bytes);
	}

	protected boolean doDecode(IoSession session, IoBuffer ioBuffer, ProtocolDecoderOutput out) throws Exception {
		ioBuffer.order(ByteOrder.LITTLE_ENDIAN); 
		MsgPack mp = (MsgPack) session.getAttribute("nac-msg-pack"); // 從session對象中獲取“xhs-upload”屬性值 
		if(null==mp){
			 if (ioBuffer.remaining() >= 8) {
				 //取消息體長度
				 int msgLength = ioBuffer.getInt(); 
				 int msgMethod = ioBuffer.getInt();
				 mp=new MsgPack();
				 mp.setMsgLength(msgLength);
				 mp.setMsgMethod(msgMethod);
				 session.setAttribute("nac-msg-pack",mp);
				 return true;
			 }
			 return false;
		}
		if(ioBuffer.remaining()>=mp.getMsgLength()){
			byte [] msgPack=new byte[mp.getMsgLength()];
			ioBuffer.get(msgPack);
			mp.setMsgPack(new String(msgPack,charset));
			session.removeAttribute("nac-msg-pack");
			out.write(mp);
			return true;
		}
		return false;
	}   

}
協議編碼類:
package com.minaqq.charset;

import java.io.NotSerializableException;
import java.io.Serializable;
import java.nio.ByteOrder;
import java.nio.charset.Charset;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

import com.minaqq.domain.MsgPack;

public class MsgProtocolEncoder extends ProtocolEncoderAdapter{
	private Charset charset=null;

    public MsgProtocolEncoder(Charset charset) {
        this.charset = charset;     
    }     
    //在此處實現對MsgProtocolEncoder包的編碼工作,並把它寫入輸出流中     
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { 
        if(message instanceof MsgPack){
        	 MsgPack mp = (MsgPack) message; 
        	 IoBuffer buf = IoBuffer.allocate(mp.getMsgLength());
        	 buf.order(ByteOrder.LITTLE_ENDIAN);
             buf.setAutoExpand(true);    
             //設置消息內容的長度
             buf.putInt(mp.getMsgLength()); 
             //設置消息的功能函數
             buf.putInt(mp.getMsgMethod());
             if (null != mp.getMsgPack()) {
            	 buf.put(mp.getMsgPack().getBytes(charset));
             }   
             buf.flip();     
             out.write(buf);  
             out.flush();
             buf.free();
        }
    }     
    public void dispose() throws Exception {     
    }
    
	public void encode0(IoSession arg0, Object arg1, ProtocolEncoderOutput arg2)
			throws Exception {
		if (!(arg1 instanceof Serializable)) {
	        throw new NotSerializableException();
	    }
	    IoBuffer buf = IoBuffer.allocate(64);
	    buf.setAutoExpand(true);
	    buf.putObject(arg1);

	    int objectSize = buf.position() - 4;
	    if (objectSize > 1024) {
	        throw new IllegalArgumentException("The encoded object is too big: " + objectSize + " (> " + 1024
	                + ')');
	    }

	    buf.flip();
	    arg2.write(buf);
	}
	
}
協議實體類:

package com.minaqq.domain;

import java.io.Serializable;

/**
 * @see 自定義數據包
 * @author Herman.Xiong
 * @date 2014年6月11日 11:31:45
 */
public class MsgPack implements Serializable{
	/**
	 * 序列化和反序列化的版本號
	 */
	private static final long serialVersionUID = 1L;
	//消息長度
	private int msgLength;
	//消息方法
	private int msgMethod;
	//消息包內容
	private String msgPack;
	
	public MsgPack() {}

	public int getMsgLength() {
		return msgLength;
	}

	public void setMsgLength(int msgLength) {
		this.msgLength = msgLength;
	}

	public int getMsgMethod() {
		return msgMethod;
	}

	public void setMsgMethod(int msgMethod) {
		this.msgMethod = msgMethod;
	}

	public String getMsgPack() {
		return msgPack;
	}

	public void setMsgPack(String msgPack) {
		this.msgPack = msgPack;
	}

	public MsgPack(int msgLength, int msgMethod, String msgPack) {
		this.msgLength = msgLength;
		this.msgMethod = msgMethod;
		this.msgPack = msgPack;
	}

	public String toString() {
		return "MsgPack [msgLength=" + msgLength + ", msgMethod=" + msgMethod
				+ ", msgPack=" + msgPack + "]";
	}
	
}
心跳信息工廠類:
package com.minaqq.server;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
/**
 * @see 發送心跳包的內容
 * getResponse()---->isResponse();獲取數據判斷心跳事件(目的是判斷是否觸發心跳超時異常)
 * isRequest()----->getRequest(); 寫回數據是心跳事件觸發的數據(目的寫回給服務器(客戶端)心跳包)
 * @author Herman.Xiong
 */
public class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory{
	
	//心跳包內容
    private static final String HEARTBEATREQUEST = "HEARTBEATREQUEST";
    private static final String HEARTBEATRESPONSE = "HEARTBEATRESPONSE";
    
	/**
     * @see 返回給客戶端的心跳包數據 return 返回結果才是客戶端收到的心跳包數據
     * @author Herman.Xiong
     */
    public Object getRequest(IoSession session) {
        return HEARTBEATREQUEST;
    }

    /**
     * @see 接受到的客戶端數據包
     * @author Herman.Xiong
     */
    public Object getResponse(IoSession session, Object request) {
        return request;
    }

    /**
     * @see 判斷是否是客戶端發送來的的心跳包此判斷影響 KeepAliveRequestTimeoutHandler實現類判斷是否心跳包發送超時
     * @author Herman.Xiong
     */
    public boolean isRequest(IoSession session, Object message) {
        if(message.equals(HEARTBEATRESPONSE)){
            System.out.println("接收到客戶端心數據包引發心跳事件                 心跳數據包是》》" + message);
	        return true;
	    }
        return false;
    }

    /**
     * @see  判斷發送信息是否是心跳數據包此判斷影響 KeepAliveRequestTimeoutHandler實現類 判斷是否心跳包發送超時
     * @author Herman.Xiong
     */
    public boolean isResponse(IoSession session, Object message) {
        if(message.equals(HEARTBEATREQUEST)){
            System.out.println("服務器發送數據包中引發心跳事件: " + message);
            return true;
        }
        return false;
    }
}
心跳業務處理類:
package com.minaqq.server;

import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
/**
 * @see 當心跳超時時的處理,也可以用默認處理 這裏like
 * KeepAliveRequestTimeoutHandler.LOG的處理
 * @author Herman.Xiong
 */
public class KeepAliveRequestTimeoutHandlerImpl  implements KeepAliveRequestTimeoutHandler {
	/**
     * @see org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler心跳超時處理
     * @author Herman.Xiong
     */
    public void keepAliveRequestTimedOut(KeepAliveFilter filter,
            IoSession session) throws Exception {
        System.out.println("服務器端心跳包發送超時處理(即長時間沒有發送(接受)心跳包)---關閉當前長連接");
        CloseFuture closeFuture = session.close(true);
        closeFuture.addListener(new IoFutureListener<IoFuture>() {
            public void operationComplete(IoFuture future) {
                if (future instanceof CloseFuture) {
                    ((CloseFuture) future).setClosed();
                    System.out.println("sessionClosed CloseFuture setClosed-->"+ future.getSession().getId());
                }
            }
        });
    }
}
服務器發送數據包的線程類:
package com.minaqq.worker;

import java.util.Map;

import org.apache.mina.core.session.IoSession;

import com.minaqq.domain.MsgPack;
import com.minaqq.server.ServerMsgProtocol;
import com.minaqq.utils.XmlUtils;
/**
 * @see 服務器端發送數據
 * @author Herman.Xiong
 * @date 2014年6月9日 10:38:59
 */
public class ServerSendMsgThread extends Thread{
	
	public void run() {
		while(true){
			if(null!=ServerMsgProtocol.getAcceptor()){
				System.out.println("MinaServer.getAcceptor().getManagedSessionCount() is "+ServerMsgProtocol.getAcceptor().getManagedSessionCount());
				
				Map<Long, IoSession> map=ServerMsgProtocol.getAcceptor().getManagedSessions();
				for (Long key : map.keySet()) {
					IoSession is = map.get(key);
					//SocketAddress sa=is.getRemoteAddress();
					//InetSocketAddress isa=(InetSocketAddress)sa;
					//is.write("我是中文測試"+"session id is "+key+"  hostName:"+isa.getHostName()+"   address:"+isa.getAddress()+"   port:"+isa.getPort()+"        isa.toString:"+isa.toString());
					MsgPack mp=new MsgPack();
					//請求協議
					mp.setMsgMethod(1000);
					//mp.setMsgPack("我是服務端");
					String str="";
					for (int i = 0; i < 100; i++) {
						str+=XmlUtils.getXml();
					}
					mp.setMsgPack(str);
					mp.setMsgLength(mp.getMsgPack().getBytes().length);
					try {
						is.write(mp);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}else {
				System.out.println("MinaServer.getAcceptor is null ");
			}
			try {
				Thread.sleep(3000);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}
客戶端測試類:
package com.minaqq.test;

import com.minaqq.client.ClientMsgProtocol;
import com.minaqq.domain.MsgPack;
import com.minaqq.worker.ClientSendMsgThread;

public class MsgClientTest {
	public static void main(String[] args) {
		ClientMsgProtocol.clientStart();
		System.out.println("客戶端啟動成功......");
		ClientSendMsgThread csmt=new ClientSendMsgThread();
		csmt.start();
		/*MsgPack mp=new MsgPack();
		//請求協議
		mp.setMsgMethod(2000);
		mp.setMsgPack("我是客戶端");
		mp.setMsgLength(mp.getMsgPack().getBytes().length);
		ClientMsgProtocol.getIoSession().write(mp);*/
		System.out.println("客戶端工作線程啟動成功......");
	}
}
客戶端創建連接類:
package com.minaqq.client;

import java.net.InetSocketAddress;

import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

import com.minaqq.protocol.MsgProtocol;

/**
 * @see 模擬客戶端;
 * 用於連接服務端,並向服務端發送消息
 * @author Herman.Xiong
 * @date 2013年11月26日 11:27:50
 * @version 1.0
 * @serial jdk 1.6
 */
public class ClientMsgProtocol {
	
	private static NioSocketConnector connector ;
	
	private static IoSession is;
	
	public static NioSocketConnector getConnector(){
    	if(null==connector){
    		// 創建非阻塞的server端的Socket連接
    		connector = new NioSocketConnector();
    	}
    	return connector;
    }
	
	public static IoSession getIoSession(){
    	return is;
	}
	
	public static void clientStart(){
		// 創建客戶端連接器
		NioSocketConnector connector = getConnector(); 
		connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MsgProtocol()));
		LoggingFilter loggingFilter = new LoggingFilter();
        loggingFilter.setMessageReceivedLogLevel(LogLevel.INFO);
        loggingFilter.setMessageSentLogLevel(LogLevel.INFO);
		connector.getFilterChain().addLast("logger", loggingFilter); 
		connector.getSessionConfig().setReceiveBufferSize(2048*5000);//接收緩衝區1M
		connector.setConnectTimeoutMillis(30000); // 設置連接超時
		connector.setHandler(new TimeClientHandler());// 設置消息處理器
		ConnectFuture cf = connector.connect(new InetSocketAddress("10.10.2.136",8888));// 建立連接
		cf.awaitUninterruptibly();// 等待連接創建完成
		try {
			is=cf.getSession();
			//getIoSession().write(new String(XmlUtils.getXml().getBytes("UTF-8")));// 發送消息
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}
客戶端消息處理事件類:
package com.minaqq.client;

import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

import com.minaqq.domain.MsgPack;


/**
 * @see 處理接收客戶端消的息事件
 * @author Herman.Xiong
 * @date 2013年11月26日 11:23:32
 * @version 1.0
 * @since jdk1.6
 */
public class TimeClientHandler implements IoHandler{
	
	/**
	 * 接收客戶端發送的消息
	 */
	public void messageReceived(IoSession session, Object message) throws Exception { 
		MsgPack mp=(MsgPack)message;
		System.out.println("收到服務端發來的消息:"+mp.toString());// 顯示接收到的消息
	}

	public void exceptionCaught(IoSession arg0, Throwable arg1)
			throws Exception {
		
	}

	public void messageSent(IoSession arg0, Object arg1) throws Exception {
		
	}

	public void sessionClosed(IoSession arg0) throws Exception {
		
	}

	public void sessionCreated(IoSession arg0) throws Exception {
		
	}

	public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception {
		
	}

	public void sessionOpened(IoSession arg0) throws Exception {
		
	}
}
客戶端發送消息線程類:
package com.minaqq.worker;

import com.minaqq.client.ClientMsgProtocol;
import com.minaqq.domain.MsgPack;
/**
 * @see 模擬客戶端發送數據
 * @author Herman.Xiong
 * @date 2014年6月9日 10:38:59
 */
public class ClientSendMsgThread extends Thread{
	public void run() {
		while(true){
			if(null!=ClientMsgProtocol.getConnector()){
				try {
					//ClientMsgProtocol.getIoSession().write(new String("我是客戶端".getBytes("UTF-8")));
					MsgPack mp=new MsgPack();
					//請求協議
					mp.setMsgMethod(2000);
					mp.setMsgPack("我是客戶端");
					mp.setMsgLength(mp.getMsgPack().getBytes().length);
					ClientMsgProtocol.getIoSession().write(mp);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}else {
				System.out.println("MinaServer.getAcceptor is null ");
			}
			try {
				Thread.sleep(3000);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}
發送xml消息的工具類:
package com.minaqq.utils;

import com.minaqq.domain.Address;
import com.minaqq.domain.House;
import com.minaqq.domain.Person;
import com.minaqq.domain.PhoneNumber;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.xml.DomDriver;

public class XmlUtils {
	
	public static void testXStream(){
		XStream xstream=new XStream(new DomDriver());
		xstream.alias("PERSON", Person.class);
		xstream.alias("ADDRESS",Address.class);
		xstream.alias("PHONENUMBER", PhoneNumber.class);
		xstream.alias("HOUSE", House.class);
		Person person=(Person)xstream.fromXML(XmlUtils.getXml());
		System.out.println(person.toString());
	}
	
	public static String getXml(){
		StringBuffer sb=new StringBuffer("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");//
		sb.append("<PERSON firstName=\"Herman\">");
		sb.append("<lastName>Xiong</lastName>  ");
		sb.append("<phonex>");
		sb.append("<code>0</code>");
		sb.append("<number>1234567</number>");
		sb.append("</phonex>");
		sb.append("<fax>");
		sb.append("<code>0</code>");
		sb.append("<number>7654321</number>");
		sb.append("</fax>");
		sb.append("<addList>");
		sb.append("<ADDRESS>");
		sb.append("<add>上海市</add>");
		sb.append("<zipCode>123456</zipCode>");
		sb.append("</ADDRESS>");
		sb.append("</addList>");
		sb.append("<house>");
		sb.append("<HOUSE>");
		sb.append("<size>300萬</size>");
		sb.append("<price>120平方米</price>");
		sb.append("</HOUSE>");
		sb.append("<HOUSE>");
		sb.append("<size>500萬</size>");
		sb.append("<price>130平方米</price>");
		sb.append("</HOUSE>");
		sb.append("<HOUSE>");
		sb.append("<size>160萬</size>");
		sb.append("<price>61.5平方米</price>");
		sb.append("</HOUSE>");
		sb.append("</house>");
		sb.append("</PERSON>\n");
		return sb.toString();
	}
}
xml有關的實體類:
package com.minaqq.domain;

import java.util.Arrays;
import java.util.List;

import com.thoughtworks.xstream.annotations.XStreamAsAttribute;

public class Person {
	@XStreamAsAttribute
	private String firstName;
	private String lastName;
	private PhoneNumber phonex;
	private PhoneNumber fax;
	private List<Address> addList;
	private House[] house;
	public String getFirstName() {
		return firstName;
	}
	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}
	public String getLastName() {
		return lastName;
	}
	public void setLastName(String lastName) {
		this.lastName = lastName;
	}
	public PhoneNumber getPhonex() {
		return phonex;
	}
	public void setPhonex(PhoneNumber phonex) {
		this.phonex = phonex;
	}
	public PhoneNumber getFax() {
		return fax;
	}
	public void setFax(PhoneNumber fax) {
		this.fax = fax;
	}
	public List<Address> getAddList() {
		return addList;
	}
	public void setAddList(List<Address> addList) {
		this.addList = addList;
	}
	public House[] getHouse() {
		return house;
	}
	public void setHouse(House[] house) {
		this.house = house;
	}
	public Person() {
	}
	public Person(String firstName, String lastName) {
		this.firstName=firstName;
		this.lastName=lastName;
	}
	public Person(String firstName, String lastName, PhoneNumber phonex,
			PhoneNumber fax) {
		this.firstName = firstName;
		this.lastName = lastName;
		this.phonex = phonex;
		this.fax = fax;
	}
	public Person(String firstName, String lastName, PhoneNumber phonex,
			PhoneNumber fax, List<Address> addList) {
		this.firstName = firstName;
		this.lastName = lastName;
		this.phonex = phonex;
		this.fax = fax;
		this.addList = addList;
	}
	public Person(String firstName, String lastName, PhoneNumber phonex,
			PhoneNumber fax, List<Address> addList, House[] house) {
		this.firstName = firstName;
		this.lastName = lastName;
		this.phonex = phonex;
		this.fax = fax;
		this.addList = addList;
		this.house = house;
	}
	@Override
	public String toString() {
		return "Person [addList=" + addList + ", fax=" + fax + ", firstName="
				+ firstName + ", house=" + Arrays.toString(house)
				+ ", lastName=" + lastName + ", phonex=" + phonex + "]";
	}
}
xml有關的實體類:
package com.minaqq.domain;

public class Address {
	private String add;
	private String zipCode;
	public String getAdd() {
		return add;
	}
	public void setAdd(String add) {
		this.add = add;
	}
	public String getZipCode() {
		return zipCode;
	}
	public void setZipCode(String zipCode) {
		this.zipCode = zipCode;
	}
	public Address() {
	}
	public Address(String add, String zipCode) {
		this.add = add;
		this.zipCode = zipCode;
	}
	@Override
	public String toString() {
		return "Address [add=" + add + ", zipCode=" + zipCode + "]";
	}
}
xml有關的實體類:
package com.minaqq.domain;

public class PhoneNumber {
	private int code;
	private int number;
	public int getCode() {
		return code;
	}
	public void setCode(int code) {
		this.code = code;
	}
	public int getNumber() {
		return number;
	}
	public void setNumber(int number) {
		this.number = number;
	}
	public PhoneNumber(){}
	public PhoneNumber(int code,int number){
		this.code=code;
		this.number=number;
	}
	@Override
	public String toString() {
		return "PhoneNumber [code=" + code + ", number=" + number + "]";
	}
}
xml有關的實體類:
package com.minaqq.domain;

public class House {
	private String size;
	private String price;
	public String getSize() {
		return size;
	}
	public void setSize(String size) {
		this.size = size;
	}
	public String getPrice() {
		return price;
	}
	public void setPrice(String price) {
		this.price = price;
	}
	public House() {
	}
	public House(String size, String price) {
		this.size = size;
		this.price = price;
	}
	@Override
	public String toString() {
		return "House [price=" + price + ", size=" + size + "]";
	}
}

運行效果圖,服務圖片:


客戶端圖片:

OK,到此結束了,歡迎大家關注我的個人博客。

學習資料已經源碼下載請點擊:https://download.csdn.net/download/xmt1139057136/7487611

如有不懂,請大家加入qq群:135430763共同學習!

最後更新:2017-04-03 07:57:02

  上一篇:go Swift 概述及Swift運算符和表達式
  下一篇:go Apache Thrift使用總結