《程序员》09.1期《服务器负载均衡架构之应用层负载均衡》源代码公布

转自:http://blog.csdn.net/programmer_editor/archive/2009/01/04/3705427.aspx
由于篇幅问题,《程序员》杂志09年第1期《服务器负载均衡架构之应用层负载均衡》文中的源代码没有在杂志上刊登,下面贴出其中提到的7段代码,供读者使用。

CodeFile_1.java
  1. class LoadBalancerHandler implements IHttpRequestHandler, ILifeCycle {
  2.  private final List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>();
  3.  private HttpClient httpClient;
  4.  /*
  5.  * this class does not implement server monitoring or healthiness checks
  6.  */
  7.  public LoadBalancerHandler(InetSocketAddress... srvs) {
  8.  servers.addAll(Arrays.asList(srvs));
  9.  }
  10.  public void onInit() {
  11.  httpClient = new HttpClient();
  12.  httpClient.setAutoHandleCookies(false);
  13. }
  14.  public void onDestroy() throws IOException {
  15.  httpClient.close();
  16.  }
  17.  public void onRequest(final IHttpExchange exchange) throws IOException {
  18.  IHttpRequest request = exchange.getRequest();
  19.  // determine the business server based on the id's hashcode
  20.  Integer customerId = request.getRequiredIntParameter("id");
  21.  int idx = customerId.hashCode() % servers.size();
  22.  if (idx < 0) {
  23.  idx *= -1;
  24.  }
  25.  // retrieve the business server address and update the Request-URL of the request
  26.  InetSocketAddress server = servers.get(idx);
  27.  URL url = request.getRequestUrl();
  28.  URL newUrl = new URL(url.getProtocol(), server.getHostName(), server.getPort(), url.getFile());
  29.  request.setRequestUrl(newUrl);
  30.  // proxy header handling (remove hop-by-hop headers, ...)
  31.  // ...
  32.  // create a response handler to forward the response to the caller
  33.  IHttpResponseHandler respHdl = new IHttpResponseHandler() {
  34.  @Execution(Execution.NONTHREADED)
  35.  public void onResponse(IHttpResponse response) throws IOException {
  36.  exchange.send(response);
  37.  }
  38.  @Execution(Execution.NONTHREADED)
  39.  public void onException(IOException ioe) throws IOException {
  40.  exchange.sendError(ioe);
  41.  }
  42.  };
  43.  // forward the request in a asynchronous way by passing over the response handler
  44.  httpClient.send(request, respHdl);
  45.  }
  46. }
  47. class LoadBalancer {
  48.  public static void main(String[] args) throws Exception {
  49.  InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("srv1"8030), new InetSocketAddress("srv2"8030)};
  50.  HttpServer loadBalancer = new HttpServer(8080new LoadBalancerHandler(srvs));
  51.  loadBalancer.run();
  52.  }
  53. }


