注:以下Cache未加时效处理,是比较简陋的Cache方案。
AbstractCacheMap基类,用来定义和限制子类的操作:
package cachemap.base;
/**
* Parent class of InnerCacheMap & OutterCacheMap
*
* @author heyang
* @time Sep 23, 2011,4:00:45 PM
*/
public abstract class AbstractCacheMap {
// Read-Write lock,for protecting the cacheMap in the Multi-thread environment
private ReadWriteLock lock;
/**
* Contructor
*/
public AbstractCacheMap() {
lock = new ReadWriteLock();
}
/**
* Put key-value pair into cacheMap
* It can be called by any class
* @param key
* @param value
* @throws Exception
*/
public void writeData(String key,byte[] value) throws Exception{
try {
lock.writeLock();
set(key,value);
} finally {
lock.writeUnlock();
}
}
/**
* Put key-value pair into cacheMap,force child-class to implement.
* It only can be called by child-class
* @param key
* @param value
*/
protected abstract void set(String key,byte[] value) throws Exception;
/**
* Get value by it's key
* It can be called by any class
*
* @param key
* @return
* @throws Exception
*/
public byte[] readData(String key) throws Exception{
try {
lock.readLock();
return get(key);
} finally {
lock.readUnlock();
}
}
/**
* Get value by it's key,force child-class to implement.
* It only can be called by child-class
*
* @param key
* @return
* @throws Exception
*/
protected abstract byte[] get(String key) throws Exception;
/**
* Judge the existence of a key
* It can be called by any class
*
* @param key
* @return
* @throws Exception
*/
public boolean containsKey(String key) throws Exception{
try {
lock.readLock();
return contains(key);
} finally {
lock.readUnlock();
}
}
/**
* Judge the existence of a key,force child-class to implement.
* It only can be called by child-class
*
* @param key
* @return
* @throws Exception
*/
protected abstract boolean contains(String key) throws Exception;
/**
* Remove a key-value pair from cacheMap by it's key
* It can be called by any class
*
* @param key
* @throws Exception
*/
public void removeData(String key) throws Exception{
try {
lock.writeLock();
remove(key);
} finally {
lock.writeUnlock();
}
}
/**
* Remove a key-value pair from cacheMap by it's key
* It only can be called by child-class
*
* @param key
* @return
* @throws Exception
*/
protected abstract void remove(String key) throws Exception;
/**
* Remove all data in the cacheMap
* It can be called by any class
*
* @throws Exception
*/
public void removeAllData() throws Exception{
try {
lock.writeLock();
removeAll();
} finally {
lock.writeUnlock();
}
}
/**
* Remove all data in the cacheMap
* It only can be called by child-class
*
* @param key
* @return
* @throws Exception
*/
protected abstract void removeAll() throws Exception;
}
与AbstractCacheMap类配合使用,用来防止读写线程冲突和线程拥挤的读写锁类:
package cachemap.base;
/**
* Parent class of InnerCacheMap & OutterCacheMap
*
* @author heyang
* @time Sep 23, 2011,4:00:45 PM
*/
public abstract class AbstractCacheMap {
// Read-Write lock,for protecting the cacheMap in the Multi-thread environment
private ReadWriteLock lock;
/**
* Contructor
*/
public AbstractCacheMap() {
lock = new ReadWriteLock();
}
/**
* Put key-value pair into cacheMap
* It can be called by any class
* @param key
* @param value
* @throws Exception
*/
public void writeData(String key,byte[] value) throws Exception{
try {
lock.writeLock();
set(key,value);
} finally {
lock.writeUnlock();
}
}
/**
* Put key-value pair into cacheMap,force child-class to implement.
* It only can be called by child-class
* @param key
* @param value
*/
protected abstract void set(String key,byte[] value) throws Exception;
/**
* Get value by it's key
* It can be called by any class
*
* @param key
* @return
* @throws Exception
*/
public byte[] readData(String key) throws Exception{
try {
lock.readLock();
return get(key);
} finally {
lock.readUnlock();
}
}
/**
* Get value by it's key,force child-class to implement.
* It only can be called by child-class
*
* @param key
* @return
* @throws Exception
*/
protected abstract byte[] get(String key) throws Exception;
/**
* Judge the existence of a key
* It can be called by any class
*
* @param key
* @return
* @throws Exception
*/
public boolean containsKey(String key) throws Exception{
try {
lock.readLock();
return contains(key);
} finally {
lock.readUnlock();
}
}
/**
* Judge the existence of a key,force child-class to implement.
* It only can be called by child-class
*
* @param key
* @return
* @throws Exception
*/
protected abstract boolean contains(String key) throws Exception;
/**
* Remove a key-value pair from cacheMap by it's key
* It can be called by any class
*
* @param key
* @throws Exception
*/
public void removeData(String key) throws Exception{
try {
lock.writeLock();
remove(key);
} finally {
lock.writeUnlock();
}
}
/**
* Remove a key-value pair from cacheMap by it's key
* It only can be called by child-class
*
* @param key
* @return
* @throws Exception
*/
protected abstract void remove(String key) throws Exception;
/**
* Remove all data in the cacheMap
* It can be called by any class
*
* @throws Exception
*/
public void removeAllData() throws Exception{
try {
lock.writeLock();
removeAll();
} finally {
lock.writeUnlock();
}
}
/**
* Remove all data in the cacheMap
* It only can be called by child-class
*
* @param key
* @return
* @throws Exception
*/
protected abstract void removeAll() throws Exception;
}
用来往CacheMap中异步添加值的CacheMapSetter类:
package cachemap.setter;
import cachemap.base.AbstractCacheMap;
/**
* CacheMapSetter
* It's use is to set a key-value pair into cacheMap
*
* @author heyang
* @time Sep 26, 2011,10:11:36 AM
*/
public final class CacheMapSetter implements Runnable{
// The reference to the cacheMap
private AbstractCacheMap cacheMap;
private String key;
private byte[] value;
/**
* Constuctor
* @param cacheMap
* @param key
* @param value
*/
public CacheMapSetter(AbstractCacheMap cacheMap,String key,byte[] value){
this.cacheMap=cacheMap;
this.key=key;
this.value=value;
new Thread(this).start();
}
@Override
public void run(){
try{
cacheMap.writeData(key, value);
}
catch(Exception ex){
ex.printStackTrace();
}
}
}
AbstractCacheMap的实际子类InnerCacheMap,存储空间使用的是内置的HashMap:
package cachemap;
import java.util.HashMap;
import java.util.Map;
import cachemap.base.AbstractCacheMap;
/**
* CacheMap used local HashMap
*
* @author heyang
* @time Sep 23, 2011,3:48:17 PM
*/
public class InnerCacheMap extends AbstractCacheMap{
// essential storage
private Map<String,byte[]> map;
/**
* Contructor
*/
public InnerCacheMap(){
super();
map=new HashMap<String,byte[]>();
}
@Override
protected byte[] get(String key) throws Exception {
return map.get(key);
}
@Override
protected void set(String key, byte[] value) throws Exception {
map.put(key, value);
}
@Override
protected boolean contains(String key) throws Exception {
return map.containsKey(key);
}
@Override
protected void remove(String key) throws Exception {
map.remove(key);
}
@Override
protected void removeAll() throws Exception {
map.clear();
}
}
AbstractCacheMap的子类WxsCacheMap,使用的是外部的WebSphere Extreme Scale作为存储空间:
package cachemap;
import cachemap.base.AbstractCacheMap;
import com.devwebsphere.wxsutils.WXSMap;
import com.devwebsphere.wxsutils.WXSUtils;
/**
* CacheMap powered by WebSphere eXtreme Scale
*
* @author heyang
* @time Sep 23, 2011,3:47:54 PM
*/
public class WxsCacheMap extends AbstractCacheMap{
// essential storage
private WXSMap<String, byte[]> wxsMap;
/**
* Contructor
*/
public WxsCacheMap(String host,String grid,String businessObjectName){
super();
try{
WXSUtils wxsUtils = WxsConnection.getWxsConnection(host,grid);
wxsMap=getMap(wxsUtils,businessObjectName);
}catch(Exception ex){
ex.printStackTrace();
}
}
/**
* Get map from WXSUtils
* @param wxsUtils
* @param strBusinessObject
* @return
*/
private WXSMap<String, byte[]> getMap(WXSUtils wxsUtils, String strBusinessObject) {
if (strBusinessObject.equalsIgnoreCase("CUSTOMER")){
return wxsUtils.getCache("BO_CUSTOMER");
}else if (strBusinessObject.equalsIgnoreCase("VEHICLE")){
return wxsUtils.getCache("BO_VEHICLE");
}else if (strBusinessObject.equalsIgnoreCase("NAVIGATION")){
return wxsUtils.getCache("NAV_NAVIGATION");
}else{
return null;
}
}
@Override
protected byte[] get(String key) throws Exception {
return wxsMap.get(key);
}
@Override
protected void set(String key, byte[] value) throws Exception {
wxsMap.put(key, value);
}
@Override
protected boolean contains(String key) throws Exception {
return wxsMap.contains(key);
}
@Override
protected void remove(String key) throws Exception {
wxsMap.remove(key);
}
@Override
protected void removeAll() throws Exception {
wxsMap.clear();
}
}
与WxsCacheMap配合使用的WxsConnection类:
package cachemap;
import com.devwebsphere.wxsutils.WXSUtils;
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridRuntimeException;
import com.ibm.websphere.objectgrid.TargetNotAvailableException;
public class WxsConnection {
static WXSUtils utils;
static ObjectGrid grid;
/**
* @param strCatalogServerEndPoint: Catalog Server End Point example: localhost:2809
* @param strGrid: Grid name
* @return
*/
public static synchronized WXSUtils getWxsConnection(String strCatalogServerEndPoint,String strGrid) {
if (utils == null) {
grid = WXSUtils.connectClient(strCatalogServerEndPoint,strGrid);
utils = new WXSUtils(grid);
}
return utils;
}
public static void handleException(ObjectGridRuntimeException e) {
Throwable ne = e.getCause();
if (ne instanceof TargetNotAvailableException) {
// bad target (maybe ip changed)
WXSUtils oldUtils;
synchronized (WxsConnection.class) {
oldUtils = utils;
utils = null;
}
oldUtils.getObjectGrid().destroy();
}
}
}
用来从外界得到CacheMap的CacheFactory类:
package cachemap;
import cachemap.base.AbstractCacheMap;
/**
* CacheMapFacory
* It' used to get the actual cacheMap
*
* @author heyang
* @time Sep 26, 2011,10:41:39 AM
*/
public final class CacheMapFacory{
/**
* getInnerCacheMap
* @return
* @throws Exception
*/
public static AbstractCacheMap getInnerCacheMap() throws Exception{
return new InnerCacheMap();
}
/**
* getWxsCacheMap
* @return
* @throws Exception
*/
public static AbstractCacheMap getWxsCacheMap() throws Exception{
return new WxsCacheMap("host","grid","VEHICLE");
}
}
JavaCompute节点中使用Cache的Java代码:
import cachemap.CacheMapFacory;
import cachemap.base.AbstractCacheMap;
import cachemap.setter.CacheMapSetter;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbElement;
import com.ibm.broker.plugin.MbException;
import com.ibm.broker.plugin.MbMessage;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;
/**
* subflow_JavaCompute class
*
* @author heyang
* @time Sep 26, 2011,10:14:34 AM
*/
public class subflow_JavaCompute extends MbJavaComputeNode {
private static AbstractCacheMap cacheMap;
static{
try {
cacheMap=CacheMapFacory.getInnerCacheMap();
} catch (Exception e) {
e.printStackTrace();
}
}
public void evaluate(MbMessageAssembly inAssembly) throws MbException {
/****************************
* 1.out&alt
***************************/
MbOutputTerminal out = getOutputTerminal("out");
//MbOutputTerminal alt = getOutputTerminal("alternate");
/****************************
* 2.inMsg& outMsg
***************************/
MbMessage inMsg = inAssembly.getMessage();
MbMessage outMsg = new MbMessage();
/****************************
* 3.input&output environment
***************************/
MbMessage inputEnv=inAssembly.getLocalEnvironment();
MbMessage outputEnv=new MbMessage(inAssembly.getLocalEnvironment());
/****************************
* 4.outAssembly
***************************/
MbMessageAssembly outAssembly=new MbMessageAssembly(inAssembly,outputEnv,inAssembly.getExceptionList(),outMsg);
try{
// Copy Headers
copyMessageHeaders(inMsg,outMsg);
// Get key
String key=getKey(inputEnv);
// Get operation
String operation=getOperation(inputEnv);
if("READ".equalsIgnoreCase(operation)){
if(cacheMap.containsKey(key)){
byte[] value=cacheMap.readData(key);
// TODO:
MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
MbElement elementValue = elementOutputVariables.getFirstElementByPath("Value");
if(elementValue==null){
elementValue = elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Value", "");
}
elementValue.setValue(value);
elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Status", "SUCCESS");
elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Message", "CACHE Record Found");
}else{
// TODO:
MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Status", "FAILURE");
elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Message", "No Record Found");
}
}else if("WRITE".equalsIgnoreCase(operation)){
if(cacheMap.containsKey(key)){
cacheMap.removeData(key);
}
byte[] value=getValue(outputEnv);
new CacheMapSetter(cacheMap,key, value);
// TODO:
MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
MbElement elementValue = elementOutputVariables.getFirstElementByPath("Value");
if(elementValue==null){
elementValue = elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Value", "");
}
elementValue.detach();
elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Status", "SUCCESS");
elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Message", "Record Added to Cache");
}else if("REMOVEALL".equalsIgnoreCase(operation)){
cacheMap.removeAllData();
// TODO:
MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
MbElement elementValue = elementOutputVariables.getFirstElementByPath("Value");
if(elementValue==null){
elementValue = elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Value", "");
}
elementValue.detach();
elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Status", "SUCCESS");
elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Message", "Rmoved all data from Cache");
}else{
throw new Exception("Unknown Operation:"+operation);
}
// Send Back
out.propagate(outAssembly);
}
catch(Exception ex){
ex.printStackTrace();
}
}
//private
/**
* get operation
* get operation from input environment
*
* @param inputEnv
* @return
* @throws Exception
*/
private String getOperation(MbMessage inputEnv) throws Exception{
MbElement elementInputEnvironment = inputEnv.getRootElement();
MbElement elementOperation = elementInputEnvironment.getFirstElementByPath("Variables/Operation");
String strOperation = elementOperation.getValueAsString();
return strOperation;
}
/**
* getKey
* get key from input environment
*
* @param inputEnv
* @return
* @throws Exception
*/
private String getKey(MbMessage inputEnv) throws Exception{
MbElement elementInputEnvironment = inputEnv.getRootElement();
MbElement elementKey = elementInputEnvironment.getFirstElementByPath("Variables/BOKey");
String strKey = elementKey.getValueAsString();
return strKey;
}
/**
* getValue
* get value from output environment
*
* @param outputEnv
* @return
* @throws Exception
*/
private byte[] getValue(MbMessage outputEnv) throws Exception{
MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
MbElement elementValue = elementOutputVariables.getFirstElementByPath("Value");
byte[] byteInputObject = (byte[])elementValue.getValue();
return byteInputObject;
}
/**
* copyMessageHeaders
* @param inMessage
* @param outMessage
* @throws MbException
*/
private void copyMessageHeaders(MbMessage inMessage, MbMessage outMessage) throws MbException {
MbElement outRoot = outMessage.getRootElement();
// iterate though the headers starting with the first child of the root element
MbElement header = inMessage.getRootElement().getFirstChild();
while (header != null && header.getNextSibling() != null) // stop before the last child of the body
{
// copy the header and add it to the out message
outRoot.addAsLastChild(header.copy());
// move along to next header
header = header.getNextSibling();
}
}
}
源码下载:
http://www.blogjava.net/Files/heyang/SimpleCacheInJavaComputeNode_01.rar