ZooKeeper是一个优秀的协调服务, 目前是Hadoop的一个子项目. 我们可以用它来为我们的服务提供配置中心, 注册中心, 分布式同步锁, 消息队列等服务, 更多信息请浏览 http://hadoop.apache.org/zookeeper/
上篇文章中实现一个CXF的负载均衡服务, 本次我们使用ZooKeeper来为我们的服务提供动态服务器列表, 以便把客户端的调用分配到各个有效的服务上去.
动态更新服务列表有2种方法
* 定时去获取数据, 更新我们的数据 --- 通用
* 使用ZooKeeper的watch特性, 有服务器加入/退出时我们自动获取通知 --- 适用于有消息通知机制的
首先我们的HelloService部分要向ZooKeeper注册
只有注册到ZooKeeper上, 我们才知道你可以提供这个服务. 在实际环境中, 需要每个服务都需要向ZooKeeper注册 ()
注册代码如下:
private void register2Zookeeper(String address) throws Exception
{
ZooKeeper zk = new ZooKeeper(zkAddress, 3000, null);
GroupMemberCenter gmc = new GroupMemberCenter();
gmc.setZooKeeper(zk);
gmc.createAndSetGroup(groupName);
gmc.joinGroupByDefine(address);
System.out.println("register service to zookeeper: " + address);
}
GroupMemberCenter是一个辅助类, 代码如下:
/**
* Dynamic member center.
* <p/>
* The member maybe leave or dead dynamiclly.
*
*
* @author: Felix Zhang Date: 2010-9-30 17:58:16
*/
public class GroupMemberCenter
{
public static final String ESCAPE_PREFIX = "|||";
private static final Log log = LogFactory.getLog(GroupMemberCenter.class);
private static final List<String> EMPTY_MEMBERS = new ArrayList<String>(0);
private ZooKeeper zk;
private String group = "";
private String me = "";
public void setZooKeeper(ZooKeeper zk)
{
this.zk = zk;
}
public void setGroup(String groupName)
{
if (groupName != null && groupName.length() > 0)
{
if (!groupName.startsWith("/"))
{
groupName = "/" + groupName;
}
this.group = groupName;
}
}
public boolean createAndSetGroup(String groupName)
{
boolean result = createGroup(groupName);
if (result)
{
setGroup(groupName);
}
return result;
}
public boolean createGroup(String groupName)
{
assert groupName != null;
if (!groupName.startsWith("/"))
{
groupName = "/" + groupName;
}
try
{
Stat s = zk.exists(groupName, false);
if (s == null)
{
zk.create(groupName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
catch (Exception e)
{
log.error("create group error: " + groupName, e);
return false;
}
return true;
}
protected String buildName(String name)
{
return group + "/" + name;
}
public boolean joinGroup()
{
return joinGroup(null);
}
public boolean joinGroup(Integer port)
{
try
{
//use ipaddress as default, if you will use different ipaddress, you need joinGroup(yourip)
me = InetAddress.getLocalHost().getHostAddress();
return joinGroupByDefine(me + ":" + port);
}
catch (Exception e)
{
log.error("join group error", e);
return false;
}
}
public boolean joinGroupByDefine(String userdefine)
{
assert userdefine != null;
assert userdefine.length() > 0;
try
{
me = userdefine;
if (me.contains("[host]"))
{
String host = InetAddress.getLocalHost().getHostAddress();
me = me.replaceFirst("\\[host\\]", host);
}
//if contains "/", how to deal? --- maybe we need more format in future
me = ESCAPE_PREFIX + URLEncoder.encode(me, "UTF-8");
zk.create(buildName(me), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
catch (Exception e)
{
log.error("join group error: " + me, e);
return false;
}
return true;
}
public void leaveGroup()
{
try
{
zk.delete(buildName(me), 0);
}
catch (Exception e)
{
log.error("leave group error: " + me, e);
}
}
public List<String> fetchGroupMembers()
{
return fetchGroupMembers(group, null);
}
public List<String> fetchGroupMembers(String groupName)
{
return fetchGroupMembers(groupName, null);
}
public List<String> fetchGroupMembers(String groupName, Watcher watcher)
{
if (groupName != null && groupName.length() > 0)
{
if (!groupName.startsWith("/"))
{
groupName = "/" + groupName;
}
}
else
{
return EMPTY_MEMBERS;
}
try
{
List<String> childlist;
if(watcher == null)
{
childlist = zk.getChildren(groupName, false);
}
else
{
childlist = zk.getChildren(groupName, watcher);
}
List<String> lastresult = new ArrayList<String>();
for (String item : childlist)
{
if (item.startsWith(ESCAPE_PREFIX))
{
lastresult.add(URLDecoder.decode(item, "UTF-8").substring(3));
}
else
{
lastresult.add(item);
}
}
return lastresult;
}
catch (Exception e)
{
log.error("fetch group members error", e);
return EMPTY_MEMBERS;
}
}
}
GroupMemberCenter主要是把用户的address信息做一下转义然后在ZooKeeper中创建一个节点, 注册时使用 CreateMode.EPHEMERAL 模式, 也就是类似心跳监测, 如果服务挂掉, 那么ZooKeeper会自动删除此节点.
为了方便测试, 编写3个启动服务的程序来模拟多台机器, 启动的都是Hello服务, 只是端口不一样而已:
public class HelloServiceServer5Zookeeper1 {
public static void main(String[] args) throws Exception {
new HelloServicePublisher5Zookeeper().start("http://localhost:8081/service/Hello", new HelloFirstImpl());
}
}
其他2个请自己看源码包.
下面我们来准备Client, 代码和上篇文章中的一样, 首先是一个XML:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jaxws="http://cxf.apache.org/jaxws"
xmlns:clustering="http://cxf.apache.org/clustering"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="loadBalanceStrategy" class="org.javascud.extensions.cxf.RandomLoadBalanceStrategy">
<property name="removeFailedEndpoint" value="true" />
</bean>
<bean id="loadBalanceFeature" class="org.javascud.extensions.cxf.LoadBalanceFeature">
<property name="strategy" ref="loadBalanceStrategy" />
</bean>
<jaxws:client name="helloClient"
serviceClass="org.javascud.extensions.cxf.service.Hello" >
<jaxws:features>
<ref bean="loadBalanceFeature" />
</jaxws:features>
</jaxws:client>
<bean id="zooKeeper" class="org.apache.zookeeper.ZooKeeper">
<constructor-arg index="0" value="127.0.0.1:2181" />
<constructor-arg index="1" value="3000" />
<constructor-arg index="2" ><null/></constructor-arg>
</bean>
</beans>
XML没有写任何服务的网址, 后面的程序负责更新服务列表. 此XML定义了一个ZooKeeper客户端, 你可以根据自己的实际情况修改, 例如ZooKeeper本身也可以是负载均衡的 (一般为3台服务器, 方便投票).
调用的Java代码如下:
ClassPathXmlApplicationContext context
= new ClassPathXmlApplicationContext(new String[]
{"org/javascud/extensions/cxf/zookeeper/client/loadbalance_fail_zookeeper.xml"});
final Hello client = (Hello) context.getBean("helloClient");
final AbstractLoadBalanceStrategy strategy = (AbstractLoadBalanceStrategy) context.getBean("loadBalanceStrategy");
Client myclient = ClientProxy.getClient(client);
String address = myclient.getEndpoint().getEndpointInfo().getAddress();
System.out.println(address);
ZooKeeper zk = (ZooKeeper) context.getBean("zooKeeper");
//使用定时刷新的方式更新服务列表: 实际代码中可以写一个单独的类来调用
ServiceEndpointsFetcher fetcher = new ServiceEndpointsFetcher();
fetcher.setStrategy(strategy);
fetcher.setZooKeeper(zk);
fetcher.setGroupName(groupName);
fetcher.start();
//调用服务
for (int i = 1; i <= 1000; i++) {
String result1 = client.sayHello("Felix" + i);
System.out.println("Call " + i + ": " + result1);
int left = strategy.getAlternateAddresses(null).size();
System.out.println("================== left " + left + " ===========================");
Thread.sleep(100);
}
查看上面的代码可以发现, 我们使用了ServiceEndpointsFetcher来刷新, 间隔固定的时间去获取最新的服务列表.
我们还可以采用观察者方式来更新服务列表:
/**
* watcher service from zookeeper.
*
* @author Felix Zhang Date:2010-10-16 01:13
*/
public class ServiceEndpointsWatcher extends ZooKeeperChildrenWatcher {
private AbstractLoadBalanceStrategy strategy;
public void setStrategy(AbstractLoadBalanceStrategy strategy) {
this.strategy = strategy;
}
@Override
protected void updateData(List<String> members) {
strategy.setAlternateAddresses(members);
}
}
ZooKeeperChildrenWatcher是一个父类, 调用GroupMemberCenter的代码来监测ZooKeeper上的对应节点:
/**
* a Watcher for monitor zookeeper by getChildren
*
* @author Felix Zhang Date:2010-10-16 14:39
*/
public abstract class ZooKeeperChildrenWatcher implements Watcher {
private ZooKeeper zooKeeper;
private String groupName;
private GroupMemberCenter gmc = null;
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
@Override
public void process(WatchedEvent event) {
fetchAndUpdate();
}
private void fetchAndUpdate() {
//get children and register watcher again
List<String> members = gmc.fetchGroupMembers(groupName, this);
updateData(members);
}
protected abstract void updateData(List<String> members);
public void init() {
if (zooKeeper != null) {
gmc = new GroupMemberCenter();
gmc.setZooKeeper(zooKeeper);
fetchAndUpdate();
}
}
}
调用ServiceEndpointsWatcher的代码是在Spring的XML中, 当然也可以在单独程序中调用:
<bean id="serviceEndpointsWatcher"
class="org.javascud.extensions.cxf.zookeeper.ServiceEndpointsWatcher"
init-method="init">
<property name="strategy" ref="loadBalanceStrategy" />
<property name="zooKeeper" ref="zooKeeper" />
<property name="groupName" value="helloservice" />
</bean>
ok, 下面我们启动ZooKeeper, 在2181端口. 然后其次启动三个HelloService: HelloServiceServer5Zookeeper1, HelloServiceServer5Zookeeper2, HelloServiceServer5Zookeeper3, 它们分别监测在8081, 8082, 8083端口, 并且会向ZooKeeper注册, 你查看用ZooKeeper的客户端查看 ls /helloservice, 应该可以看到三个节点.
然后我们运行客户端代码 HelloClient5Zookeeper, 在客户端运行的过程中, 我们可以终止/启动HelloService, 就可以看到我们的程序会动态地访问不同的HelloService, 达到了负载均衡的目的.
注: ServiceEndpointsWatcher 或ServiceEndpointsFetcher 一定现行运行, 否则调用服务的部分会抛出异常, 因为没有可用的服务地址.
代码打包下载: http://cnscud.googlecode.com/files/extensions-20101016.zip
SVN 源码位置: http://cnscud.googlecode.com/svn/trunk/extensions/
转载请注明作者和出处 http://scud.blogjava.net