0%

Netty在dubbo中的应用浅析

“上善若水,水善利万物而不争。
处众人所恶,故几於道。
居善地,心善渊,与善仁,言善信,正善治,事善能,动善时。
夫唯不争,故无尤。”1

Netty是一款高性能的网络传输框架,在各类中间件和分布式框架中都能见到它的身影。今天就从dubbo的源码中说一说netty,netty本质上是负责网络传输,网络传输自然离不开socket,socket是端到端的连接。dobbo采用的是无中心化,每个client端都能与server端连接,每个client端同时也可以作为server端。
Dubbo的client端主要实现AbstractClient,NettyClient扩展继承了它。一般来说对于同一个server端来说(ip和port相同),只有一个client实例对应,也就是dubbo所说的共享连接。
从DubboProtocol类实现可以找到:

1
private ExchangeClient[] getClients(URL url) {
2
    // whether to share connection
3
    boolean service_share_connect = false;
4
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
5
    // if not configured, connection is shared, otherwise, one connection for one service
6
    if (connections == 0) {
7
        service_share_connect = true;
8
        connections = 1;
9
    }
10
11
    ExchangeClient[] clients = new ExchangeClient[connections];
12
    for (int i = 0; i < clients.length; i++) {
13
        if (service_share_connect) {
14
            clients[i] = getSharedClient(url);
15
        } else {
16
            clients[i] = initClient(url);
17
        }
18
    }
19
    return clients;
20
}

NettyClient 打开连接:

1
protected void doOpen() throws Throwable {
2
     NettyHelper.setNettyLoggerFactory();
3
     final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
4
     bootstrap = new Bootstrap();
5
     bootstrap.group(nioEventLoopGroup)
6
             .option(ChannelOption.SO_KEEPALIVE, true)
7
             .option(ChannelOption.TCP_NODELAY, true)
8
             .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
9
             //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
10
             .channel(NioSocketChannel.class);
11
12
     if (getTimeout() < 3000) {
13
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
14
     } else {
15
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
16
     }
17
18
     bootstrap.handler(new ChannelInitializer() {
19
20
         protected void initChannel(Channel ch) throws Exception {
21
             NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
22
             ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
23
                     .addLast("decoder", adapter.getDecoder())
24
                     .addLast("encoder", adapter.getEncoder())
25
                     .addLast("handler", nettyClientHandler);
26
         }
27
     });
28
 }

dubbo为了实现对Channel的抽象,不依赖Netty的实现,自己设计了Channel类,而NettyChannel只不过是dubbo的Channel其中一种实现而已。NettyChannel类保存了一个静态变量channelMap,这个是map型变量。原生的Channel和dubbo定制化的NettyChannel一对一对应绑定起来。
NettyChannel:

1
private static final ConcurrentMap<Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>();

一对一绑定实现:

1
static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
2
        if (ch == null) {
3
            return null;
4
        }
5
        NettyChannel ret = channelMap.get(ch);
6
        if (ret == null) {
7
            NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
8
            if (ch.isActive()) {
9
                ret = channelMap.putIfAbsent(ch, nettyChannel);
10
            }
11
            if (ret == null) {
12
                ret = nettyChannel;
13
            }
14
        }
15
        return ret;
16
    }

NettyHandler是对ChannelHandler一层封装。ChannelHandler大量采用装饰器模式和委托模式,这类似Java中的IO中Stream。通过装饰器模式使得ChannelHandler具有解码,统计,分发等等功能。最里层ExchangeHandler是DubboProtocol类中的内部类。reply方法看起来不来,主要做了2件事:获取对应的Invoker,执行invoke调用。

1
2
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
3
4
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
5
            if (message instanceof Invocation) {
6
                Invocation inv = (Invocation) message;
7
                Invoker<?> invoker = getInvoker(channel, inv);
8
                // need to consider backward-compatibility if it's a callback
9
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
10
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
11
                    boolean hasMethod = false;
12
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
13
                        hasMethod = inv.getMethodName().equals(methodsStr);
14
                    } else {
15
                        String[] methods = methodsStr.split(",");
16
                        for (String method : methods) {
17
                            if (inv.getMethodName().equals(method)) {
18
                                hasMethod = true;
19
                                break;
20
                            }
21
                        }
22
                    }
23
                    if (!hasMethod) {
24
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
25
                                + " not found in callback service interface ,invoke will be ignored."
26
                                + " please update the api interface. url is:"
27
                                + invoker.getUrl()) + " ,invocation is :" + inv);
28
                        return null;
29
                    }
30
                }
31
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
32
                return invoker.invoke(inv);
33
            }
34
            throw new RemotingException(channel, "Unsupported request: "
35
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
36
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
37
        }
38
39
        @Override
40
        public void received(Channel channel, Object message) throws RemotingException {
41
            if (message instanceof Invocation) {
42
                reply((ExchangeChannel) channel, message);
43
            } else {
44
                super.received(channel, message);
45
            }
46
        }
47
48
        @Override
49
        public void connected(Channel channel) throws RemotingException {
50
            invoke(channel, Constants.ON_CONNECT_KEY);
51
        }
52
53
        @Override
54
        public void disconnected(Channel channel) throws RemotingException {
55
            if (logger.isInfoEnabled()) {
56
                logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
57
            }
58
            invoke(channel, Constants.ON_DISCONNECT_KEY);
59
        }
60
61
        private void invoke(Channel channel, String methodKey) {
62
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
63
            if (invocation != null) {
64
                try {
65
                    received(channel, invocation);
66
                } catch (Throwable t) {
67
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
68
                }
69
            }
70
        }
71
72
        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
73
            String method = url.getParameter(methodKey);
74
            if (method == null || method.length() == 0) {
75
                return null;
76
            }
77
            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
78
            invocation.setAttachment(Constants.PATH_KEY, url.getPath());
79
            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
80
            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
81
            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
82
            if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
83
                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
84
            }
85
            return invocation;
86
        }
87
    };

NettyHandler继承了SimpleChannelHandler,是我们最需要关注和设计的类,因为它是Netty提供开发者最有控制权的类。任何依赖Netty的框架都需要定制化NettyHandler类。dubbo也不例外,对NettyHandler进行了大量抽象和封装,使其能满足自身功能的需要。

接下来会在下一篇中详细解读dubbo源码来分析发布服务时如何启动socket监听,启动netty服务。

1:老子《道德经》第八章,老子故里,中国鹿邑。