ServerSocket用法详解
1.C/S模式中,Server需要创建特定端口的ServerSocket.->其负责接收client连接请求.
2.线程池->包括一个工作队列和若干工作线程->工作线程不断的从工作队列中取出任务并执行.-->java.util.concurrent->线程池
3.server可利用多线程来处理与多个客户的通信任务.
1.构造ServerSocket
1.重载形式
ServerSocket() throws IOException
ServerSocket(int port) throws IOException
ServerSocket(int port,int backlog) throws IOException
ServerSocket(int port,int backlog,InetAddress bindAddr) throws IOException
port指定服务器要绑定的端口,即服务器要监听的端口;backlog指定客户连接请求队列的长度;bindAddr指定服务器要绑定的IP地址.
2.除了第一个构造方法,其他方法都会与特定端口绑定.
a.如ServerSocket serverSocket = new ServerSocket(80)
如果运行时无法绑定到80端口,则会抛出BindException.原因->
1.端口已经被其他服务器进程占用
2.某些os中,如果没有以超级用户的身份来运行server,则os不允许绑定到1-1023之间的端口
b.port设为0->表示由os来为server分配一个任意可用的端口->匿名端口->
->大部分服务器需要使用明确的端口.->因为client程序需要事先知道server端口,才能方便访问server.
->匿名端口一般适用于server与client之间的临时通信,通信结束则断开连接,且SeverSocket占用的临时端口也被释放.{@link ServerSocket#getLocalPort()}
->如ftp协议->两个并行的TCP连接->控制连接(21端口)和数据连接.
a.TCP client创建一个监听匿名端口的ServerSocket.再把这个ServerSocket监听的端口好发送给TCP服务器.然后TCP服务器主动请求与client连接.此连接用于和服务器建立临时的数据连接.这种方式就是用了匿名端口.
3.设定client连接请求队列的长度
1.server运行时,可能会同时监听到多个client的连接请求.
2.管理client连接请求的任务是由os来完成的.os将这些连接请求存储在一个先进先出队列中.
3.许多os限定了队列的最大长度.一般为50.
4.当队列中的连接请求达到了队列的最大容量时,server进程所在的主机会拒绝新的连接请求.->只有当server进行通过ServerSocket#accept从队列中取出连接请求->队列腾出空位->队列才能继续加入新的连接请求.
5.对于client来说,如果其连接请求被加入的server的连接队列中,就意味着client与server连接建立成功->client进程从Socket的构造方法中返回
->如果client进程发出的连接请求被server拒绝,则Socket构造方法抛出ConnectionException.
6.ServerSocket构造中的backlog参数用来显示的设置连接请求队列的长度,其将覆盖os限定的队列的最大长度.->注意依然采用os限定的队列的长度的情况:
1.backlog参数的值大于os限定的队列的最大长度.
2.backlog参数的值小于或等于0
3.其构造方法中未设置backlog参数.
4.设定绑定的ip地址
1.如果主机只有一个ip地址,默认情况下,server程序与其绑定.
2.ServerSocket构造方法可显示的指定服务器要绑定的ip地址,该构造方法适用于具有多个ip地址的主机.如一个主机具有两个网卡.一个用于连接Internet,以一个用于连接本地局域网.如果server仅仅被本地局域网访问则可显示指定绑定的ip地址.
5.默认构造方法的作用
1.ServerSocket有一个不带参数的构造方法->其创建的ServerSocket不与任何端口绑定->需调用bind与特定端口绑定.
2.其用于在于允许server在绑定到特定端口之前,可先设置SerevrSocket的一些选项->因为一旦与特定端口绑定,有些选项就不能再用.
如:
ServerSocket serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);//先设置serverSocket的选项,该选项必须在bind之前才有效
serverSocket.bind(new InetAddress(8000));//再执行绑定端口
2.接收和关闭与客户的连接
1.ServerSocket#accept->从连接请求队列中取出一个client的连接请求->创建于client连接的Socket对象,返回->
2.如果队列中没有连接请求,accept方法则会一直等待,知道接收了连接请求才返回
3.->server从Socket对象获得输入输出流->与client交换数据->当server正在进行发送数据的操作时->client断了连接->server抛出IOException的子类SocketException:
java.net.SocektException:Connection reset by peer.
->这只是server与单个client通信中的异常,这种异常应该被捕获,使得server能继续与其他client通信.
4.->单线程server采用的通信流程:
public void service()
{
while(true)
{
Socket socket = null;
try
{
socket = serverSocket.accept();//从连接队列中取出一个连接
//接收和发送数据
...
}
catch(IOException e)
{
//这只是与单个client通信时遇到的异常,可能是由于client过早断开连接引起的
//该异常不应该中断整个while循环
e.printStackTrace();
}
finally//不管与client是通信正常结束还是异常结束,最后都关闭socket,断开与client的连接
{
try
{
if(socket != null)
{
socket.close();//与一个client通信结束后,要关闭socket
}
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
}
3.关闭ServerSocket:
1.ServerSocket#close->使得server释放占用的端口->断开与所有client的连接.
2.当一个server程序运行结束时,即使没有执行close方法,os也会释放这个server占用的端口->server并不一定要在结束之前执行其close方法.
3.某些情况,如果希望及时释放server端口,以便其他程序占用端口,则可显示调用ServerSocket#close方法.
4.ServerSocket#isClosed判断ServerSocket是否关闭->只有执行了ServerSocket#close后,其才返回true.->即使ServerSocket还未与特定端口绑定,isClosed亦返回false.
5.isBound判断ServerSocket是否已经与一个端口绑定.->只要ServerSocket与一个端口绑定,即使已关闭->isBound也返回true.
6.判断一个ServerSocket是否已经与一个特定端口绑定且还没有被关闭:
boolean isOpen = serverSocket.isBound() && !serverSocket.isClosed()
4.获取ServerSocket的信息
1.public InetAddress getInetAddress()//获得server绑定的ip地址
2.public int getLocalPort()//获得server绑定的端口
->端口设为0,os为server分配一个匿名端口
3.FTP:
1.ftp使用两个并行的tcp连接,一个是控制连接,一个是数据连接.
2.控制连接用于在client和server之间发送控制信息,如用户名和口令,改变远程目录的命令或上传和下载的命令.
3.数据连接用于传送文件.
4.tcp server在21端口监听控制连接.
5.数据连接的建立两种方式:
1.tcp server在20端口上监听数据连接,tcp client主动请求建立与该端口的连接
2.匿名端口:tcp client->创建匿名端口的ServerSocket->#getLocalPort发送到server.->tcp server主动请求建立与client的连接.
->client使用匿名端口,用于和server建立临时的数据连接
->实际应用中,server也可使用匿名端口.
5.ServerSocket选项
1.SO_TIMEOUT:表示等待client连接的超时时间.
1.public void setSoTimeout(int timeout) throws SocketException
2.public int getSoTimeout() throws IOException
3.该选项表示ServerSocket的accept方法的等待client连接的超时时间.->ms
4.选项为0,表示永远不会超时,也是其默认值.
5.ServerSocket#accept->如果连接请求队列为空,server就会一直等待->直到收到了client连接才从accept返回->设定了超时时间,如果等待时间超过了超时时间,则抛出SocketTimeoutException.->InterruptedException的子类.
6.Server执行serverSocket#accept方法时,等待client连接的过程称之为阻塞.
2.SO_REUSEADDR:表示是否允许重用服务器所绑定的地址.
1.public void setReuseAddress(boolean on) throws SocketException
2.public boolean getReuseAddress() throws SocketException
3.同Socket#SO_REUSEADDR->决定如果网络上仍有数据向旧的ServerSocket传输数据,是否允许新的ServerSocket绑定到旧的ServerSocket同样的端口上.该选项的默认值与os有关,有的os允许重用端口,有的os不允许重用.
4.ServerSocket关闭时,如果网络上还有发送到这个ServerSocket的数据,这个ServerSocket不会立刻释放本地端口,而是会等待一段时间,确保接收到了网络发过来的延迟数据,然后再释放端口.
5.许多server程序使用固定端口。Server程序关闭后->有可能其端口还被占用一段时间->如果此时在同一个主机上重启server程序->由于端口被占用,使得server程序无法绑定到该端口->server启动失败,抛出BindException.
Exception in thread "main" java.net.BindException:Address already in use:JVM_Bind.
6.为确保一个进程关闭ServerSocket后,即使os还未释放端口,同一个主机的上其他进程可以立刻重用该端口->ServerSocket#serReuseAddress(true)
if(!serverSocket.getReuseAddress()){serverSocket.setReuseAddress(true)}
7.必须在serverSocket还未绑定到一个本地端口前调用.否则无效.->两个公用一个端口端口的进程必须都调用setReuseAddress(true)->一个进程关闭ServerSocket后,另一个进程可立即重用相同端口
3.SO_REVBUF:表示接收数据的缓冲区的大小.
1.public void setReceiveBufferSize(int size) throws SocketException
2.public int getReceiveBufferSize() throws SocketException
3.该选项表示Server用于接收数据的缓冲区的大小,字节为单位.(传输大的连续的数据块,http/ftp可以使用较大的缓冲区,减少传输数据的次数,提高传输数据的效率;对于交互频繁且单次数据量较小的通信 telnet和网络游戏,则应该采用较小的缓冲区,确保及时把小批量的数据发送给对方)
4.该默认值与os有关.->如果要设置超过64kb的缓冲区必须在serverSocket绑定到特定端口前设置才有效.
5.执行ServerSocket#setReceiveBufferSize->相当于对所有由serverSocket#accept返回的Socket设置接收数据的缓冲区的大小.
4.public void setPerformancePreferences(int connectionTime,int latency,int bandwidth)->同Socket#setPerformancePreferences->用于设定连接时间,延迟和带宽的相对重要性.
6.创建多线程的服务器
1.chap1#EchoServer有一个很大的问题就是无法同时与多个client通信.->
1.当前这个EchoServer的问题是只能连接一个Client,只有当前的client输入bye,结束连接的时候,才会处理下一个client
2.原因在于service方法,while(br.readLine() != null),这里,当Client没有输入数据时,线程则挂起,阻塞,等待client输入
->如果同时又多个client请求.这些client则必须排队等待.
2.许多实际应用要求server具有同时为多个client提供服务器的能力-如http服务器.
3.衡量server具有同时响应多个客户的能力-并发性能
1.能同时接收并处理多个client连接
2.对于每个client,都会迅速给与响应
4.用多个线程来同时为多个client提供服务,这是提高server的并发性能的最常用的手段.
5.
a.为每个客户分配一个工作线程.伪代码:
public void service()
{
while(true)
{
ServerSocket socket = null;
try
{
socket = serverSocket.accept();//接收client连接
//EchoHandler实现了Runnable接口,其run负责与单个client通信.通信结束则断开连接.工作线程也会自然终止.
Thread workThread = new Thread(new EchoHandler(socket));//创建一个工作线程
workThread.start();//启动工作线程
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
b.该实现缺点:
1.server创建和销毁工作线程的开销,包括所花费的时间和系统资源很大.->如果server与多client通信并且每个client的通信时间都很短->那么server为client创建新线程的开销比实际与client通信的开销还要大.
2.除了创建和销毁线程的开销,活动的线程也消耗系统资源.每个线程本身会占用一定的内存,每个线程约1M.如果有大量client连接server,就必须创建爱你大量工作线程->消耗了大量内存,导致系统的内存空间不足.(-Xss,设置每个线程的堆栈大小)
3.如果线程数目固定,并且每个线程都有很长的生命周期,则线程切换也是相对固定的.->不同的os有不同的切换周期,一般在20ms.这里所说的线程切换是在Java虚拟机以及底层os的调度之下,线程之间转让cpu的使用权.如果频繁创建和销毁线程,则导致频繁切换线程;因为一个线程被销毁后,必须要把cpu转让给另一个已就绪的线程,使该线程或的运行机会.这种情况下,线程之前的切换不再遵循系统的固定切换周期,切换线程的开销甚至比创建及销毁线程的开销还要大.
->引入线程池.
6.线程池为线程生命周期开销问题和系统资源不足问题提供了解决方案:
1.线程池预先创建一些工作线程->其不断的从工作队列中取出任务->执行任务->工作线程执行完一个任务->继续执行工作队列的下一个任务->
2.线程池减少了创建和销毁的次数,每个工作线程都可以被一直重用,能执行多个任务
3.可根据系统的负载能力,方便的调整线程池中线程的数目->防止因为消耗过量系统资源而导致系统崩溃.
c.使用java.util.concurrent包提供了现成的线程池的实现.->比ThreadPoolImpl的要健壮而且功能更强大.
1.public interface Executor#void execute(Runnable command)
2.public interface ExecutorService extends Executor
1.void shutdown()
2.List<Runnable> shutdownNow()
3.boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
4.<T> Future<T> submit(Callable<T> task)
5.<T> Future<T> submit(Runnable task, T result)
6.Future<?> submit(Runnable task)
7.<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
8.<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
9.boolean isTerminated()
3.public class Executors
-> Factory and utility methods
1. public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());}
2.public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());}
3. public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));}
->Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.
->与newFixedThreadPool(1)区别->Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.
->The difference is (only) that the SingleThreadExecutor cannot have its thread size adjusted later on, which you can do with a FixedThreadExecutor by calling ThreadPoolExecutor#setCorePoolSize (needs a cast first).->((ThreadPoolExecutor)newFixedThreadPool(1)).setCorePoolSize(3)->
->ThreadPoolExecutor->
protected void finalize() { shutdown();}->因为DelegatedExecutorService持有了ThreadPoolExecutor引用->所以FinalizableDelegatedExecutorService->
->static class FinalizableDelegatedExecutorService extends DelegatedExecutorService->#finalize->super.shutdown->即在执行finalize的时候调用shutdown->因为其持有ExecutorService引用,所以其不能finalize->只能持有者调用finalize->
4. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}
4.public abstract class AbstractExecutorService implements ExecutorService
5.public interface ScheduledExecutorService extends ExecutorService
6.public class ThreadPoolExecutor extends AbstractExecutorService
7.public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService
7.使用线程池的注意事项:
1.死锁
1.A线程持有对象X的锁并等待对象Y的锁->B线程持有对象Y的锁并等待对象X的锁->A,B线程都不释放自己持有的锁->并且等待对方的锁->导致两个线程永远等待下去->产生死锁
2.线程池死锁->所有工作线程都在执行各自任务时被阻塞->等待某个任务A的执行结果->而任务A仍在工作队列中->没有空闲线程->A一直不能被执行->是的线程池的所有工作线程都永远阻塞->死锁->
2.系统资源不足
线程池线程数目多->消耗内存和其他系统资源.->影响系统性能.
3.并发错误
ThreadPoolImpl的工作队列依靠wait和notify使工作线程即使取得任务.->如果编码不正确,可能会丢失通知->导致工作线程一直保持空闲状态->无视工作队列需要处理的任务->使用这些方法必须格外小心->最好使用现有的,比较成熟的线程池->如concurrent包中的线程池类.
4.线程泄露
1.对于工作线程数目固定的线程池->如果工作线程在执行任务抛出RuntimeException或Error->并且这些异常或错误没有被捕获->工作线程就会异常终止->线程池失去了一个工作线程->如果所有的工作线程都异常终止->线程池就为空->
2.工作线程在执行一个任务被阻塞->如等待用户输入数据->但由于用户一直不输入数据->导致工作线程一直被阻塞->这样的工作线程名存实亡->不执行任何任务了->如果线程池中所有工作线程都处于这样的阻塞状态->线程池无法处理新加入的任务
5.任务过载
当工作队列中有大量排队等待执行的任务时->这些任务本身可能消耗太多系统资源而引起系统资源缺乏.
8.使用线程池遵循原则:
1.如果任务A在执行过程中需要同步等待任务B的执行结果->任务A不适合加入到线程池的工作队列->如果把像任务A一样的需要等待其他任务执行结果的任务加入到工作队列中->可能会导致线程池的死锁.
2.如果执行某个任务可能会阻塞并且会长时间阻塞->应该设定超时时间->避免工作线程永久的阻塞下去->导致线程泄露->
->服务器程序中->线程等待client连接 或者 等待client发送的数据时都可能会阻塞->
1.ServerSocket#setSoTimeout(int timeout)->设定等待client连接的超时时间
2.对于每个与client连接的Socket->Socket#setSoTimeout(int timeout)->设定等待client发送数据的超时时间.
3.了解任务的特点:
1.分析任务时经常会执行阻塞的io操作->还是执行一直不会阻塞的运算操作->前者时断时续的占用cpu,而后者对cpu具有更高的利用率->预计完成人呢无的大概时间->是短时间任务还是长时间任务?->根据任务的特点->对任务进行分类->不同类型的任务加入到不同线程池的工作队列中->根据任务的特点->分别调整每个线程池.
4.调整线程池的大小:
1.线程池的最佳大小主要取决于系统的可用cpu的数目以及工作队列中任务的特点.
2.如一个具有n个cpu的os上只有一个工作队列且其中全部是运算性质的任务,即不会阻塞的任务,则线程中具有n或n+1个工作线程时,一般会获得最大的cpu利用率。
3.如果工作队列中包含会执行I/O操作并常常阻塞的任务,则要让线程池的大小超过可用cpu的数目,因为并不是所有工作线程都一直在工作。选择一个典型的任 务,然后估计在执行这个任务的过程中,等待时间(WT)与实际占用CPU进行运算的时间(ST)之间的比例WT/ST。对于一个具有N个CPU的系统,需 要设置大约N×(1+WT/ST)个线程来保证CPU得到充分利用。注-(n + 平均阻塞的线程)
4.CPU利用率不是调整线程池大小过程中唯一要考虑的事项。随着线程池中工作线程数目的增长,还会碰到内存或者其他系统资源的限制,如套接字、打开的文件句柄或数据库连接数目等。要保证多线程消耗的系统资源在系统的承载范围之内
5.避免任务过载。服务器应根据系统的承载能力,限制客户并发连接的数目。当客户并发连接的数目超过了限制值,服务器可以拒绝连接请求,并友好地告知客户:服务器正忙,请稍后再试.
9.关闭服务器
1.之前的EchoServer都无法关闭自身->只能靠os强制终止->处理方式虽然简单,但是会导致服务器正在执行的任务被突然中断->server处理的任务重要->不允许被突然中断->server需在恰当的时刻关闭自己.
注:1.很多游戏服务器程序采用ShutdownHooker来进行jvm进程退出的清理工作,如linux下kill的时候会调用钩子.
2.本人倾向于在游戏服务器正常的关闭自己->理想的情况下是向游戏服务器发送shutdown命令,然后游戏服务器执行清理工作,所有线程执行shutdown->jvm进程自然退出.
部分源码:
package com.game.landon.serverSocket;
import java.net.Socket;
/** *//**
*
*测试server的连接请求队列特性的client,client试图100次与server建立连接
*
*<output "server未执行service方法">
第 1 次连接成功
第 2 次连接成功
第 3 次连接成功
Exception in thread "main" java.net.ConnectException: Connection refused: connect
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.PlainSocketImpl.doConnect(Unknown Source)
at java.net.PlainSocketImpl.connectToAddress(Unknown Source)
at java.net.PlainSocketImpl.connect(Unknown Source)
at java.net.SocksSocketImpl.connect(Unknown Source)
at java.net.Socket.connect(Unknown Source)
at java.net.Socket.connect(Unknown Source)
at java.net.Socket.<init>(Unknown Source)
at java.net.Socket.<init>(Unknown Source)
at com.game.landon.serverSocket.Client.main(Client.java:27)
*</output>
*
*<output "server执行service方法">
第 1 次连接成功
第 2 次连接成功
第 3 次连接成功
第 4 次连接成功
第 5 次连接成功
第 100 次连接成功
*</output>
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-2
*
*/
public class Client
{
public static void main(String[] args) throws Exception
{
final int length = 100;
String host = "localhost";
int port = 8000;
Socket[] sockets = new Socket[length];
for(int i = 0;i < length;i++)
{
sockets[i] = new Socket(host,port);
System.out.println(String.format("第 %d 次连接成功", i + 1));
}
Thread.sleep(3 * 1000);
for(int i = 0;i < length;i++)
{
sockets[i].close();//断开连接
}
}
}
package com.game.landon.serverSocket;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
/** *//**
*
*Echo Handler.负责与client通信
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-11
*
*/
public class EchoHandler implements Runnable
{
private Socket socket;
public EchoHandler(Socket socket)
{
this.socket = socket;
}
/** *//**
* echo to client
* @param msg
* @return
*/
public String echo(String msg)
{
return "echo:" + msg;
}
/** *//**
* Wrapped {@link Socket#getOutputStream()} to PrintWriter
* @param socket
* @return
* @throws IOException
*/
private PrintWriter getWriter(Socket socket) throws IOException
{
// 返回输出流对象,向输出流写数据,就能向对方发送数据
OutputStream socketOut = socket.getOutputStream();
//public PrintWriter(OutputStream out, boolean autoFlush)
return new PrintWriter(socketOut,true);
}
/** *//**
* Wrapped {@link Socket#getInputStream()} to BufferedReader
* @param socket
* @return
* @throws IOException
*/
private BufferedReader getReader(Socket socket) throws IOException
{
// 返回输入流对象;只需从输入流读数据,就能接收来自对方的数据
InputStream socketIn = socket.getInputStream();
return new BufferedReader(new InputStreamReader(socketIn));
}
@Override
public void run()
{
try
{
System.out.println("New Connection accepted:" + socket.getInetAddress() + ":" + socket.getPort());
BufferedReader br = getReader(socket);
PrintWriter pw = getWriter(socket);
String msg = null;
//read from client
while((msg = br.readLine()) != null)
{
System.out.println(msg);
//write to client
pw.println(echo(msg));
if(msg.equals("bye"))
{
break;
}
}
}
catch(IOException e)
{
e.printStackTrace();
}
finally
{
try
{
if(socket != null)
{
//close socket
socket.close();
}
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
}
package com.game.landon.serverSocket;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** *//**
*
*用JDK自带的Executor线程池实现的EchoServer.
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-22
*
*/
public class EchoServerExecutors
{
private int port = 8000;
private ServerSocket serverSocket;
private ExecutorService executorService;
private final int POOL_SIZE = 4;
public EchoServerExecutors() throws IOException
{
serverSocket = new ServerSocket(port);
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * POOL_SIZE);
System.out.println("EchoServerExecutors launched");
}
public void service()
{
while(true)
{
Socket socket;
try
{
socket = serverSocket.accept();
executorService.execute(new EchoHandler(socket));
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException
{
new EchoServerExecutors().service();
}
}
package com.game.landon.serverSocket;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/** *//**
*
*每client每thread实现的Echo Server,用{@link com.game.landon.entrance.EchoClient}测试即可
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-11
*
*/
public class EchoServerPerClientPerThread
{
private int port = 8000;
private ServerSocket serverSocket;
public EchoServerPerClientPerThread() throws IOException
{
serverSocket = new ServerSocket(port);
System.out.println("EchoServerPerClientPerThread launched");
}
/** *//**
* echo server主业务循环
*/
public void service()
{
while(true)
{
Socket socket = null;
try
{
socket = serverSocket.accept();
//EchoHandler实现了Runnable接口,其run负责与单个client通信.通信结束则断开连接.工作线程也会自然终止.
Thread workThread = new Thread(new EchoHandler(socket));
workThread.start();
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
public static void main(Stringargs) throws IOException
{
EchoServerPerClientPerThread server = new EchoServerPerClientPerThread();
server.service();
}
}
package com.game.landon.serverSocket;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/** *//**
*
*用{@link ThreadPoolImpl}实现的EchoServer,可提供优雅的关闭方法->关闭ServerSocket/关闭线程池
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-17
*
*/
public class EchoServerThreadPool
{
private int port = 8000;
private ServerSocket serverSocket;
private ThreadPoolImpl threadPool;
private final int POOL_SIZE = 4;//单cpu时线程池的工作线程的数目
public EchoServerThreadPool() throws IOException
{
serverSocket = new ServerSocket(port);
// 根据当前os可用的cpu数目设定线程池中的线程数目
threadPool = new ThreadPoolImpl(Runtime.getRuntime().availableProcessors() * POOL_SIZE);
System.out.println("EchoServerThreadPool started.");
}
public void service()
{
while(true)
{
Socket socket;
try
{
socket = serverSocket.accept();
threadPool.execute(new EchoHandler(socket));//将与client echo的任务提交给线程池
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException
{
new EchoServerThreadPool().service();
}
}
package com.game.landon.serverSocket;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/** *//**
*
*可关闭自身的EchoServer
*<pre>
*目前有一个问题是如果启动了EchoServer,然后启动了EchoClient->再启动ShutdownClient->如果EchoClient一直不输入数据->
*即使EchoServer已经shutdown->EchoClient进程也一直没停止->因为EchoClient#talk->是一直在等待client输入->EchoServer shutdown后->输入任意
*->会抛出java.net.SocketException: Software caused connection abort: recv failed
*</pre>
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-23
*
*/
public class EchoServerWithShutdown
{
private int port = 8000;
private ServerSocket serverSocket;
private ExecutorService executorService;
private final int POOL_SIZE = 4;
/** *//** 监听shutdown的port */
private int portForShutdown = 8001;
/** *//** 监听shutdown的ServerSocket */
private ServerSocket serverSocketForShutdown;
/** *//** server是否shutdown */
private boolean isShutdown = false;
public EchoServerWithShutdown() throws IOException
{
serverSocket = new ServerSocket(port);
serverSocket.setSoTimeout(60 * 1000);//设定等待client连接的超时时间为60秒
serverSocketForShutdown = new ServerSocket(portForShutdown);
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * POOL_SIZE);
// 启动shutdown线程
shutdownThread.start();
System.out.println("EchoServerWithShutdown launched");
}
public void service()
{
while(!isShutdown)//主循环.shutdown线程执行shutdown后,下次循环的时候就会退出service方法
{
Socket socket = null;
try
{
socket = serverSocket.accept();
// 设置等待client发送数据的超时时间为60秒->如果client60秒内未发送数据->则EchoHandler#79L->抛出java.net.SocketTimeoutException: Read timed out
//->EchoHandler这个task自然结束->线程池自然结束
socket.setSoTimeout(60 * 1000);
// 可能会抛出RejectedExecutionException->shutdown线程调用了executorService#shutdown
executorService.submit(new EchoHandler(socket));
}
catch(SocketTimeoutException e)
{
e.printStackTrace();
}
catch(RejectedExecutionException e)
{
try
{
if(socket != null)
{
socket.close();
}
}
catch(IOException ie)
{
}
return;//退出service
}
catch(SocketException e)
{
e.printStackTrace();
//serverSocket.accept()->serverSocket被shutdown线程关闭
if(e.getMessage().indexOf("socket closed") != -1)
{
return;//退出service
}
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
/** *//** 执行关闭server的shutdown线程 */
private Thread shutdownThread = new Thread()
{
public void start()
{
// 设置为后台线程(守护线程),用户线程全部关闭后,jvm自动退出
setDaemon(true);
super.start();
}
public void run()
{
while(!isShutdown)
{
Socket socketForShutdown = null;
try
{
socketForShutdown = serverSocketForShutdown.accept();
BufferedReader br = new BufferedReader(new InputStreamReader(socketForShutdown.getInputStream()));
String cmd = br.readLine();
if(cmd.equals("shutdown"))
{
// server执行关闭开始时间
long beginTime = System.currentTimeMillis();
socketForShutdown.getOutputStream().write("server is shutdowning\r\n".getBytes());
// 设置关闭状态
isShutdown = true;
// 请求关闭线程池->线程池不再接收新的任务->但是会执行完工作队列现有的任务
executorService.shutdown();
// 等待关闭线程池,每次等待的超时时间为30秒(shutdown线程blocked.)
while(!executorService.isTerminated())
{
executorService.awaitTermination(30, TimeUnit.SECONDS);
}
//线程池terminate后
serverSocket.close();//关闭ServerSocket
long endTime = System.currentTimeMillis();
socketForShutdown.getOutputStream().write(("echo server has shutdown..used:" + (endTime - beginTime) + "milliseconds\r\n").getBytes());
socketForShutdown.close();
serverSocketForShutdown.close();
}
else
{
socketForShutdown.getOutputStream().write("err cmd".getBytes());
socketForShutdown.close();
}
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
};
public static void main(String[] args) throws IOException
{
new EchoServerWithShutdown().service();
}
}
package com.game.landon.serverSocket;
import java.io.IOException;
import java.net.ServerSocket;
/** *//**
*
*测试ServerSocket匿名端口
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-5
*
*/
public class RandomPort
{
public static void main(Stringargs) throws IOException
{
// port 设为0,表示匿名端口
ServerSocket serverSocket = new ServerSocket(0);
System.out.println(serverSocket.getInetAddress() + ":" + serverSocket.getLocalPort());
}
}
package com.game.landon.serverSocket;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/** *//**
*
*测试server的连接请求队列特性的server
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-2
*
*/
public class Server
{
private int port = 8000;
private ServerSocket serverSocket;
public Server() throws IOException
{
serverSocket = new ServerSocket(port, 3);//设置连接请求队列的长度为3
System.out.println("server started");
}
// 只要该方法执行,不断执行accept方法->从队列中取出连接请求->便可以使队列及时腾出空位以容纳新的连接请求
public void service()
{
while(true)
{
Socket socket = null;
try
{
socket = serverSocket.accept();//从连接队列中取出一个连接
System.out.println("New Connection accepted: " + socket.getInetAddress() + ":" + socket.getPort());//打印连接进来的socket ip:端口
}
catch(IOException e)
{
e.printStackTrace();
}
finally
{
try
{
if(socket != null)
{
socket.close();
}
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception
{
Server server = new Server();
// TimeUnit.MILLISECONDS.sleep(10 * 60 * 1000);//睡眠10分钟
server.service();
}
}
package com.game.landon.serverSocket;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
/** *//**
*
*向EchoServer发送shutdown的client
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-23
*
*/
public class ShutdownEchoClient
{
public static void main(String[] args)
{
Socket socket = null;
try
{
socket = new Socket("localhost",8001);
// 必须发送一行数据.因为handler调用的是br.readLine
socket.getOutputStream().write("shutdown\r\n".getBytes());
//接收server反馈
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String msg = null;
while((msg = br.readLine()) != null)
{
System.out.println(msg);
}
}
catch(IOException e)
{
e.printStackTrace();
}
finally
{
try
{
if(socket != null)
{
socket.close();
}
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
}
package com.game.landon.serverSocket;
import java.io.IOException;
import java.net.ServerSocket;
/** *//**
*
*ServerSocket#close
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-3
*
*/
public class TestServerSocketClose
{
public static void main(String[] args)
{
for(int port = 1;port <= 65535;port++)
{
try
{
ServerSocket serverSocket = new ServerSocket(port);
serverSocket.close();//马上关闭,及时释放它占用的端口,避免临时占用系统的大多数端口
}
catch(IOException e)
{
System.err.println(String.format("[%d] is occupied by other server process", port));
}
}
}
}
package com.game.landon.serverSocket;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
/** *//**
*
*测试ServerSocket#SO_RCVBUF选项
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-9
*
*/
public class TestServerSocketRcvBuf
{
public static void main(String[] args) throws Exception
{
ServerSocket serverSocket = new ServerSocket(8000);
int recBufSize = serverSocket.getReceiveBufferSize();
//输出:Windows 7:8192
System.out.println(System.getProperties().getProperty("os.name") + ":" + recBufSize);
ServerSocket ss2 = new ServerSocket();
// 如果设置要超过64k以上的缓冲区,则必须bind之前调用 .暂时未确认.
ss2.setReceiveBufferSize(65 * 1024 * 1024);
ss2.bind(new InetSocketAddress(8001));
}
}
package com.game.landon.serverSocket;
import java.util.LinkedList;
/** *//**
*
*线程池的一个简单实现
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-11
*
*/
// A thread group represents a set of threads. In addition, a threadgroup can also include other thread groups
public class ThreadPoolImpl extends ThreadGroup
{
/** *//** 工作队列 */
private LinkedList<Runnable> workQueue;
/** *//** 线程池是否关闭 */
private boolean isClosed;
/** *//** 线程池id */
private static int threadPoolID;
/** *//** 线程id */
private int threadID;
public ThreadPoolImpl(int poolSize)
{
super("ThreadPool-" + (threadPoolID++));
//1.Changes the daemon status of this thread group
//2.A daemon thread group is automatically destroyed when its last thread is stopped or its last thread group is destroyed
setDaemon(true);
workQueue = new LinkedList<Runnable>();//创建工作队列
//创建poolSize的数目的工作线程并启动
for(int i = 0;i < poolSize;i++)
{
new WorkThread().start();
}
}
/** *//**
*
* 向工作队列添加一个新任务(向线程池提交任务).synchronized->多线程->notify
*
* @param task
*/
public synchronized void execute(Runnable task)
{
// 如果线程池被关闭,则抛出IllegalStateException
if(isClosed)
{
throw new IllegalStateException();
}
if(task != null)
{
workQueue.add(task);
// 唤醒正在getTask()方法中等待任务的工作线程
notify();
}
}
/** *//**
*
* 从工作队列取出一个任务.工作线程调用此方法->在wait的时候可被中断,即会抛出InterruptedException
*
* @return
* @throws InterruptedException
*/
public synchronized Runnable getTask() throws InterruptedException
{
while(workQueue.size() == 0)
{
// 如果线程池已关闭,返回null
if(isClosed)
{
return null;
}
// 如果线程池未关闭且工作队列为空,则getTask调用线程则等待.并释放锁.->可能会被其他线程interrupt.
wait();
}
//移除工作队列的第一个任务并返回
return workQueue.removeFirst();
}
/** *//**
* 关闭线程池
*/
public synchronized void close()
{
if(!isClosed)
{
isClosed = true;
//清除工作队列
workQueue.clear();
// 中断所有工作线程
//1.如果此时一个工作线程正在因getTask->wait->而阻塞->则会抛出IneterruptedException
//2.如果此时一个工作线程正在执行一个任务->则这个工作线程会正常执行完任务->但是在下一轮while(!isInterrupted())循环->而退出while->
//3.如果执行的任务时候工作线程被阻塞了->则也会抛出IneterruptedException->被catch了(可能被task#run catch,如task执行sleep)->然后下一轮while的时候,退出while
interrupt();
}
}
/** *//**
* 关闭线程池,等待工作线程将所有的任务执行完再结束
*/
public void join()
{
synchronized(this)
{
isClosed = true;
notifyAll();//唤醒在getTask中等待任务的工作线程->因为已经isClosed置为true->所以线程唤醒后->return null.
}
//因为经过上步,已经有部分等待任务的工作线程已经结束了.
//activeCount:Returns an estimate of the number of active threads in this thread group
Thread[] threads = new Thread[activeCount()];
//enumerate:Copies into the specified array every active thread in this thread group and its subgroups
int count = enumerate(threads);
//调用每个active thread的join->等待所有工作线程运行结束->当前调用线程则等待->可能被interrupt->
for(int i = 0;i < count;i++)
{
try
{
threads[i].join();
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
/** *//**
*
* 内部类:工作线程
*
* @author landon
*
*/
private class WorkThread extends Thread
{
public WorkThread()
{
// java.lang.Thread.Thread(ThreadGroup group, String name)
super(ThreadPoolImpl.this, "WorkdThread-" + (threadID++));
}
public void run()
{
while(!isInterrupted())//判断线程是否被中断
{
Runnable task = null;
try
{
task = getTask();//从工作队列取任务,可能wait
}
catch(InterruptedException e)
{
e.printStackTrace();
}
// 如果task为null则可能表示getTask时被中断了->直接return->退出while->结束此线程
if(task == null)
{
return;
}
try
{
task.run();// 运行任务,任务的执行过程中可能抛出异常.
}
catch(Throwable t)
{
t.printStackTrace();
}
}
}
}
}
package com.game.landon.serverSocket;
import java.util.concurrent.TimeUnit;
/** *//**
*
*测试{@link ThreadPoolImpl}的简单使用
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-17
*
*/
public class ThreadPoolTester
{
public static void main(Stringargs)
{
if(args.length != 2)
{
System.err.println("Usage: java ThreadPoolTester numTasks poolSize");
return;
}
int numTasks = Integer.parseInt(args[0]);
int poolSize = Integer.parseInt(args[1]);
ThreadPoolImpl threadPool= new ThreadPoolImpl(poolSize);//创建线程池
//向线程池提交任务
for(int i = 0;i < numTasks;i++)
{
threadPool.execute(createTask(i));
}
threadPool.join();//等待工作线程完成所有任务
// threadPool.close();
}
/** *//**
*
* 创建一个简单的任务
*
* @param taskID
* @return
*/
private static Runnable createTask(final int taskID)
{
return new Runnable()
{
@Override
public void run()
{
System.out.println("Task " + taskID + ":start");
try
{
TimeUnit.MILLISECONDS.sleep(500);//sleep->相当于任务的执行时间
}
catch(InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Task " + taskID + ":end");
}
};
}
}
package com.game.landon.serverSocket;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/** *//**
*
*测试ServerSocket#accept超时
*
*@author landon
*@since 1.6.0_35
*@version 1.0.0 2013-7-8
*
*<output>
Exception in thread "main" java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.PlainSocketImpl.accept(Unknown Source)
at java.net.ServerSocket.implAccept(Unknown Source)
at java.net.ServerSocket.accept(Unknown Source)
at com.game.landon.serverSocket.TimeoutTester.main(TimeoutTester.java:24)
*</output>
*
*/
public class TimeoutTester
{
public static void main(String[] args) throws IOException
{
ServerSocket serverSocket = new ServerSocket(8000);
// serverSocket.setSoTimeout(6 * 1000);//去掉这个,则accpet方法永远不会超时且一直等待下去,直到接收到了client连接才从accept返回
Socket socket = serverSocket.accept();//如果连接请求队列为空,则server会一直等待->直到设定的超时时间
socket.close();
}
}
posted on 2013-07-24 14:21
landon 阅读(5596)
评论(1) 编辑 收藏 所属分类:
Program 、
Book