0%

dubbo源码解读分析netty的应用

“持而盈之,不如其已。
揣而锐之,不可常保。
金玉满堂,莫之能守;富贵而骄,自遗其咎。
功遂身退,天之道也。”1

上一篇中我们简单分析了netty在dubbo中的应用,本篇从dubbo发布服务的角度从源码中详细分析netty的具体使用。
开启netty服务,代码的调用链如下:

1
ServiceBean
2
      -->onApplicationEvent()
3
      -->ServiceConfig.export()
4
          -->doExport()
5
              -->doExportUrls()
6
                  -->doExportUrlsFor1Protocol()

发布服务的入口代码:

1
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
2
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
3
Exporter<?> exporter = protocol.export(wrapperInvoker);

这里proxyFactory对象默认当做JdkProxyFactory,进入到getInvoker方法。第一个参数是接口的实现对象,第二个参数是即将发布的接口Class,第三个参数是发布协议的URL。

1
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
2
    return new AbstractProxyInvoker<T>(proxy, type, url) {
3
        @Override
4
        protected Object doInvoke(T proxy, String methodName,
5
                                  Class<?>[] parameterTypes,
6
                                  Object[] arguments) throws Throwable {
7
            Method method = proxy.getClass().getMethod(methodName, parameterTypes);
8
            return method.invoke(proxy, arguments);
9
        }
10
    };
11
}

方法内代码,新建了一个抽象类AbstractProxyInvoker,并实现了抽象方法doInvoke。doInvoke中通过反射机制执行要调用的方法。
接下来:Exporter<?> exporter = protocol.export(invoker);代码 。这里protocol默认当做是DubboProtocol类。

1
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2
     URL url = invoker.getUrl();
3
4
     // export service.
5
     String key = serviceKey(url);
6
     DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
7
     exporterMap.put(key, exporter);
8
9
     //export an stub service for dispatching event
10
     Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
11
     Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
12
     if (isStubSupportEvent && !isCallbackservice) {
13
         String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
14
         if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
15
             if (logger.isWarnEnabled()) {
16
                 logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
17
                         "], has set stubproxy support event ,but no stub methods founded."));
18
             }
19
         } else {
20
             stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
21
         }
22
     }
23
24
     openServer(url);
25
     optimizeSerialization(url);
26
     return exporter;
27
 }

这个方法将传入的Invoker对象封装到DubboExporter对象中,并生成了唯一的key值。同时将key与DubboExporter对象关联保存进入exporterMap中,它是一个支持高并发的ConcurrentHashMap类。当客户端做远程请求服务时,就是根据key值从这个MAP中取出的真正接口实现对象来响应客户端的请求。在后面的代码分析中会体现出来。

1
private void openServer(URL url) {
2
    // find server.
3
    String key = url.getAddress();
4
    //client can export a service which's only for server to invoke
5
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
6
    if (isServer) {
7
        ExchangeServer server = serverMap.get(key);
8
        if (server == null) {
9
            serverMap.put(key, createServer(url));
10
        } else {
11
            // server supports reset, use together with override
12
            server.reset(url);
13
        }
14
    }
15
}

首先判断serverMap中是否已经包含了当前服务的ExchangeServer对象,如果没有调用createServer(url)创建一个并保存到serverMap中。继续跟进到createServer中,在这里调用了Exchangers类的静态方法bind创建了一个ExchangeServer对象,并返回出去了。注意bind方法的两个参数,第一个是URL很熟悉对吧!就不细说了,关键是第二个参数requestHandler,它是ExchangeHandlerAdapter类。它重写了很多父接口中的方法。里面重写了一个received方法,这个就是netty框架在接收到客户端请求以后响应处理的入口。具体处理细节在后面分析。这里继续往下看是怎么启动netty服务的。

