posts - 56,  comments - 12,  trackbacks - 0

Overview

Cindy是一个Java异步I/O框架,提供了一个统一高效的模型,同时支持TCP、UDP以及Pipe,并能够方便的在异步和同步操作之间进行 切换。目前其实现是基于Java NIO,并计划通过JNI来支持各操作系统上本身提供的异步I/O功能,应用可以方便的通过运行期属性来方便的切换到更为高效的实现上。

为什么不使用Java IO?

Java IO包采用阻塞模型来处理网络操作。假设应用调用了read方法读取来自网络流上的数据,而数据尚未到达本机,则对read方法的调用将一直等到数据到达 并成功接收后才返回。由于IO包的所有操作会一直阻塞当前线程,为了不影响其他事务的处理,在一般情况下应用总是会用一个独立线程或线程池来处理这些 I/O操作。

Java IO包阻塞模型的优点就是非常简单易用,如果配合上线程池,效率也非常不错,所以得到了广泛的应用。但这种简单模型也有其固有的缺点:扩展性不足。如果应 用只需要进行少量的网络操作,那么开启若干个单独的I/O线程无伤大雅;但是如果是实现一个服务端的应用,需要同时处理成千上万个网络连接,采用阻塞模型 的话就要同时开启上千个线程。虽然现在强劲的服务器能够负担起这么多线程,但系统花在线程调度上的时间也会远远多于用于处理网络操作上的时间。

为什么要使用Java NIO?

采用IO包的阻塞模型,如果数据量不大的话,则线程的大部分时间都会浪费在等待上。对于稀缺的服务器资源而言,这是一种极大的浪费。

在Java 1.4中引入的NIO包里,最引人注目的就是提供了非阻塞I/O的实现。和IO包提供的阻塞模型不同的是,对一个非阻塞的连接进行操作,如果此时相应的状 态还未就绪,则调用会立即返回,而不是等待状态就绪后才返回。假设应用调用了read方法读取来自网络流上的数据,而此刻数据尚未到达本机,则对read 方法的调用将立即返回,并通知应用目前只能读到0个字节。应用可以根据自身的策略来进行处理,比如读取其他网络连接的数据等等,这就使得一个线程管理多个 连接成为可能。

NIO包还提供了Selector机制,将一个非阻塞连接注册在Selector上,应用就不需去轮询该连接当前是否可以读取或写入数据,在相应状 态就绪后Selector会通知该连接。由于一个Selector上可以注册多个非阻塞连接,这样就使得可以用更少的线程数来管理更多的连接。

为什么选择Cindy,而不直接使用NIO?

Java NIO包虽然提供了非阻塞I/O模型,但是直接使用NIO的非阻塞I/O需要成熟的网络编程经验,处理众多底层的网络异常,以及维护连接状态,判断连接超 时等等。对于关注于其业务逻辑的应用而言,这些复杂性都是不必要的。不同Java版本的NIO实现也会有一些Bug,Cindy会巧妙的绕开这些已知的 Bug并完成相应功能。并且NIO本身也在不断发展中,Java 1.4的NIO包中只实现了TCP/UDP单播/Pipe,Java 5.0中引入的SSLEngine类使得基于非阻塞的流协议(TCP/Pipe)支持SSL/TLS成为可能,在未来的版本中还可能会加入非阻塞多播的实 现。Cindy会关注这些新功能,并将其纳入到统一的框架中来。

Cindy虽然目前的实现是基于NIO,但它会不仅仅局限于NIO。等到一些基于操作系统本身实现的AIO(Asynchronous IO)类库成熟后,它也会加入对这些类库的支持,通过操作系统本身实现的AIO来提高效率。

如果应用程序只想使用一种高效的模型,而不想关心直接使用NIO所带来的这些限制,或希望将来无需更改代码就切换到更高效率的AIO实现上,那么 Cindy会是一个很好的选择。并且使用Cindy,应用可以在同步和异步之间进行无缝切换,对于大部分操作是异步,可某些特殊操作需要同步的应用而言, 这极大的提高了易用性。

Hello world example

场景:服务端监听本地的1234端口,打印任何收到的消息到控制台上;客户端建立TCP连接到本地的1234端口,发送完"Hello world!",然后断开连接。

基于Java IO包的客户端示例

基于Java IO包的阻塞模型的示例,用于对比,不做额外说明。(异常处理代码略)

Socket socket = new Socket("localhost",1234);
OutputStream os = new BufferedOutputStream(socket.getOutputStream());
os.write("Hello world!".getBytes());
os.close();
socket.close();

