“持而盈之,不如其已。
揣而锐之,不可常保。
金玉满堂,莫之能守;富贵而骄,自遗其咎。
功遂身退,天之道也。”1
上一篇中我们简单分析了netty在dubbo中的应用,本篇从dubbo发布服务的角度从源码中详细分析netty的具体使用。
开启netty服务,代码的调用链如下:
1 | ServiceBean |
2 | -->onApplicationEvent() |
3 | -->ServiceConfig.export() |
4 | -->doExport() |
5 | -->doExportUrls() |
6 | -->doExportUrlsFor1Protocol() |
发布服务的入口代码:
1 | Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); |
2 | DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); |
3 | Exporter<?> exporter = protocol.export(wrapperInvoker); |
这里proxyFactory对象默认当做JdkProxyFactory,进入到getInvoker方法。第一个参数是接口的实现对象,第二个参数是即将发布的接口Class,第三个参数是发布协议的URL。
1 | public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { |
2 | return new AbstractProxyInvoker<T>(proxy, type, url) { |
3 | |
4 | protected Object doInvoke(T proxy, String methodName, |
5 | Class<?>[] parameterTypes, |
6 | Object[] arguments) throws Throwable { |
7 | Method method = proxy.getClass().getMethod(methodName, parameterTypes); |
8 | return method.invoke(proxy, arguments); |
9 | } |
10 | }; |
11 | } |
方法内代码,新建了一个抽象类AbstractProxyInvoker,并实现了抽象方法doInvoke。doInvoke中通过反射机制执行要调用的方法。
接下来:Exporter<?> exporter = protocol.export(invoker);代码 。这里protocol默认当做是DubboProtocol类。
1 | public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { |
2 | URL url = invoker.getUrl(); |
3 | |
4 | // export service. |
5 | String key = serviceKey(url); |
6 | DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); |
7 | exporterMap.put(key, exporter); |
8 | |
9 | //export an stub service for dispatching event |
10 | Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); |
11 | Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); |
12 | if (isStubSupportEvent && !isCallbackservice) { |
13 | String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); |
14 | if (stubServiceMethods == null || stubServiceMethods.length() == 0) { |
15 | if (logger.isWarnEnabled()) { |
16 | logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + |
17 | "], has set stubproxy support event ,but no stub methods founded.")); |
18 | } |
19 | } else { |
20 | stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); |
21 | } |
22 | } |
23 | |
24 | openServer(url); |
25 | optimizeSerialization(url); |
26 | return exporter; |
27 | } |
这个方法将传入的Invoker对象封装到DubboExporter对象中,并生成了唯一的key值。同时将key与DubboExporter对象关联保存进入exporterMap中,它是一个支持高并发的ConcurrentHashMap类。当客户端做远程请求服务时,就是根据key值从这个MAP中取出的真正接口实现对象来响应客户端的请求。在后面的代码分析中会体现出来。
1 | private void openServer(URL url) { |
2 | // find server. |
3 | String key = url.getAddress(); |
4 | //client can export a service which's only for server to invoke |
5 | boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); |
6 | if (isServer) { |
7 | ExchangeServer server = serverMap.get(key); |
8 | if (server == null) { |
9 | serverMap.put(key, createServer(url)); |
10 | } else { |
11 | // server supports reset, use together with override |
12 | server.reset(url); |
13 | } |
14 | } |
15 | } |
首先判断serverMap中是否已经包含了当前服务的ExchangeServer对象,如果没有调用createServer(url)创建一个并保存到serverMap中。继续跟进到createServer中,在这里调用了Exchangers类的静态方法bind创建了一个ExchangeServer对象,并返回出去了。注意bind方法的两个参数,第一个是URL很熟悉对吧!就不细说了,关键是第二个参数requestHandler,它是ExchangeHandlerAdapter类。它重写了很多父接口中的方法。里面重写了一个received方法,这个就是netty框架在接收到客户端请求以后响应处理的入口。具体处理细节在后面分析。这里继续往下看是怎么启动netty服务的。
1 | private ExchangeServer createServer(URL url) { |
2 | // send readonly event when server closes, it's enabled by default |
3 | url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); |
4 | // enable heartbeat by default |
5 | url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); |
6 | String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); |
7 | |
8 | if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) |
9 | throw new RpcException("Unsupported server type: " + str + ", url: " + url); |
10 | |
11 | url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); |
12 | ExchangeServer server; |
13 | try { |
14 | server = Exchangers.bind(url, requestHandler); |
15 | } catch (RemotingException e) { |
16 | throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); |
17 | } |
18 | str = url.getParameter(Constants.CLIENT_KEY); |
19 | if (str != null && str.length() > 0) { |
20 | Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); |
21 | if (!supportedTypes.contains(str)) { |
22 | throw new RpcException("Unsupported client type: " + str); |
23 | } |
24 | } |
25 | return server; |
26 | } |
以下是Exchangers类的静态方法bind的所有处理,getExchanger方法最终返回了HeaderExchanger对象。
1 | public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { |
2 | if (url == null) { |
3 | throw new IllegalArgumentException("url == null"); |
4 | } |
5 | if (handler == null) { |
6 | throw new IllegalArgumentException("handler == null"); |
7 | } |
8 | url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); |
9 | return getExchanger(url).bind(url, handler); |
10 | } |
11 | public static Exchanger getExchanger(URL url) { |
12 | String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); |
13 | return getExchanger(type); |
14 | } |
15 | public static Exchanger getExchanger(String type) { |
16 | return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); |
17 | } |
HeaderExchanger类中的bind方法代码如下,将dubbo协议的handler对象最终包装成了DecodeHandler对象,并传入到了Transporters类的bind方法中。
1 | public class HeaderExchanger implements Exchanger { |
2 | |
3 | public static final String NAME = "header"; |
4 | |
5 | public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { |
6 | return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); |
7 | } |
8 | |
9 | public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { |
10 | return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); |
11 | } |
12 | |
13 | } |
继续跟进Transporters类的bind方法如下。总体思路就是获取Transporter接口的具体实现类,然后调用该实现的bind方法。它有MinaTransporter,NettyTransporter,GrizzlyTransporter三种实现类,这里我就默认使用实现类NettyTransporter了。
1 | public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { |
2 | if (url == null) { |
3 | throw new IllegalArgumentException("url == null"); |
4 | } |
5 | if (handlers == null || handlers.length == 0) { |
6 | throw new IllegalArgumentException("handlers == null"); |
7 | } |
8 | ChannelHandler handler; |
9 | if (handlers.length == 1) { |
10 | handler = handlers[0]; |
11 | } else { |
12 | handler = new ChannelHandlerDispatcher(handlers); |
13 | } |
14 | return getTransporter().bind(url, handler); |
15 | } |
1 | public class NettyTransporter implements Transporter { |
2 | |
3 | public static final String NAME = "netty"; |
4 | |
5 | public Server bind(URL url, ChannelHandler listener) throws RemotingException { |
6 | return new NettyServer(url, listener); |
7 | } |
8 | |
9 | public Client connect(URL url, ChannelHandler listener) throws RemotingException { |
10 | return new NettyClient(url, listener); |
11 | } |
12 | |
13 | } |
NettyTransporter类的源码如下。关注下bind方法,新建了一个NettyServer对象。感觉离netty越来越近了。
在NettyServer类中重点关注两个方法doOpen(它重写了抽象类父类的AbstractServer中的doOpen抽象方法)和它的构造函数。
1 | public NettyServer(URL url, ChannelHandler handler) throws RemotingException { |
2 | super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); |
3 | } |
4 | protected void doOpen() throws Throwable { |
5 | NettyHelper.setNettyLoggerFactory(); |
6 | |
7 | bootstrap = new ServerBootstrap(); |
8 | |
9 | bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); |
10 | workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), |
11 | new DefaultThreadFactory("NettyServerWorker", true)); |
12 | |
13 | final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); |
14 | channels = nettyServerHandler.getChannels(); |
15 | |
16 | bootstrap.group(bossGroup, workerGroup) |
17 | .channel(NioServerSocketChannel.class) |
18 | .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) |
19 | .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) |
20 | .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) |
21 | .childHandler(new ChannelInitializer<NioSocketChannel>() { |
22 | |
23 | protected void initChannel(NioSocketChannel ch) throws Exception { |
24 | NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); |
25 | ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug |
26 | .addLast("decoder", adapter.getDecoder()) |
27 | .addLast("encoder", adapter.getEncoder()) |
28 | .addLast("handler", nettyServerHandler); |
29 | } |
30 | }); |
31 | // bind |
32 | ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); |
33 | channelFuture.syncUninterruptibly(); |
34 | channel = channelFuture.channel(); |
35 | |
36 | } |
构造函数跟进super方法的处理。查看父类AbstractServer中的构造函数,细看里面执行了doOpen方法,根据抽象模版方法模式,其实调用的是子类的doOpen方法。到此已经将netty服务开启。
1:老子《道德经》第九章,老子故里,中国鹿邑。