转自:http://blog.csdn.net/programmer_editor/archive/2009/01/04/3705427.aspx
由于篇幅问题,《程序员》杂志09年第1期《服务器负载均衡架构之应用层负载均衡》文中的源代码没有在杂志上刊登,下面贴出其中提到的7段代码,供读者使用。
CodeFile_1.java
- class LoadBalancerHandler implements IHttpRequestHandler, ILifeCycle {
- private final List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>();
- private HttpClient httpClient;
-
- public LoadBalancerHandler(InetSocketAddress... srvs) {
- servers.addAll(Arrays.asList(srvs));
- }
- public void onInit() {
- httpClient = new HttpClient();
- httpClient.setAutoHandleCookies(false);
- }
- public void onDestroy() throws IOException {
- httpClient.close();
- }
- public void onRequest(final IHttpExchange exchange) throws IOException {
- IHttpRequest request = exchange.getRequest();
-
- Integer customerId = request.getRequiredIntParameter("id");
- int idx = customerId.hashCode() % servers.size();
- if (idx < 0) {
- idx *= -1;
- }
-
- InetSocketAddress server = servers.get(idx);
- URL url = request.getRequestUrl();
- URL newUrl = new URL(url.getProtocol(), server.getHostName(), server.getPort(), url.getFile());
- request.setRequestUrl(newUrl);
-
-
-
- IHttpResponseHandler respHdl = new IHttpResponseHandler() {
- @Execution(Execution.NONTHREADED)
- public void onResponse(IHttpResponse response) throws IOException {
- exchange.send(response);
- }
- @Execution(Execution.NONTHREADED)
- public void onException(IOException ioe) throws IOException {
- exchange.sendError(ioe);
- }
- };
-
- httpClient.send(request, respHdl);
- }
- }
- class LoadBalancer {
- public static void main(String[] args) throws Exception {
- InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("srv1", 8030), new InetSocketAddress("srv2", 8030)};
- HttpServer loadBalancer = new HttpServer(8080, new LoadBalancerHandler(srvs));
- loadBalancer.run();
- }
- }
CodeFile_2.java
- class CookieBasedLoadBalancerHandler implements IHttpRequestHandler, ILifeCycle {
- private final List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>();
- private int serverIdx = 0;
- private HttpClient httpClient;
-
- public CookieBasedLoadBalancerHandler(InetSocketAddress... realServers) {
- servers.addAll(Arrays.asList(realServers));
- }
- public void onInit() {
- httpClient = new HttpClient();
- httpClient.setAutoHandleCookies(false);
- }
- public void onDestroy() throws IOException {
- httpClient.close();
- }
- public void onRequest(final IHttpExchange exchange) throws IOException {
- IHttpRequest request = exchange.getRequest();
- IHttpResponseHandler respHdl = null;
- InetSocketAddress serverAddr = null;
-
- cl : for (String cookieHeader : request.getHeaderList("Cookie")) {
- for (String cookie : cookieHeader.split(";")) {
- String[] kvp = cookie.split("=");
- if (kvp[0].startsWith("LB_SLOT")) {
- int slot = Integer.parseInt(kvp[1]);
- serverAddr = servers.get(slot);
- break cl;
- }
- }
- }
-
- if (serverAddr == null) {
- final int slot = nextServerSlot();
- serverAddr = servers.get(slot);
- respHdl = new IHttpResponseHandler() {
- @Execution(Execution.NONTHREADED)
- public void onResponse(IHttpResponse response) throws IOException {
-
- response.setHeader("Set-Cookie", "LB_SLOT=" + slot + ";Path=/");
- exchange.send(response);
- }
- @Execution(Execution.NONTHREADED)
- public void onException(IOException ioe) throws IOException {
- exchange.sendError(ioe);
- }
- };
- } else {
- respHdl = new IHttpResponseHandler() {
- @Execution(Execution.NONTHREADED)
- public void onResponse(IHttpResponse response) throws IOException {
- exchange.send(response);
- }
- @Execution(Execution.NONTHREADED)
- public void onException(IOException ioe) throws IOException {
- exchange.sendError(ioe);
- }
- };
- }
-
- URL url = request.getRequestUrl();
- URL newUrl = new URL(url.getProtocol(), serverAddr.getHostName(), serverAddr.getPort(), url.getFile());
- request.setRequestUrl(newUrl);
-
-
-
- httpClient.send(request, respHdl);
- }
-
- private synchronized int nextServerSlot() {
- serverIdx++;
- if (serverIdx >= servers.size()) {
- serverIdx = 0;
- }
- return serverIdx;
- }
- }
- class LoadBalancer {
- public static void main(String[] args) throws Exception {
- InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("srv1", 8030), new InetSocketAddress("srv2", 8030)};
- CookieBasedLoadBalancerHandler hdl = new CookieBasedLoadBalancerHandler(srvs);
- HttpServer loadBalancer = new HttpServer(8080, hdl);
- loadBalancer.run();
- }
- }
CodeFile_3.java
- class RedirectLoadBalancerHandler implements IHttpRequestHandler {
- private final List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>();
-
- public RedirectLoadBalancerHandler(InetSocketAddress... realServers) {
- servers.addAll(Arrays.asList(realServers));
- }
- @Execution(Execution.NONTHREADED)
- public void onRequest(final IHttpExchange exchange) throws IOException, BadMessageException {
- IHttpRequest request = exchange.getRequest();
-
- Integer customerId = request.getRequiredIntParameter("id");
- int idx = customerId.hashCode() % servers.size();
- if (idx < 0) {
- idx *= -1;
- }
-
- HttpResponse redirectResponse = new HttpResponse(303, "text/html", "<html>....");
-
- InetSocketAddress server = servers.get(idx);
- URL url = request.getRequestUrl();
- URL newUrl = new URL(url.getProtocol(), server.getHostName(), server.getPort(), url.getFile());
- redirectResponse.setHeader("Location", newUrl.toString());
-
- exchange.send(redirectResponse);
- }
- }
- class Server {
- public static void main(String[] args) throws Exception {
- InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("srv1", 8030), new InetSocketAddress("srv2", 8030)};
- RedirectLoadBalancerHandler hdl = new RedirectLoadBalancerHandler(srvs);
- HttpServer loadBalancer = new HttpServer(8080, hdl);
- loadBalancer.run();
- }
- }
CodeFile_4.java
- class LoadBalancerRequestInterceptor implements IHttpRequestHandler, ILifeCycle {
- private final List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>();
- private InetSocketAddress localServer;
- private HttpClient httpClient;
-
- public LoadBalancerRequestInterceptor(InetSocketAddress localeServer, InetSocketAddress... srvs) {
- this.localServer = localeServer;
- servers.addAll(Arrays.asList(srvs));
- }
- public void onInit() {
- httpClient = new HttpClient();
- httpClient.setAutoHandleCookies(false);
- }
- public void onDestroy() throws IOException {
- httpClient.close();
- }
- public void onRequest(final IHttpExchange exchange) throws IOException, BadMessageException {
- IHttpRequest request = exchange.getRequest();
- Integer customerId = request.getRequiredIntParameter("id");
- int idx = customerId.hashCode() % servers.size();
- if (idx < 0) {
- idx *= -1;
- }
- InetSocketAddress server = servers.get(idx);
-
- if (server.equals(localServer)) {
- exchange.forward(request);
-
- } else {
- URL url = request.getRequestUrl();
- URL newUrl = new URL(url.getProtocol(), server.getHostName(), server.getPort(), url.getFile());
- request.setRequestUrl(newUrl);
-
-
- IHttpResponseHandler respHdl = new IHttpResponseHandler() {
- @Execution(Execution.NONTHREADED)
- public void onResponse(IHttpResponse response) throws IOException {
- exchange.send(response);
- }
- @Execution(Execution.NONTHREADED)
- public void onException(IOException ioe) throws IOException {
- exchange.sendError(ioe);
- }
- };
- httpClient.send(request, respHdl);
- }
- }
- }
- class Server {
- public static void main(String[] args) throws Exception {
- RequestHandlerChain handlerChain = new RequestHandlerChain();
- InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("srv1", 8030), new InetSocketAddress("srv2", 8030)};
- handlerChain.addLast(new LoadBalancerRequestInterceptor(new InetSocketAddress("srv1", 8030), srvs));
- handlerChain.addLast(new CacheInterceptor(new LocalHttpResponseCache()));
- handlerChain.addLast(new MyRequestHandler());
- HttpServer httpServer = new HttpServer(8030, handlerChain);
- httpServer.run();
- }
- }
CodeFile_5.java
- class LoadBalancerRequestInterceptor implements IHttpRequestHandler, ILifeCycle {
- private final Map<String, List<InetSocketAddress>> serverClusters = new HashMap<String, List<InetSocketAddress>>();
- private HttpClient httpClient;
-
- public void addVirtualServer(String virtualServer, InetSocketAddress... realServers) {
- serverClusters.put(virtualServer, Arrays.asList(realServers));
- }
- public void onInit() {
- httpClient = new HttpClient();
- httpClient.setAutoHandleCookies(false);
- }
- public void onDestroy() throws IOException {
- httpClient.close();
- }
- public void onRequest(final IHttpExchange exchange) throws IOException, BadMessageException {
- IHttpRequest request = exchange.getRequest();
- URL requestUrl = request.getRequestUrl();
- String targetServer = requestUrl.getHost() + ":" + requesrUrl.getPort();
-
- for (Entry<String, List<InetSocketAddress>> serverCluster : serverClusters.entrySet()) {
- if (targetServer.equals(serverCluster.getKey())) {
- String id = request.getRequiredStringParameter("id");
- int idx = id.hashCode() % serverCluster.getValue().size();
- if (idx < 0) {
- idx *= -1;
- }
- InetSocketAddress realServer = serverCluster.getValue().get(idx);
- URL newUrl = new URL(requesrUrl.getProtocol(), realServer.getHostName(), realServer.getPort(), requesrUrl.getFile());
- request.setRequestUrl(newUrl);
-
-
- IHttpResponseHandler respHdl = new IHttpResponseHandler() {
- @Execution(Execution.NONTHREADED)
- public void onResponse(IHttpResponse response) throws IOException {
- exchange.send(response);
- }
- @Execution(Execution.NONTHREADED)
- public void onException(IOException ioe) throws IOException {
- exchange.sendError(ioe);
- }
- };
- httpClient.send(request, respHdl);
- return;
- }
- }
-
- exchange.forward(request);
- }
- }
- class SimpleTest {
- public static void main(String[] args) throws Exception {
-
- RequestHandlerChain handlerChain1 = new RequestHandlerChain();
- handlerChain1.addLast(new CacheInterceptor(new LocalHttpResponseCache()));
- handlerChain1.addLast(new MyRequestHandler());
- HttpServer httpServer1 = new HttpServer(8040, handlerChain1);
- httpServer1.start();
- RequestHandlerChain handlerChain2 = new RequestHandlerChain();
- handlerChain2.addLast(new CacheInterceptor(new LocalHttpResponseCache()));
- handlerChain2.addLast(new MyRequestHandler());
- HttpServer httpServer2 = new HttpServer(8030, handlerChain2);
- httpServer2.start();
-
- HttpClient httpClient = new HttpClient();
-
- LoadBalancerRequestInterceptor lbInterceptor = new LoadBalancerRequestInterceptor();
- InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("localhost", 8030), new InetSocketAddress("localhost", 8030) };
- lbInterceptor.addVirtualServer("customerService:8080", srvs);
- httpClient.addInterceptor(lbInterceptor);
-
- GetRequest request = new GetRequest("http://customerService:8080/price?id=2336&amount=5656");
- IHttpResponse response = httpClient.call(request);
- assert (response.getHeader("X-Cached") == null);
- request = new GetRequest("http://customerService:8080/price?id=2336&amount=5656");
- response = httpClient.call(request);
- assert (response.getHeader("X-Cached").equals("true"));
- request = new GetRequest("http://customerService:8080/price?id=2337&amount=5656");
- response = httpClient.call(request);
- assert (response.getHeader("X-Cached") == null);
- request = new GetRequest("http://customerService:8080/price?id=2337&amount=5656");
- response = httpClient.call(request);
- assert (response.getHeader("X-Cached").equals("true"));
-
- }
- }
CodeFile_6.java
- class MySessionBasedRequestHandler implements IHttpRequestHandler {
- @SynchronizedOn(SynchronizedOn.SESSION)
- public void onRequest(IHttpExchange exchange) throws IOException {
- IHttpRequest request = exchange.getRequest();
- IHttpSession session = exchange.getSession(true);
-
- Integer countRequests = (Integer) session.getAttribute("count");
- if (countRequests == null) {
- countRequests = 1;
- } else {
- countRequests++;
- }
- session.setAttribute("count", countRequests);
-
- exchange.send(new HttpResponse(200, "text/plain", "count=" + countRequests));
- }
- }
- class Server {
- public static void main(String[] args) throws Exception {
- HttpServer httpServer = new HttpServer(8030, new MySessionBasedRequestHandler());
- httpServer.run ();
- }
- }
CodeFile_7.java
- class BackupBasedSessionManager implements ISessionManager {
- private ISessionManager delegee = null;
- private HttpClient httpClient = null;
- public BackupBasedSessionManager(HttpClient httpClient, ISessionManager delegee) {
- this.httpClient = httpClient;
- this.delegee = delegee;
- }
- public boolean isEmtpy() {
- return delegee.isEmtpy();
- }
- public String newSession(String idPrefix) throws IOException {
- return delegee.newSession(idPrefix);
- }
- public void registerSession(HttpSession session) throws IOException {
- delegee.registerSession(session);
- }
- public HttpSession getSession(String sessionId) throws IOException {
- HttpSession session = delegee.getSession(sessionId);
-
- if (session == null) {
- String id = URLEncoder.encode(sessionId);
- IHttpResponse response = httpClient.call(new GetRequest("http://sessionservice:8080/?id=" + id));
- if (response.getStatus() == 200) {
- try {
- byte[] serialized = response.getBlockingBody().readBytes();
- ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serialized));
- session = (HttpSession) in.readObject();
- registerSession(session);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException(cnfe);
- }
- }
- }
- return session;
- }
- public void saveSession(String sessionId) throws IOException {
- delegee.saveSession(sessionId);
- HttpSession session = delegee.getSession(sessionId);
- ByteArrayOutputStream bos = new ByteArrayOutputStream() ;
- ObjectOutputStream out = new ObjectOutputStream(bos) ;
- out.writeObject(session);
- out.close();
- byte[] serialized = bos.toByteArray();
- String id = URLEncoder.encode(session.getId());
- PostRequest storeRequest = new PostRequest("http://sessionservice:8080/?id=" + id + "&ttl=600", "application/octet-stream", serialized);
- httpClient.send(storeRequest, null);
- }
- public void removeSession(String sessionId) throws IOException {
- delegee.removeSession(sessionId);
- String id = URLEncoder.encode(sessionId);
- httpClient.call(new DeleteRequest("http://sessionservice:8080/?id=" + id));
- }
- public void close() throws IOException {
- delegee.close();
- }
- }
- class Server {
- public static void main(String[] args) throws Exception {
-
- HttpServer httpServer = new HttpServer(8030, new MySessionBasedRequestHandler());
-
- HttpClient sessionServerHttpClient = new HttpClient();
- LoadBalancerRequestInterceptor lbInterceptor = new LoadBalancerRequestInterceptor();
- InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("sessionSrv1", 5010), new InetSocketAddress("sessionSrv2", 5010)};
- lbInterceptor.addVirtualServer("sessionservice:8080", srvs);
- sessionServerHttpClient.addInterceptor(lbInterceptor);
-
- ISessionManager nativeSessionManager = httpServer.getSessionManager();
- BackupBasedSessionManager sessionManager = new BackupBasedSessionManager(sessionServerHttpClient, nativeSessionManager);
- httpServer.setSessionManager(sessionManager);
-
- httpServer.start();
- }
- }