閱讀64 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Netty框架中的@Skip使用說明

最近在學習Netty框架,對著教程上寫了個簡單的netty應用,可是死活調試不成功,對著程序跟教程上看了幾遍也找不到原因,後來又重新寫了一遍,服務端程序終於調試成功,原因出在了那個@Skip注釋上了,代碼如下:

package com.chris.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Skip;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.example.discard.DiscardServerHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.ReferenceCountUtil;

import java.net.SocketAddress;
import java.sql.Date;


/**
 * @author Chris
 * @date 2015-4-12
 */
public class NettyTimerServer {
	
	
	public void bind(int port) throws Exception{
		
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workGroup = new NioEventLoopGroup();
		
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workGroup)
			.channel(NioServerSocketChannel.class)
			.option(ChannelOption.SO_BACKLOG, 1024)
			.handler(new LoggingHandler(LogLevel.INFO))
			.childHandler(new ChildChannelHandler());
			System.out.println("server bind 8888");
			ChannelFuture f = b.bind(port).sync();
			System.out.println("finish bind");
			f.channel().closeFuture().sync();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}finally{
			bossGroup.shutdownGracefully();
			workGroup.shutdownGracefully();
		}
		
		
	}
	
	private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

		/* (non-Javadoc)
		 * @see io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.Channel)
		 */
		@Override
		protected void initChannel(SocketChannel arg0) throws Exception {
			System.out.println("server initChannel");
			arg0.pipeline().addLast(new TimeServerHandler());
			//arg0.pipeline().addl
			
		}
		
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		
		try {
			new NettyTimerServer().bind(8888);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

}

class TimeServerHandler extends ChannelHandlerAdapter {

	@Override
	@Skip
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		super.channelActive(ctx);
	}

	@Override
	@Skip
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		ByteBuf buf = (ByteBuf)msg;
		
		byte[] bytes = new byte[buf.readableBytes()];
		
		buf.readBytes(bytes);
		
		String body = new String(bytes,"UTF-8");
		System.out.println("the server receive order:"+body);
		
		String currentTIme = "QUERY CURRENT TIME".equalsIgnoreCase(body)?(new Date(System.currentTimeMillis())).toString():"receive error order";
		
		ByteBuf resp = Unpooled.copiedBuffer(currentTIme.getBytes());
		
		ctx.write(resp);
	}

	@Override
	@Skip
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}

	@Override
	@Skip
	public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
			SocketAddress localAddress, ChannelPromise promise)
			throws Exception {
		// TODO Auto-generated method stub
		super.connect(ctx, remoteAddress, localAddress, promise);
	}
	
	
	
	
	
	
}

 

這個實現類的每個方法上都有一個@Skip注釋,去掉注釋之後,程序調試成功,使用netty開發的服務端程序可以正常接收和處理客戶端連接。

被這個注釋坑了一天了,於是特地去看了netty的源碼,以下是關於@Skip源碼的說明:

/**
     * Indicates that the annotated event handler method in {@link ChannelHandler} will not be invoked by
     * {@link ChannelPipeline}.  This annotation is only useful when your handler method implementation
     * only passes the event through to the next handler, like the following:
     *
     * <pre>
     * {@code @Skip}
     * {@code @Override}
     * public void channelActive({@link ChannelHandlerContext} ctx) {
     *     ctx.fireChannelActive(); // do nothing but passing through to the next handler
     * }
     * </pre>
     *
     * {@link #handlerAdded(ChannelHandlerContext)} and {@link #handlerRemoved(ChannelHandlerContext)} are not able to
     * pass the event through to the next handler, so they must do nothing when annotated.
     *
     * <pre>
     * {@code @Skip}
     * {@code @Override}
     * public void handlerAdded({@link ChannelHandlerContext} ctx) {
     *     // do nothing
     * }
     * </pre>
     *
     * <p>
     * Note that this annotation is not {@linkplain Inherited inherited}.  If you override a method annotated with
     * {@link Skip}, it will not be skipped anymore.  Similarly, you can override a method not annotated with
     * {@link Skip} and simply pass the event through to the next handler, which reverses the behavior of the
     * supertype.
     * </p>
     */
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Skip {
        // no value
    }

大概意思就是說@Skip注釋用來在實現了Handler的實現類中的方法上,程序運行過程中如果某個handler實現中的方法被@Skip注釋了,則此方法不會被 ChannelPipeline 對象調用,所以,這就是為什麼我的服務端程序死活調試不成功的原因。我們可以看看netty內部執行過程中是如何處理@Skip注釋的,通過對源碼文件全文掃苗,找到了對@Skip注釋的處理都集中在了AbstractChannelHandlerContext中,下麵貼出處理@Skip相關的方法源碼:

