Scenario服务器1:客户端n 发送Notification给客户端的后处理服务器端给第一个客户端发送notification,然后在限定时间内,等待客户端作出回应—向服务器发送request。如果客户端一直没有回复,服务器会在到达限定时间后,向第二个客户端发送notification。如果客户端在限定时间内回复,服务器端放弃再给其他客户端发送消息。Design服务器、客户端使用socket发送和接收信息发送端存在一个客户端列表,每次发送一给一个客户端,发送后,向Helper发送添加该客户端id的请求。 客户端收到信息会向Helper发送删除该id的请求。 Helper收到add时,启动一个ScheduledExecutorService类的schedule,延时启动一个线程,并将该schedule缓存。remove时,从缓存里取出schedule并停止它。如果在延时时间内,线程没有被停止,它会被执行:从缓存中取出,告诉服务器向下一个客户端发送请求。 UML
Code
Server public class Server { public static void main(String[] args) throws Exception { final String id = "100"; ServerSocket serverSocket = new ServerSocket(IO.BIO_TCP_PORT); System.out.println("Server is listening on port: " + IO.BIO_TCP_PORT); Socket socket = null; try { socket = serverSocket.accept(); } catch (Exception e) { System.out.println("accept socket error."); }
SendingNotification sender = new SendingNotification(id, socket); sender.start(); ReceivingRequest receiver=new ReceivingRequest(socket); receiver.start(); }
SendingNotification public class SendingNotification extends Thread { private String id; private Socket socket;
public SendingNotification(String sdId, Socket socket) { this.id = sdId; this.socket = socket; }
@Override public void run() { Helper.getInstance().add(id); OutputStream outputStream = null; byte[] buffer = new byte[1024]; try { outputStream = socket.getOutputStream(); buffer = (id+"\n").getBytes(); outputStream.write(buffer); outputStream.flush(); } catch (Exception e) { System.out.println("don't send success"); try { outputStream.close(); socket.close(); } catch (Exception e1) { } } } }
ReceivingRequest public class ReceivingRequest extends Thread { private Socket socket;
public ReceivingRequest(Socket socket) { this.socket = socket; }
@Override public void run() { BufferedReader in; boolean finished = false; while (!finished) { try { in = new BufferedReader(new InputStreamReader(socket.getInputStream())); String line = in.readLine(); if (line == null) { Thread.sleep(100); continue; } Helper.getInstance().remove(line); in.close(); socket.close(); finished = true; } catch (Exception e) { System.out.println("receive fails to run."); } } } }
Helper public class Helper { private static Helper instance = new Helper(); private ConcurrentHashMap<String, Schedule> cache = new ConcurrentHashMap<String, Schedule>(); private int timeout = 10;
public static Helper getInstance() { return instance; }
private Schedule addTask(final String id) { final Schedule schedule = new Schedule(); schedule.schedule(new Runnable() { public void run() { doNext(id); schedule.shutdown(); } }, timeout, SECONDS); return schedule; }
private void doNext(String id) { Schedule schedule = cache.remove(id); System.out.println("time out and do next well."); System.out.println("total time=" + schedule.getSeconds()); }
public void add(final String id) { Schedule schedule = addTask(id); cache.put(id, schedule); System.out.println("Add to cache successfully"); }
public void remove(final String id) { Schedule schedule = cache.remove(id); if (schedule == null) System.out.println("no schedule exist."); else { schedule.shutdown(); System.out.println("Remove to cache successfully"); } }
Schedule public class Schedule { ScheduledExecutorService excutor; private long startTime;
public Schedule() { excutor = Executors.newSingleThreadScheduledExecutor(); startTime = System.currentTimeMillis(); }
public long getTotalTime() { long endTime = System.currentTimeMillis(); return endTime - startTime; }
public String getSeconds() { long s = getTotalTime() / 1000; return s + " seconds"; }
public void schedule(Runnable command, long delay, TimeUnit unit) { excutor.schedule(command, delay, unit); }
public void shutdown() { excutor.shutdownNow(); } }
Client public class Client { public static void main(String[] args) { Socket socket; try { socket = new Socket(IO.SERVER_IP, IO.BIO_TCP_PORT); readLine(socket); } catch (UnknownHostException e) { } catch (IOException e) { } catch (InterruptedException e) { } }
private static void readLine(Socket socket) throws IOException, InterruptedException { BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream(), true); boolean flag = true; while (flag) { String command = in.readLine(); if (command == null) { flag = true; continue; } else { //Thread.sleep(2000); out.println(command); out.flush(); out.close(); in.close(); socket.close(); flag = false; } } }
IO public interface IO { String SERVER_IP = "127.0.0.1";//"192.168.225.166"; int BIO_TCP_PORT = 9109;
|
|
随笔:7
文章:1
评论:2
引用:0
| 日 | 一 | 二 | 三 | 四 | 五 | 六 |
---|
26 | 27 | 28 | 29 | 30 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 | 1 | 2 | 3 | 4 | 5 | 6 |
|
公告
常用链接
留言簿
随笔分类
随笔档案
文章分类
文章档案
搜索
最新评论
阅读排行榜
评论排行榜
|
|