CodeFile_2.java
  1. class CookieBasedLoadBalancerHandler implements IHttpRequestHandler, ILifeCycle {
  2.  private final List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>();
  3.  private int serverIdx = 0;
  4.  private HttpClient httpClient;
  5.  /*
  6.  * this class does not implement server monitoring or healthiness checks
  7.  */
  8.  public CookieBasedLoadBalancerHandler(InetSocketAddress... realServers) {
  9.  servers.addAll(Arrays.asList(realServers));
  10.  }
  11.  public void onInit() {
  12.  httpClient = new HttpClient();
  13.  httpClient.setAutoHandleCookies(false);
  14. }
  15.  public void onDestroy() throws IOException {
  16.  httpClient.close();
  17.  }
  18.  public void onRequest(final IHttpExchange exchange) throws IOException {
  19.  IHttpRequest request = exchange.getRequest();
  20.  IHttpResponseHandler respHdl = null;
  21.  InetSocketAddress serverAddr = null;
  22.  // check if the request contains the LB_SLOT cookie
  23.  cl : for (String cookieHeader : request.getHeaderList("Cookie")) {
  24.  for (String cookie : cookieHeader.split(";")) {
  25.  String[] kvp = cookie.split("=");
  26.  if (kvp[0].startsWith("LB_SLOT")) {
  27.  int slot = Integer.parseInt(kvp[1]);
  28.  serverAddr = servers.get(slot);
  29.  break cl;
  30.  }
  31.  }
  32.  }
  33.  // request does not contains the LB_SLOT -> select a server
  34.  if (serverAddr == null) {
  35.  final int slot = nextServerSlot();
  36.  serverAddr = servers.get(slot);
  37.  respHdl = new IHttpResponseHandler() {
  38.  @Execution(Execution.NONTHREADED)
  39.  public void onResponse(IHttpResponse response) throws IOException {
  40.  // set the LB_SLOT cookie
  41.  response.setHeader("Set-Cookie""LB_SLOT=" + slot + ";Path=/");
  42.  exchange.send(response);
  43.  }
  44.  @Execution(Execution.NONTHREADED)
  45.  public void onException(IOException ioe) throws IOException {
  46.  exchange.sendError(ioe);
  47.  }
  48.  };
  49.  } else {
  50.  respHdl = new IHttpResponseHandler() {
  51.  @Execution(Execution.NONTHREADED)
  52.  public void onResponse(IHttpResponse response) throws IOException {
  53.  exchange.send(response);
  54.  }
  55.  @Execution(Execution.NONTHREADED)
  56.  public void onException(IOException ioe) throws IOException {
  57.  exchange.sendError(ioe);
  58.  }
  59.  };
  60.  }
  61.  // update the Request-URL of the request
  62.  URL url = request.getRequestUrl();
  63.  URL newUrl = new URL(url.getProtocol(), serverAddr.getHostName(), serverAddr.getPort(), url.getFile());
  64.  request.setRequestUrl(newUrl);
  65.  // proxy header handling (remove hop-by-hop headers, ...)
  66.  // ...
  67.  // forward the request
  68.  httpClient.send(request, respHdl);
  69.  }
  70.  // get the next slot by using the using round-robin approach
  71.  private synchronized int nextServerSlot() {
  72.  serverIdx++;
  73.  if (serverIdx >= servers.size()) {
  74.  serverIdx = 0;
  75.  }
  76.  return serverIdx;
  77.  }
  78. }
  79. class LoadBalancer {
  80.  public static void main(String[] args) throws Exception {
  81.  InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("srv1"8030), new InetSocketAddress("srv2"8030)};
  82.  CookieBasedLoadBalancerHandler hdl = new CookieBasedLoadBalancerHandler(srvs);
  83.  HttpServer loadBalancer = new HttpServer(8080, hdl);
  84.  loadBalancer.run();
  85.  }


CodeFile_3.java
  1. class RedirectLoadBalancerHandler implements IHttpRequestHandler {
  2.  private final List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>();
  3.  /*
  4.  * this class does not implement server monitoring or healthiness checks
  5.  */
  6.  public RedirectLoadBalancerHandler(InetSocketAddress... realServers) {
  7.  servers.addAll(Arrays.asList(realServers));
  8.  }
  9.  @Execution(Execution.NONTHREADED)
  10.  public void onRequest(final IHttpExchange exchange) throws IOException, BadMessageException {
  11.  IHttpRequest request = exchange.getRequest();
  12.  // determine the business server based on the id´s hashcode
  13.  Integer customerId = request.getRequiredIntParameter("id");
  14.  int idx = customerId.hashCode() % servers.size();
  15.  if (idx < 0) {
  16.  idx *= -1;
  17.  }
  18.  // create a redirect response -> status 303
  19.  HttpResponse redirectResponse = new HttpResponse(303"text/html""<html>....");
  20.  // ... and add the location header
  21.  InetSocketAddress server = servers.get(idx);
  22.  URL url = request.getRequestUrl();
  23.  URL newUrl = new URL(url.getProtocol(), server.getHostName(), server.getPort(), url.getFile());
  24.  redirectResponse.setHeader("Location", newUrl.toString());
  25.  // send the redirect response
  26.  exchange.send(redirectResponse);
  27.  }
  28. }
  29. class Server {
  30.  public static void main(String[] args) throws Exception {
  31.  InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("srv1"8030), new InetSocketAddress("srv2"8030)};
  32.  RedirectLoadBalancerHandler hdl = new RedirectLoadBalancerHandler(srvs);
  33.  HttpServer loadBalancer = new HttpServer(8080, hdl);
  34.  loadBalancer.run();
  35.  }
  36. }
