1 package com.lei.socket.test;
2
3 import java.io.BufferedReader;
4 import java.io.BufferedWriter;
5 import java.io.IOException;
6 import java.io.InputStream;
7 import java.io.InputStreamReader;
8 import java.io.OutputStreamWriter;
9 import java.net.InetSocketAddress;
10 import java.nio.ByteBuffer;
11 import java.nio.CharBuffer;
12 import java.nio.channels.SelectionKey;
13 import java.nio.channels.Selector;
14 import java.nio.channels.ServerSocketChannel;
15 import java.nio.channels.SocketChannel;
16 import java.nio.charset.Charset;
17 import java.nio.charset.CharsetDecoder;
18 import java.util.Iterator;
19
20
21 public class SelectorServer {
22
23
24 public void start(){
25 try{
26 Selector selector = Selector.open();
27 ServerSocketChannel server = ServerSocketChannel.open();
28 server.socket().bind(new InetSocketAddress(port));
29 server.configureBlocking(false);
30 SelectionKey serverkey = server.register(selector, SelectionKey.OP_ACCEPT);
31 System.out.println("HelloServer started on port " + port);
32 while(runFlag) {
33 selector.select();
34 Iterator iter = selector.selectedKeys().iterator();
35 while(iter.hasNext()){
36 SelectionKey key = (SelectionKey)iter.next();
37 iter.remove();
38 if(key == serverkey){
39 if(! key.isAcceptable()) continue;
40 SocketChannel client = server.accept();
41 if(client == null) continue;
42 Session session = new Session(client,++sessionId);
43 client.configureBlocking(false);
44 session.onCreate();
45 client.register(selector, SelectionKey.OP_READ,session);
46 }else if(key.isReadable()){
47 Session session = (Session) key.attachment();
48 String data = session.getMessage();
49 if(data != null ){
50 System.out.println("receice data from "+ session.getSocketChannel().socket().getInetAddress().getHostAddress()+ " :" + data);
51 session = handleData(session,data);
52 }
53 if(session != null)
54 session.getSocketChannel().register(selector, SelectionKey.OP_WRITE,session);
55 }else if(key.isWritable()){
56 Session session = (Session) key.attachment();
57 session.sendMessage();
58 session.getSocketChannel().register(selector, SelectionKey.OP_READ,session);
59 }
60
61 }
62 }
63 }catch(IOException e){
64 System.out.println("server start error");
65 e.printStackTrace();
66 }
67 }
68
69
70 //业务逻辑处理
71 public Session handleData(Session session,String data){
72 if(data.equalsIgnoreCase("quit")){
73 session.onClose();
74 session = null;
75 }else{
76 session.write("hello:" + data +"\r\n");
77 }
78 return session;
79 }
80
81 public static void main(String[] args){
82 SelectorServer s = new SelectorServer();
83 s.start();
84 }
85
86 private int port = 3000;
87
88 private int sessionId = 0;
89
90 private boolean runFlag = true;
91
92
93 //会话类
94 private class Session{
95 public Session(SocketChannel sc,int sessionId){
96 this.sc = sc;
97 this.sessionId = sessionId;
98 }
99
100 private boolean read(){
101 ByteBuffer buf = ByteBuffer.allocate(10);
102 int readBytes = 0;
103 int matchCount = 0;
104 try {
105 int ret;
106 try {
107 while ((ret = sc.read(buf)) > 0)
108 readBytes += ret;
109 } finally {
110 buf.flip();
111 }
112
113 int oldPos = buf.position();
114 int oldLimit = buf.limit();
115 while (buf.hasRemaining()) {
116 byte b = buf.get();
117
118 switch (b) {
119 case '\r':
120 matchCount++;
121 break;
122 case '\n':
123 matchCount++;
124 break;
125 default:
126 matchCount = 0;
127 }
128 }
129 buf.limit(oldLimit - matchCount);
130 buf.position(oldPos);
131 if(receiveBuffer.position() + readBytes > receiveBuffer.capacity())
132 receiveBuffer = capacity(receiveBuffer.position() + readBytes,receiveBuffer);
133
134 receiveBuffer.put(buf);
135 } catch (Throwable e) {
136
137 } finally {
138 buf.clear();
139 }
140 if(matchCount > 0)
141 return true;
142 else return false;
143 }
144
145 public String getMessage(){
146 if(!read())
147 return null;
148 receiveBuffer.flip();
149 String result = decode(receiveBuffer);
150 receiveBuffer.clear();
151 return result;
152 }
153
154 public void write(String str){
155 byte[] b;
156 try{
157 b = str.getBytes("iso8859-1");
158 }catch(Exception e){b = str.getBytes();}
159 if(sendBuffer.position() + b.length > sendBuffer.capacity())
160 sendBuffer = capacity(sendBuffer.position() + b.length +1,sendBuffer);
161
162 sendBuffer.put(b);
163 }
164
165 public void sendMessage(){
166 sendBuffer.flip();
167 try{
168 sc.write(sendBuffer);
169 }catch(Exception e){}
170 sendBuffer.clear();
171 }
172
173 public void onCreate(){
174 System.out.println("accept client " + sc.socket().getInetAddress().getHostAddress() + ",session:" + sessionId);
175 }
176
177 public void onClose(){
178 System.out.println("quit client " + sc.socket().getInetAddress().getHostAddress() + ",session:" + sessionId);
179 try{
180 sc.socket().close();
181 sc.close();
182 }catch(Exception e){e.printStackTrace();}
183 }
184
185 private String decode(ByteBuffer buffer) {
186 Charset charset = null;
187 CharsetDecoder decoder = null;
188 CharBuffer charBuffer = null;
189 try {
190 charset = Charset.forName("iso8859-1");
191 decoder = charset.newDecoder();
192 charBuffer = decoder.decode(buffer);
193 return charBuffer.toString();
194 } catch (Exception ex) {
195 ex.printStackTrace();
196 return "";
197 }
198 }
199
200 private ByteBuffer capacity(int newSize,ByteBuffer oldBuf){
201 if(newSize <= oldBuf.capacity())
202 return oldBuf;
203 int oldPosition = oldBuf.position();
204 ByteBuffer newBuf = ByteBuffer.allocate(newSize);
205 newBuf.clear();
206 oldBuf.clear();
207 newBuf.put(oldBuf);
208 newBuf.position(oldPosition);
209 oldBuf = null;
210 return newBuf;
211 }
212
213 public SocketChannel getSocketChannel(){
214 return sc;
215 }
216 private SocketChannel sc;
217 private int sessionId;
218 private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
219 private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
220 }
221 }
222