“上善若水,水善利万物而不争。
处众人所恶,故几於道。
居善地,心善渊,与善仁,言善信,正善治,事善能,动善时。
夫唯不争,故无尤。”1
Netty是一款高性能的网络传输框架,在各类中间件和分布式框架中都能见到它的身影。今天就从dubbo的源码中说一说netty,netty本质上是负责网络传输,网络传输自然离不开socket,socket是端到端的连接。dobbo采用的是无中心化,每个client端都能与server端连接,每个client端同时也可以作为server端。
Dubbo的client端主要实现AbstractClient,NettyClient扩展继承了它。一般来说对于同一个server端来说(ip和port相同),只有一个client实例对应,也就是dubbo所说的共享连接。
从DubboProtocol类实现可以找到:
1 | private ExchangeClient[] getClients(URL url) { |
2 | // whether to share connection |
3 | boolean service_share_connect = false; |
4 | int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); |
5 | // if not configured, connection is shared, otherwise, one connection for one service |
6 | if (connections == 0) { |
7 | service_share_connect = true; |
8 | connections = 1; |
9 | } |
10 | |
11 | ExchangeClient[] clients = new ExchangeClient[connections]; |
12 | for (int i = 0; i < clients.length; i++) { |
13 | if (service_share_connect) { |
14 | clients[i] = getSharedClient(url); |
15 | } else { |
16 | clients[i] = initClient(url); |
17 | } |
18 | } |
19 | return clients; |
20 | } |
NettyClient 打开连接:
1protected void doOpen() throws Throwable {2 NettyHelper.setNettyLoggerFactory();3 final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);4 bootstrap = new Bootstrap();5 bootstrap.group(nioEventLoopGroup)6 .option(ChannelOption.SO_KEEPALIVE, true)7 .option(ChannelOption.TCP_NODELAY, true)8 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)9 //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())10 .channel(NioSocketChannel.class);1112 if (getTimeout() < 3000) {13 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);14 } else {15 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());16 }1718 bootstrap.handler(new ChannelInitializer() {1920 protected void initChannel(Channel ch) throws Exception {21 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);22 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug23 .addLast("decoder", adapter.getDecoder())24 .addLast("encoder", adapter.getEncoder())25 .addLast("handler", nettyClientHandler);26 }27 });28 }
dubbo为了实现对Channel的抽象,不依赖Netty的实现,自己设计了Channel类,而NettyChannel只不过是dubbo的Channel其中一种实现而已。NettyChannel类保存了一个静态变量channelMap,这个是map型变量。原生的Channel和dubbo定制化的NettyChannel一对一对应绑定起来。
NettyChannel:
1 | private static final ConcurrentMap<Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>(); |
一对一绑定实现:
1 | static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) { |
2 | if (ch == null) { |
3 | return null; |
4 | } |
5 | NettyChannel ret = channelMap.get(ch); |
6 | if (ret == null) { |
7 | NettyChannel nettyChannel = new NettyChannel(ch, url, handler); |
8 | if (ch.isActive()) { |
9 | ret = channelMap.putIfAbsent(ch, nettyChannel); |
10 | } |
11 | if (ret == null) { |
12 | ret = nettyChannel; |
13 | } |
14 | } |
15 | return ret; |
16 | } |
NettyHandler是对ChannelHandler一层封装。ChannelHandler大量采用装饰器模式和委托模式,这类似Java中的IO中Stream。通过装饰器模式使得ChannelHandler具有解码,统计,分发等等功能。最里层ExchangeHandler是DubboProtocol类中的内部类。reply方法看起来不来,主要做了2件事:获取对应的Invoker,执行invoke调用。
1 | |
2 | private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { |
3 | |
4 | public Object reply(ExchangeChannel channel, Object message) throws RemotingException { |
5 | if (message instanceof Invocation) { |
6 | Invocation inv = (Invocation) message; |
7 | Invoker<?> invoker = getInvoker(channel, inv); |
8 | // need to consider backward-compatibility if it's a callback |
9 | if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { |
10 | String methodsStr = invoker.getUrl().getParameters().get("methods"); |
11 | boolean hasMethod = false; |
12 | if (methodsStr == null || methodsStr.indexOf(",") == -1) { |
13 | hasMethod = inv.getMethodName().equals(methodsStr); |
14 | } else { |
15 | String[] methods = methodsStr.split(","); |
16 | for (String method : methods) { |
17 | if (inv.getMethodName().equals(method)) { |
18 | hasMethod = true; |
19 | break; |
20 | } |
21 | } |
22 | } |
23 | if (!hasMethod) { |
24 | logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() |
25 | + " not found in callback service interface ,invoke will be ignored." |
26 | + " please update the api interface. url is:" |
27 | + invoker.getUrl()) + " ,invocation is :" + inv); |
28 | return null; |
29 | } |
30 | } |
31 | RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); |
32 | return invoker.invoke(inv); |
33 | } |
34 | throw new RemotingException(channel, "Unsupported request: " |
35 | + (message == null ? null : (message.getClass().getName() + ": " + message)) |
36 | + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); |
37 | } |
38 | |
39 | |
40 | public void received(Channel channel, Object message) throws RemotingException { |
41 | if (message instanceof Invocation) { |
42 | reply((ExchangeChannel) channel, message); |
43 | } else { |
44 | super.received(channel, message); |
45 | } |
46 | } |
47 | |
48 | |
49 | public void connected(Channel channel) throws RemotingException { |
50 | invoke(channel, Constants.ON_CONNECT_KEY); |
51 | } |
52 | |
53 | |
54 | public void disconnected(Channel channel) throws RemotingException { |
55 | if (logger.isInfoEnabled()) { |
56 | logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); |
57 | } |
58 | invoke(channel, Constants.ON_DISCONNECT_KEY); |
59 | } |
60 | |
61 | private void invoke(Channel channel, String methodKey) { |
62 | Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); |
63 | if (invocation != null) { |
64 | try { |
65 | received(channel, invocation); |
66 | } catch (Throwable t) { |
67 | logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); |
68 | } |
69 | } |
70 | } |
71 | |
72 | private Invocation createInvocation(Channel channel, URL url, String methodKey) { |
73 | String method = url.getParameter(methodKey); |
74 | if (method == null || method.length() == 0) { |
75 | return null; |
76 | } |
77 | RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]); |
78 | invocation.setAttachment(Constants.PATH_KEY, url.getPath()); |
79 | invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); |
80 | invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); |
81 | invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); |
82 | if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { |
83 | invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); |
84 | } |
85 | return invocation; |
86 | } |
87 | }; |
NettyHandler继承了SimpleChannelHandler,是我们最需要关注和设计的类,因为它是Netty提供开发者最有控制权的类。任何依赖Netty的框架都需要定制化NettyHandler类。dubbo也不例外,对NettyHandler进行了大量抽象和封装,使其能满足自身功能的需要。
接下来会在下一篇中详细解读dubbo源码来分析发布服务时如何启动socket监听,启动netty服务。
1:老子《道德经》第八章,老子故里,中国鹿邑。