基于Java IO包的服务端示例

基于Java IO包的阻塞模型的示例,用于对比,不做额外说明。(异常处理代码略)

ServerSocket ss = new ServerSocket(1234);
while (true) {
? final Socket socket = ss.accept();
? newThread() {

??? public void run() {
????? BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
????? String line = null;
????? while ((line = br.readLine()) != null) {
??????? System.out.println(line);
????? }
}

}.start();
}

基于Cindy的同步客户端示例

								//create tcp session
Session session = SessionFactory.createSession(SessionType.TCP);

//set remote address
session.setRemoteAddress(new InetSocketAddress("localhost", 1234));

//start session and wait until completed
session.start().complete();

//create send packet
Packet packet = new DefaultPacket(BufferFactory.wrap("Hello world\!".getBytes()));

//send packet and wait until completed
session.flush(packet).complete();

//close session and wait until completed
session.close().complete();

Step 1: create session

Session是连接的抽象,一个Session代表着一个连接。连接有多种类型,SessionType.TCP代表着TCP连接,SessionType.UDP代表着UDP连接,SessionType.PIPE代表着Pipe连接等等。

在这里通过SessionFactory.createSession(SessionType.TCP)创建了一个TCP连接。如果要创建UDP连接,可以使用类似的代码:

Session session = SessionFactory.createSession(SessionType.UDP);

连接创建后可以通过返回Session实例的getSessionType方法来得到该连接的类型。

Step 2: set the remote address

TCP需要先设置好要连接的地址才能开始连接,在这里是连接到本机的1234端口。对于UDP来说,这一步设置不是必须的。

Step 3: start session

参数设置完成后就可以通过session.start方法启动Session了。*和Java IO中的阻塞同步调用不同,由于Cindy是一个异步I/O框架,在调用session.start()方法并返回后,连接可能还没有建立成功。*如果我 们想要切换到同步阻塞的方式,比如等连接建立成功后才返回,则可以通过start方法返回的Future对象来进行切换。

Session中所有的异步方法(如start/close/flush/send等)的返回值均为Future对象,该对象代表着所进行的异步操作。我们可以通过三种方式来处理Future对象:

  • 阻塞:调用Future.complete(),则该方法会一直等到异步操作完成后才返回;或者可以调用Future.complete(int timeout)来等待异步操作完成,如果在指定时间内操作未完成,则该方法也会返回。值得注意的是操作完成(isCompleted)包括操作成功和操作失败两种状态(isSucceeded)
  • 轮询:周期性的调用Future.isCompleted()方法来查询异步操作是否完成。
  • 回调:通过Future.addListener()方法加入自定义的FutureListener,在异步操作结束后,Future会触发FutureListener的futureCompleted事件。对任何已完成的Future对象调用addListener方法会马上触发FutureListener的futureCompleted事件。

在该示例中我们采用的是第一种方式——阻塞,等待连接建立完成后才返回。可以通过Future.complete()方法的返回值或者 Session.isStarted()来判断连接是否建立成功。在这个简单的示例里假设不会出现任何连接错误,所以就不再判断异常情况了,不过在正式的 应用中应当注意相关的判断。

Step 4: the Packet/Buffer interface

任何数据要在网络上传输最终都得转换成字节流,Buffer接口就是字节流的抽象。注意的是这里使用的Buffer是net.sf.cindy.Buffer,而没有采用java.nio.Buffer,这个设计上的取舍可以参考以后的介绍。

而数据传输除了要有内容外,还得有目的地,这样才能把内容送到正确的地址上。Packet接口就代表着要发送的数据,包括一个Buffer对象和一 个目的地址------SocketAddress。对于流协议(如TCP/Pipe)而言,目的地址在建立连接的时候就已经设置好了,所以发送过程中无 需再进行设置,用默认的null即可;对于消息协议(比如非连接型的UDP),则需要在发送时指定每个包的目的地址。

在这里要将"Hello world!"转换为Buffer对象,可以通过BufferFactory来完成:BufferFactory.wrap("Hello world!".getBytes()),将一个字节数组包装成Buffer。

由于是TCP连接,目的地址在建立连接时就已经指定好了,所以可以简单的构造一个只有Buffer而没有SocketAddress的Packet对象:new DefaultPacket(buffer)。(DefaultPacket是Packet接口的默认实现)

Step 5: send packet

Session的flush方法和start方法一样都是异步方法,在调用flush并返回时,数据可能并没有发送完成,在该示例中仍然采用阻塞方式等待数据发送完成后才返回:session.flush(packet).complete()。