CodeFile_4.java
  1. class LoadBalancerRequestInterceptor implements IHttpRequestHandler, ILifeCycle {
  2.  private final List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>();
  3.  private InetSocketAddress localServer;
  4.  private HttpClient httpClient;
  5.  /*
  6.  * this class does not implement server monitoring or healthiness checks
  7.  */
  8.  public LoadBalancerRequestInterceptor(InetSocketAddress localeServer, InetSocketAddress... srvs) {
  9.  this.localServer = localeServer;
  10.  servers.addAll(Arrays.asList(srvs));
  11.  }
  12.  public void onInit() {
  13.  httpClient = new HttpClient();
  14.  httpClient.setAutoHandleCookies(false);
  15. }
  16.  public void onDestroy() throws IOException {
  17.  httpClient.close();
  18.  }
  19.  public void onRequest(final IHttpExchange exchange) throws IOException, BadMessageException {
  20.  IHttpRequest request = exchange.getRequest();
  21.  Integer customerId = request.getRequiredIntParameter("id");
  22.  int idx = customerId.hashCode() % servers.size();
  23.  if (idx < 0) {
  24.  idx *= -1;
  25.  }
  26.  InetSocketAddress server = servers.get(idx);
  27.  // local server?
  28.  if (server.equals(localServer)) {
  29.  exchange.forward(request);
  30.  // .. no
  31.  } else {
  32.  URL url = request.getRequestUrl();
  33.  URL newUrl = new URL(url.getProtocol(), server.getHostName(), server.getPort(), url.getFile());
  34.  request.setRequestUrl(newUrl);
  35.  // proxy header handling (remove hop-by-hop headers, ...)
  36.  // ...
  37.  IHttpResponseHandler respHdl = new IHttpResponseHandler() {
  38.  @Execution(Execution.NONTHREADED)
  39.  public void onResponse(IHttpResponse response) throws IOException {
  40.  exchange.send(response);
  41.  }
  42.  @Execution(Execution.NONTHREADED)
  43.  public void onException(IOException ioe) throws IOException {
  44.  exchange.sendError(ioe);
  45.  }
  46.  };
  47.  httpClient.send(request, respHdl);
  48.  }
  49.  }
  50. }
  51. class Server {
  52.  public static void main(String[] args) throws Exception {
  53.  RequestHandlerChain handlerChain = new RequestHandlerChain();
  54.  InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("srv1"8030), new InetSocketAddress("srv2"8030)};
  55.  handlerChain.addLast(new LoadBalancerRequestInterceptor(new InetSocketAddress("srv1"8030), srvs));
  56.  handlerChain.addLast(new CacheInterceptor(new LocalHttpResponseCache()));
  57.  handlerChain.addLast(new MyRequestHandler());
  58.  HttpServer httpServer = new HttpServer(8030, handlerChain);
  59.  httpServer.run();
  60.  }
  61. }