1
private ExchangeServer createServer(URL url) {
2
     // send readonly event when server closes, it's enabled by default
3
     url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
4
     // enable heartbeat by default
5
     url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
6
     String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
7
8
     if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
9
         throw new RpcException("Unsupported server type: " + str + ", url: " + url);
10
11
     url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
12
     ExchangeServer server;
13
     try {
14
         server = Exchangers.bind(url, requestHandler);
15
     } catch (RemotingException e) {
16
         throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
17
     }
18
     str = url.getParameter(Constants.CLIENT_KEY);
19
     if (str != null && str.length() > 0) {
20
         Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
21
         if (!supportedTypes.contains(str)) {
22
             throw new RpcException("Unsupported client type: " + str);
23
         }
24
     }
25
     return server;
26
 }

以下是Exchangers类的静态方法bind的所有处理,getExchanger方法最终返回了HeaderExchanger对象。

1
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
2
    if (url == null) {
3
        throw new IllegalArgumentException("url == null");
4
    }
5
    if (handler == null) {
6
        throw new IllegalArgumentException("handler == null");
7
    }
8
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
9
    return getExchanger(url).bind(url, handler);
10
}
11
public static Exchanger getExchanger(URL url) {
12
    String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
13
    return getExchanger(type);
14
}
15
public static Exchanger getExchanger(String type) {
16
    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
17
}

HeaderExchanger类中的bind方法代码如下,将dubbo协议的handler对象最终包装成了DecodeHandler对象,并传入到了Transporters类的bind方法中。

1
public class HeaderExchanger implements Exchanger {
2
3
    public static final String NAME = "header";
4
5
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
6
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
7
    }
8
9
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
10
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
11
    }
12
13
}

继续跟进Transporters类的bind方法如下。总体思路就是获取Transporter接口的具体实现类,然后调用该实现的bind方法。它有MinaTransporter,NettyTransporter,GrizzlyTransporter三种实现类,这里我就默认使用实现类NettyTransporter了。

1
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
2
    if (url == null) {
3
        throw new IllegalArgumentException("url == null");
4
    }
5
    if (handlers == null || handlers.length == 0) {
6
        throw new IllegalArgumentException("handlers == null");
7
    }
8
    ChannelHandler handler;
9
    if (handlers.length == 1) {
10
        handler = handlers[0];
11
    } else {
12
        handler = new ChannelHandlerDispatcher(handlers);
13
    }
14
    return getTransporter().bind(url, handler);
15
}
1
public class NettyTransporter implements Transporter {
2
3
    public static final String NAME = "netty";
4
5
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
6
        return new NettyServer(url, listener);
7
    }
8
9
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
10
        return new NettyClient(url, listener);
11
    }
12
13
}

NettyTransporter类的源码如下。关注下bind方法,新建了一个NettyServer对象。感觉离netty越来越近了。
在NettyServer类中重点关注两个方法doOpen(它重写了抽象类父类的AbstractServer中的doOpen抽象方法)和它的构造函数。

1
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
2
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
3
}
4
protected void doOpen() throws Throwable {
5
    NettyHelper.setNettyLoggerFactory();
6
7
    bootstrap = new ServerBootstrap();
8
9
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
10
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
11
            new DefaultThreadFactory("NettyServerWorker", true));
12
13
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
14
    channels = nettyServerHandler.getChannels();
15
16
    bootstrap.group(bossGroup, workerGroup)
17
            .channel(NioServerSocketChannel.class)
18
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
19
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
20
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
21
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
22
                @Override
23
                protected void initChannel(NioSocketChannel ch) throws Exception {
24
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
25
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
26
                            .addLast("decoder", adapter.getDecoder())
27
                            .addLast("encoder", adapter.getEncoder())
28
                            .addLast("handler", nettyServerHandler);
29
                }
30
            });
31
    // bind
32
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
33
    channelFuture.syncUninterruptibly();
34
    channel = channelFuture.channel();
35
36
}

构造函数跟进super方法的处理。查看父类AbstractServer中的构造函数,细看里面执行了doOpen方法,根据抽象模版方法模式,其实调用的是子类的doOpen方法。到此已经将netty服务开启。

1:老子《道德经》第九章,老子故里,中国鹿邑。