flush和send方法的区别在于:flush方法接受的参数是Packet对象,send方法接受的参数是Object对象,通过send方法发送的对象会被Session所关联的PacketEncoder转换为Packet对象再进行发送。

Step 6: close session

发送完成后需要关闭连接,close方法同样是一个异步方法,在这里等待连接完全关闭后才返回:session.close().complete()。

从这个同步的示例可以看到,虽然Cindy是一个异步I/O框架,但用它来完成同步的I/O操作也是一件非常容易的事情。

基于Cindy的异步客户端示例

								final Session session = SessionFactory.createSession(SessionType.TCP);
session.setRemoteAddress(new InetSocketAddress("localhost", 1234));
Future future = session.start();
future.addListener(new FutureListener() {

public void futureCompleted(Future future) throws Exception {
Packet packet = new DefaultPacket("Hello world!".getBytes());
session.flush(packet).complete();
session.close();
}

});

前面的三行代码和同步示例没有区别,只是在第四行代码中采用了回调的方式来处理Future对象,而不是通过阻塞的方式。当连接建立完成后,FutureListener的futureCompleted事件被触发,应用可以在该方法中做相应的事件处理。

在发送Packet的过程中,其实也可以采用回调的方式来处理。在这里仍采用同步方法处理,一方面是减少内嵌类的数量,另一方面是示例Cindy可以非常容易的切换同步和异步操作。后文对异步处理会有更详细的介绍。

基于Cindy的服务端示例

SessionAcceptor acceptor = SessionFactory.createSessionAcceptor(SessionType.TCP);
acceptor.setListenPort(1234);
acceptor.setAcceptorHandler(new SessionAcceptorHandlerAdapter() {

public void sessionAccepted(SessionAcceptor acceptor, Session session) throws Exception {
session.setSessionHandler(new LogSessionHandler());
session.start();
}

});
acceptor.start();

Step 1: create SessionAcceptor

SessionAcceptor代表着连接的服务端,SessionAcceptor和Session是一对多关系,类似于IO包的ServerSocket和Socket。这里创建的是TCP服务端。

Step 2: set listen port

设置服务端的监听端口,在这里是1234端口;如果要设置监听地址,则可以通过setListenAddress来进行设置。

Step 3: set acceptor handler

每当SessionAcceptor上有连接建立成功,将会触发该SessionAcceptor所关联SessionAcceptorHandler的sessionAccepted事件,应用可以在该事件中为连接上的Session设置一些属性并开始或关闭连接。

SessionAcceptorHandler接口是处理SessionAcceptor产生的各种事件,SessionHandler接口则用于 处理Session产生的各种事件。在这里我们仅仅关心对象接收事件(objectReceived),当接收到对象后,将该对象打印到控制台上。(注: LogSessionHandler的代码没有列出来)

SessionHandler/SessionAcceptorHandler的更多介绍请参阅后面章节。

PacketEncoder/PacketDecoder

PacketEncoder

在前面的Hello world示例中,我们都是通过session.flush方法来发送数据,而该方法只接收Packet类型的参数,这就要求我们在发送任何数据前都要先 进行转换。比如在前面示例中,我们就得先将"Hello world!"字符串转换成一个代表"Hello world!"的Packet,然后再进行发送。

这种做法没有什么问题,其唯一的缺陷在于将发送逻辑和序列化逻辑耦合在一起。比如在上面的示例中,发送逻辑是将"Hello world!"发送出去,序列化逻辑是"Hello world"字符串的字节表示。虽然有一定的关联,但的确是两种不同的逻辑,比如我们可以更改序列化逻辑,通过Serializable接口来序列化 "Hello world",但这并不影响发送逻辑。

神说,要有光,就有了光。你知道,神不关心光是怎么来的。PacketEncoder的作用就是分离发送逻辑和序列化逻辑。对于应用而言,在发送时 它只需要把要发送的对象传递给Session,至于怎么序列化,则由Session所关联的PacketEncoder来处理。所以在上面的Hello world示例中,发送逻辑可以改为:

session.send("Hello world!");

序列化逻辑可以通过PacketEncoder来设置:

session.setPacketEncoder(new PacketEncoder() {

public Packet encode(Session session, Object obj) throws Exception {
returnnew DefaultPacket(BufferFactory.wrap(obj.toString().getBytes()));
}

});

如果要改变序列化逻辑,比如通过Serializable接口来序列化,则只需要更改PacketEncoder,而不需要改动发送代码:

