Connector概述
Connector是Jetty中可以直接接受客户端连接的抽象,一个Connector监听Jetty服务器的一个端口,所有客户端的连接请求首先通过该端口,而后由操作系统分配一个新的端口(Socket)与客户端进行数据通信(先握手,然后建立连接,但是使用不同的端口)。不同的Connector实现可以使用不同的底层结构,如Socket Connector、NIO Connector等,也可以使用不同的协议,如Ssl Connector、AJP Connector,从而在不同的Connector中配置不同的EndPoint和Connection。EndPoint用于连接(Socket)的读写数据(参见
深入Jetty源码之EndPoint),Connection用于关联Request、Response、EndPoint、Server,并将解析出来的Request、Response传递给在Server中注册的Handler来处理(参见
深入Jetty源码之Connection)。在Jetty中Connector的有:SocketConnector、SslSocketConnector、Ajp13SocketConnector、BlockingChannelConnector、SelectChannelConnector、SslSelectChannelConnector、LocalConnector、NestedConnector等,其类图如下。
Connector类图
Connector接口
首先Connector实现了LifeCycle接口,在启动Jetty服务器时,会调用其的start方法,用于初始化Connector内部状态,并打开Connector以接受客户端的请求(调用open方法);而在停止Jetty服务器时会调用其stop方法,以关闭Connector以及内部元件(如Connection等)以及做一些清理工作。因而open、close方法是Connector中用于处理生命周期的方法;对每个Connector都有name字段用于标记该Connector,默认值为:hostname:port;Connector中还有Server的引用,可以从中获取ThreadPool,并作为Handler的容器被使用(在创建HttpConnection时,Server实例作为构造函数参数传入,并在handleRequest()方法中将解析出来的Request、Response传递给Server注册的Handler);Connector还定义了一些用于配置当前Connector的方法,如Buffer Size、Max Idle Time、Low Resource Max Idle Time,以及一些统计信息,如当前Connector总共处理过的请求数、总共处理过的连接数、当前打开的连接数等信息。
Connector的接口定义如下:
public interface Connector
extends LifeCycle {
// Connector名字,默认值hostname:port
String getName();
// 打开当前Connector
void open()
throws IOException;
// 关闭当前Connector
void close()
throws IOException;
// 对Server的引用
void setServer(Server server);
Server getServer();
// 在处理请求消息头时使用的Buffer大小
int getRequestHeaderSize();
void setRequestHeaderSize(
int size);
// 在处理响应消息头时使用的Buffer大小
int getResponseHeaderSize();
void setResponseHeaderSize(
int size);
// 在处理请求消息内容时使用的Buffer大小
int getRequestBufferSize();
void setRequestBufferSize(
int requestBufferSize);
// 在处理响应消息内容使用的Buffer大小
int getResponseBufferSize();
void setResponseBufferSize(
int responseBufferSize);
// 在处理请求消息时使用的Buffer工厂
Buffers getRequestBuffers();
// 在处理响应消息时使用的Buffer工厂
Buffers getResponseBuffers();
// User Data Constraint的配置可以是None、Integral、Confidential,对这三种值的解释:
// None:A value of NONE means that the application does not require any transport guarantees.
// Integral:A value of INTEGRAL means that the application requires the data sent between the client and server to be sent in such a way that it can't be changed in transit.
// Confidential:A value of CONFIDENTIAL means that the application requires the data to be transmitted in a fashion that prevents other entities from observing the contents of the transmission on.
// In most cases, the presence of the INTEGRAL or CONFIDENTIAL flag indicates that the use of SSL is required.参考:这里 // 如果配置了user-data-constraint为Integral或confidential表示所有相应请求都会重定向到使用Integral/Confidential Schema/Port构建的新的URL中。
int getIntegralPort();
String getIntegralScheme();
boolean isIntegral(Request request);
int getConfidentialPort();
String getConfidentialScheme();
boolean isConfidential(Request request);
// 在将HttpConnection交给Server中的Handlers处理前,根据当前Connector,自定义一些EndPoint和Request的配置,如设置EndPoint的MaxIdleTime,Request的timestamp,
// 清除SelectChannelEndPoint中的idleTimestamp,检查forward头等。 void customize(EndPoint endpoint, Request request)
throws IOException;
// 主要用于SelectChannelConnector中,重置SelectChannelEndPoint中的idleTimestamp,即重新计时idle的时间。
// 它在每个Request处理结束,EndPoint还未关闭,并且当前连接属于keep-alive类型的时候被调用。 void persist(EndPoint endpoint)
throws IOException;
// 底层的链接实例,如ServerSocket、ServerSocketChannel等。 Object getConnection();
//是否对"X-Forwarded-For"头进行DNS名字解析 boolean getResolveNames();
// 当前Connector绑定的主机名、端口号等。貌似在Connector的实现中没有一个默认的主机名。
// 由于端口号可以设置为0,表示由操作系统随机的分配一个还没有被使用的端口,因而这里由LocalPort用于存储Connector实际上绑定的端口号;
// 其中-1表示这个Connector还未开启,-2表示Connector已经关闭。 String getHost();
void setHost(String hostname);
void setPort(
int port);
int getPort();
int getLocalPort();
// Socket的最大空闲时间,以及在资源比较少(如线程池中的任务数比最大可用线程数要多)的情况下的最大空闲时间,当空闲时间超过这个时间后关闭当前连接(Socket)。 int getMaxIdleTime();
void setMaxIdleTime(
int ms);
int getLowResourceMaxIdleTime();
void setLowResourceMaxIdleTime(
int ms);
// 是否当前Connector处于LowResources状态,即线程池中的任务数比最大可用线程数要多 public boolean isLowResources();
/* ------------------------以下是一些获取和当前Connector相关的统计信息------------------------------------ */
// 打开或关闭统计功能 public void setStatsOn(
boolean on);
// 统计功能的开闭状态 public boolean getStatsOn();
// 重置统计数据,以及统计开始时间 public void statsReset();
// 统计信息的开启时间戳 public long getStatsOnMs();
// 当前Connector处理的请求数 public int getRequests();
// 当前Connector接收到过的连接数 public int getConnections() ;
// 当前Connector所有当前还处于打开状态的连接数 public int getConnectionsOpen() ;
// 当前Connector历史上同时处于打开状态的最大连接数 public int getConnectionsOpenMax() ;
// 当前Connector所有连接的持续时间总和 public long getConnectionsDurationTotal();
// 当前Connector的最长连接持续时间 public long getConnectionsDurationMax();
// 当前Connector平均连接持续时间 public double getConnectionsDurationMean() ;
// 当前Connector所有连接持续时间的标准偏差 public double getConnectionsDurationStdDev() ;
// 当前Connector的所有连接的平均请求数 public double getConnectionsRequestsMean() ;
// 当前Connector的所有连接的请求数标准偏差 public double getConnectionsRequestsStdDev() ;
// 当前Connector的所有连接的最大请求数 public int getConnectionsRequestsMax();
}
AbstractConnector实现
Jetty中所有的Connector都继承自AbstractConnector,而它自身继承自HttpBuffers,HttpBuffers包含了Request、Response的Buffer工厂的创建以及相应Size的配置。AbstractConnector还包含了Name、Server、maxIdleTime以及一些统计信息的引用,用于实现Connector接口中的方法,只是一些Get、Set方法,不详述。除了Connector接口相关的配置,AbstractConnector还为定义了两个字段:_acceptQueueSize用于表示ServerSocket
或ServerSocketChannel中最大可等待的请求数,_acceptors用于表示用于调用ServerSocket或ServerSocketChannel中accept方法的线程数,建议这个数字小于或等于可用处理器的2倍。
在AbstractConnector中还定义了Acceptor内部类,它实现了Runnable接口,在其run方法实现中,它将自己的Thread实例赋值给AbstractConnector中的_acceptorThread数组中acceptor号对应的bucket,并更新线程名为:<name> Acceptor<index> <Connector.toString>。然后根据配置的_acceptorPriorityOffset设置当前线程的priority。只要当前Connector处于Running状态,并且底层链接实例不为null,不断的调用accept方法()。在退出的finally语句快中清理_acceptorThread数组中相应的Bucket值为null,并将线程原来的Name、Priority设置回来。
AbstractConnector实现了doStart()方法,它首先保证Server实例的存在;然后打开当前Connector(调用其open()方法),并调用父类的doStart方法(这里是HttpBuffers,用于初始化对应的Buffers);如果没有自定义的ThreadPool,则从Server中获取ThreadPool;最后根据acceptors的值创建_acceptorThread数组,将acceptors个Acceptor实例dispatch给ThreadPool。在doStop方法实现中,它首先调用close方法,然后对非Server中的ThreadPool调用其stop方法,再调用父类的doStop方法清理Buffers的引用,最后遍历_acceptorThread数据,调用每个Thread的interrupt方法。
在各个子类的accept方法实现中,他们在获取客户端过来的Socket连接后,都会对该Socket做一些配置,即调用AbstractConnector的configure方法,它首先设置Socket的TCP_NODELAY为true,即禁用Nagle算法(关于禁用的理由可以参考:http://jerrypeng.me/2013/08/mythical-40ms-delay-and-tcp-nodelay/#sec-4-2,简单的,如果该值为false,则TCP的数据包要么达到TCP Segment Size,要么收到一个Ack,才会发送出去,即有Delay);然后如果设置了_soLingerTime,则开启Socket中SO_LINGER选项,否则,关闭该选项(SO_LINGER选项用于控制关闭一个Socket的行为,如果开启了该选项,则在关闭Socket时会等待_soLingerTime时间,此时如果有数据还未发送完,则会发送这些数据;如果关闭了该选项,则Socket的关闭会立即返回,此时也有可能继续发送未发送完成的数据,具体参考:http://blog.csdn.net/factor2000/article/details/3929816)。
在ServerSocket和ServerSocketChannel中还有一个SO_REUSEADDR的配置,一般来说当一个端口被释放后会等待两分钟再被使用,此时如果重启服务器,可能会导致启动时的绑定错误,设置该值可以让端口释放后可以立即被使用(具体参考:http://www.cnblogs.com/mydomain/archive/2011/08/23/2150567.html)。在AbstractConnector中可以使用setReuseAddress方法来配置,默认该值设置为true。
AbstractConnector中还实现了customize方法,它在forwarded设置为true的情况下设置相应的attribute:javax.servlet.request.cipher_suite, javax.servlet.request.ssl_session_id,以及Request中对应Host、Server等头信息。这个逻辑具体含义目前还不是很了解。。。。
最后关于统计数据的更新方法,AbstractConnector定义了如下方法:
protected void connectionOpened(Connection connection)
protected void connectionUpgraded(Connection oldConnection, Connection newConnection)
protected void connectionClosed(Connection connection)
SocketConnector实现
有了AbstractConnector的实现,SocketConnector的实现就变的非常简单了,它保存了一个EndPoint的Set,表示所有在这个Connector下正在使用的EndPoint,然后是ServerSocket,在open方法中创建,并在getConnection()方法中返回,还有一个localPort字段,当ServerSocket被创建时从ServerSocket实例中获取,并在getLocalPort()方法中返回。在close方法中关闭ServerSocket,并设置localPort为-2;在accept方法中,调用ServerSocket的accept方法,返回一个Socket,调用configure方法对新创建的Socket做一些基本的配置,然后使用该Socket创建ConnectorEndPoint,并调用其dispatch方法;在customize方法中,在调用AbstractConnector的customize方法的同时还设置ConnectorEndPoint的MaxIdleTime,即设置Socket的SO_TIMEOUT选项,用于配置该Socket的空闲可等待时间;在doStart中会先清理ConnectorEndPoint的集合,而在doStop中会关闭所有还处于打开状态的ConnectorEndPoint。
SelectChannelConnector实现
SelectChannelConnector内部使用ServerSocketChannel,在open方法中创建ServerSocketChannel,配置其为非blocking模式,并设置localPort值;在accept方法中调用ServerSocket的accept方法获得一个SocketChannel,配置该Channel为非blocking模式,调用AbstractChannel的configure方法做相应Socket配置,最后将该SocketChannel注册给ConnectorSelectManager;在doStart方法中,它会初始化ConnectorSelectManager的SelectSets值为acceptors值、MaxIdleTime、LowResourceConnections、LowResourcesMaxIdleTime等值,并启动该Manager,并dispatch acceptors个线程,不断的调用Manager的doSelect方法;在close方法中会先stop ConnectorSelectManager,然后关闭ServerSocketChannel,设置localPort为-2;在customize方法中会清除SelectChannelEndPoint的idleTimestamp,重置其MaxIdleTime以及重置Request中的timestamp的值;在persist方法中会重置SelectChannelEndPoint中idleTimestamp的值。
BlockingChannelConnector实现
BlockingChannelConnector实现类似SocketConnector,不同的是它使用ServerSocketChannel,并且其EndPoint为BlockingChannelEndPoint。所不同的是它需要在doStart方法中启动一个线程不断的检查所有还在connections集合中的BlockingChannelEndPoint是否已经超时,每400ms检查一次,如果超时则关闭该EndPoint。
其他的Connector的实现都比较类似,而SSL相关的Connector需要也只是加入了SSL相关的逻辑,这里不再赘述。
posted on 2014-05-01 18:40
DLevin 阅读(4680)
评论(0) 编辑 收藏 所属分类:
Jetty