/**
     * Returns an integer bitset that tells which handler methods were annotated with {@link Skip}.
     * It gets the value from {@link #skipFlagsCache} if an handler of the same type were queried before.
     * Otherwise, it delegates to {@link #skipFlags0(Class)} to get it.
     */
    static int skipFlags(ChannelHandler handler) {
        WeakHashMap<Class<?>, Integer> cache = skipFlagsCache.get();
        Class<? extends ChannelHandler> handlerType = handler.getClass();
        int flagsVal;
        Integer flags = cache.get(handlerType);
        if (flags != null) {
            flagsVal = flags;
        } else {
            flagsVal = skipFlags0(handlerType);
            cache.put(handlerType, Integer.valueOf(flagsVal));
        }

        return flagsVal;
    }

 

 /**
     * Determines the {@link #skipFlags} of the specified {@code handlerType} using the reflection API.
     */
    static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
        int flags = 0;
        try {
            if (isSkippable(handlerType, "handlerAdded")) {
                flags |= MASK_HANDLER_ADDED;
            }
            if (isSkippable(handlerType, "handlerRemoved")) {
                flags |= MASK_HANDLER_REMOVED;
            }
            if (isSkippable(handlerType, "exceptionCaught", Throwable.class)) {
                flags |= MASK_EXCEPTION_CAUGHT;
            }
            if (isSkippable(handlerType, "channelRegistered")) {
                flags |= MASK_CHANNEL_REGISTERED;
            }
            if (isSkippable(handlerType, "channelUnregistered")) {
                flags |= MASK_CHANNEL_UNREGISTERED;
            }
            if (isSkippable(handlerType, "channelActive")) {
                flags |= MASK_CHANNEL_ACTIVE;
            }
            if (isSkippable(handlerType, "channelInactive")) {
                flags |= MASK_CHANNEL_INACTIVE;
            }
            if (isSkippable(handlerType, "channelRead", Object.class)) {
                flags |= MASK_CHANNEL_READ;
            }
            if (isSkippable(handlerType, "channelReadComplete")) {
                flags |= MASK_CHANNEL_READ_COMPLETE;
            }
            if (isSkippable(handlerType, "channelWritabilityChanged")) {
                flags |= MASK_CHANNEL_WRITABILITY_CHANGED;
            }
            if (isSkippable(handlerType, "userEventTriggered", Object.class)) {
                flags |= MASK_USER_EVENT_TRIGGERED;
            }
            if (isSkippable(handlerType, "bind", SocketAddress.class, ChannelPromise.class)) {
                flags |= MASK_BIND;
            }
            if (isSkippable(handlerType, "connect", SocketAddress.class, SocketAddress.class, ChannelPromise.class)) {
                flags |= MASK_CONNECT;
            }
            if (isSkippable(handlerType, "disconnect", ChannelPromise.class)) {
                flags |= MASK_DISCONNECT;
            }
            if (isSkippable(handlerType, "close", ChannelPromise.class)) {
                flags |= MASK_CLOSE;
            }
            if (isSkippable(handlerType, "deregister", ChannelPromise.class)) {
                flags |= MASK_DEREGISTER;
            }
            if (isSkippable(handlerType, "read")) {
                flags |= MASK_READ;
            }
            if (isSkippable(handlerType, "write", Object.class, ChannelPromise.class)) {
                flags |= MASK_WRITE;
            }
            if (isSkippable(handlerType, "flush")) {
                flags |= MASK_FLUSH;
            }
        } catch (Exception e) {
            // Should never reach here.
            PlatformDependent.throwException(e);
        }

        return flags;
    }

 

 @SuppressWarnings("rawtypes")
    private static boolean isSkippable(
            Class<?> handlerType, String methodName, Class<?>... paramTypes) throws Exception {

        Class[] newParamTypes = new Class[paramTypes.length + 1];
        newParamTypes[0] = ChannelHandlerContext.class;
        System.arraycopy(paramTypes, 0, newParamTypes, 1, paramTypes.length);

        return handlerType.getMethod(methodName, newParamTypes).isAnnotationPresent(Skip.class);
    }

 

相信不少netty初學者都會碰到此類問題吧,希望這篇文章能對大家有點幫助。

最後更新:2017-05-22 20:03:46

  上一篇:go  dissecting-disruptor-wiring-up
  下一篇:go  饑餓和公平