session.setPacketEncoder(new SerialEncoder());

通过Cindy内置的PacketEncoderChain,应用可通过Session发送任意对象,把序列化逻辑完全交给PacketEncoder。如下面的伪码所示:

PacketEncoderChain chain = new PacketEncoderChain();
chain.addPacketEncoder(new Message1Encoder());
chain.addPacketEncoder(new Message2Encoder());

session.setPacketEncoder(chain);
session.send(new Message1());
session.send(new Message2());

PacketDecoder

PacketEncoder是用来处理序列化逻辑的,相应的,PacketDecoder则是用来处理反序列化逻辑的。

发送方通过session.send方法可以发送任意对象,Session关联的PacketEncoder会将该对象转换为Packet发送出 去;接收方收到了Packet后,也可以通过其关联的PacketDecoder将该Packet转换为一个对象,再通知应用。假设发送方用了 SerialEncoder来发送序列化对象,则接收方就可以使用SerialDecoder来进行反序列化,然后应用就可以直接对对象进行处理。

session.setPacketDecoder(new SerialDecoder());
session.setSessionHandler(new SessionHandlerAdapter() {

public void objectReceived(Session session, Object obj) throws Exception {
//obj即为反序列化后得到的对象
}

}

Cindy源代码example目录下net.sf.cindy.example.helloworld包下提供了一个非常简单的示例。

应用在实现PacketDecoder的时候要注意判断当前已接收到的内容长度。

比如TCP是一个流协议,一方发送了1000个字节,另一方可能在接收的时候只收到了前200个字节,剩下的800个字节要在下次接收时才收到,可是需要1000个字节才能构造出一个完整的对象,则应用的PacketDecoder实现可能会类似于:

								public
Object decode(Session session, Packet packet) throws Exception {
Buffer content = packet.getContent();
if (content.remaining() >= 1000) {
//如果接收到的长度大于1000,则从Buffer中取出内容并返回decode结果
}
returnnull; //接收长度不足,等待全部接收完成后再decode,返回null不会触发objectReceived事件
}

SessionHandler/SessionHandlerAdapter

SessionHandler接口用于处理Session的各种事件。比如当连接建立成功后,会触发SessionHandler的 sessionStarted事件;连接关闭后,会触发SessionHandler的sessionClosed事件;对象发送成功后会触发 SessionHandler的objectSent事件;对象接收成功后会触发SessionHandler的objectReceived事件等等。

SessionHandlerAdapter是SessionHandler的空实现。即如果你仅仅对SessionHandler中某几个事件感 兴趣,就不用全部实现SessionHandler中定义的各种方法,而只需要继承自SessionHandlerAdapter,实现感兴趣的事件即 可。

通过SessionHandler,可以极大的减少内嵌类的数量。如前面的异步Hello world示例,如果用SessionHandler来改写,则会是:

Session session = SessionFactory.createSession(SessionType.TCP);
session.setRemoteAddress(new InetSocketAddress("localhost", 1234));
session.setSessionHandler(new SessionHandlerAdapter() {

public void sessionStarted(Session session) throws Exception {
// session started
Buffer buffer = BufferFactory.wrap("Hello world!".getBytes());
Packet packet = new DefaultPacket(buffer);
session.send(packet);
}

public void objectSent(Session session, Object obj) throws Exception {
// the packet have been sent, close current session
session.close();
};
};
session.start();

在上面的代码中,当sessionStarted事件触发后,即Session成功建立后,会发送"Hello world!"消息;当objectSent事件触发后,即消息发送成功,调用session.close()异步关闭连接。这些处理全部都是异步的,并 且仅仅使用了一个内嵌类。

SessionHandler中一共定义了如下几个方法:

  • void sessionStarted(Session session) throws Exception //连接已建立
  • void sessionClosed(Session session) throws Exception //连接已关闭
  • void sessionTimeout(Session session) throws Exception //连接超时
  • void objectReceived(Session session, Object obj) throws Exception //接收到了对象
  • void objectSent(Session session, Object obj) throws Exception //发送了对象
  • void exceptionCaught(Session session, Throwable cause) //捕捉到异常

如果通过session.setSessionTimeout方法设置了超时时间,则在指定的时间内没有接收或发送任何数据就会触发 sessionTimeout事件。发生了该事件并不代表着连接被关闭,应用可以选择关闭该空闲连接或者发送某些消息来检测网络连接是否畅通。默认情况下 sessionTimeout为0,即从不触发sessionTimeout事件。

如果通过PacketEncoder发送了任何对象,则objectSent事件将被触发(注:通过Session.flush方法发送的 Packet不会触发objectSent事件,只有通过Session.send方法发送的对象才会触发objectSent事件);通过 PacketDecoder接收到任何对象,则objectReceived事件将被触发,一般情况下应用都通过监听该事件来对接收到的对象做相应处理。

exceptionCaught事件代表着session捕捉到了一个异常,这个异常可能是由于底层网络所导致,也可能是应用在处理SessionHandler事件时所抛出来的异常。请注意,如果应用在处理exceptionCaught事件中抛出运行期异常,则该异常不会再度触发exceptionCaught事件,否则可能出现死循环。 由于应用无法处理底层网络所引发的异常,所以在部署稳定后,可以通过指定运行期参数- Dnet.sf.cindy.disableInnerException来取消对底层网络异常的分发,或者判断异常类型——所有的内部异常都是从 SessionException基类继承下来的,应用可以根据这个特性来判断是底层网络出现了异常,还是SessionHandler或 SessionFilter中抛出了异常。

SessionFilter/SessionFilterAdapter

SessionFilter与SessionHandler有些类似,均用于处理Session的各种事件,不同的则是SessionFilter 先处理这些事件,并判断是否需要把该事件传递给下一个SessionFilter。等到所有SessionFilter处理完成后,事件才会传递给 SessionHandler由其来处理。

SessionFilterAdapter是SessionFilter的空实现,默认是把事件传递给下一个SessionFilter。SessionFilter中定义了如下几个方法:

  • void sessionStarted(SessionFilterChain filterChain) throws Exception;
  • void sessionClosed(SessionFilterChain filterChain) throws Exception;
  • void sessionTimeout(SessionFilterChain filterChain) throws Exception;
  • void objectReceived(SessionFilterChain filterChain, Object obj) throws Exception;
  • void objectSent(SessionFilterChain filterChain, Object obj) throws Exception;
  • void exceptionCaught(SessionFilterChain filterChain, Throwable cause);
  • void packetReceived(SessionFilterChain filterChain, Packet packet) throws Exception;
  • void packetSend(SessionFilterChain filterChain, Packet packet) throws Exception;
  • void packetSent(SessionFilterChain filterChain, Packet packet) throws Exception;

其中前六种方法和SessionHandler的作用一致,后三种方法则是用于处理接收和发送的Packet的。

可以看到SessionFilter可以算做SessionHandler的超集,那为什么需要引入SessionHandler呢?

虽然SessionFilter和SessionHandler在表现形式上有很多接近的地方,但是在应用逻辑上却是处于不同的地位。 SessionHandler目的就是处理应用最为核心的业务逻辑,这些逻辑都是基于Object的,和网络层没有太大的关系;而 SessionFilter和网络层的关联就比较大了,一般用来处理网络相关的一些逻辑(如包压缩/解压缩、包加密/解密)或者是核心业务逻辑外的一些分 支逻辑(如记录日志、黑名单处理)。

基于SessionFilter,应用可以做很多扩展而不影响核心的业务处理(核心的业务处理应该放在SessionHandler中)。比如数据 包相关的扩展:加入SSLFilter,则所发送的数据都会被SSL编码后才发送,接收的数据会先被解码成明文才接收;加入ZipFilter,则可以压 缩所发送的数据,接收时再解压缩。比如统计的扩展:加入StatisticFilter,则可以统计发送和接收的字节数,以及发送速率。比如ACL的扩 展:加入AllowListFilter/BlockListFilter,则可以允许指定或限制某些IP地址访问;加入LoginFilter,如果用 户没有登录,则不把事件传递给后面处理业务逻辑的SessionFilter或SessionHandler。比如线程处理的扩展:加入 ThreadPoolFilter,可以指定让某个线程池来进行后面事件的处理。比如日志记录的扩展:加入LogFilter,则可以记录相应的事件信 息。

所列举的这些只是一些基于SessionFilter的常见应用,应用可以根据自身的业务需要来进行选择。Cindy所推荐的实践是将不同的业务逻辑分散到不同的SessionFilter中,在SessionHandler中只处理核心逻辑。

在这里可以示范一个ZipFilter的伪码:

								public class ZipFilter extends SessionFilterAdapter {

public void packetReceived(SessionFilterChain filterChain, Packet packet) throws Exception {
Packet unzippedPacket = unzip(packet); //解压缩
super.packetReceived(filterChain, unzippedPacket);//把解压缩后的包传递给下一个Filter
}

public void packetSend(SessionFilterChain filterChain, Packet packet) throws Exception {
Packet zippedPacket = zip(packet); //压缩
super.packetSend(filterChain, zippedPacket); //把压缩后的包传递给下一个Filter
}
}

Buffer/Packet

在前面的示例中我们已经接触到了Buffer/Packet,Buffer是数据流的抽象,Packet是网络包的抽象。

Java NIO中已经提供了java.nio.ByteBuffer类用于表示字节流,为什么不直接使用java.nio.ByteBuffer?

java.nio.ByteBuffer虽然是NIO中表示字节流的标准类,但是对于高负荷的网络应用而言,其设计上存在着以下缺陷:

  • ByteBuffer并不是一个接口,而是一个抽象类。最为关键的地方是其构造函数为包级私有,这意味着我们无法继承自ByteBuffer构造子类。
  • ByteBuffer 仅仅是其所持有内容的一个外部包装,多个不同的ByteBuffer可以共享相同的内容。比如通过slice、duplicate等方法构造一个新的 ByteBuffer,该新ByteBuffer和原ByteBuffer共享的是同一份内容。这就意味着无法构造基于ByteBuffer的缓存机制。

比如如下代码:

ByteBuffer buffer1 = ByteBuffer.allocate(100);
ByteBuffer buffer2 = buffer1.slice();
System.out.println(buffer1 == buffer2);
System.out.println(buffer1.equals(buffer2));

打印出来的都是false,而实际上两个ByteBuffer共享的是同一份数据。在不经意的情况下,可能发生应用把两个ByteBuffer返回缓存中,被缓存当成是不同的对象进行处理,可能破坏数据完整性。

为什么Cindy使用自定义的Buffer接口?

  • Buffer是一个接口,如果对现有的实现类不满意,应用可以方便的加入自己的实现
  • 支持一系列的工具方法,比如indexOf/getString/putString/getUnsignedXXX等等,加入这些常用方法会给应用带来很大的方便
  • 可以非常方便的与nio中的ByteBuffer做转换,并且效率上不会有太大损失。由于大部分的网络类库都是基于nio的ByteBuffer来设计的,这样保证了兼容性
  • NIO的ByteBuffer无法表示ByteBuffer数组,而Cindy中提供了工具类把Buffer数组包装成一个Buffer
  • Cindy的Buffer是可缓存的,对于高负荷的网络应用而言,这会带来性能上优势

在一般情况下,应用应该直接通过BufferFactory类来构造Buffer实例。目前BufferFactory类中有如下方法:

  • Buffer wrap(byte[] array)
  • Buffer wrap(byte[] array, int offset, int length)
  • Buffer wrap(ByteBuffer buffer)
  • Buffer wrap(Buffer[] buffers)
  • Buffer allocate(int capacity)
  • Buffer allocate(int capacity, boolean direct)

前四种方法都是包装方法,将现有的字节数组、ByteBuffer以及Buffer数组包装成一个单一的Buffer对象。第五和第六种方法是构造 一个新的Buffer对象,新构造Buffer对象的内容是不确定的(java.nio.ByteBuffer.allocate得到的内容是全0),可 能来自缓存或直接生成,依赖于对Buffer缓存的配置。一般情况下都是通过第五种方法构造新的Buffer对象,通过运行期参数- Dnet.sf.cindy.useDirectBuffer可以改变默认行为,是构造Non-Direct Buffer还是构造Direct Buffer。

默认情况下生成的Buffer是可以被缓存,在调用了session.send/flush方法后,Buffer的内容就会被释放掉,或者手动调用 Buffer.release也能释放Buffer所持有的内容。通过Buffer.isReleased方法可以可以判断当前的Buffer是否被释放 掉。对已经释放掉的Buffer进行get/put操作都会导致ReleasedBufferException,但是对 position/limit/capacity的操作还是有效的。如果应用不希望Buffer的内容被释放,可以通过设置permanent属性为 true来使得Buffer的内容不被释放。

是否所有Buffer都需要手动调用release来释放?

这里有一个基本的原则:谁生成,谁释放。比如应用调用BufferFactory.allocate(1024)得到了一个Buffer实例,这个Buffer实例是由应用生成出来的,那么应用在使用完该Buffer实例后就应该负责调用buffer.release将它释放掉。

不过框架为了提供应用的方便,所有通过session.send/flush方法发送的Buffer都会在发送完成后被释放。如果上面得到的这个 Buffer实例通过调用session.flush方法被发送出去,那么该Buffer实例就不需要再通过release方法手工释放了。

如果应用不手动release自己生成出来的Buffer,也不会造成内存泄漏,因为这些Buffer会通过Java的垃圾回收机制被回收掉。唯一的缺陷在于由于无法重用对象,性能可能会有少许的降低。

举个例子,假设你实现了一个特定的PacketDecoder:

								public class MyPacketDecoder implements PacketDecoder {

publicObject decode(Session session, Packet packet) throws Exception {
Buffer content = packet.getContent();
Buffer result = allocateBuffer();
decodeContent(content, result);
return result;
}

}

可以看到,在MyPacketDecoder的实现当中,并没有对接收到的Buffer调用release方法,因为这个Buffer并不是应用生 成出来的,不应该由应用来释放。在该MyPacketDecoder实现中生成了一个新的Buffer实例,并当作PacketDecoder的返回值。 这个新的实例是应用生成出来的,则应该由应用来释放,所以应用应该在相应的objectReceived事件中释放该Buffer实例,如:

								public void objectReceived(Session session, Object obj) throws Exception {
Buffer buffer = (Buffer) obj;
try {
process(buffer);
} finally {
buffer.release();
}
}

Advanced Topic

JMX支持

通过运行期属性-Dnet.sf.cindy.useJmx可以开启JMX支持(Java 5.0中内置了JMX支持,如果运行在Java 1.4版本中,则需要手工下载相应类库)。

下图是通过jconsole进行JMX管理的示例:

流量控制

当网络接收的速度大于应用的处理速度时,如果不控制接收速率,则收到的消息会在队列中堆积,应用无法及时处理而造成内存溢出。Cindy 3.0中加入了流量控制功能,当接收队列中消息超过指定数量时,Cindy会放慢网络接收速度。该功能对于防止内存溢出以及应用程序调试有很大帮助。

可以通过运行期参数-Dnet.sf.cindy.dispatcher.capacity来指定队列中最多可以堆积的消息数。默认值是1000,即队列中消息数超过1000,网络接收速度就会放缓,等到消息数少于1000后,接收速度就会恢复正常。

Read packet size

Read packet size是指接收时每次读包所构造的缓冲区大小。对于TCP而言,这个值影响到的仅仅是效率;对于UDP则影响到数据正确性。

对于UDP应用,假设发送方发送的包所携带数据大小为1000字节,接收方的read packet size设置为600字节,则后面400个字节会被丢弃,这样
构造出来的逻辑包可能会不正确。

对于TCP应用,如果逻辑包是定长的,这个值最好也设为逻辑包的长度。该值太少,则可能导致在过多的包组合操作(比如逻辑包长度为1000字节, read packet size设置为100,则可能需要把10次接收到的packet组合成一个packet才能构造出一个逻辑包);该值太大,可能造成内存的浪费(不过由于 Buffer缓存的存在会缓解这一情况)。

默认的read packet size是8192字节,可以通过运行期属性-Dnet.sf.cindy.session.readPacketSize来更改。如果对单独的 Session进行更改,则可以通过session的readPacketSize属性进行设置。

Direct ByteBuffer vs Non-Direct ByteBuffer

java.nio.ByteBuffer引入了这两种类型的ByteBuffer,在Cindy中也有基于这两种ByteBuffer的Buffer包装类。那么这两者的区别在什么地方?在什么环境下采用哪种类型的ByteBuffer会更有效率?

Non-direct ByteBuffer内存是分配在堆上的,直接由Java虚拟机负责垃圾收集,你可以把它想象成一个字节数组的包装类,如下伪码所示:

HeapByteBuffer extends ByteBuffer {
byte[] content;
int position, limit, capacity;
......
}

而Direct ByteBuffer是通过JNI在Java虚拟机外的内存中分配了一块,该内存块并不直接由Java虚拟机负责垃圾收集,但是在Direct ByteBuffer包装类被回收时,会通过Java Reference机制来释放该内存块。如下伪码所示:

DirectByteBuffer extends ByteBuffer {
long address;
int position, limit, capacity;

protected void finalize() throws Throwable{
//释放内存块,该段代码仅仅用于演示,真正的Direct ByteBuffer并不是通过finalize来释放的
releaseAddress();
......
}
......
}

除开以上的这些外,如果我们查找Java实现类的代码,就可以了解到这两者之间更深入的区别。比如在Sun的Java实现中,绝大部分 Channel类都是通过sun.nio.ch.IOUtil这个工具类和外界进行通讯的,如FileChannel/SocketChannel等等。 简单的用伪码把write方法给表达出来(read方法也差不多,就不多做说明了):

								int write(ByteBuffer src, ......) {
if (src instanceof DirectBuffer)
return writeFromNativeBuffer(...);
ByteBuffer direct = getTemporaryDirectBuffer(src);
writeFromNativeBuffer(direct,......);
updatePosition(src);
releaseTemporaryDirectBuffer(direct);
}

是的,在发送和接收前会把Non-direct ByteBuffer转换为Direct ByteBuffer,然后再进行相关的操作,最后更新原始ByteBuffer的position。这意味着什么?假设我们要从网络中读入一段数据,再 把这段数据发送出去的话,采用Non-direct ByteBuffer的流程是这样的:

								
网络 --> 临时的Direct ByteBuffer --> 应用 Non-direct ByteBuffer --> 临时的Direct ByteBuffer --> 网络

而采用Direct ByteBuffer的流程是这样的:

网络 --> 应用 Direct ByteBuffer --> 网络

可以看到,除开构造和析构临时Direct ByteBuffer的时间外,起码还能节约两次内存拷贝的时间。那么是否在任何情况下都采用Direct Buffer呢?

答案是否定的。对于大部分应用而言,两次内存拷贝的时间几乎可以忽略不计,而构造和析构Direct Buffer的时间却相对较长。在JVM的实现当中,某些方法会缓存一部分临时Direct ByteBuffer,意味着如果采用Direct ByteBuffer仅仅能节约掉两次内存拷贝的时间,而无法节约构造和析构的时间。就用Sun的实现来说,write(ByteBuffer)和 read(ByteBuffer)方法都会缓存临时Direct ByteBuffer,而write(ByteBuffer[])和read(ByteBuffer[])每次都生成新的临时Direct ByteBuffer。

根据这些区别,在选择ByteBuffer类型上有如下的建议:

  • 如果你做中小规模的应用(在这里,应用大小是按照使用ByteBuffer的次数和规模来做划分的),并不在乎这些细节问题,请选择Non-direct ByteBuffer
  • 如果采用Direct ByteBuffer后性能并没有出现你所期待的变化,请选择Non-direct ByteBuffer
  • 如果没有Direct ByteBuffer Pool,尽量不要使用Direct ByteBuffer
  • 除非你确定该ByteBuffer会长时间存在,并且和外界有频繁交互,可采用Direct ByteBuffer
  • 如 果采用Non-direct ByteBuffer,那么采用非聚集(gather)的write/read(ByteBuffer)效果反而*可能*超出聚集的write/read (ByteBuffer[]),因为聚集的write/read的临时Direct ByteBuffer是非缓存的(在Sun的实现上是这样,其他的实现则不确定)

基本上,采用Non-direct ByteBuffer总是对的!因为内存拷贝需要的开销对大部分应用而言都可以忽略不计。在Cindy中,一般的应用只需要通过 BufferFactory.allocate方法来得到Buffer实例即可,默认设置下采用的是Non-Direct Buffer;通过BufferFactory.wrap方法包装字节数组或ByteBuffer得到的Buffer类也有很高的效率;只有通过 BufferFactory.wrap(Buffer[])方法目前还处于实验阶段,其效率不一定比生成一个大的Buffer,然后拷贝现有内容更快。如 果应用非常注重效率,要使用该方法上要多加注意。

默认参数设置

对Cindy中一些默认参数的配置可以通过以下两种方式:

  • 配置文件配置

Cindy在启动时会在当前classpath上寻找cindy.properties文件,如果找到后会把该属性配置文件读入到缓存中。在该 properties中的配置无需net.sf.cindy.前缀,即如果要配置net.sf.cindy.enableJmx=true并且使用 DirectBuffer,则只需要在cindy.properties中加入:

enableJmx=true
useDirectBuffer=true
  • 运行期属性配置

运行时通过-D参数指定的配置,如果和上面配置文件中的Key相同,则会覆盖配置文件的配置。通过-D参数指定配置时不能省略net.sf.cindy.前缀。

当前版本全部可以配置的属性请参见Cindy源代码包的readme.txt。

posted on 2007-01-19 00:09 苦笑枯 阅读(2243) 评论(0)  编辑  收藏 所属分类: Java

只有注册用户登录后才能发表评论。


网站导航:
 
收藏来自互联网,仅供学习。若有侵权,请与我联系!

<2025年1月>
2930311234
567891011
12131415161718
19202122232425
2627282930311
2345678

常用链接

留言簿(2)

随笔分类(56)

随笔档案(56)

搜索

  •  

最新评论

阅读排行榜

评论排行榜