宜人贷蜂巢团队,由michael创立于2013年,通过使用互联网科技手段助力金融生态和谐健康发展。自成立起一直致力于多维度数据闭环平台建设。目前团队规模超过百人,涵盖征信、
图1 - api网关项目框架
图中描绘了api网关系统的处理流程,以及与服务注册发现、日志分析、报警系统、各类爬虫的关系。其中api网关系统接收请求,对请求进行编解码、鉴权、限流、加解密,再基于eureka服务注册发现模块,将请求发送到有效的服务节点上;网关及抓取系统的日志,会被收集到elk平台中,做业务分析及报警处理。
二、bio vs nio
api网关承载数倍于爬虫的流量,提升服务器的并发处理能力、缩短系统的响应时间,通信模型的选择是至关重要的,是选择bio,还是nio?
1. streamvs buffer & 阻塞 vs 非阻塞
bio是面向流的,io的读写,每次只能处理一个或者多个bytes,如果数据没有读写完成,线程将一直等待于此,而不能暂时跳过io或者等待io读写完成异步通知,线程滞留在io读写上,不能充分利用机器有限的线程资源,造成server的吞吐量较低,见图2。而nio与此不同,面向buffer,线程不需要滞留在io读写上,采用操作系统的epoll模式,在io数据准备好了,才由线程来处理,见图3。
nioevenrloopgroup的创建,具体执行过程是执行类multithreadeventexecutorgroup的构造方法:
/***createanewinstance.**@paramnthreadsthenumberofthreadsthatwillbeusedbythisinstance.*@paramexecutortheexecutortouse,or{@codenull}ifthedefaultshouldbeused.*@paramchooserfactorythe{@linkeventexecutorchooserfactory}touse.*@paramargsargumentswhichwillpassedtoeach{@link#newchild(executor,object...)}call*/protectedmultithreadeventexecutorgroup(intnthreads,executorexecutor,eventexecutorchooserfactorychooserfactory,object...args){if(nthreads<=0){thrownewillegalargumentexception(string.format(nthreads:%d(expected:>0),nthreads));}if(executor==null){executor=newthreadpertaskexecutor(newdefaultthreadfactory());}children=neweventexecutor[nthreads];for(inti=0;i<nthreads;i++){booleansuccess=false;try{children[i]=newchild(executor,args);success=true;}catch(exceptione){thrownewillegalstateexception(failedtocreateachildeventloop,e);}finally{if(!success){for(intj=0;j<i;j++){children[j].shutdowngracefully();}for(intj=0;j<i;j++){eventexecutore=children[j];try{while(!e.isterminated()){e.awaittermination(integer.max_value,timeunit.seconds);}}catch(interruptedexceptioninterrupted){//letthecallerhandletheinterruption.thread.currentthread().interrupt();break;}}}}}chooser=chooserfactory.newchooser(children);finalfuturelistener<object>terminationlistener=newfuturelistener<object>(){@overridepublicvoidoperationcomplete(future<object>future)throwsexception{if(terminatedchildren.incrementandget()==children.length){terminationfuture.setsuccess(null);}}};for(eventexecutore:children){e.terminationfuture().addlistener(terminationlistener);}set<eventexecutor>childrenset=newlinkedhashset<eventexecutor>(children.length);collections.addall(childrenset,children);readonlychildren=collections.unmodifiableset(childrenset);}其中,创建细节见下:
线程池中的线程数nthreads必须大于0;如果executor为null,创建默认executor,executor用于创建线程(newchild方法使用executor对象);依次创建线程池中的每一个线程即nioeventloop,如果其中有一个创建失败,将关闭之前创建的所有线程;chooser为线程池选择器,用来选择下一个eventexecutor,可以理解为,用来选择一个线程来执行task。chooser的创建细节,见下:
defaulteventexecutorchooserfactory根据线程数创建具体的eventexecutorchooser,线程数如果等于2^n,可使用按位与替代取模运算,节省cpu的计算资源,见源码:
@suppresswarnings(unchecked)@overridepubliceventexecutorchoosernewchooser(eventexecutor[]executors){if(ispoweroftwo(executors.length)){returnnewpoweroftoweventexecutorchooser(executors);}else{returnnewgenericeventexecutorchooser(executors);}}privatestaticfinalclasspoweroftoweventexecutorchooserimplementseventexecutorchooser{privatefinalatomicintegeridx=newatomicinteger();privatefinaleventexecutor[]executors;poweroftoweventexecutorchooser(eventexecutor[]executors){this.executors=executors;}@overridepubliceventexecutornext(){returnexecutors[idx.getandincrement()&executors.length-1];}}privatestaticfinalclassgenericeventexecutorchooserimplementseventexecutorchooser{privatefinalatomicintegeridx=newatomicinteger();privatefinaleventexecutor[]executors;genericeventexecutorchooser(eventexecutor[]executors){this.executors=executors;}@overridepubliceventexecutornext(){returnexecutors[math.abs(idx.getandincrement()%executors.length)];}}newchild(executor, args)的创建细节,见下:
multithreadeventexecutorgroup的newchild方法是一个抽象方法,故使用nioeventloopgroup的newchild方法,即调用nioeventloop的构造函数:
@overrideprotectedeventloopnewchild(executorexecutor,object...args)throwsexception{returnnewnioeventloop(this,executor,(selectorprovider)args[0],((selectstrategyfactory)args[1]).newselectstrategy(),(rejectedexecutionhandler)args[2]);}在这里先看下nioeventloop的类层次关系:
创建任务队列tailtasks(内部为有界的linkedblockingqueue):
创建线程的任务队列taskqueue(内部为有界的linkedblockingqueue),以及任务过多防止系统宕机的拒绝策略rejectedhandler。
其中tailtasks和taskqueue均是任务队列,而优先级不同,taskqueue的优先级高于tailtasks,定时任务的优先级高于taskqueue。
五、serverbootstrap初始化及启动
了解了netty线程池nioevenrloopgroup的创建过程后,下面看下api网关服务serverbootstrap的是如何使用线程池引入服务中,为高并发访问服务的。
api网关serverbootstrap初始化及启动代码,见下:
serverbootstrap=newserverbootstrap();bossgroup=newnioeventloopgroup(config.getbossgroupthreads());workergroup=newnioeventloopgroup(config.getworkergroupthreads());serverbootstrap.group(bossgroup,workergroup).channel(nioserversocketchannel.class).option(channeloption.tcp_nodelay,config.istcpnodelay()).option(channeloption.so_backlog,config.getbacklogsize()).option(channeloption.so_keepalive,config.issokeepalive())//memorypooled.option(channeloption.allocator,pooledbytebufallocator.default).childoption(channeloption.allocator,pooledbytebufallocator.default).childhandler(channelinitializer);channelfuturefuture=serverbootstrap.bind(config.getport()).sync();log.info(api-gatewaystartedonport:{},config.getport());future.channel().closefuture().sync();api网关系统使用netty自带的线程池,共有三组线程池,分别为bossgroup、workergroup和executorgroup(使用在channelinitializer中,本文暂不作介绍)。其中,bossgroup用于接收客户端的tcp连接,workergroup用于处理i/o、执行系统task和定时任务,executorgroup用于处理网关业务加解密、限流、路由,及将请求转发给后端的抓取服务等业务操作。
六、channel与线程池的绑定
serverbootstrap初始化后,通过调用bind(port)方法启动server,bind的调用链如下:
abstractbootstrap.bind->abstractbootstrap.dobind->abstractbootstrap.initandregister其中,channelfuture regfuture = config().group().register(channel);中的group()方法返回bossgroup,而channel在serverbootstrap的初始化过程指定channel为nioserversocketchannel.class,至此将nioserversocketchannel与bossgroup绑定到一起,bossgroup负责客户端连接的建立。那么niosocketchannel是如何与workergroup绑定到一起的?
调用链abstractbootstrap.initandregister -> abstractbootstrap. init-> serverbootstrap.init ->serverbootstrapacceptor.serverbootstrapacceptor ->serverbootstrapacceptor.channelread:
publicvoidchannelread(channelhandlercontextctx,objectmsg){finalchannelchild=(channel)msg;child.pipeline().addlast(childhandler);for(entry<channeloption<?>,object>e:childoptions){try{if(!child.config().setoption((channeloption<object>)e.getkey(),e.getvalue())){logger.warn(unknownchanneloption:+e);}}catch(throwablet){logger.warn(failedtosetachanneloption:+child,t);}}for(entry<attributekey<?>,object>e:childattrs){child.attr((attributekey<object>)e.getkey()).set(e.getvalue());}try{childgroup.register(child).addlistener(newchannelfuturelistener(){@overridepublicvoidoperationcomplete(channelfuturefuture)throwsexception{if(!future.issuccess()){forceclose(child,future.cause());}}});}catch(throwablet){forceclose(child,t);}}其中,childgroup.register(child)就是将niosocketchannel与workdergroup绑定到一起,那又是什么触发了serverbootstrapacceptor的channelread方法?
其实当一个 client 连接到 server 时,java 底层的 nio serversocketchannel 会有一个selectionkey.op_accept 就绪事件,接着就会调用到 nioserversocketchannel.doreadmessages方法。
@overrideprotectedintdoreadmessages(list<object>buf)throwsexception{socketchannelch=javachannel().accept();try{if(ch!=null){buf.add(newniosocketchannel(this,ch));return1;}}catch(throwablet){…}return0;}javachannel().accept() 会获取到客户端新连接的socketchannel,实例化为一个 niosocketchannel, 并且传入 nioserversocketchannel 对象(即 this),由此可知, 我们创建的这个niosocketchannel 的父 channel 就是 nioserversocketchannel 实例 。
接下来就经由 netty 的 channelpipeline 机制,将读取事件逐级发送到各个 handler 中,于是就会触发前面我们提到的 serverbootstrapacceptor.channelread 方法啦。
至此,分析了netty线程池的初始化、serverbootstrap的启动及channel与线程池的绑定过程,能够看出netty中线程池的优雅设计,使用不同的线程池负责连接的建立、io读写等,为api网关项目的高并发访问提供了技术基础。
七、总结
初学者怎么才能学好如何使用云服务器
推进上网站优化顺遂的法子
企业进行网络推广应避免的4个误区
建设外文的网站请尽量避免这种失误
中小企业如何进行网站SEO优化?
物联网网站建设可以带来哪些优势
怎么用模板建设一个汽车租赁网站呢
济宁SEO为什么网站首页收录大于栏目页?因为栏目页的内容布局不足吸引蜘蛛