很久没上来留言了,这一阵子平时也是偶尔想起这里,这次心血来潮想留点什么,也希望高手提供一些更先进的编程思想。这次就谈SOCKET,这里这方面的介绍还是比较多的,大多都是C/S结构的为服务端,但是结合其它的应用上就差些,必须单独启动服务,如果扩展到B/S结构就有很多优势(其实也是个人感觉)......
我就把一些简单结构提出来,供大家参考和指教:
结构:
Message.java 消息类,用来组装通讯消息
MessageServlet.java 用servlet调用下类,为了和Resin等同时启动 MessageServer.java 启动封装类,指定监听端口
ServerListener.java 服务端管理类,起分发、处理等用途 ClientListener.java 每个客户端的监听线程 MessageListener.java 每条消息的线程,处理完则关闭
至于Message.java,我就不把我的文件全拷来,用个最简单的代替吧,大家也可以提供自己的想法,我自己的那个针对性太强,适用性太低,就参考下类吧(字符串处理比较直观,用byte比较稳定): package net.fool;
import java.io.Serializable; public class Message implements Serializable{ //信息类型 //private MessageType type = MessageType.HALFBAKED; //大家各抒己见吧 //信息内容 private String content = ""; //构造函数 public Message(String mess){ this.content = mess; } public String toString(){ return this.content; } }
至于MessageServlet.java 只是一个在init方法里启动服务,在destroy中关闭服务而已 比如对resin来说只要在某一个配置应用的web.xml添加 <servlet> <servlet-name>mss</servlet-name> <servlet-class>net.fool.MessageServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> 这一段文字,就可以在resin启动时启动该socket服务了
package net.fool;
import javax.servlet.*; import javax.servlet.http.*;
public class MessageServlet extends HttpServlet{ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, java.io.IOException{ } public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, java.io.IOException{ }
/** * <p>初始化servlet. * * @param cfg Servlet configuration information */ public void init(ServletConfig cfg) throws ServletException{ super.init(cfg); MessageServer mb = MessageServer.Connect(5678); }
/** * <p>清除servlet. */ public void destroy(){ super.destroy(); MessageServer.DisConnect(); } } 至于MessageServer.java, 则是创建一个ServerSocket,并启动监听线程,然后客户端连接上就可以做后面的事情了
package net.fool;
import java.util.*; import java.io.*; import java.net.*; import java.nio.*;
public class MessageServer{ private static int messagePort = 5678;
private static MessageServer mess; private static ServerSocket server; private static ServerListener serverListener;
private MessageServer(){ try{ server = new ServerSocket(messagePort); System.out.println("MessageServer listening to *:"+String.valueOf(messagePort)+""); serverListener = new ServerListener(server); serverListener.start(); }catch(Exception e){ e.printStackTrace(); } }
public synchronized static MessageServer Connect(int port){ MessageServer.messagePort = port; if (mess == null) { mess = new MessageServer(); }else if(server.getLocalPort()!=port){ DisConnect(); mess = new MessageServer(); } return mess; }
public synchronized static MessageServer Connect(){ return Connect(messagePort); }
public synchronized static void DisConnect(){ try{ mess = null; System.out.println("close MessageServer for port:"+String.valueOf(server.getLocalPort())+""); serverListener = null; server.close(); }catch(Exception e){} } } ServerListener.java, 则是一个监听线程,也是一个主服务线程
package net.fool;
import java.util.*; import java.io.*; import java.net.*;
public class ServerListener extends Thread{ private ServerSocket serverSocket;
private static Hashtable clients = new Hashtable(); //对应每个客户端的登陆信息 private static Hashtable messages = new Hashtable(); //未处理消息集合
private static Socket clientSocket; //多客户端
public ServerListener(ServerSocket serverSocket) throws Exception{ super("ServerListener"); this.serverSocket=serverSocket; }
public void run(){ try{ while(true){ clientSocket=serverSocket.accept();
Response(clientSocket); }//while }catch(Exception e){System.out.println(e);} }//run
public static void Response(Socket client) throws Exception{ try{ ClientListener clistener = new ClientListener(client,clients); clistener.start(); }catch(Exception e){ System.out.println("连接到" +client.getInetAddress().getHostAddress()+"失败!"); } }
//添加消息,可从B/S结构中传入需发布的消息 public static boolean addMessage(Message mess) throws Exception{ boolean flag = false; synchronized(messages){ try{ messages.put(mess,"newMessage"); //待扩充
MessageListener mlistener = new MessageListener(mess,messages); mlistener.start(); }catch(Exception e){ System.out.println("发布消息" +mess.toString()+"失败!"); return flag; } }
flag = true; return flag; }
//根据消息类型处理消息 消息分发 //这里只是发送到所有客户端 public synchronized static boolean releaseMessage(Message mess) throws Exception{ Enumeration enumeration = clients.keys(); for(;enumeration.hasMoreElements();){ try{ Socket soc = (Socket)enumeration.nextElement(); DataOutputStream dos = new DataOutputStream(soc.getOutputStream()); dos.write(mess.toString().getBytes("GBK")); }catch(Exception e){} } return true; } }//class ClientListener.java, 则是每一个客户端监听线程,得到客户端发送的消息,整理起来,把相关消息丢给主服务分发消息
package net.fool;
import java.util.*; import java.io.*; import java.net.*;
public class ClientListener extends Thread{ private static Hashtable clients; private Message userMessage; private Socket socket;
private BufferedReader reader; private boolean listening = true;
public ClientListener(Socket socket,Hashtable clients) throws Exception{ super("ClientListener"); this.clients=clients; this.socket=socket;
this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); }
public void run(){ try { listener(); }catch(Exception e) { e.printStackTrace(); }finally{ synchronized(clients){ //String ss_c = ("断开连接:"+socket); clients.remove(socket); try{ socket.close(); reader.close(); //ServerListener.releaseMessage(某人下线消息); }catch(Exception ex){} //System.out.println(ss_c); } } }
private void listener() throws Exception{ synchronized(clients){ clients.put(socket,userMessage); } //扩展后应放在下面的循环内,当为某人的上线消息时才执行这个
while(listening) {
//读取客户端发送来的消息 String rLine = reader.readLine(); if(rLine==null||rLine.equals("\r\n")) break;; Message me = new Message(rLine);
//当为某人的上线消息时 synchronized......
ServerListener.addMessage(me); } } }//class MessageListener.java, 则是每一条消息监听线程,只是让消息堆积起来慢慢处理
package net.fool;
import java.util.*; import java.io.*; import java.net.*;
public class MessageListener extends Thread{ private static Hashtable messages; private Message message; private boolean listening = true;
public MessageListener(Message message,Hashtable messages) throws Exception{ super("MessageListener"); this.messages=messages; this.message=message; }
public void run(){ try { listener(); }catch(Exception e) { e.printStackTrace(); }finally{ synchronized(messages){ messages.remove(message); try{ message = null; }catch(Exception ex){} } } }
private void listener() throws Exception{ while(listening){
//本条消息的处理 //包括即时和等待,存储和丢失,个人和群发等等判断,待扩充Message类 try{ listening = !ServerListener.releaseMessage(message); }catch(Exception e){e.printStackTrace();} } }
}//class
|