摘要: 这里演示的是Spring中使用AspectJ注解和XML配置两种方式实现AOP
下面是使用AspectJ注解实现AOP的Java Project
首先是位于classpath下的applicationContext.xml文件
Code highlighting produced by Actipro CodeHighlighter (freeware)http://www....
阅读全文
访问量上升,数据库压力大,怎么办?好办法是在中间挡一层缓存!这个缓存要求高效,不能比数据库慢,否则服务质量受影响;如果能把数据用hash打散存储到硬盘,也是可以的,不过在内存越来越便宜的今天,还是使用内存吧!
mysql也有自己的缓存,也是存储在内存的,但是有一个说法是:
以下是引用片段:
只能有一个实例 意味着你能存储内容的上限就是你服务器的可用内存,一台服务器能有多少内存?你又能存多少呢?
只要有写操作,mysql的query cache就失效 只要数据库内容稍有改变,那怕改变的是其他行,mysql的query cache也会失效 |
再说,如果mysql都抗不住了,怎么还能指望它提供的缓存呢?
所以我可以使用memcached了!他的好处和如何用可以参考:
开发时面对需求是个麻烦事,更漫长而闹心的是维护,所以我更关心的是memcached运行中的情况。还好的是,memcached的作者给我们提供查看运行情况的命令。主要是“stats”,使用方法为 “telnet ip 端口号”,登录后使用“stats”命令。
然后你可以看见很多内容,具体可以参考:《memcacche stats》
以下是引用片段:
pid = process id uptime = number of seconds since the process was started time = current time version = memcached version rusage_user = seconds the cpu has devoted to the process as the user rusage_system = seconds the cpu has devoted to the process as the system curr_items = total number of items currently in memcache total_items = total number of items that have passed through the cache bytes = total number of bytes currently in use by curr_items curr_connections = total number of open connections to memcached connection_structures = ??? cmd_get = total GET commands issued to the server cmd_set = total SET commands issued to the server get_hits = total number of times a GET command was able to retrieve and return data get_misses = total number of times a GET command was unable to retrieve and return data bytes_read = total number of bytes input into the server bytes_written = total number of bytes written by the server limit_maxbytes = total storage bytes available to the server. |
着重说一下几个对观测很有用的项。
limit_maxbytes、bytes
memcached在存储的时候是可以设置失效时间的,但如果存储已经满了,那旧数据即使没有到过期时间,也会被移除。所以需要观察memcached存储是否已经满了,同时这对扩容也是有意义的参考。limit_maxbytes即总的存储大小,而bytes就是已经使用的大小,从这两个数据就可以看出在memcached启动时,我们为它分配的内存是否足够使用。
cmd_get、cmd_set
memcached启动后,我们对它一共做了多少次读取操作呢?从这两个参数可以观察出来。
get_hits、get_misses
使用memcached后,我们需要评估我们使用的策略是否合理。不能够使用中间缓存后,后端的数据库还是有较大的访问量,这样的话中间缓存就变得没有意义了。get_hits表示命中了多少次读取,即来memcached取到了多少有效数据;get_misses表示没有命中的次数,即此次来取数据的时候,memcached并没有你所查询的数据。如果没有清零统计数据的话,cmd_get = get_hits + get_misses。
memcached 的状态查询还有其它的命令,可以参考:《Memcached的stats命令》
如下:
stats reset
清空统计数据
stats malloc
显示内存分配数据
stats maps
这个不太确定,看源代码是把/proc/self/maps的数据显示出来。
stats cachedump slab_id limit_num
显示某个slab中的前limit_num个key列表,显示格式如下
ITEM key_name [ value_length b; expire_time|access_time s]
其中,memcached 1.2.2及以前版本显示的是 访问时间(timestamp)
1.2.4以上版本,包括1.2.4显示 过期时间(timestamp)
如果是永不过期的key,expire_time会显示为服务器启动的时间
stats cachedump 7 2
ITEM copy_test1 [250 b; 1207795754 s]
ITEM copy_test [248 b; 1207793649 s]
stats slabs
显示各个slab的信息,包括chunk的大小、数目、使用情况等
stats items
显示各个slab中item的数目和最老item的年龄(最后一次访问距离现在的秒数)
stats detail [on|off|dump]
设置或者显示详细操作记录
参数为on,打开详细操作记录
参数为off,关闭详细操作记录
参数为dump,显示详细操作记录(每一个键值get、set、hit、del的次数)
stats detail dump
PREFIX copy_test2 get 1 hit 1 set 0 del 0
PREFIX copy_test1 get 1 hit 1 set 0 del 0
PREFIX cpy get 1 hit 0 set 0 del 0
memcached数据存储和取回相关的基本命令只有4条。
下面将采用telnet与memcached进行交互,并介绍这4条基本命令。
假设memcached服务器在本机上,并监听在默认端口11211上。
telnet连接到memcached:
telnet 127.0.0.1 11211
SET:添加一个新的条目到memcached,或是用新的数据替换掉已存在的条目
set test1 0 0 10
testing001
STORED
ADD:仅当key不存在的情况下存储数据。如果一个key已经存在,将得到NOT_STORED的响应
add test1 0 0 10
testing002
NOT_STORED
add test2 0 0 10
testing002
STORED
REPLACE:仅当key已经存在的情况下存储数据。如果一个key不存在,将得到NOT_STORED的响应
replace test1 0 0 10
testing003
STORED
replace test3 0 0 10
testing003
NOT_STORED
GET:从memcached中返回数据。从缓存中返回数据时,将在第一行得到key的名字,flag的值和返回的value的长度。真正的数据在第二行,最后返回END。如果key并不存在,那么在第一行就直接返回END。
get test1
VALUE test1 0 10
testing003
END
get test4
END
get test1 test2
VALUE test1 0 10
testing003
END
注:像上面那样你可以在一个请求中包含多个由空格分开的key。当请求多个key时,将只会得到那些有存储数据的key的响应。memcached将不会响应没有存储Data的key。
###################################################################
1、启动Memcache 常用参数
memcached 1.4.3
-p <num> 设置端口号(默认不设置为: 11211)
-U <num> UDP监听端口 (默认: 11211, 0 时关闭)
-l <ip_addr> 绑定地址 (默认:所有都允许,无论内外网或者本机更换IP,有安全隐患,若设置为127.0.0.1就只能本机访问)
-d 独立进程运行
-u <username> 绑定使用指定用于运行进程 <username>
-m <num> 允许最大内存用量,单位M (默认: 64 MB)
-P <file> 将PID写入文件<file>,这样可以使得后边进行快速进程终止, 需要与 -d 一起使用
如:
在linux下:./usr/local/bin/memcached -d -u jb-mc -l 192.168.1.197 -m 2048 -p 12121
在window下:d:\App_Serv\memcached\memcached.exe -d RunService -l 127.0.0.1 -p 11211 -m 500
在windows下注册为服务后运行:
sc.exe create jb-Memcached binpath= “d:\App_Serv\memcached\memcached.exe -d RunService -p 11211 -m 500″ start= auto
net start jb-Memcached
2、连接:telnet 127.0.0.1 11211
不要说不会用这个?
3、写入memcache
<command name> <key> <flags> <exptime> <bytes>\r\n <data block>\r\n
a) <command name> 可以是”set”, “add”, “replace”。
“set”表示按照相应的<key>存储该数据,没有的时候增加,有的覆盖。
“add”表示按照相应的<key>添加该数据,但是如果该<key>已经存在则会操作失败。
“replace”表示按照相应的<key>替换数据,但是如果该<key>不存在则操作失败
b) <key> 客户端需要保存数据的key。
c) <flags> 是一个16位的无符号的整数(以十进制的方式表示)。
该标志将和需要存储的数据一起存储,并在客户端get数据时返回。
客户可以将此标志用做特殊用途,此标志对服务器来说是不透明的。
d) <exptime> 过期的时间。
若为0表示存储的数据永远不过时(但可被服务器算法:LRU 等替换)。
如果非0(unix时间或者距离此时的秒数),当过期后,服务器可以保证用户得不到该数据(以服务器时间为标准)。
e) <bytes> 需要存储的字节数(不包含最后的”\r\n”),当用户希望存储空数据时,<bytes>可以为0
f) 最后客户端需要加上”\r\n”作为”命令头”的结束标志。
<data block>\r\n
紧接着”命令头”结束之后就要发送数据块(即希望存储的数据内容),最后加上”\r\n”作为此次通讯的结束。
结果响应:reply
当以上数据发送结束之后,服务器将返回一个应答。可能有如下的情况:
a) “STORED\r\n”:表示存储成功
b) “NOT_STORED\r\n” : 表示存储失败,但是该失败不是由于错误。
通常这是由于”add”或者”replace”命令本身的要求所引起的,或者该项在删除队列之中。
如: set key 33 0 4\r\n
ffff\r\n
4、获取/检查KeyValue
get <key>*\r\n
a) <key>* 表示一个或者多个key(以空格分开)
b) “\r\n” 命令头的结束
结果响应:reply
服务器端将返回0个或者多个的数据项。每个数据项都是由一个文本行和一个数据块组成。当所有的数据项都接收完毕将收到”END\r\n”
每一项的数据结构:
VALUE <key> <flags> <bytes>\r\n
<data block>\r\n
a) <key> 希望得到存储数据的key
b) <falg> 发送set命令时设置的标志项
c) <bytes> 发送数据块的长度(不包含”\r\n”)
d) “\r\n” 文本行的结束标志
e) <data block> 希望接收的数据项。
f) “\r\n” 接收一个数据项的结束标志。
如果有些key出现在get命令行中但是没有返回相应的数据,这意味着服务器中不存在这些项,这些项过时了,或者被删除了
如:get aa
VALUE aa 33 4
ffff
END
5、删除KeyValue:
delete <key> <time>\r\n
a) <key> 需要被删除数据的key
b) <time> 客户端希望服务器将该数据删除的时间(unix时间或者从现在开始的秒数)
c) “\r\n” 命令头的结束
6、检查Memcache服务器状态:
stats\r\n
在这里可以看到memcache的获取次数,当前连接数,写入次数,已经命中率等;
pid : 进程id
uptime :总的运行时间,秒数
time : 当前时间
version : 版本号
……
curr_items : 当前缓存中的KeyValue数量
total_items : 曾经总共经过缓存的KeyValue数量
bytes : 所有的缓存使用的内存量
curr_connections 当前连接数
….
cmd_get : 总获取次数
cmd_set : 总的写入次数
get_hits : 总的命中次数
miss_hits : 获取失败次数
…..
bytes_read : 总共读取的流量字节数
bytes_written : 总的写入流量字节
limit_maxbytes : 最大允许使用的内存量,字节
7、高级缓存细节查看方法:
stats reset
清空统计数据
stats malloc
显示内存分配数据
stats cachedump slab_id limit_num
显示某个slab中的前limit_num个key列表,显示格式如下
ITEM key_name [ value_length b; expire_time|access_time s]
其中,memcached 1.2.2及以前版本显示的是 访问时间(timestamp)
1.2.4以上版本,包括1.2.4显示 过期时间(timestamp)
如果是永不过期的key,expire_time会显示为服务器启动的时间
stats cachedump 7 2
ITEM copy_test1 [250 b; 1207795754 s]
ITEM copy_test [248 b; 1207793649 s]
stats slabs
显示各个slab的信息,包括chunk的大小、数目、使用情况等
stats items
显示各个slab中item的数目和最老item的年龄(最后一次访问距离现在的秒数)
stats detail [on|off|dump]
设置或者显示详细操作记录
参数为on,打开详细操作记录
参数为off,关闭详细操作记录
参数为dump,显示详细操作记录(每一个键值get、set、hit、del的次数)
8、清空所有键值
flush_all
注:flush并不会将items删除,只是将所有的items标记为expired,因此这时memcache依旧占用所有内存。
8、退出
quit\r\n
今天要介绍的是Simple-Spring-Memcached,它封装了对MemCached的调用,使MemCached的客户端开发变得超乎寻常的简单,只要一行代码就行:
@ReadThroughAssignCache(assignedKey = "VETS", expiration = 300, namespace = "NELZ")
是不是很神奇?这行代码指定了MemCached的key,过期时间和命名空间。假设你的MemCached服务器IP是:196.168.10.101,端口是:12000,那么在数据调用的配置文件中只要加上下面配置代码就可以了:
1 <import resource="classpath:simplesm-context.xml" />
5 <bean id="memcachedConnectionBean" class="net.nelz.simplesm.config.MemcachedConnectionBean">
7 <property name="consistentHashing" value="true" />
9 <property name="nodeList" value="196.168.10.101:12000" />
5 </bean>
从simplesm-context.xml的内容中,可以看出它所封装的类和方法:
1 <bean id="memcachedClientFactory" class="net.nelz.simplesm.config.MemcachedClientFactory" >
3 property name="bean" ref="memcachedConnectionBean" />
5 </bean>
9 <bean id="memcachedClient" factory-bean="memcachedClientFactory" factory-method="createMemcachedClient" />
13 <bean id="methodStore" class="net.nelz.simplesm.aop.CacheKeyMethodStoreImpl" />
17 <bean id="net.nelz.simplesm.DefaultKeyProvider" class="net.nelz.simplesm.impl.DefaultKeyProvider">
19 <property name="methodStore" ref="methodStore" />
21 </bean>
25 <bean id="readThroughSingleCache" class="net.nelz.simplesm.aop.ReadThroughSingleCacheAdvice">
27 <property name="cache" ref="memcachedClient" />
29 <property name="methodStore" ref="methodStore" />
31 <property name="defaultKeyProvider" ref="net.nelz.simplesm.DefaultKeyProvider" />
33 </bean>
35 <bean id="readThroughMultiCache" class="net.nelz.simplesm.aop.ReadThroughMultiCacheAdvice">
37 <property name="cache" ref="memcachedClient" />
39 <property name="methodStore" ref="methodStore" />
41 <property name="defaultKeyProvider" ref="net.nelz.simplesm.DefaultKeyProvider" />
43 </bean>
45 <bean id="readThroughAssignCache" class="net.nelz.simplesm.aop.ReadThroughAssignCacheAdvice">
47 <property name="cache" ref="memcachedClient" />
49 <property name="methodStore" ref="methodStore" />
51 <property name="defaultKeyProvider" ref="net.nelz.simplesm.DefaultKeyProvider" />
53 </bean>
54
55 <bean id="updateSingleCache" class="net.nelz.simplesm.aop.UpdateSingleCacheAdvice">
57 <property name="cache" ref="memcachedClient" />
59 <property name="methodStore" ref="methodStore" />
61 <property name="defaultKeyProvider" ref="net.nelz.simplesm.DefaultKeyProvider" />
63 </bean>
64
65 <bean id="updateMultiCache" class="net.nelz.simplesm.aop.UpdateMultiCacheAdvice">
67 <property name="cache" ref="memcachedClient" />
69 <property name="methodStore" ref="methodStore" />
71 <property name="defaultKeyProvider" ref="net.nelz.simplesm.DefaultKeyProvider" />
73 </bean>
75 <bean id="updateAssignCache" class="net.nelz.simplesm.aop.UpdateAssignCacheAdvice">
77 <property name="cache" ref="memcachedClient" />
79 <property name="methodStore" ref="methodStore" />
81 <property name="defaultKeyProvider" ref="net.nelz.simplesm.DefaultKeyProvider" />
83 </bean>
85 <bean id="invalidateSingleCache" class="net.nelz.simplesm.aop.InvalidateSingleCacheAdvice">
87 <property name="cache" ref="memcachedClient" />
89 <property name="methodStore" ref="methodStore" />
91 <property name="defaultKeyProvider" ref="net.nelz.simplesm.DefaultKeyProvider" />
93 </bean>
95 <bean id="invalidateMultiCache" class="net.nelz.simplesm.aop.InvalidateMultiCacheAdvice">
97 <property name="cache" ref="memcachedClient" />
99 <property name="methodStore" ref="methodStore" />
101 <property name="defaultKeyProvider" ref="net.nelz.simplesm.DefaultKeyProvider" />
103 </bean>
105 <bean id="invalidateAssignCache" class="net.nelz.simplesm.aop.InvalidateAssignCacheAdvice">
107 <property name="cache" ref="memcachedClient" />
109 <property name="methodStore" ref="methodStore" />
111 <property name="defaultKeyProvider" ref="net.nelz.simplesm.DefaultKeyProvider" />
113 </bean>
Simple-Spring-Memcached还提供了一个例子,在spring的petClinic例子中加入了几行代码,就实现了对MemCached的调用:
1 import net.nelz.simplesm.annotations.ReadThroughAssignCache;
2
3 import net.nelz.simplesm.annotations.ReadThroughSingleCache;
4
5 @ReadThroughAssignCache(assignedKey = "VETS", expiration = 300, namespace = "NELZ")
6
7 public Collection<Vet> getVets() {
8
9 System.out.println("\n ! ! !Gonna wait a bit: " + new Date() + "\n");
10
11 try {
12
13 Thread.sleep(4000);
14
15 } catch (Exception ex) {}
16
17 return sessionFactory.getCurrentSession().createQuery("from Vet vet order by vet.lastName, vet.firstName").list();
18
19 }
为了加强测试的效果,在第一次读取数据时,故意停顿了一下(sleep)。
Memcached是高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提升访问速度。Memcached由Danga Interactive开发,用于提升LiveJournal.com访问速度的。LJ每秒动态页面访问量几千次,用户700万。Memcached将数据库负载大幅度降低,更好的分配资源,更快速访问。
上网baidu了很多东西,几乎都差不多,而且基于java的说的很少,所有只有在研究了各个其他语言类的应用后再来尝试在java上进行简单的操作应用。先从memcached上进行说明,memcached的最新版是采用c语言进行开发和设计的,据说旧版的是采用perl语言开发的,而且它是一个应用软件来的,是作为缓存服务器的服务器端运行在服务器上的,需要使用特定的语言编写客户端与其进行通信来进行数据的缓存和获取。通常我们是把memcached安装运行在web服务器上,然后通过对需要的数据进行缓存,据我目前所知,所有数据的缓存设置和存取操作,以及数据的更新后替换操作全部需要程序来进行,而不是自动进行的(自动不知道能不能成功,呵呵)。下面从一个实际的例子来应用memcached。
首先到http://splinedancer.com/memcached-win32/下载windows版本的memcached,然后依次执行下面:
1、Unzip the binaries in your desired directory (eg. c:\memcached) ,
解压
2、Install the service using the command: 'c:\memcached\memcached.exe -d install' from the command line
cmd 通过 c:\memcached\memcached.exe -d install 进行安装
3、Start the server from the Microsoft Management Console or by running the following command: 'c:\memcached\memcached.exe -d start'
cmd 通过 c:\memcached\memcached.exe -d install 启用服务
4、Use the server, by default listening to port 11211
默认的端口为11211。
执行完毕后,我们就可以在任务管理器中见到memcached.exe这个进程了。好了,我们的服务器已经正常运行了, 下面我们就来写java的客户端连接程序。到
https://github.com/gwhalin/Memcached-Java-Client/downloads下载memcahce客户端。
然后我们来编写代码,比如我提供的一个应用类如下:
package utils.cache;
import java.util.Date;
import com.meetup.memcached.SockIOPool;
import com.meetup.memcached.MemcachedClient;
/**
* 使用memcached的缓存实用类.
*
* @author hao446tian
*
*/
public class MemCached
{
// 创建全局的唯一实例
protected static MemcachedClient mcc = new MemcachedClient();
protected static MemCached memCached = new MemCached();
// 设置与缓存服务器的连接池
static {
// 服务器列表和其权重
String[] servers = {"127.0.0.1:11211"};
Integer[] weights = {3};
// 获取socke连接池的实例对象
SockIOPool pool = SockIOPool.getInstance();
// 设置服务器信息
pool.setServers( servers );
pool.setWeights( weights );
// 设置初始连接数、最小和最大连接数以及最大处理时间
pool.setInitConn( 5 );
pool.setMinConn( 5 );
pool.setMaxConn( 250 );
pool.setMaxIdle( 1000 * 60 * 60 * 6 );
// 设置主线程的睡眠时间
pool.setMaintSleep( 30 );
// 设置TCP的参数,连接超时等
pool.setNagle( false );
pool.setSocketTO( 3000 );
pool.setSocketConnectTO( 0 );
// 初始化连接池
pool.initialize();
// 压缩设置,超过指定大小(单位为K)的数据都会被压缩
mcc.setCompressEnable( true );
mcc.setCompressThreshold( 64 * 1024 );
}
/**
* 保护型构造方法,不允许实例化!
*
*/
protected MemCached()
{
}
/**
* 获取唯一实例.
* @return
*/
public static MemCached getInstance()
{
return memCached;
}
/**
* 添加一个指定的值到缓存中.
* @param key
* @param value
* @return
*/
public boolean add(String key, Object value)
{
return mcc.add(key, value);
}
public boolean add(String key, Object value, Date expiry)
{
return mcc.add(key, value, expiry);
}
public boolean replace(String key, Object value)
{
return mcc.replace(key, value);
}
public boolean replace(String key, Object value, Date expiry)
{
return mcc.replace(key, value, expiry);
}
/**
* 根据指定的关键字获取对象.
* @param key
* @return
*/
public Object get(String key)
{
return mcc.get(key);
}
public static void main(String[] args)
{
MemCached cache = MemCached.getInstance();
cache.add("hello", 234);
System.out.print("get value : " + cache.get("hello"));
}
}
那么我们就可以通过简单的像main方法中操作的一样存入一个变量,然后再取出进行查看,我们可以看到先调用了add,然后再进行get,我们运行一次后,234这个值已经被我们存入了memcached的缓存中的了,我们将main方法中红色的那一行注释掉后,我们再运行还是可以看到get到的value也是234,即缓存中我们已经存在了数据了。
对基本的数据我们可以操作,对于普通的POJO而言,如果要进行存储的话,那么比如让其实现java.io.Serializable接口,因为memcached是一个分布式的缓存服务器,多台服务器间进行数据共享需要将对象序列化的,所以必须实现该接口,否则会报错的。比如我们写一个简单的测试Bean如下:
class TBean implements java.io.Serializable
{
private static final long serialVersionUID = 1945562032261336919L;
private String name;
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
}
}
然后我们在main方法中加入如下几行代码:
TBean tb = new TBean();
tb.setName("铁木箱子");
cache.add("bean", tb);
TBean tb1 = (TBean)cache.get("bean");
System.out.print("name=" + tb1.getName());
tb1.setName("铁木箱子_修改的");
tb1 = (TBean)cache.get("bean");
System.out.print("name=" + tb1.getName());
我们首先把TBean的一个实例放入缓存中,然后再取出来,并进行名称的修改,然后我们再取这个对象,我们再看其名称,发现修改的对象并不是缓存中的对象,而是通过序列化过来的一个实例对象,这样我们就无须担心对原生类的无意修改导致缓存数据失效了,呵呵~~看来我也是多此一想啊。所以这表明从缓存中获取的对象是存入对象的一个副本,对获取对象的修改并不能真正的修改缓存中的数据,而应该使用其提供的replace等方法来进行修改。
查看缓存区块,分析结果可调配-f参数
telnet 127.0.0.1 11212
查看缓存的各种状态
stats
====================================
查看缓存所分配的slabs
stats slabs
====================================
清除统计数据
stats reset====================================
是否显示详细操作记录(每一个键值get、set、hit、del的次数)
stats detail on,记录详细操作
stats detail dump,不记录详细操作
====================================
显示各个slab中item的数目和最老item的年龄(最后一次访问距离现在的秒数)
stats items
====================================
显示某个slab中的前limit_num个key列表,显示格式如下
stats cachedump slab_id 显示条数
参考资料
一般来说一个memcahced进程会预先将自己划分为若干个slab,每个slab下又有若干个page,每个page下又有多个chunk,如果我们把这3个咚咚看作是object得话,这是两个一对多得关系。再一般来说,slab得数量是有限得,几个,十几个,或者几十个,这个跟进程配置得内存有关。而每个slab下得page默认情况是1m,也就是说如果一个slab占用100m得内存得话,那么默认情况下这个slab所拥有得page得个数就是100,而chunk就是我们得数据存放得最终地方。chunk_size表示数据存放块得大小,chunks_per_page表示一个内存页page中拥有得chunk得数量,total_pages表示每个slab下page得个数。total_chunks表示这个slab下chunk得总数(=total_pages * chunks_per_page),used_chunks表示该slab下已经使用得chunk得数量,free_chunks表示该slab下还可以使用得chunks数量。
企业中各项目中相互协作的时候可能用得到消息通知机制。比如有东西更新了,可以通知做索引。
在 Java 里有 JMS 的多个实现。其中 apache 下的 ActiveMQ 就是不错的选择。还有一个比较热的是 RabbitMQ (是 erlang 语言实现的)。这里示例下使用 ActiveMQ
用 ActiveMQ 最好还是了解下 JMS
JMS 公共 |
点对点域 |
发布/订阅域 |
ConnectionFactory |
QueueConnectionFactory |
TopicConnectionFactory |
Connection |
QueueConnection |
TopicConnection |
Destination |
Queue |
Topic |
Session |
QueueSession |
TopicSession |
MessageProducer |
QueueSender |
TopicPublisher |
MessageConsumer |
QueueReceiver |
TopicSubscriber |
JMS 定义了两种方式:Quere(点对点);Topic(发布/订阅)。
ConnectionFactory 是连接工厂,负责创建Connection。
Connection 负责创建 Session。
Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。
Destination 是消息的目的地。
详细的可以网上找些 JMS 规范(有中文版)。
下载 apache-activemq-5.3.0。http://activemq.apache.org/download.html,解压,然后双击 bin/activemq.bat。运行后,可以在 http://localhost:8161/admin 观察。也有 demo, http://localhost:8161/demo。把 activemq-all-5.3.0.jar 加入 classpath。
Jms 发送 代码:
- public static void main(String[] args) throws Exception {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
-
- Connection connection = connectionFactory.createConnection();
- connection.start();
-
- Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue("my-queue");
-
- MessageProducer producer = session.createProducer(destination);
- for(int i=0; i<3; i++) {
- MapMessage message = session.createMapMessage();
- message.setLong("count", new Date().getTime());
- Thread.sleep(1000);
-
- producer.send(message);
- }
- session.commit();
- session.close();
- connection.close();
- }
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(destination);
for(int i=0; i<3; i++) {
MapMessage message = session.createMapMessage();
message.setLong("count", new Date().getTime());
Thread.sleep(1000);
//通过消息生产者发出消息
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
Jms 接收代码:
- public static void main(String[] args) throws Exception {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
-
- Connection connection = connectionFactory.createConnection();
- connection.start();
-
- final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue("my-queue");
-
- MessageConsumer consumer = session.createConsumer(destination);
-
-
-
-
-
-
-
-
-
-
-
-
-
- int i=0;
- while(i<3) {
- i++;
- MapMessage message = (MapMessage) consumer.receive();
- session.commit();
-
-
- System.out.println("收到消息:" + new Date(message.getLong("count")));
- }
-
- session.close();
- connection.close();
- }
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageConsumer consumer = session.createConsumer(destination);
/*//listener 方式
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
MapMessage message = (MapMessage) msg;
//TODO something....
System.out.println("收到消息:" + new Date(message.getLong("count")));
session.commit();
}
});
Thread.sleep(30000);
*/
int i=0;
while(i<3) {
i++;
MapMessage message = (MapMessage) consumer.receive();
session.commit();
//TODO something....
System.out.println("收到消息:" + new Date(message.getLong("count")));
}
session.close();
connection.close();
}
启动 JmsReceiver 和 JmsSender 可以在看输出三条时间信息。当然 Jms 还指定有其它格式的数据,如 TextMessage
结合 Spring 的 JmsTemplate 方便用:
xml:
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
-
- <!-- 在非 web / ejb 容器中使用 pool 时,要手动 stop,spring 不会为你执行 destroy-method 的方法
- <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
- <property name="connectionFactory">
- <bean class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL" value="tcp://localhost:61616" />
- </bean>
- </property>
- </bean>
- -->
- <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL" value="tcp://localhost:61616" />
- </bean>
- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
- <property name="connectionFactory" ref="jmsFactory" />
- <property name="defaultDestination" ref="destination" />
- <property name="messageConverter">
- <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
- </property>
- </bean>
-
- <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
- <constructor-arg index="0" value="my-queue" />
- </bean>
-
- </beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<!-- 在非 web / ejb 容器中使用 pool 时,要手动 stop,spring 不会为你执行 destroy-method 的方法
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
</bean>
-->
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory" />
<property name="defaultDestination" ref="destination" />
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="my-queue" />
</bean>
</beans>
sender:
- public static void main(String[] args) {
- ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml");
-
- JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
-
- jmsTemplate.send(new MessageCreator() {
-
- public Message createMessage(Session session) throws JMSException {
- MapMessage mm = session.createMapMessage();
- mm.setLong("count", new Date().getTime());
- return mm;
- }
-
- });
- }
public static void main(String[] args) {
ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml");
JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage mm = session.createMapMessage();
mm.setLong("count", new Date().getTime());
return mm;
}
});
}
receiver:
- public static void main(String[] args) {
- ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml");
-
- JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
- while(true) {
- Map<String, Object> mm = (Map<String, Object>) jmsTemplate.receiveAndConvert();
- System.out.println("收到消息:" + new Date((Long)mm.get("count")));
- }
- }
public static void main(String[] args) {
ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml");
JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
while(true) {
Map<String, Object> mm = (Map<String, Object>) jmsTemplate.receiveAndConvert();
System.out.println("收到消息:" + new Date((Long)mm.get("count")));
}
}
注意:直接用 Jms 接口时接收了消息后要提交一下,否则下次启动接收者时还可以收到旧数据。有了 JmsTemplate 就不用自己提交 session.commit() 了。如果使用了 PooledConnectionFactory 要把 apache-activemq-5.3.0\lib\optional\activemq-pool-5.3.0.jar 加到 classpath
JVM在加载类的时候,都是通过ClassLoader的loadClass()方法来加载class的,loadClass(String name)方法:
- public Class<?> loadClass(String name) throws ClassNotFoundException {
- return loadClass(name, false);
- }
public Class<?> loadClass(String name) throws ClassNotFoundException {
return loadClass(name, false);
}
loadClass(String name)方法再调用loadClass(String name, boolean resolve)方法:
- name - 类的二进制名称
- resolve - 如果该参数为 true,则分析这个类
- protected synchronized Class<?> loadClass(String name, boolean resolve)
- throws ClassNotFoundException
- {
-
-
- Class c = findLoadedClass(name);
- if (c == null) {
- try {
- if (parent != null) {
- c = parent.loadClass(name, false);
- } else {
- c = findBootstrapClass0(name);
- }
- } catch (ClassNotFoundException e) {
-
-
- c = findClass(name);
- }
- }
- if (resolve) {
- resolveClass(c);
- }
- return c;
- }
protected synchronized Class<?> loadClass(String name, boolean resolve)
throws ClassNotFoundException
{
// First, check if the class has already been loaded
//JVM 规范规定ClassLoader可以在缓存保留它所加载的Class,如果一个Class已经被加载过,则直接从缓存中获取
Class c = findLoadedClass(name);
if (c == null) {
try {
if (parent != null) {
c = parent.loadClass(name, false);
} else {
c = findBootstrapClass0(name);
}
} catch (ClassNotFoundException e) {
// If still not found, then invoke findClass in order
// to find the class.
c = findClass(name);
}
}
if (resolve) {
resolveClass(c);
}
return c;
}
如果ClassLoader并没有加载这个class,则调用findBootstrapClass0:
- private Class findBootstrapClass0(String name)
- throws ClassNotFoundException
- {
- check();
- if (!checkName(name))
- throw new ClassNotFoundException(name);
- return findBootstrapClass(name);
- }
private Class findBootstrapClass0(String name)
throws ClassNotFoundException
{
check();
if (!checkName(name))
throw new ClassNotFoundException(name);
return findBootstrapClass(name);
}
该方法会调用check()方法来判断这个类是否已经初始化,并且通过checkName(name)来判断由name指定的这个类是否存在
最后调用findBootstrapClass(name):
- private native Class findBootstrapClass(String name)
- throws ClassNotFoundException;
private native Class findBootstrapClass(String name)
throws ClassNotFoundException;
而这个findBootstrapClass方法是一个native方法,这是我们的root loader,这个载入方法并非是由JAVA所写,而是C++写的,它会最终调用JVM中的原生findBootstrapClass方法来完成类的加载。
如果上面两个都找不到,则使用findClass(name)来查找指定类名的Class:
- protected Class<?> findClass(String name) throws ClassNotFoundException {
- throw new ClassNotFoundException(name);
- }
protected Class<?> findClass(String name) throws ClassNotFoundException {
throw new ClassNotFoundException(name);
}
JDK5.0中的说明: 使用指定的二进制名称查找类。此方法应该被类加载器的实现重写,该实现按照委托模型来加载类。在通过父类加载器检查所请求的类后,此方法将被 loadClass 方法调用。默认实现抛出一个 ClassNotFoundException。
所以,我们在自定义类中,只需要重写findClass()即可。
MyClassLoader类:
- public class MyClassLoader extends ClassLoader {
- private String fileName;
-
- public MyClassLoader(String fileName) {
- this.fileName = fileName;
- }
-
- protected Class<?> findClass(String className) throws ClassNotFoundException {
- Class clazz = this.findLoadedClass(className);
- if (null == clazz) {
- try {
- String classFile = getClassFile(className);
- FileInputStream fis = new FileInputStream(classFile);
- FileChannel fileC = fis.getChannel();
- ByteArrayOutputStream baos = new
- ByteArrayOutputStream();
- WritableByteChannel outC = Channels.newChannel(baos);
- ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
- while (true) {
- int i = fileC.read(buffer);
- if (i == 0 || i == -1) {
- break;
- }
- buffer.flip();
- outC.write(buffer);
- buffer.clear();
- }
- fis.close();
- byte[] bytes = baos.toByteArray();
-
- clazz = defineClass(className, bytes, 0, bytes.length);
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- return clazz;
- }
- private byte[] loadClassBytes(String className) throws
- ClassNotFoundException {
- try {
- String classFile = getClassFile(className);
- FileInputStream fis = new FileInputStream(classFile);
- FileChannel fileC = fis.getChannel();
- ByteArrayOutputStream baos = new
- ByteArrayOutputStream();
- WritableByteChannel outC = Channels.newChannel(baos);
- ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
- while (true) {
- int i = fileC.read(buffer);
- if (i == 0 || i == -1) {
- break;
- }
- buffer.flip();
- outC.write(buffer);
- buffer.clear();
- }
- fis.close();
- return baos.toByteArray();
- } catch (IOException fnfe) {
- throw new ClassNotFoundException(className);
- }
- }
- private String getClassFile(String name) {
- StringBuffer sb = new StringBuffer(fileName);
- name = name.replace('.', File.separatorChar) + ".class";
- sb.append(File.separator + name);
- return sb.toString();
- }
- }
public class MyClassLoader extends ClassLoader {
private String fileName;
public MyClassLoader(String fileName) {
this.fileName = fileName;
}
protected Class<?> findClass(String className) throws ClassNotFoundException {
Class clazz = this.findLoadedClass(className);
if (null == clazz) {
try {
String classFile = getClassFile(className);
FileInputStream fis = new FileInputStream(classFile);
FileChannel fileC = fis.getChannel();
ByteArrayOutputStream baos = new
ByteArrayOutputStream();
WritableByteChannel outC = Channels.newChannel(baos);
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
while (true) {
int i = fileC.read(buffer);
if (i == 0 || i == -1) {
break;
}
buffer.flip();
outC.write(buffer);
buffer.clear();
}
fis.close();
byte[] bytes = baos.toByteArray();
clazz = defineClass(className, bytes, 0, bytes.length);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
return clazz;
}
private byte[] loadClassBytes(String className) throws
ClassNotFoundException {
try {
String classFile = getClassFile(className);
FileInputStream fis = new FileInputStream(classFile);
FileChannel fileC = fis.getChannel();
ByteArrayOutputStream baos = new
ByteArrayOutputStream();
WritableByteChannel outC = Channels.newChannel(baos);
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
while (true) {
int i = fileC.read(buffer);
if (i == 0 || i == -1) {
break;
}
buffer.flip();
outC.write(buffer);
buffer.clear();
}
fis.close();
return baos.toByteArray();
} catch (IOException fnfe) {
throw new ClassNotFoundException(className);
}
}
private String getClassFile(String name) {
StringBuffer sb = new StringBuffer(fileName);
name = name.replace('.', File.separatorChar) + ".class";
sb.append(File.separator + name);
return sb.toString();
}
}
该类中通过调用defineClass(String name, byte[] b, int off, int len)方法来定义一个类:
- protected final Class<?> defineClass(String name, byte[] b, int off, int len)
- throws ClassFormatError
- {
- return defineClass(name, b, off, len, null);
- }
protected final Class<?> defineClass(String name, byte[] b, int off, int len)
throws ClassFormatError
{
return defineClass(name, b, off, len, null);
}
注:MyClassLoader加载类时有一个局限,必需指定
.class文件,而不能指定
.jar文件。MainClassLoader类:
- public class MainClassLoader {
- public static void main(String[] args) {
- try {
- MyClassLoader tc = new MyClassLoader("F:\\OpenLib\\");
- Class c = tc.findClass("Test");
- c.newInstance();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } catch (IllegalAccessException e) {
- e.printStackTrace();
- } catch (InstantiationException e) {
- e.printStackTrace();
- }
- }
- }
public class MainClassLoader {
public static void main(String[] args) {
try {
MyClassLoader tc = new MyClassLoader("F:\\OpenLib\\");
Class c = tc.findClass("Test");
c.newInstance();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
}
}
}
最后是一个简单的Test测试类:
- public class Test
- {
- public Test() {
- System.out.println("Test");
- }
- public static void main(String[] args) {
- System.out.println("Hello World");
- }
- }
public class Test
{
public Test() {
System.out.println("Test");
}
public static void main(String[] args) {
System.out.println("Hello World");
}
}
最近使用Mina开发一个Java的NIO服务端程序,因此也特意学习了Apache的这个Mina框架。
首先,Mina是个什么东西?看下官方网站(http://mina.apache.org/)对它的解释:
Apache的Mina(Multipurpose Infrastructure Networked Applications)是一个网络应用框架,可以帮助用户开发高性能和高扩展性的网络应用程序;它提供了一个抽象的、事件驱动的异步API,使Java NIO在各种传输协议(如TCP/IP,UDP/IP协议等)下快速高效开发。
Apache Mina也称为:
- NIO框架
- 客户端/服务端框架(典型的C/S架构)
- 网络套接字(networking socket)类库
- 事件驱动的异步API(注意:在JDK7中也新增了异步API)
总之:我们简单理解它是一个封装底层IO操作,提供高级操作API的通讯框架!
在Mina的官网、以及网上都有比较丰富的文档了,这里我就稍微简单说一下Mina的结构和示例代码。
因为Mina2.X改进了Mina的代码结构和包结构,降低了使用的复杂性和增强了健壮性,所以使得API发生了比较大的改变,有许多地方已经和Mina1.x不兼容了。
这里使用的是Mina2.0.4
1.Mina的结构
Mina的通信流程大致如上图所示,各个组件功能有:
(1.) IoService:这个接口在一个线程上负责套接字的建立,拥有自己的Selector,监
听是否有连接被建立。
(Mina底层使用JAVA NIO, 因此它是典型的使用Reactor模式架构的,采用事件驱动编程 , Mina运行用户自定义线程模型,可以是单线程、多线程、线程池等 ,
跟JAVA Socket不一样, Mina是非阻塞的Socket,它内部已经保证了对各个连接(session)的业务和数据的隔离,采用轮询机制为各个session分配CPU资源,
所以,你就不需要再去考虑不同Socket连接需要用不同的线程去操纵的问题了。)
(2.) IoProcessor:这个接口在另一个线程上负责检查是否有数据在通道上读写,也就是
说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,
通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService
与 IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上
的过滤器,并在过滤器链之后调用IoHandler。
(3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、
数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与 decode
是最为重要的、也是你在使用Mina 时最主要关注的地方。
(4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。
2. Mina编程的大致过程.
2.1 总体流程
建立服务器端的资源: 包括 Acceptor的建立,之后为Acceptor配置相应的Filter(可以是Mina自带的Filter或者自定义的Filter),
之后再配置相应基于事件驱动的处理业务逻辑的IoHandler.
建立客户端的资源: Mina采用了统一的编程模型,所以建立客户端的过程和建立服务器端的过程大致上是相似的,不过这里建立的是Connector.
2.2 示例程序。(使用jar包为 mina-core-2.0.4.jar)
下面通过一个简单的示例程序来进一步理解Mina的运行机制。
该程序实现简单的即时通讯功能。 即,多个客户端可以同时脸上服务器,并进行类似于聊天室一样的通信。
2.2.1 建立自定义的TextLineCodecFacotry
为了了解Mina的代码功能以及运行机制,我们模拟实现了类似Mina自带TextLineCodecFactory。
该CodecFactory功能是: 配合ProtocolCodecFilter 进行对底层数据(binary二进制数据流)和高层数据(特定类型的数据对象信息,例如String)之间的转换。
这里实现了一个断行读取功能,即遇到'\n'的时候,就认为是一个String Line , 将这段数据流封装成String,之后再交给下一个Filter或者Handler处理。
CodecFactory是一个工厂方法,底层通过一个Decoder和Encoder来提供对数据进行解、编码的操作,载体是IoBuffer。
(解码:将二进制数据转换成高层数据(对象) 编码:将高层数据(对象)转换成二进制数据流) )
1)建立Decoder (MyTextLineDecoder)
实现了ProtocolDecoder接口
package com.mai.mina.diyCodecFilter;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
public class MyTextLineDecoder implements ProtocolDecoder{
Charset charset = Charset.forName("UTF-8");
IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true);
@Override
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput output)
throws Exception {
// TODO Auto-generated method stub
while(in.hasRemaining()){
byte b = in.get();
if(b == '\n'){
buf.flip();
byte[] bytes = new byte[buf.limit()];
buf.get(bytes);
String message = new String(bytes,charset);
buf = IoBuffer.allocate(100).setAutoExpand(true);
output.write(message);
}else{
buf.put(b);
}
}
}
@Override
public void dispose(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1)
throws Exception {
// TODO Auto-generated method stub
}
}
2)建立Encoder (MyTextLineEncoder)
实现了ProtocolEncoder接口
package com.mai.mina.diyCodecFilter;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.textline.LineDelimiter;
public class MyTextLineEncoder implements ProtocolEncoder{
Charset charset = Charset.forName("UTF-8");
@Override
public void dispose(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput output)
throws Exception {
// TODO Auto-generated method stub
IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true);
buf.putString(message.toString(), charset.newEncoder());
buf.putString(LineDelimiter.DEFAULT.getValue(), charset.newEncoder());
buf.flip();
output.write(buf);
}
}
3)建立MyTextLineCodecFactory
实现了ProtocolCodecFactory接口
package com.mai.mina.diyCodecFilter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
public class MyTextLineCodecFactory implements ProtocolCodecFactory{
@Override
public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
return new MyTextLineDecoder();
}
@Override
public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
return new MyTextLineEncoder();
}
}
2.2.2 建立服务器端资源(包括Acceptor的配置、Handler建立)
1). 建立自定义IoHandler(MyServerHandleDemo1)
实现了IoHandler接口。
package com.mai.mina.diyChat;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.logging.Logger;
import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class MyServerHandleDemo1 implements IoHandler{
private Logger logger = Logger.getLogger(this.getClass().getName());
@Override
public void exceptionCaught(IoSession session, Throwable arg1)
throws Exception {
// TODO Auto-generated method stub
logger.warning("服务器启动发生异常,have a exception : " + arg1.getMessage());
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
String messageStr = message.toString();
DateFormat format = new SimpleDateFormat("yyyy-MM-dd H:m:s");
String dateStr = format.format(new Date());
logger.info(messageStr + "\t" + dateStr);
Collection<IoSession> sessions = session.getService().getManagedSessions().values();
for(IoSession tempSession : sessions){
tempSession.write(messageStr + "\t" + dateStr);
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
logger.info("服务器成功发送信息: " + message.toString());
}
@Override
public void sessionClosed(IoSession session) throws Exception {
// TODO Auto-generated method stub
logger.info("there is a session closed");
CloseFuture future = session.close(true);
future.addListener(new IoFutureListener(){
public void operationComplete(IoFuture future){
if(future instanceof CloseFuture){
((CloseFuture)future).setClosed();
logger.info("have do the future set to closed");
}
}
});
}
@Override
public void sessionCreated(IoSession session) throws Exception {
// TODO Auto-generated method stub
logger.info("there is a session created");
session.write("welcome to the chat room");
}
@Override
public void sessionIdle(IoSession session, IdleStatus arg1) throws Exception {
// TODO Auto-generated method stub
logger.info(session.getId() + "(SesssionID) is idle in the satate-->" + arg1);
}
@Override
public void sessionOpened(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
}
}
2).建立Acceptor ,同时也充当Server端的启动类 (SimpleMinaServer)
package com.mai.mina.diyChat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class SimpleMinaServer {
SocketAcceptor acceptor = null;
SimpleMinaServer(){
acceptor = new NioSocketAcceptor();
}
public boolean bind(){
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(
new MyTextLineCodecFactory()); //配置CodecFactory
LoggingFilter log = new LoggingFilter();
log.setMessageReceivedLogLevel(LogLevel.INFO);
acceptor.getFilterChain().addLast("logger", log);
acceptor.setHandler(new MyServerHandleDemo1()); //配置handler
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);
try {
acceptor.bind(new InetSocketAddress(8888));
return true;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return false;
}
}
public static void main(String args[]){
SimpleMinaServer server = new SimpleMinaServer();
if(!server.bind()){
System.out.println("服务器启动失败");
}else{
System.out.println("服务器启动成功");
}
}
}
2.2.3 建立Client端资源:
1)自定义IoHandler(MyClientHandleDemo1)
实现IoHandler接口
package com.mai.mina.diyChat;
import java.util.logging.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
public class MyClientHandleDemo1 extends IoHandlerAdapter{
private ChatPanel messagePanel = null;
private Logger logger = Logger.getLogger(this.getClass().getName());
MyClientHandleDemo1(){
}
MyClientHandleDemo1(ChatPanel messagePanel){
this.messagePanel = messagePanel;
}
public void messageReceived(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
String messageStr = message.toString();
logger.info("receive a message is : " + messageStr);
if(messagePanel != null)
messagePanel.showMsg(messageStr);
}
public void messageSent(IoSession session , Object message) throws Exception{
logger.info("客户端发了一个信息:" + message.toString());
}
}
2) 建立Connector (SimpleMinaClient)
View Code
package com.mai.mina.diyChat;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketConnector;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
public class SimpleMinaClient {
public SocketConnector connector = null;
public ConnectFuture future;
public IoSession session = null;
private ChatPanel messagePanel = null;
SimpleMinaClient(){
}
SimpleMinaClient(ChatPanel messagePanel){
this.messagePanel = messagePanel;
}
boolean connect(){
try{
connector = new NioSocketConnector();
connector.setConnectTimeoutMillis(3000);
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(
new MyTextLineCodecFactory());
LoggingFilter log = new LoggingFilter();
log.setMessageReceivedLogLevel(LogLevel.INFO);
connector.getFilterChain().addLast("logger", log);
connector.setHandler(new MyClientHandleDemo1(messagePanel));
future = connector.connect(new InetSocketAddress("127.0.0.1" , 8888));
future.awaitUninterruptibly();
session = future.getSession();
return true;
}catch(Exception e){
e.printStackTrace();
return false;
}
}
public void setAttribute(Object key , Object value){
session.setAttribute(key, value);
}
void sentMsg(String message){
session.write(message);
}
boolean close(){
CloseFuture future = session.getCloseFuture();
future.awaitUninterruptibly(1000);
connector.dispose();
return true;
}
public SocketConnector getConnector() {
return connector;
}
public IoSession getSession() {
return session;
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
SimpleMinaClient client = new SimpleMinaClient();
if(client.connect()){
client.sentMsg("hello , sever !");
client.close();
}
}
}
到这里,基本的Mina通信基础就建立好了。
接下来实现一个客户端的GUI界面,方便实际功能的建立和信息交互的演示。
2.2.4 Client Gui界面的建立。(ChatPanel -通过使用SimpleMinaClient来提供实际通信功能)
View Code
package com.mai.mina.diyChat;
import java.awt.BorderLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JLabel;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.JTextField;
import org.apache.commons.lang.math.RandomUtils;
public class ChatPanel extends javax.swing.JPanel {
private JPanel northPanel;
private JLabel headLabel;
private JScrollPane jScrollPane1;
private JScrollPane jScrollPane2;
private JButton exitB;
private JButton clearMsgB;
private JButton sentB;
private JButton connectB;
private JTextArea messageText;
private JTextField nameText;
private JLabel nameLabel;
private JTextArea messageArea;
private JPanel southPanel;
private SimpleMinaClient client = null;
private boolean connected = false;
private String username = null;
{
//Set Look & Feel
try {
javax.swing.UIManager.setLookAndFeel("com.sun.java.swing.plaf.windows.WindowsLookAndFeel");
} catch(Exception e) {
e.printStackTrace();
}
}
public void connect(){
if(client.connect()){
username = nameText.getText().trim();
if(username == null || "".equals(username)){
username = "游客" + RandomUtils.nextInt(1000);
nameText.setText(username);
}
connected = true;
dealUIWithFlag();
}else{
connected = false;
dealUIWithFlag();
showMsg("连接服务器失败。。。");
}
}
public void showMsg(String msg){
messageArea.append(msg);
messageArea.append("\n");
messageArea.selectAll();
messageArea.lostFocus(null, this);
}
public void sentMsg(){
String message = username + ":" + messageText.getText();
client.sentMsg(message);
messageText.setText("");
messageText.requestFocus();
}
public void dealUIWithFlag(){
if(connected){
nameText.setEnabled(false);
connectB.setEnabled(false);
sentB.setEnabled(true);
clearMsgB.setEnabled(true);
exitB.setEnabled(true);
}else{
nameText.setEnabled(true);
connectB.setEnabled(true);
sentB.setEnabled(false);
clearMsgB.setEnabled(false);
exitB.setEnabled(false);
}
}
public void closeTheClient(){
if(client.close()){
showMsg("连接已断开...");
connected = false;
dealUIWithFlag();
}else{
showMsg("无法断开连接...");
}
}
public static void main(String[] args) {
JFrame frame = new JFrame();
frame.getContentPane().add(new ChatPanel());
frame.addWindowListener(new WindowAdapter(){
public void windowClosing(WindowEvent e){
System.exit(0);
}
});
frame.pack();
frame.setLocationByPlatform(true);
frame.setVisible(true);
}
public ChatPanel() {
super();
client = new SimpleMinaClient(this);
initGUI();
dealUIWithFlag();
}
private void initGUI() {
try {
this.setPreferredSize(new java.awt.Dimension(400, 339));
this.setLayout(null);
{
northPanel = new JPanel();
BorderLayout northPanelLayout = new BorderLayout();
northPanel.setLayout(northPanelLayout);
this.add(northPanel);
northPanel.setBounds(0, 0, 400, 188);
{
headLabel = new JLabel();
northPanel.add(headLabel, BorderLayout.NORTH);
headLabel.setText("\u6b22\u8fce\u4f7f\u7528 (\u6d4b\u8bd5Ip:port --> 127.0.0.1:8888)");
headLabel.setPreferredSize(new java.awt.Dimension(397, 19));
}
{
jScrollPane1 = new JScrollPane();
northPanel.add(jScrollPane1, BorderLayout.CENTER);
jScrollPane1.setPreferredSize(new java.awt.Dimension(400, 169));
{
messageArea = new JTextArea();
jScrollPane1.setViewportView(messageArea);
messageArea.setPreferredSize(new java.awt.Dimension(398, 145));
messageArea.setEditable(false);
messageArea.setLineWrap(true);
messageArea.setWrapStyleWord(true);
}
}
}
{
southPanel = new JPanel();
this.add(southPanel);
southPanel.setBounds(0, 194, 400, 145);
southPanel.setLayout(null);
{
nameLabel = new JLabel();
southPanel.add(nameLabel);
nameLabel.setText("\u6635\u79f0:");
nameLabel.setBounds(10, 12, 35, 15);
}
{
nameText = new JTextField();
southPanel.add(nameText);
nameText.setText("\u6e38\u5ba2");
nameText.setBounds(45, 9, 96, 21);
}
{
jScrollPane2 = new JScrollPane();
southPanel.add(jScrollPane2);
jScrollPane2.setBounds(15, 37, 364, 69);
{
messageText = new JTextArea();
jScrollPane2.setViewportView(messageText);
messageText.setBounds(101, 72, 362, 75);
messageText.setPreferredSize(new java.awt.Dimension(362, 54));
messageText.setLineWrap(true);
messageText.setWrapStyleWord(true);
}
}
{
connectB = new JButton();
southPanel.add(connectB);
connectB.setText("\u8fde\u63a5\u670d\u52a1\u5668");
connectB.setBounds(179, 8, 93, 23);
connectB.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent evt) {
System.out.println("connectB.actionPerformed, event="+evt);
//TODO add your code for connectB.actionPerformed
connect();
}
});
}
{
sentB = new JButton();
southPanel.add(sentB);
sentB.setText("\u53d1\u9001");
sentB.setBounds(261, 116, 57, 23);
sentB.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent evt) {
System.out.println("sentB.actionPerformed, event="+evt);
//TODO add your code for sentB.actionPerformed
sentMsg();
}
});
}
{
clearMsgB = new JButton();
southPanel.add(clearMsgB);
clearMsgB.setText("\u6e05\u7a7a");
clearMsgB.setBounds(324, 116, 57, 23);
clearMsgB.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent evt) {
System.out.println("clearMsgB.actionPerformed, event="+evt);
//TODO add your code for clearMsgB.actionPerformed
messageText.setText("");
}
});
}
{
exitB = new JButton();
southPanel.add(exitB);
exitB.setText("\u6ce8\u9500");
exitB.setBounds(282, 8, 57, 23);
exitB.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent evt) {
System.out.println("exitB.actionPerformed, event="+evt);
//TODO add your code for exitB.actionPerformed
closeTheClient();
}
});
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 运行结果
首先启动服务器端,即运行SimpleMinaServer类 , 启动成功时会在控制台中打印出“服务器启动成功"
接下来运行客户端ChatPanel。
note: 上面只是一个简单的信息交互,其实使用Mina比较常用的还是在自定义协议处理这块。
所以,比较应该注重学习的是Filter这块。有时间大家可以去看看源码。
Cron表达式是一个字符串,字符串以5或6个空格隔开,分开工6或7个域,每一个域代表一个含义,Cron有如下两种语法
格式:
Seconds Minutes Hours DayofMonth Month DayofWeek Year 或
Seconds Minutes Hours DayofMonth Month DayofWeek
每一个域可出现的字符如下:
代码
Seconds:可出现,- * / 四个字符,有效范围为0-59的整数
Minutes:可出现,- * / 四个字符,有效范围为0-59的整数
Hours:可出现,- * / 四个字符,有效范围为0-23的整数
DayofMonth:可出现,- * / ? L W C八个字符,有效范围为0-31的整数
Month:可出现,- * / 四个字符,有效范围为1-12的整数或JAN-DEc
DayofWeek:可出现,- * / ? L C #四个字符,有效范围为1-7的整数或SUN-SAT两个范围。1表示星期天,2表示星期一, 依次类推
Year:可出现,- * / 四个字符,有效范围为1970-2099年
每一个域都使用数字,但还可以出现如下特殊字符,它们的含义是:
代码
(1)*:表示匹配该域的任意值,假如在Minutes域使用*,即表示每分钟都会触发事件。
(2)?:只能用在DayofMonth和DayofWeek两个域。它也匹配域的任意值,但实际不会。因为DayofMonth和DayofWeek会相互影响。例如想在每月的20日触发调度,不管20日到底是星期几,则只能使用如下写法: 13 13 15 20 * ?,其中最后一位只能用?,而不能使用*,如果使用*表示不管星期几都会触发,实际上并不是这样。
(3)-:表示范围,例如在Minutes域使用5-20,表示从5分到20分钟每分钟触发一次
(4)/:表示起始时间开始触发,然后每隔固定时间触发一次,例如在Minutes域使用5/20,则意味着5分钟触发一次,而25,45等分别触发一次.
(5),:表示列出枚举值值。例如:在Minutes域使用5,20,则意味着在5和20分每分钟触发一次。
(6)L:表示最后,只能出现在DayofWeek和DayofMonth域,如果在DayofWeek域使用5L,意味着在最后的一个星期四触发。
(7)W:表示有效工作日(周一到周五),只能出现在DayofMonth域,系统将在离指定日期的最近的有效工作日触发事件。例如:在DayofMonth使用5W,如果5日是星期六,则将在最近的工作日:星期五,即4日触发。如果5日是星期天,则在6日触发;如果5日在星期一到星期五中的一天,则就在5日触发。另外一点,W的最近寻找不会跨过月份
(8)LW:这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。
(9)#:用于确定每个月第几个星期几,只能出现在DayofMonth域。例如在4#2,表示某月的第二个星期三。
举几个例子:
代码
0 0 2 1 * ? * 表示在每月的1日的凌晨2点调度任务
0 15 10 ? * MON-FRI 表示周一到周五每天上午10:15执行作业
0 15 10 ? 6L 2002-2006 表示200-2006年的每个月的最后一个星期五上午10:15执行作业
91linux
一个cron表达式有至少6个(也可能7个)有空格分隔的时间元素。
按顺序依次为
秒(0~59)
分钟(0~59)
小时(0~23)
天(月)(0~31,但是你需要考虑你月的天数)
月(0~11)
天(星期)(1~7 1=SUN 或 SUN,MON,TUE,WED,THU,FRI,SAT)
7.年份(1970-2099)
其中每个元素可以是一个值(如6),一个连续区间(9-12),一个间隔时间(8-18/4)(/表示每隔4小时),一个列表(1,3,5),通配符。由于"月份中的日期"和"星期中的日期"这两个元素互斥的,必须要对其中一个设置?.
0 0 10,14,16 * * ? 每天上午10点,下午2点,4点
0 0/30 9-17 * * ? 朝九晚五工作时间内每半小时
0 0 12 ? * WED 表示每个星期三中午12点
"0 0 12 * * ?" 每天中午12点触发
"0 15 10 ? * *" 每天上午10:15触发
"0 15 10 * * ?" 每天上午10:15触发
"0 15 10 * * ? *" 每天上午10:15触发
"0 15 10 * * ? 2005" 2005年的每天上午10:15触发
"0 * 14 * * ?" 在每天下午2点到下午2:59期间的每1分钟触发
"0 0/5 14 * * ?" 在每天下午2点到下午2:55期间的每5分钟触发
"0 0/5 14,18 * * ?" 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
"0 0-5 14 * * ?" 在每天下午2点到下午2:05期间的每1分钟触发
"0 10,44 14 ? 3 WED" 每年三月的星期三的下午2:10和2:44触发
"0 15 10 ? * MON-FRI" 周一至周五的上午10:15触发
"0 15 10 15 * ?" 每月15日上午10:15触发
"0 15 10 L * ?" 每月最后一日的上午10:15触发
"0 15 10 ? * 6L" 每月的最后一个星期五上午10:15触发
"0 15 10 ? * 6L 2002-2005" 2002年至2005年的每月的最后一个星期五上午10:15触发
"0 15 10 ? * 6#3" 每月的第三个星期五上午10:15触发
有些子表达式能包含一些范围或列表
例如:子表达式(天(星期))可以为 “MON-FRI”,“MON,WED,FRI”,“MON-WED,SAT”
“*”字符代表所有可能的值
因此,“*”在子表达式(月)里表示每个月的含义,“*”在子表达式(天(星期))表示星期的每一天
“/”字符用来指定数值的增量
例如:在子表达式(分钟)里的“0/15”表示从第0分钟开始,每15分钟
在子表达式(分钟)里的“3/20”表示从第3分钟开始,每20分钟(它和“3,23,43”)的含义一样
“?”字符仅被用于天(月)和天(星期)两个子表达式,表示不指定值
当2个子表达式其中之一被指定了值以后,为了避免冲突,需要将另一个子表达式的值设为“?”
“L” 字符仅被用于天(月)和天(星期)两个子表达式,它是单词“last”的缩写
但是它在两个子表达式里的含义是不同的。
在天(月)子表达式中,“L”表示一个月的最后一天
在天(星期)自表达式中,“L”表示一个星期的最后一天,也就是SAT
如果在“L”前有具体的内容,它就具有其他的含义了
例如:“6L”表示这个月的倒数第6天,“FRIL”表示这个月的最一个星期五
注意:在使用“L”参数时,不要指定列表或范围,因为这会导致问题
字段 允许值 允许的特殊字符
秒 0-59 , - * /
分 0-59 , - * /
小时 0-23 , - * /
日期 1-31 , - * ? / L W C
月份 1-12 或者 JAN-DEC , - * /
星期 1-7 或者 SUN-SAT , - * ? / L C #
年(可选) 留空, 1970-2099 , - * /
<!--
注意:日和星期是任先其一
?:代表可有可无
*:代表每一年
秒 分 时 日 月 星期几 年
0 0 0 10 12 ? 2009 //代表:2009年12月10日0点0分0秒执行(星期几:'?'代表忽略)
0 0 0 10 12 ? * //代表:每年12月10日0点0分0秒执行
0 0 0 10 * ? //代表:每月10日0点0分0秒执行
0 0 1 1 * ? //代表:每月1号1点0分0秒执行
0 0 1 1 3,6,9 ? //代表:3月 6月 9月,1号1点0分0秒执行
0 0 1 1 2-5 ?
-->
摘要: 通过这几天的看书和学习,对Lucene有了更进一步的认识,所以总结一下这些天的学习成果把Lucene的学习心得也学出来。
1 Lucene的认识
提到Lucene很多人都知道这个开源的搜索工具,其魅力也是很大的。它让我们对搜索引擎的认识不在那么神秘,也不会在觉得百度和goo...
阅读全文
摘要: Lucene是apache组织的一个用java实现全文搜索引擎的开源项目。 其功能非常的强大,api也很简单。总得来说用Lucene来进行建立 和搜索和操作数据库是差不多的(有点像),Document可以看作是 数据库的一行记录,Field可以看作是数据库的字段。用lucene实 现搜索引擎就像用JDBC实现连接数据库一样简单。
Lucene2.0,它与以前广泛应用和介绍的Lucene 1.4...
阅读全文