CodeFile_5.java
  1. class LoadBalancerRequestInterceptor implements IHttpRequestHandler, ILifeCycle {
  2.  private final Map<String, List<InetSocketAddress>> serverClusters = new HashMap<String, List<InetSocketAddress>>();
  3.  private HttpClient httpClient;
  4.  /*
  5.  * this class does not implement server monitoring or healthiness checks
  6.  */
  7.  public void addVirtualServer(String virtualServer, InetSocketAddress... realServers) {
  8.  serverClusters.put(virtualServer, Arrays.asList(realServers));
  9.  }
  10.  public void onInit() {
  11.  httpClient = new HttpClient();
  12.  httpClient.setAutoHandleCookies(false);
  13. }
  14.  public void onDestroy() throws IOException {
  15.  httpClient.close();
  16.  }
  17.  public void onRequest(final IHttpExchange exchange) throws IOException, BadMessageException {
  18.  IHttpRequest request = exchange.getRequest();
  19.  URL requestUrl = request.getRequestUrl();
  20.  String targetServer = requestUrl.getHost() + ":" + requesrUrl.getPort();
  21.  // handle a virtual address
  22.  for (Entry<String, List<InetSocketAddress>> serverCluster : serverClusters.entrySet()) {
  23.  if (targetServer.equals(serverCluster.getKey())) {
  24.  String id = request.getRequiredStringParameter("id");
  25.  int idx = id.hashCode() % serverCluster.getValue().size();
  26.  if (idx < 0) {
  27.  idx *= -1;
  28.  }
  29.  InetSocketAddress realServer = serverCluster.getValue().get(idx);
  30.  URL newUrl = new URL(requesrUrl.getProtocol(), realServer.getHostName(), realServer.getPort(), requesrUrl.getFile());
  31.  request.setRequestUrl(newUrl);
  32.  // proxy header handling (remove hop-by-hop headers, ...)
  33.  // ...
  34.  IHttpResponseHandler respHdl = new IHttpResponseHandler() {
  35.  @Execution(Execution.NONTHREADED)
  36.  public void onResponse(IHttpResponse response) throws IOException {
  37.  exchange.send(response);
  38.  }
  39.  @Execution(Execution.NONTHREADED)
  40.  public void onException(IOException ioe) throws IOException {
  41.  exchange.sendError(ioe);
  42.  }
  43.  };
  44.  httpClient.send(request, respHdl);
  45.  return;
  46.  }
  47.  }
  48.  // request address is not virtual one -> do nothing by forwarding request for standard handling
  49.  exchange.forward(request);
  50.  }
  51. }
  52. class SimpleTest {
  53.  public static void main(String[] args) throws Exception {
  54.  // start the servers
  55.  RequestHandlerChain handlerChain1 = new RequestHandlerChain();
  56.  handlerChain1.addLast(new CacheInterceptor(new LocalHttpResponseCache()));
  57.  handlerChain1.addLast(new MyRequestHandler());
  58.  HttpServer httpServer1 = new HttpServer(8040, handlerChain1);
  59.  httpServer1.start();
  60.  RequestHandlerChain handlerChain2 = new RequestHandlerChain();
  61.  handlerChain2.addLast(new CacheInterceptor(new LocalHttpResponseCache()));
  62.  handlerChain2.addLast(new MyRequestHandler());
  63.  HttpServer httpServer2 = new HttpServer(8030, handlerChain2);
  64.  httpServer2.start();
  65.  // create the client
  66.  HttpClient httpClient = new HttpClient();
  67.  // ... and add the load balancer interceptor
  68.  LoadBalancerRequestInterceptor lbInterceptor = new LoadBalancerRequestInterceptor();
  69.  InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("localhost"8030), new InetSocketAddress("localhost"8030) };
  70.  lbInterceptor.addVirtualServer("customerService:8080", srvs);
  71.  httpClient.addInterceptor(lbInterceptor);
  72.  // run some tests
  73.  GetRequest request = new GetRequest("http://customerService:8080/price?id=2336&amount=5656");
  74.  IHttpResponse response = httpClient.call(request);
  75.  assert (response.getHeader("X-Cached") == null);
  76.  request = new GetRequest("http://customerService:8080/price?id=2336&amount=5656");
  77.  response = httpClient.call(request);
  78.  assert (response.getHeader("X-Cached").equals("true"));
  79.  request = new GetRequest("http://customerService:8080/price?id=2337&amount=5656");
  80.  response = httpClient.call(request);
  81.  assert (response.getHeader("X-Cached") == null);
  82.  request = new GetRequest("http://customerService:8080/price?id=2337&amount=5656");
  83.  response = httpClient.call(request);
  84.  assert (response.getHeader("X-Cached").equals("true"));
  85.  // ...
  86.  }
  87. }
CodeFile_6.java
  1. class MySessionBasedRequestHandler implements IHttpRequestHandler {
  2.  @SynchronizedOn(SynchronizedOn.SESSION)
  3.  public void onRequest(IHttpExchange exchange) throws IOException {
  4.  IHttpRequest request = exchange.getRequest();
  5.  IHttpSession session = exchange.getSession(true);
  6.  //..
  7.  Integer countRequests = (Integer) session.getAttribute("count");
  8.  if (countRequests == null) {
  9.  countRequests = 1;
  10.  } else {
  11.  countRequests++;
  12.  }
  13.  session.setAttribute("count", countRequests);
  14.  // and return the response
  15.  exchange.send(new HttpResponse(200"text/plain""count=" + countRequests));
  16.  }
  17. }
  18. class Server {
  19.  public static void main(String[] args) throws Exception {
  20.  HttpServer httpServer = new HttpServer(8030new MySessionBasedRequestHandler());
  21.  httpServer.run ();
  22.  }
  23. }
