本文共 9437 字,大约阅读时间需要 31 分钟。
这篇文章的主要目的是分析下Tomcat在处理连接请求的整个过程,参考了前人的文章并在文末指出,通过时序图能够较清楚的走通整个流程。
Connector 启动以后会启动一组线程用于不同阶段的请求处理过程,Acceptor、Poller、worker 所在的线程组都维护在 NioEndpoint 中。
public class NioEndpoint extends AbstractJsseEndpoint{ public void startInternal() throws Exception { // 创建worker线程组 if ( getExecutor() == null ) { createExecutor(); } // Poller线程组由一堆线程组成 pollers = new Poller[getPollerThreadCount()]; for (int i=0; i { // Acceptor线程组由一堆线程组成 protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } } // worker的线程组由executor创建线程池组成 public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }}
说明:
Acceptor接受的新连接没有立即注册到selector当中,需要先封装成PollerEvent对象后保存至PollerEvent队列当中,Poller对象会消费PollerEvent队列,类似生产消费模型。public class NioEndpoint extends AbstractJsseEndpoint{ private volatile ServerSocketChannel serverSock = null; protected class Acceptor extends AbstractEndpoint.Acceptor { public void run() { while (running) { state = AcceptorState.RUNNING; try { SocketChannel socket = null; try { // 监听socket负责接收新连接 socket = serverSock.accept(); } catch (IOException ioe) { } if (running && !paused) { // 处理接受到的socket对象 if (!setSocketOptions(socket)) { closeSocket(socket); } } } catch (Throwable t) { } } state = AcceptorState.ENDED; } } protected boolean setSocketOptions(SocketChannel socket) { try { socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); channel = new NioChannel(socket, bufhandler); // 注册到Poller当中 getPoller0().register(channel); } catch (Throwable t) { } return true; } public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; } public class Poller implements Runnable { public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); r = new PollerEvent(socket,ka,OP_REGISTER); // 添加PollerEvent队列当中 addEvent(r); } private void addEvent(PollerEvent event) { // 投入到PollerEvent队列当中 events.offer(event); if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); } }}
说明:
Poller会消费PollerEvent队列(由Acceptor进行投递),并注册到Selector当中。当注册到Selector的socket数据可读的时候将socket封装成SocketProcessor对象,投递到Executor实现的线程池进行处理。public class NioEndpoint extends AbstractJsseEndpoint{ public static class PollerEvent implements Runnable { private NioChannel socket; private int interestOps; private NioSocketWrapper socketWrapper; public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) { reset(ch, w, intOps); } public void run() { if (interestOps == OP_REGISTER) { try { socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { } } } } public class Poller implements Runnable { public void run() { while (true) { // events()负责处理PollerEvent事件并注册到selector当中 hasEvents = events(); keyCount = selector.select(selectorTimeout); // 处理新接受的socket的读写事件 Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); processKey(sk, attachment); } } } // 处理读写事件 protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { if (sk.isReadable()) { if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } } }}public abstract class AbstractEndpoint { public boolean processSocket(SocketWrapperBasesocketWrapper, SocketEvent event, boolean dispatch) { try { sc = createSocketProcessor(socketWrapper, event); Executor executor = getExecutor(); // 注册到Worker的线程池ThreadPoolExecutor。 if (dispatch && executor != null) { executor.execute(sc); } } catch (RejectedExecutionException ree) { } return true; }}
说明:
说明:
【招贤纳士】
欢迎热爱技术、热爱生活的你和我成为同事,和贝贝共同成长。
贝贝集团诚招算法、大数据、BI、Java、PHP、android、iOS、测试、运维、DBA等人才,有意可投递。
贝贝集团创建于2011年,旗下拥有贝贝网、贝店、贝贷等平台,致力于成为全球领先的家庭消费平台。
贝贝创始团队来自阿里巴巴,先后获得IDG资本、高榕资本、今日资本、新天域资本、北极光等数亿美金的风险投资。
公司地址:杭州市江干区普盛巷9号东谷创业园(上下班有多趟班车)
转载地址:http://xhmjl.baihongyu.com/