1 package tea.client.network; 2 3 import java.net.InetSocketAddress; 4 import java.util.concurrent.Executors; 5 import org.jboss.netty.bootstrap.ClientBootstrap; 6 import org.jboss.netty.channel.Channel; 7 import org.jboss.netty.channel.ChannelFuture; 8 import org.jboss.netty.channel.ChannelHandlerContext; 9 import org.jboss.netty.channel.ChannelPipeline; 10 import org.jboss.netty.channel.ChannelPipelineFactory; 11 import org.jboss.netty.channel.ChannelStateEvent; 12 import org.jboss.netty.channel.Channels; 13 import org.jboss.netty.channel.ExceptionEvent; 14 import org.jboss.netty.channel.MessageEvent; 15 import org.jboss.netty.channel.SimpleChannelHandler; 16 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 17 import tea.common.network.ClientDecoder; 18 import tea.common.network.ClientEncoder; 19 import tea.common.network.ClientMessage; 20 21 /** 22 * @author Teaey 23 * @creation 2012-8-25 24 */ 25 public class BaseClient 26 { 27 public BaseClient(String ip, String port) 28 { 29 this.ip = ip; 30 this.port = port; 31 } 32 private String ip; 33 private String port; 34 private Channel channel; 35 private ClientBootstrap bootstrap; 36 private Object syn = new Object(); 37 private static final int Receive_Timeout = 10000; //ms 38 private ClientMessage response = null; 39 public void connect() 40 { 41 bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); 42 bootstrap.setOption("tcpNoDelay", true); 43 bootstrap.setPipelineFactory(new ClientPipelineFactory()); 44 while (true) 45 { 46 ChannelFuture future = bootstrap.connect(new InetSocketAddress(ip, Integer.parseInt(port))); 47 future.awaitUninterruptibly(5000); 48 if (future.isDone()) 49 { 50 channel = future.getChannel(); 51 if (channel != null && channel.isConnected()) 52 { 53 break; 54 } 55 } 56 } 57 } 58 public void disconnect() 59 { 60 if (channel.isConnected()) 61 { 62 channel.disconnect(); 63 } 64 } 65 public boolean isConnected() 66 { 67 return channel.isConnected(); 68 } 69 public void close() 70 { 71 if (this.channel.isOpen()) 72 { 73 this.channel.close(); 74 } 75 bootstrap.releaseExternalResources(); 76 } 77 /** 78 * 发送消息,无需返回 79 */ 80 public void send(ClientMessage message) 81 { 82 channel.write(message); 83 } 84 /** 85 * 发送消息,等待返回 86 */ 87 public ClientMessage sendWaitBack(ClientMessage message) 88 { 89 response = null; 90 try 91 { 92 channel.write(message); 93 synchronized (syn) 94 { 95 try 96 { 97 syn.wait(Receive_Timeout); 98 } catch (InterruptedException e) 99 { 100 e.printStackTrace(); 101 } 102 } 103 if (null == response) 104 { 105 System.err.println("Receive response timeout"); 106 } 107 } catch (Exception e) 108 { 109 e.printStackTrace(); 110 } 111 return response; 112 } 113 class ClientPipelineFactory implements ChannelPipelineFactory 114 { 115 public ChannelPipeline getPipeline() throws Exception 116 { 117 ChannelPipeline p = Channels.pipeline(); 118 p.addLast("frameDecoder", new ClientDecoder()); 119 p.addLast("fremeEncoder", new ClientEncoder()); 120 p.addLast("logicHandler", new ClientMsgHandler()); 121 return p; 122 } 123 } 124 class ClientMsgHandler extends SimpleChannelHandler 125 { 126 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception 127 { 128 Object obj = e.getMessage(); 129 if (obj instanceof ClientMessage) 130 { 131 ClientMessage msg = (ClientMessage) obj; 132 response = msg; 133 synchronized (syn) 134 { 135 syn.notifyAll(); 136 } 137 } 138 } 139 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception 140 { 141 System.out.println("connected server:" + ctx.getChannel()); 142 } 143 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception 144 { 145 System.out.println("disconnected server:" + ctx.getChannel()); 146 } 147 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception 148 { 149 System.out.println("Error in exceptionCaught:" + e.getCause()); 150 } 151 } 152 } |