CodeFile_7.java

  1. class BackupBasedSessionManager implements ISessionManager {
  2.  private ISessionManager delegee = null;
  3.  private HttpClient httpClient = null;
  4.  public BackupBasedSessionManager(HttpClient httpClient, ISessionManager delegee) {
  5.  this.httpClient = httpClient;
  6.  this.delegee = delegee;
  7.  }
  8.  public boolean isEmtpy() {
  9.  return delegee.isEmtpy();
  10.  }
  11.  public String newSession(String idPrefix) throws IOException {
  12.  return delegee.newSession(idPrefix);
  13.  }
  14.  public void registerSession(HttpSession session) throws IOException {
  15.  delegee.registerSession(session);
  16.  }
  17.  public HttpSession getSession(String sessionId) throws IOException {
  18.  HttpSession session = delegee.getSession(sessionId);
  19.  // session not available? -> try to get it from the backup location
  20.  if (session == null) {
  21.  String id = URLEncoder.encode(sessionId);
  22.  IHttpResponse response = httpClient.call(new GetRequest("http://sessionservice:8080/?id=" + id));
  23.  if (response.getStatus() == 200) {
  24.  try {
  25.  byte[] serialized = response.getBlockingBody().readBytes();
  26.  ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serialized));
  27.  session = (HttpSession) in.readObject();
  28.  registerSession(session);
  29.  } catch (ClassNotFoundException cnfe) {
  30.  throw new IOException(cnfe);
  31.  }
  32.  }
  33.  }
  34.  return session;
  35.  }
  36.  public void saveSession(String sessionId) throws IOException {
  37.  delegee.saveSession(sessionId);
  38.  HttpSession session = delegee.getSession(sessionId);
  39.  ByteArrayOutputStream bos = new ByteArrayOutputStream() ;
  40.  ObjectOutputStream out = new ObjectOutputStream(bos) ;
  41.  out.writeObject(session);
  42.  out.close();
  43.  byte[] serialized = bos.toByteArray();
  44.  String id = URLEncoder.encode(session.getId());
  45.  PostRequest storeRequest = new PostRequest("http://sessionservice:8080/?id=" + id + "&ttl=600""application/octet-stream", serialized);
  46.  httpClient.send(storeRequest, null); // send the store request asynchronous and ignore result
  47.  }
  48.  public void removeSession(String sessionId) throws IOException {
  49.  delegee.removeSession(sessionId);
  50.  String id = URLEncoder.encode(sessionId);
  51.  httpClient.call(new DeleteRequest("http://sessionservice:8080/?id=" + id));
  52.  }
  53.  public void close() throws IOException {
  54.  delegee.close();
  55.  }
  56. }
  57. class Server {
  58.  public static void main(String[] args) throws Exception {
  59.  // set the server's handler
  60.  HttpServer httpServer = new HttpServer(8030new MySessionBasedRequestHandler());
  61.  // create a load balanced http client instance
  62.  HttpClient sessionServerHttpClient = new HttpClient();
  63.  LoadBalancerRequestInterceptor lbInterceptor = new LoadBalancerRequestInterceptor();
  64.  InetSocketAddress[] srvs = new InetSocketAddress[] { new InetSocketAddress("sessionSrv1"5010), new InetSocketAddress("sessionSrv2"5010)};
  65.  lbInterceptor.addVirtualServer("sessionservice:8080", srvs);
  66.  sessionServerHttpClient.addInterceptor(lbInterceptor);
  67.  // wrap the local built-in session manager by backup aware session manager
  68.  ISessionManager nativeSessionManager = httpServer.getSessionManager();
  69.  BackupBasedSessionManager sessionManager = new BackupBasedSessionManager(sessionServerHttpClient, nativeSessionManager);
  70.  httpServer.setSessionManager(sessionManager);
  71.  // start the server
  72.  httpServer.start();
  73.  }
  74. }



posted on 2009-01-06 10:13 .VwV. 阅读(236) 评论(0)  编辑  收藏


只有注册用户登录后才能发表评论。


网站导航:
 
<2025年1月>
2930311234
567891011
12131415161718
19202122232425
2627282930311
2345678

导航

统计

常用链接

留言簿

随笔档案

文章分类

文章档案

搜索

最新评论

阅读排行榜

评论排行榜