小方的Java博客
BlogJava
::
首页
::
新随笔
::
联系
::
聚合
::
管理
::
27 随笔 :: 17 文章 :: 115 评论 :: 0 Trackbacks
<
2013年3月
>
日
一
二
三
四
五
六
24
25
26
27
28
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1
2
3
4
5
6
公告
常用链接
我的随笔
我的文章
我的评论
我的参与
最新评论
留言簿
(6)
给我留言
查看公开留言
查看私人留言
随笔分类
(24)
AJAX(9)
(rss)
JAVA API 自学(2)
(rss)
其他技术(6)
(rss)
开发经验点滴(1)
(rss)
程序人生(3)
(rss)
非技术(3)
(rss)
随笔档案
(27)
2008年12月 (1)
2008年8月 (1)
2007年6月 (1)
2007年3月 (1)
2007年2月 (3)
2006年10月 (3)
2006年9月 (2)
2006年8月 (4)
2006年7月 (7)
2006年6月 (4)
文章分类
(17)
Hibernate(1)
(rss)
Html & Script(1)
(rss)
J2se 基础(3)
(rss)
J2se 高级(2)
(rss)
Java 学习方法(1)
(rss)
java 设计模式(2)
(rss)
Jsp(3)
(rss)
Servlet(1)
(rss)
Struts(2)
(rss)
其它(1)
(rss)
文章档案
(17)
2006年6月 (1)
2006年2月 (15)
2005年11月 (1)
相册
other
top
work.input_hint
连接专用
收藏夹
(1)
oo(1)
(rss)
BlogJava 朋友
中东大哥
在路上
快乐笛子
爱勤劳的鱼
最新随笔
1. DWR2.0的调用前拦截
2. [整理]sql语句一些实用技巧for oracle
3. [原创]毕业设计答辩之后感想
4. [整理]JavaScript的混合继承方法
5. [hibernate3]lab sysem项目一些建议
6. [整理]JavaScript最流行的2种定义类的方式
7. [原创/整理]首次发布DWR中文文档.PDF
8. [转载]"软件外包",中国软件行业何去何从?
9. [转载]与大学生谈软件外包
10. [原创]一个javascript的时钟
搜索
积分与排名
积分 - 74645
排名 - 740
最新评论
1. re: [原创]毕业设计答辩之后感想
同感!!!!!!!!!!!!!!
--wordcount
2. re: java多线程设计模式
高质量文章
--明明的JavaBlog
3. re: [整理] java.util.Calendar
评论内容较长,点击标题查看
--杨正益
4. re: [原创]公司实习中的领悟
看了你的,我才发现我做错了很多。我现在只是要专业知识而已。而已英语我从来就没有放在心上。看了你的我决定要好好的看英语了
--花花公子
5. re: [原创]DWR2.0的调用前拦截
在你这个应用里面怎么可以获得请求的URL呀,我想判断一下用户是否对某一个资源是否有访问权限的时候怎么样才能知道他是访问的那个资源呢
--小简
6. re: [原创]DWR2.0的调用前拦截
@bln13fb
这是我们公司自己的类,这里只是个范例,你自己有自己的类
--方佳玮
7. re: [原创/整理]首次发布DWR中文文档.PDF
支持您的事业!!!
--陈庚阳
8. re: [整理]SQL语句学习手册实例版
找的您好辛苦!您用心了!
--youyouday
9. re: [原创/整理]首次发布DWR中文文档.PDF
http://www.pusuo.net
--www.pusuo.net
10. re: [原创]DWR2.0的调用前拦截
ISessionContainer
SecurityFactory
如何导入
--bln13fb
阅读排行榜
1. [原创/整理]首次发布DWR中文文档.PDF(7609)
2. [整理]JavaScript最流行的2种定义类的方式(6155)
3. [整理]SQL语句学习手册实例版(5630)
4. [整理]Google Web Toolkit 和 googlipse(GWT eclipse 插件)开发ajax(4972)
5. [原创] 用GWT做的输入前提示(3488)
6. [整理] java.lang.reflect(2977)
7. DWR2.0的调用前拦截(2890)
8. [整理]sql语句一些实用技巧for oracle(2755)
9. [整理]介绍一个GWT的网站(2314)
10. [整理] java.util.Calendar(1849)
评论排行榜
1. [原创/整理]首次发布DWR中文文档.PDF(49)
2. [原创] 用GWT做的输入前提示(10)
3. [整理]SQL语句学习手册实例版(8)
4. [整理]sql语句一些实用技巧for oracle(6)
5. [原创]毕业设计答辩之后感想(6)
6. [原创]ajax in action 第9章 j2ee版 动态双组合功能(6)
7. [整理]介绍一个GWT的网站(4)
8. [原创]公司实习中的领悟(4)
9. [整理]什么才是最好处理中文方法(4)
10. [整理]Google Web Toolkit 和 googlipse(GWT eclipse 插件)开发ajax(3)
java多线程设计模式
java语言已经内置了多线程支持,所有实现Runnable接口的类都可被启动一个新线程,新线程会执行该实例的run()方法,当run()方法执行完毕后,线程就结束了。一旦一个线程执行完毕,这个实例就不能再重新启动,只能重新生成一个新实例,再启动一个新线程。
Thread类是实现了Runnable接口的一个实例,它代表一个线程的实例,并且,启动线程的唯一方法就是通过Thread类的start()实例方法:
Thread t
=
new
Thread();
t.start();
start()方法是一个native方法,它将启动一个新线程,并执行run()方法。Thread类默认的run()方法什么也不做就退出了。注意:直接调用run()方法并不会启动一个新线程,它和调用一个普通的java方法没有什么区别。
因此,有两个方法可以实现自己的线程:
方法1:自己的类extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。例如:
public
class
MyThread
extends
Thread
{
public
run()
{
System.out.println(
"
MyThread.run()
"
);
}
}
在合适的地方启动线程:
new
MyThread().start();
方法2:如果自己的类已经extends另一个类,就无法直接extends Thread,此时,必须实现一个Runnable接口:
public
class
MyThread
extends
OtherClass
implements
Runnable
{
public
run()
{
System.out.println(
"
MyThread.run()
"
);
}
}
为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyThread实例:
MyThread myt
=
new
MyThread();
Thread t
=
new
Thread(myt);
t.start();
事实上,当传入一个Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码:
public
void
run()
{
if
(target
!=
null
)
{
target.run();
}
}
线程还有一些Name, ThreadGroup, isDaemon等设置,由于和线程设计模式关联很少,这里就不多说了。
由于同一进程内的多个线程共享内存空间,在Java中,就是共享实例,当多个线程试图同时修改某个实例的内容时,就会造成冲突,因此,线程必须实现共享互斥,使多线程同步。
最简单的同步是将一个方法标记为synchronized,对同一个实例来说,任一时刻只能有一个synchronized方法在执行。当一个方法正在执行某个synchronized方法时,其他线程如果想要执行这个实例的任意一个synchronized方法,都必须等待当前执行 synchronized方法的线程退出此方法后,才能依次执行。
但是,非synchronized方法不受影响,不管当前有没有执行synchronized方法,非synchronized方法都可以被多个线程同时执行。
此外,必须注意,只有同一实例的synchronized方法同一时间只能被一个线程执行,不同实例的synchronized方法是可以并发的。例如,
class
A定义了synchronized方法sync(),则不同实例a1.sync()和a2.sync()可以同时由两个线程来执行。
多线程同步的实现最终依赖锁机制。我们可以想象某一共享资源是一间屋子,每个人都是一个线程。当A希望进入房间时,他必须获得门锁,一旦A获得门锁,他进去后就立刻将门锁上,于是B,C,D
就不得不在门外等待,直到A释放锁出来后,B,C,D
中的某一人抢到了该锁(具体抢法依赖于 JVM的实现,可以先到先得,也可以随机挑选),然后进屋又将门锁上。这样,任一时刻最多有一人在屋内(使用共享资源)。
Java语言规范内置了对多线程的支持。对于Java程序来说,每一个对象实例都有一把“锁”,一旦某个线程获得了该锁,别的线程如果希望获得该锁,只能等待这个线程释放锁之后。获得锁的方法只有一个,就是synchronized关键字。例如:
public
class
SharedResource
{
private
int
count
=
0
;
public
int
getCount()
{
return
count; }
public
synchronized
void
setCount(
int
count)
{
this
.count
=
count; }
}
同步方法public
synchronized
void
setCount(
int
count)
{
this
.count
=
count; }
事实上相当于:
public
void
setCount(
int
count)
{
synchronized
(
this
)
{
//
在此获得this锁
this
.count
=
count;
}
//
在此释放this锁
}
红色部分表示需要同步的代码段,该区域为“危险区域”,如果两个以上的线程同时执行,会引发冲突,因此,要更改SharedResource的内部状态,必须先获得SharedResource实例的锁。
退出synchronized块时,线程拥有的锁自动释放,于是,别的线程又可以获取该锁了。
为了提高性能,不一定要锁定this,例如,SharedResource有两个独立变化的变量:
public
class
SharedResouce
{
private
int
a
=
0
;
private
int
b
=
0
;
public
synchronized
void
setA(
int
a)
{
this
.a
=
a; }
public
synchronized
void
setB(
int
b)
{
this
.b
=
b; }
}
若同步整个方法,则setA()的时候无法setB(),setB()时无法setA()。为了提高性能,可以使用不同对象的锁:
public
class
SharedResouce
{
private
int
a
=
0
;
private
int
b
=
0
;
private
Object sync_a
=
new
Object();
private
Object sync_b
=
new
Object();
public
void
setA(
int
a)
{
synchronized
(sync_a)
{
this
.a
=
a;
}
}
public
synchronized
void
setB(
int
b)
{
synchronized
(sync_b)
{
this
.b
=
b;
}
}
}
通常,多线程之间需要协调工作。例如,浏览器的一个显示图片的线程displayThread想要执行显示图片的任务,必须等待下载线程 downloadThread将该图片下载完毕。如果图片还没有下载完,displayThread可以暂停,当downloadThread完成了任务后,再通知displayThread“图片准备完毕,可以显示了”,这时,displayThread继续执行。
以上逻辑简单的说就是:如果条件不满足,则等待。当条件满足时,等待该条件的线程将被唤醒。在Java中,这个机制的实现依赖于wait
/
notify。等待机制与锁机制是密切关联的。例如:
synchronized
(obj)
{
while
(
!
condition)
{
obj.wait();
}
obj.doSomething();
}
当线程A获得了obj锁后,发现条件condition不满足,无法继续下一处理,于是线程A就wait()。
在另一线程B中,如果B更改了某些条件,使得线程A的condition条件满足了,就可以唤醒线程A:
synchronized
(obj)
{
condition
=
true
;
obj.notify();
}
需要注意的概念是:
# 调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj)
{
}
代码段内。
# 调用obj.wait()后,线程A就释放了obj的锁,否则线程B无法获得obj锁,也就无法在synchronized(obj)
{
}
代码段内唤醒A。
# 当obj.wait()方法返回后,线程A需要再次获得obj锁,才能继续执行。
# 如果A1,A2,A3都在obj.wait(),则B调用obj.notify()只能唤醒A1,A2,A3中的一个(具体哪一个由JVM决定)。
# obj.notifyAll()则能全部唤醒A1,A2,A3,但是要继续执行obj.wait()的下一条语句,必须获得obj锁,因此,A1,A2,A3只有一个有机会获得锁继续执行,例如A1,其余的需要等待A1释放obj锁之后才能继续执行。
# 当B调用obj.notify
/
notifyAll的时候,B正持有obj锁,因此,A1,A2,A3虽被唤醒,但是仍无法获得obj锁。直到B退出synchronized块,释放obj锁后,A1,A2,A3中的一个才有机会获得锁继续执行。
前面讲了wait
/
notify机制,Thread还有一个sleep()静态方法,它也能使线程暂停一段时间。sleep与wait的不同点是: sleep并不释放锁,并且sleep的暂停和wait暂停是不一样的。obj.wait会使线程进入obj对象的等待集合中并等待唤醒。
但是wait()和sleep()都可以通过interrupt()方法打断线程的暂停状态,从而使线程立刻抛出InterruptedException。
如果线程A希望立即结束线程B,则可以对线程B对应的Thread实例调用interrupt方法。如果此刻线程B正在 wait
/
sleep
/
join,则线程B会立刻抛出InterruptedException,在catch()
{}
中直接return即可安全地结束线程。
需要注意的是,InterruptedException是线程自己从内部抛出的,并不是interrupt()方法抛出的。对某一线程调用 interrupt()时,如果该线程正在执行普通的代码,那么该线程根本就不会抛出InterruptedException。但是,一旦该线程进入到 wait()
/
sleep()
/
join()后,就会立刻抛出InterruptedException。
GuardedSuspention模式主要思想是:
当条件不满足时,线程等待,直到条件满足时,等待该条件的线程被唤醒。
我们设计一个客户端线程和一个服务器线程,客户端线程不断发送请求给服务器线程,服务器线程不断处理请求。当请求队列为空时,服务器线程就必须等待,直到客户端发送了请求。
先定义一个请求队列:Queue
package
com.crackj2ee.thread;
import
java.util.
*
;
public
class
Queue
{
private
List queue
=
new
LinkedList();
public
synchronized
Request getRequest()
{
while
(queue.size()
==
0
)
{
try
{
this
.wait();
}
catch
(InterruptedException ie)
{
return
null
;
}
}
return
(Request)queue.remove(
0
);
}
public
synchronized
void
putRequest(Request request)
{
queue.add(request);
this
.notifyAll();
}
}
蓝色部分就是服务器线程的等待条件,而客户端线程在放入了一个request后,就使服务器线程等待条件满足,于是唤醒服务器线程。
客户端线程:ClientThread
package
com.crackj2ee.thread;
public
class
ClientThread
extends
Thread
{
private
Queue queue;
private
String clientName;
public
ClientThread(Queue queue, String clientName)
{
this
.queue
=
queue;
this
.clientName
=
clientName;
}
public
String toString()
{
return
"
[ClientThread-
"
+
clientName
+
"
]
"
;
}
public
void
run()
{
for
(
int
i
=
0
; i
<
100
; i
++
)
{
Request request
=
new
Request(
""
+
(
long
)(Math.random()
*
10000
));
System.out.println(
this
+
"
send request:
"
+
request);
queue.putRequest(request);
try
{
Thread.sleep((
long
)(Math.random()
*
10000
+
1000
));
}
catch
(InterruptedException ie)
{
}
}
System.out.println(
this
+
"
shutdown.
"
);
}
}
服务器线程:ServerThread
package
com.crackj2ee.thread;
public
class
ServerThread
extends
Thread
{
private
boolean
stop
=
false
;
private
Queue queue;
public
ServerThread(Queue queue)
{
this
.queue
=
queue;
}
public
void
shutdown()
{
stop
=
true
;
this
.interrupt();
try
{
this
.join();
}
catch
(InterruptedException ie)
{}
}
public
void
run()
{
while
(
!
stop)
{
Request request
=
queue.getRequest();
System.out.println(
"
[ServerThread] handle request:
"
+
request);
try
{
Thread.sleep(
2000
);
}
catch
(InterruptedException ie)
{}
}
System.out.println(
"
[ServerThread] shutdown.
"
);
}
}
服务器线程在红色部分可能会阻塞,也就是说,Queue.getRequest是一个阻塞方法。这和java标准库的许多IO方法类似。
最后,写一个Main来启动他们:
package
com.crackj2ee.thread;
public
class
Main
{
public
static
void
main(String[] args)
{
Queue queue
=
new
Queue();
ServerThread server
=
new
ServerThread(queue);
server.start();
ClientThread[] clients
=
new
ClientThread[
5
];
for
(
int
i
=
0
; i
<
clients.length; i
++
)
{
clients[i]
=
new
ClientThread(queue,
""
+
i);
clients[i].start();
}
try
{
Thread.sleep(
100000
);
}
catch
(InterruptedException ie)
{}
server.shutdown();
}
}
我们启动了5个客户端线程和一个服务器线程,运行结果如下:
[ClientThread
-
0
] send request: Request
-
4984
[ServerThread] handle request: Request
-
4984
[ClientThread
-
1
] send request: Request
-
2020
[ClientThread
-
2
] send request: Request
-
8980
[ClientThread
-
3
] send request: Request
-
5044
[ClientThread
-
4
] send request: Request
-
548
[ClientThread
-
4
] send request: Request
-
6832
[ServerThread] handle request: Request
-
2020
[ServerThread] handle request: Request
-
8980
[ServerThread] handle request: Request
-
5044
[ServerThread] handle request: Request
-
548
[ClientThread
-
4
] send request: Request
-
1681
[ClientThread
-
0
] send request: Request
-
7859
[ClientThread
-
3
] send request: Request
-
3926
[ServerThread] handle request: Request
-
6832
[ClientThread
-
2
] send request: Request
-
9906
可以观察到ServerThread处理来自不同客户端的请求。
思考
Q: 服务器线程的wait条件while(queue.size()
==
0
)能否换成if(queue.size()
==
0
)
?
A: 在这个例子中可以,因为服务器线程只有一个。但是,如果服务器线程有多个(例如Web应用程序有多个线程处理并发请求,这非常普遍),就会造成严重问题。
Q: 能否用sleep(
1000
)代替wait()
?
A: 绝对不可以。sleep()不会释放锁,因此sleep期间别的线程根本没有办法调用getRequest()和putRequest(),导致所有相关线程都被阻塞。
Q: (Request)queue.remove(
0
)可以放到synchronized()
{}
块外面吗?
A: 不可以。因为while()是测试queue,remove()是使用queue,两者是一个原子操作,不能放在synchronized外面。
总结
多线程设计看似简单,实际上必须非常仔细地考虑各种锁定
/
同步的条件,稍不小心,就可能出错。并且,当线程较少时,很可能发现不了问题,一旦问题出现又难以调试。
所幸的是,已有一些被验证过的模式可以供我们使用,我们会继续介绍一些常用的多线程设计模式。
前面谈了多线程应用程序能极大地改善用户相应。例如对于一个Web应用程序,每当一个用户请求服务器连接时,服务器就可以启动一个新线程为用户服务。
然而,创建和销毁线程本身就有一定的开销,如果频繁创建和销毁线程,CPU和内存开销就不可忽略,垃圾收集器还必须负担更多的工作。因此,线程池就是为了避免频繁创建和销毁线程。
每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁创建和销毁线程。
Worker Pattern实现了类似线程池的功能。首先定义Task接口:
package
com.crackj2ee.thread;
public
interface
Task
{
void
execute();
}
线程将负责执行execute()方法。注意到任务是由子类通过实现execute()方法实现的,线程本身并不知道自己执行的任务。它只负责运行一个耗时的execute()方法。
具体任务由子类实现,我们定义了一个CalculateTask和一个TimerTask:
//
CalculateTask.java
package
com.crackj2ee.thread;
public
class
CalculateTask
implements
Task
{
private
static
int
count
=
0
;
private
int
num
=
count;
public
CalculateTask()
{
count
++
;
}
public
void
execute()
{
System.out.println(
"
[CalculateTask
"
+
num
+
"
] start
"
);
try
{
Thread.sleep(
3000
);
}
catch
(InterruptedException ie)
{}
System.out.println(
"
[CalculateTask
"
+
num
+
"
] done.
"
);
}
}
//
TimerTask.java
package
com.crackj2ee.thread;
public
class
TimerTask
implements
Task
{
private
static
int
count
=
0
;
private
int
num
=
count;
public
TimerTask()
{
count
++
;
}
public
void
execute()
{
System.out.println(
"
[TimerTask
"
+
num
+
"
] start
"
);
try
{
Thread.sleep(
2000
);
}
catch
(InterruptedException ie)
{}
System.out.println(
"
[TimerTask
"
+
num
+
"
] done.
"
);
}
}
以上任务均简单的sleep若干秒。
TaskQueue实现了一个队列,客户端可以将请求放入队列,服务器线程可以从队列中取出任务:
package
com.crackj2ee.thread;
import
java.util.
*
;
public
class
TaskQueue
{
private
List queue
=
new
LinkedList();
public
synchronized
Task getTask()
{
while
(queue.size()
==
0
)
{
try
{
this
.wait();
}
catch
(InterruptedException ie)
{
return
null
;
}
}
return
(Task)queue.remove(
0
);
}
public
synchronized
void
putTask(Task task)
{
queue.add(task);
this
.notifyAll();
}
}
终于到了真正的WorkerThread,这是真正执行任务的服务器线程:
package
com.crackj2ee.thread;
public
class
WorkerThread
extends
Thread
{
private
static
int
count
=
0
;
private
boolean
busy
=
false
;
private
boolean
stop
=
false
;
private
TaskQueue queue;
public
WorkerThread(ThreadGroup group, TaskQueue queue)
{
super
(group,
"
worker-
"
+
count);
count
++
;
this
.queue
=
queue;
}
public
void
shutdown()
{
stop
=
true
;
this
.interrupt();
try
{
this
.join();
}
catch
(InterruptedException ie)
{}
}
public
boolean
isIdle()
{
return
!
busy;
}
public
void
run()
{
System.out.println(getName()
+
"
start.
"
);
while
(
!
stop)
{
Task task
=
queue.getTask();
if
(task
!=
null
)
{
busy
=
true
;
task.execute();
busy
=
false
;
}
}
System.out.println(getName()
+
"
end.
"
);
}
}
前面已经讲过,queue.getTask()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,WorkerThread还有一个shutdown方法,用于安全结束线程。
最后是ThreadPool,负责管理所有的服务器线程,还可以动态增加和减少线程数:
package
com.crackj2ee.thread;
import
java.util.
*
;
public
class
ThreadPool
extends
ThreadGroup
{
private
List threads
=
new
LinkedList();
private
TaskQueue queue;
public
ThreadPool(TaskQueue queue)
{
super
(
"
Thread-Pool
"
);
this
.queue
=
queue;
}
public
synchronized
void
addWorkerThread()
{
Thread t
=
new
WorkerThread(
this
, queue);
threads.add(t);
t.start();
}
public
synchronized
void
removeWorkerThread()
{
if
(threads.size()
>
0
)
{
WorkerThread t
=
(WorkerThread)threads.remove(
0
);
t.shutdown();
}
}
public
synchronized
void
currentStatus()
{
System.out.println(
"
-----------------------------------------------
"
);
System.out.println(
"
Thread count =
"
+
threads.size());
Iterator it
=
threads.iterator();
while
(it.hasNext())
{
WorkerThread t
=
(WorkerThread)it.next();
System.out.println(t.getName()
+
"
:
"
+
(t.isIdle()
?
"
idle
"
:
"
busy
"
));
}
System.out.println(
"
-----------------------------------------------
"
);
}
}
currentStatus()方法是为了方便调试,打印出所有线程的当前状态。
最后,Main负责完成main()方法:
package
com.crackj2ee.thread;
public
class
Main
{
public
static
void
main(String[] args)
{
TaskQueue queue
=
new
TaskQueue();
ThreadPool pool
=
new
ThreadPool(queue);
for
(
int
i
=
0
; i
<
10
; i
++
)
{
queue.putTask(
new
CalculateTask());
queue.putTask(
new
TimerTask());
}
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(
8000
);
pool.currentStatus();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(
5000
);
pool.currentStatus();
}
private
static
void
doSleep(
long
ms)
{
try
{
Thread.sleep(ms);
}
catch
(InterruptedException ie)
{}
}
}
main()一开始放入了20个Task,然后动态添加了一些服务线程,并定期打印线程状态,运行结果如下:
worker
-
0
start.
[CalculateTask
0
] start
worker
-
1
start.
[TimerTask
0
] start
[TimerTask
0
] done.
[CalculateTask
1
] start
[CalculateTask
0
] done.
[TimerTask
1
] start
[CalculateTask
1
] done.
[CalculateTask
2
] start
[TimerTask
1
] done.
[TimerTask
2
] start
[TimerTask
2
] done.
[CalculateTask
3
] start
-----------------------------------------------
Thread count
=
2
worker
-
0
: busy
worker
-
1
: busy
-----------------------------------------------
[CalculateTask
2
] done.
[TimerTask
3
] start
worker
-
2
start.
[CalculateTask
4
] start
worker
-
3
start.
[TimerTask
4
] start
worker
-
4
start.
[CalculateTask
5
] start
worker
-
5
start.
[TimerTask
5
] start
worker
-
6
start.
[CalculateTask
6
] start
[CalculateTask
3
] done.
[TimerTask
6
] start
[TimerTask
3
] done.
[CalculateTask
7
] start
[TimerTask
4
] done.
[TimerTask
7
] start
[TimerTask
5
] done.
[CalculateTask
8
] start
[CalculateTask
4
] done.
[TimerTask
8
] start
[CalculateTask
5
] done.
[CalculateTask
9
] start
[CalculateTask
6
] done.
[TimerTask
9
] start
[TimerTask
6
] done.
[TimerTask
7
] done.
-----------------------------------------------
Thread count
=
7
worker
-
0
: idle
worker
-
1
: busy
worker
-
2
: busy
worker
-
3
: idle
worker
-
4
: busy
worker
-
5
: busy
worker
-
6
: busy
-----------------------------------------------
[CalculateTask
7
] done.
[CalculateTask
8
] done.
[TimerTask
8
] done.
[TimerTask
9
] done.
[CalculateTask
9
] done.
仔细观察:一开始只有两个服务器线程,因此线程状态都是忙,后来线程数增多,6个线程中的两个状态变成idle,说明处于wait()状态。
思考:本例的线程调度算法其实根本没有,因为这个应用是围绕TaskQueue设计的,不是以Thread Pool为中心设计的。因此,Task调度取决于TaskQueue的getTask()方法,你可以改进这个方法,例如使用优先队列,使优先级高的任务先被执行。
如果所有的服务器线程都处于busy状态,则说明任务繁忙,TaskQueue的队列越来越长,最终会导致服务器内存耗尽。因此,可以限制 TaskQueue的等待任务数,超过最大长度就拒绝处理。许多Web服务器在用户请求繁忙时就会拒绝用户:HTTP
503
SERVICE UNAVAILABLE
多线程读写同一个对象的数据是很普遍的,通常,要避免读写冲突,必须保证任何时候仅有一个线程在写入,有线程正在读取的时候,写入操作就必须等待。简单说,就是要避免“写
-
写”冲突和“读
-
写”冲突。但是同时读是允许的,因为“读
-
读”不冲突,而且很安全。
要实现以上的ReadWriteLock,简单的使用synchronized就不行,我们必须自己设计一个ReadWriteLock类,在读之前,必须先获得“读锁”,写之前,必须先获得“写锁”。举例说明:
DataHandler对象保存了一个可读写的char[]数组:
package
com.crackj2ee.thread;
public
class
DataHandler
{
//
store data:
private
char
[] buffer
=
"
AAAAAAAAAA
"
.toCharArray();
private
char
[] doRead()
{
char
[] ret
=
new
char
[buffer.length];
for
(
int
i
=
0
; i
<
buffer.length; i
++
)
{
ret[i]
=
buffer[i];
sleep(
3
);
}
return
ret;
}
private
void
doWrite(
char
[] data)
{
if
(data
!=
null
)
{
buffer
=
new
char
[data.length];
for
(
int
i
=
0
; i
<
buffer.length; i
++
)
{
buffer[i]
=
data[i];
sleep(
10
);
}
}
}
private
void
sleep(
int
ms)
{
try
{
Thread.sleep(ms);
}
catch
(InterruptedException ie)
{}
}
}
doRead()和doWrite()方法是非线程安全的读写方法。为了演示,加入了sleep(),并设置读的速度大约是写的3倍,这符合通常的情况。
为了让多线程能安全读写,我们设计了一个ReadWriteLock:
package
com.crackj2ee.thread;
public
class
ReadWriteLock
{
private
int
readingThreads
=
0
;
private
int
writingThreads
=
0
;
private
int
waitingThreads
=
0
;
//
waiting for write
private
boolean
preferWrite
=
true
;
public
synchronized
void
readLock()
throws
InterruptedException
{
while
(writingThreads
>
0
||
(preferWrite
&&
waitingThreads
>
0
))
this
.wait();
readingThreads
++
;
}
public
synchronized
void
readUnlock()
{
readingThreads
--
;
preferWrite
=
true
;
notifyAll();
}
public
synchronized
void
writeLock()
throws
InterruptedException
{
waitingThreads
++
;
try
{
while
(readingThreads
>
0
||
writingThreads
>
0
)
this
.wait();
}
finally
{
waitingThreads
--
;
}
writingThreads
++
;
}
public
synchronized
void
writeUnlock()
{
writingThreads
--
;
preferWrite
=
false
;
notifyAll();
}
}
readLock()用于获得读锁,readUnlock()释放读锁,writeLock()和writeUnlock()一样。由于锁用完必须释放,因此,必须保证lock和unlock匹配。我们修改DataHandler,加入ReadWriteLock:
package
com.crackj2ee.thread;
public
class
DataHandler
{
//
store data:
private
char
[] buffer
=
"
AAAAAAAAAA
"
.toCharArray();
//
lock:
private
ReadWriteLock lock
=
new
ReadWriteLock();
public
char
[] read(String name)
throws
InterruptedException
{
System.out.println(name
+
"
waiting for read
"
);
lock.readLock();
try
{
char
[] data
=
doRead();
System.out.println(name
+
"
reads data:
"
+
new
String(data));
return
data;
}
finally
{
lock.readUnlock();
}
}
public
void
write(String name,
char
[] data)
throws
InterruptedException
{
System.out.println(name
+
"
waiting for write
"
);
lock.writeLock();
try
{
System.out.println(name
+
"
wrote data:
"
+
new
String(data));
doWrite(data);
}
finally
{
lock.writeUnlock();
}
}
private
char
[] doRead()
{
char
[] ret
=
new
char
[buffer.length];
for
(
int
i
=
0
; i
<
buffer.length; i
++
)
{
ret[i]
=
buffer[i];
sleep(
3
);
}
return
ret;
}
private
void
doWrite(
char
[] data)
{
if
(data
!=
null
)
{
buffer
=
new
char
[data.length];
for
(
int
i
=
0
; i
<
buffer.length; i
++
)
{
buffer[i]
=
data[i];
sleep(
10
);
}
}
}
private
void
sleep(
int
ms)
{
try
{
Thread.sleep(ms);
}
catch
(InterruptedException ie)
{}
}
}
public方法read()和write()完全封装了底层的ReadWriteLock,因此,多线程可以安全地调用这两个方法:
//
ReadingThread不断读取数据:
package
com.crackj2ee.thread;
public
class
ReadingThread
extends
Thread
{
private
DataHandler handler;
public
ReadingThread(DataHandler handler)
{
this
.handler
=
handler;
}
public
void
run()
{
for
(;;)
{
try
{
char
[] data
=
handler.read(getName());
Thread.sleep((
long
)(Math.random()
*
1000
+
100
));
}
catch
(InterruptedException ie)
{
break
;
}
}
}
}
//
WritingThread不断写入数据,每次写入的都是10个相同的字符:
package
com.crackj2ee.thread;
public
class
WritingThread
extends
Thread
{
private
DataHandler handler;
public
WritingThread(DataHandler handler)
{
this
.handler
=
handler;
}
public
void
run()
{
char
[] data
=
new
char
[
10
];
for
(;;)
{
try
{
fill(data);
handler.write(getName(), data);
Thread.sleep((
long
)(Math.random()
*
1000
+
100
));
}
catch
(InterruptedException ie)
{
break
;
}
}
}
//
产生一个A-Z随机字符,填入char[10]:
private
void
fill(
char
[] data)
{
char
c
=
(
char
)(Math.random()
*
26
+
'
A
'
);
for
(
int
i
=
0
; i
<
data.length; i
++
)
data[i]
=
c;
}
}
最后Main负责启动这些线程:
package
com.crackj2ee.thread;
public
class
Main
{
public
static
void
main(String[] args)
{
DataHandler handler
=
new
DataHandler();
Thread[] ts
=
new
Thread[]
{
new
ReadingThread(handler),
new
ReadingThread(handler),
new
ReadingThread(handler),
new
ReadingThread(handler),
new
ReadingThread(handler),
new
WritingThread(handler),
new
WritingThread(handler)
}
;
for
(
int
i
=
0
; i
<
ts.length; i
++
)
{
ts[i].start();
}
}
}
我们启动了5个读线程和2个写线程,运行结果如下:
Thread
-
0
waiting
for
read
Thread
-
1
waiting
for
read
Thread
-
2
waiting
for
read
Thread
-
3
waiting
for
read
Thread
-
4
waiting
for
read
Thread
-
5
waiting
for
write
Thread
-
6
waiting
for
write
Thread
-
4
reads data: AAAAAAAAAA
Thread
-
3
reads data: AAAAAAAAAA
Thread
-
2
reads data: AAAAAAAAAA
Thread
-
1
reads data: AAAAAAAAAA
Thread
-
0
reads data: AAAAAAAAAA
Thread
-
5
wrote data: EEEEEEEEEE
Thread
-
6
wrote data: MMMMMMMMMM
Thread
-
1
waiting
for
read
Thread
-
4
waiting
for
read
Thread
-
1
reads data: MMMMMMMMMM
Thread
-
4
reads data: MMMMMMMMMM
Thread
-
2
waiting
for
read
Thread
-
2
reads data: MMMMMMMMMM
Thread
-
0
waiting
for
read
Thread
-
0
reads data: MMMMMMMMMM
Thread
-
4
waiting
for
read
Thread
-
4
reads data: MMMMMMMMMM
Thread
-
2
waiting
for
read
Thread
-
5
waiting
for
write
Thread
-
2
reads data: MMMMMMMMMM
Thread
-
5
wrote data: GGGGGGGGGG
Thread
-
6
waiting
for
write
Thread
-
6
wrote data: AAAAAAAAAA
Thread
-
3
waiting
for
read
Thread
-
3
reads data: AAAAAAAAAA
可以看到,每次读
/
写都是完整的原子操作,因为我们每次写入的都是10个相同字符。并且,每次读出的都是最近一次写入的内容。
如果去掉ReadWriteLock:
package
com.crackj2ee.thread;
public
class
DataHandler
{
//
store data:
private
char
[] buffer
=
"
AAAAAAAAAA
"
.toCharArray();
public
char
[] read(String name)
throws
InterruptedException
{
char
[] data
=
doRead();
System.out.println(name
+
"
reads data:
"
+
new
String(data));
return
data;
}
public
void
write(String name,
char
[] data)
throws
InterruptedException
{
System.out.println(name
+
"
wrote data:
"
+
new
String(data));
doWrite(data);
}
private
char
[] doRead()
{
char
[] ret
=
new
char
[
10
];
for
(
int
i
=
0
; i
<
10
; i
++
)
{
ret[i]
=
buffer[i];
sleep(
3
);
}
return
ret;
}
private
void
doWrite(
char
[] data)
{
for
(
int
i
=
0
; i
<
10
; i
++
)
{
buffer[i]
=
data[i];
sleep(
10
);
}
}
private
void
sleep(
int
ms)
{
try
{
Thread.sleep(ms);
}
catch
(InterruptedException ie)
{}
}
}
运行结果如下:
Thread
-
5
wrote data: AAAAAAAAAA
Thread
-
6
wrote data: MMMMMMMMMM
Thread
-
0
reads data: AAAAAAAAAA
Thread
-
1
reads data: AAAAAAAAAA
Thread
-
2
reads data: AAAAAAAAAA
Thread
-
3
reads data: AAAAAAAAAA
Thread
-
4
reads data: AAAAAAAAAA
Thread
-
2
reads data: MAAAAAAAAA
Thread
-
3
reads data: MAAAAAAAAA
Thread
-
5
wrote data: CCCCCCCCCC
Thread
-
1
reads data: MAAAAAAAAA
Thread
-
0
reads data: MAAAAAAAAA
Thread
-
4
reads data: MAAAAAAAAA
Thread
-
6
wrote data: EEEEEEEEEE
Thread
-
3
reads data: EEEEECCCCC
Thread
-
4
reads data: EEEEEEEEEC
Thread
-
1
reads data: EEEEEEEEEE
可以看到在Thread
-
6写入EEEEEEEEEE的过程中,3个线程读取的内容是不同的。
思考
java的synchronized提供了最底层的物理锁,要在synchronized的基础上,实现自己的逻辑锁,就必须仔细设计ReadWriteLock。
Q: lock.readLock()为什么不放入try
{ }
内?
A: 因为readLock()会抛出InterruptedException,导致readingThreads
++
不执行,而readUnlock()在
finally
{ }
中,导致readingThreads
--
执行,从而使readingThread状态出错。writeLock()也是类似的。
Q: preferWrite有用吗?
A: 如果去掉preferWrite,线程安全不受影响。但是,如果读取线程很多,上一个线程还没有读取完,下一个线程又开始读了,就导致写入线程长时间无法获得writeLock;如果写入线程等待的很多,一个接一个写,也会导致读取线程长时间无法获得readLock。preferWrite的作用是让读
/
写交替执行,避免由于读线程繁忙导致写无法进行和由于写线程繁忙导致读无法进行。
Q: notifyAll()换成notify()行不行?
A: 不可以。由于preferWrite的存在,如果一个线程刚读取完毕,此时preferWrite
=
true
,再notify(),若恰好唤醒的是一个读线程,则while(writingThreads
>
0
||
(preferWrite
&&
waitingThreads
>
0
))可能为true导致该读线程继续等待,而等待写入的线程也处于wait()中,结果所有线程都处于wait ()状态,谁也无法唤醒谁。因此,notifyAll()比notify()要来得安全。程序验证notify()带来的死锁:
Thread
-
0
waiting
for
read
Thread
-
1
waiting
for
read
Thread
-
2
waiting
for
read
Thread
-
3
waiting
for
read
Thread
-
4
waiting
for
read
Thread
-
5
waiting
for
write
Thread
-
6
waiting
for
write
Thread
-
0
reads data: AAAAAAAAAA
Thread
-
4
reads data: AAAAAAAAAA
Thread
-
3
reads data: AAAAAAAAAA
Thread
-
2
reads data: AAAAAAAAAA
Thread
-
1
reads data: AAAAAAAAAA
Thread
-
5
wrote data: CCCCCCCCCC
Thread
-
2
waiting
for
read
Thread
-
1
waiting
for
read
Thread
-
3
waiting
for
read
Thread
-
0
waiting
for
read
Thread
-
4
waiting
for
read
Thread
-
6
wrote data: LLLLLLLLLL
Thread
-
5
waiting
for
write
Thread
-
6
waiting
for
write
Thread
-
2
reads data: LLLLLLLLLL
Thread
-
2
waiting
for
read
(运行到此不动了)
注意到这种死锁是由于所有线程都在等待别的线程唤醒自己,结果都无法醒过来。这和两个线程希望获得对方已有的锁造成死锁不同。因此多线程设计的难度远远高于单线程应用。
posted on 2006-02-01 12:34
方佳玮
阅读(2673)
评论(1)
编辑
收藏
所属分类:
J2se 基础
评论
#
re: java多线程设计模式
2013-03-05 23:54
明明的JavaBlog
高质量文章
回复
更多评论
新用户注册
刷新评论列表
只有注册用户
登录
后才能发表评论。
网站导航:
博客园
IT新闻
知识库
C++博客
博问
管理
相关文章:
一篇不错的讲解Java异常的文章
java多线程设计模式
中文字符串长度计算
Powered by:
BlogJava
Copyright © 方佳玮