在springside3.*中的showcase案例中,有一个把log4j的日志存入数据库的演示,下面是我对这个案例的学习笔记。
1、我们首先来看下log4j相关日志的配置:
#Async Database Appender (Store business message)
log4j.appender.DB=org.springside.examples.showcase.log.appender.QueueAppender
log4j.appender.DB.QueueName=dblog
#Demo level with Async Database appender
log4j.logger.DBLogExample=INFO,Console,DB
log4j.additivity.DBLogExample=false
其中org.springside.examples.showcase.log.appender.QueueAppender就是对ssLog4j日志的一个扩展,而日志的event(里面是日志的内容)是存放在一个BlockingQueue中,当有多个日志需要分别存入不同的地方时,就根据QueryName来区分。
2、接下来看一下org.springside.examples.showcase.log.appender.QueueAppender里面的内容:
/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id: QueueAppender.java 1189 2010-09-01 17:24:12Z calvinxiu $
*/
package org.springside.examples.showcase.log.appender;
import java.util.concurrent.BlockingQueue;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import org.springside.examples.showcase.queue.QueuesHolder;
/** *//**
* 轻量级的Log4j异步Appender.
*
* 将所有消息放入QueueManager所管理的Blocking Queue中.
*
* @see QueuesHolder
*
* @author calvin
*/
public class QueueAppender extends org.apache.log4j.AppenderSkeleton {
protected String queueName;
protected BlockingQueue<LoggingEvent> queue;
/** *//**
* AppenderSkeleton回调函数, 事件到达时将时间放入Queue.
*/
@Override
public void append(LoggingEvent event) {
if (queue == null) {
queue = QueuesHolder.getQueue(queueName);
}
boolean sucess = queue.offer(event);
if (sucess) {
LogLog.debug("put event to queue success:" + new LoggingEventWrapper(event).convertToString());
} else {
LogLog.error("Put event to queue fail:" + new LoggingEventWrapper(event).convertToString());
}
}
/** *//**
* AppenderSkeleton回调函数,关闭Logger时的清理动作.
*/
public void close() {
}
/** *//**
* AppenderSkeleton回调函数, 设置是否需要定义Layout.
*/
public boolean requiresLayout() {
return false;
}
/** *//**
* Log4j根据getter/setter从log4j.properties中注入同名参数.
*/
public String getQueueName() {
return queueName;
}
/** *//**
* @see #getQueueName()
*/
public void setQueueName(String queueName) {
this.queueName = queueName;
}
}
这是对Log4j扩展的标准做法,继承abstract class AppenderSkeleton,实现它的abstract protected void append(LoggingEvent event) 方法。
而这个方法的实现很简单,就是根据queueName从queuesHolder中取出一个BlockingQueue<LoggingEvent>,然后把LoggerEvent塞到这个BlockingQueue中去,关于queuesHolder,下面会讲到。到这为止,log4j的活就完成了,下面的都是concurrent的事了。
3、让我们转到spring的配置文件中,看看springside是如何接收下面的工作,下面是applicationContext-log.xml的片段:
<!-- 消息Queue管理器-->
<bean class="org.springside.examples.showcase.queue.QueuesHolder">
<property name="queueSize" value="1000" />
</bean>
<!-- 读出Queue中日志消息写入数据库的任务 -->
<bean id="jdbcLogWriter" class="org.springside.examples.showcase.log.appender.JdbcLogWriter">
<property name="queueName" value="dblog" />
<property name="batchSize" value="10" />
<property name="sql">
<value>
insert into SS_LOG(THREAD_NAME,LOGGER_NAME,LOG_TIME,LEVEL,MESSAGE)
values(:thread_name,:logger_name,:log_time,:level,:message)
</value>
</property>
</bean>
我们先从简单的下手,先看QueuesHolder:
private static ConcurrentMap<String, BlockingQueue> queueMap = new MapMaker().concurrencyLevel(32).makeMap();//消息队列
/** *//**
* 根据queueName获得消息队列的静态函数.
* 如消息队列还不存在, 会自动进行创建.
*/
public static <T> BlockingQueue<T> getQueue(String queueName) {
BlockingQueue queue = queueMap.get(queueName);
if (queue == null) {
BlockingQueue newQueue = new LinkedBlockingQueue(queueSize);
//如果之前消息队列还不存在,放入新队列并返回Null.否则返回之前的值.
queue = queueMap.putIfAbsent(queueName, newQueue);
if (queue == null) {
queue = newQueue;
}
}
return queue;
}
其实这个类很简单,就是一个map,key就是上面log4j配置文件中的queueName,value就是一个BlockingQueue,这样就可以存放多个日志queue,做不同的处理。
4、下面这个是重头戏,先把JdbcLogWriter的代码全贴出来:
/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id: JdbcAppenderTask.java 353 2009-08-22 09:33:28Z calvinxiu
*/
package org.springside.examples.showcase.log.appender;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import javax.sql.DataSource;
import org.apache.log4j.spi.LoggingEvent;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springside.examples.showcase.queue.BlockingConsumer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/** *//**
* 将Queue中的log4j event写入数据库的消费者任务.
*
* 即时阻塞的读取Queue中的事件,达到缓存上限后使用Jdbc批量写入模式.
* 如需换为定时读取模式,继承于PeriodConsumer稍加改造即可.
*
* @see BlockingConsumer
*
* @author calvin
*/
public class JdbcLogWriter extends BlockingConsumer {
protected String sql;
protected int batchSize = 10;
protected List<LoggingEvent> eventsBuffer = Lists.newArrayList();
protected SimpleJdbcTemplate jdbcTemplate;
protected TransactionTemplate transactionTemplate;
/** *//**
* 带Named Parameter的insert sql.
*
* Named Parameter的名称见AppenderUtils中的常量定义.
*/
public void setSql(String sql) {
this.sql = sql;
}
/** *//**
* 批量读取事件数量, 默认为10.
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
/** *//**
* 根据注入的DataSource创建jdbcTemplate.
*/
@Resource
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new SimpleJdbcTemplate(dataSource);
}
/** *//**
* 根据注入的PlatformTransactionManager创建transactionTemplate.
*/
@Resource
public void setDefaultTransactionManager(PlatformTransactionManager defaultTransactionManager) {
transactionTemplate = new TransactionTemplate(defaultTransactionManager);
}
/** *//**
* 消息处理函数,将消息放入buffer,当buffer达到batchSize时执行批量更新函数.
*/
@Override
protected void processMessage(Object message) {
LoggingEvent event = (LoggingEvent) message;
eventsBuffer.add(event);
if (logger.isDebugEnabled()) {
logger.debug("get event: {}", new LoggingEventWrapper(event).convertToString());
}
//已到达BufferSize则执行批量插入操作
if (eventsBuffer.size() >= batchSize) {
updateBatch();
}
}
/** *//**
* 将Buffer中的事件列表批量插入数据库.
*/
@SuppressWarnings("unchecked")
public void updateBatch() {
try {
//分析事件列表, 转换为jdbc批处理参数.
int i = 0;
Map[] paramMapArray = new HashMap[eventsBuffer.size()];
for (LoggingEvent event : eventsBuffer) {
paramMapArray[i++] = parseEvent(event);
}
final SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(paramMapArray);
//执行批量插入,如果失败调用失败处理函数.
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
try {
jdbcTemplate.batchUpdate(getActualSql(), batchParams);
if (logger.isDebugEnabled()) {
for (LoggingEvent event : eventsBuffer) {
logger.debug("saved event: {}", new LoggingEventWrapper(event).convertToString());
}
}
} catch (DataAccessException e) {
status.setRollbackOnly();
handleDataAccessException(e, eventsBuffer);
}
}
});
//清除已完成的Buffer
eventsBuffer.clear();
} catch (Exception e) {
logger.error("批量提交任务时发生错误.", e);
}
}
/** *//**
* 退出清理函数,完成buffer中未完成的消息.
*/
@Override
protected void clean() {
if (!eventsBuffer.isEmpty()) {
updateBatch();
}
logger.debug("cleaned task {}", this);
}
/** *//**
* 分析Event, 建立Parameter Map, 用于绑定sql中的Named Parameter.
*/
protected Map<String, Object> parseEvent(LoggingEvent event) {
Map<String, Object> parameterMap = Maps.newHashMap();
LoggingEventWrapper eventWrapper = new LoggingEventWrapper(event);
parameterMap.put("thread_name", eventWrapper.getThreadName());
parameterMap.put("logger_name", eventWrapper.getLoggerName());
parameterMap.put("log_time", eventWrapper.getDate());
parameterMap.put("level", eventWrapper.getLevel());
parameterMap.put("message", eventWrapper.getMessage());
return parameterMap;
}
/** *//**
* 可被子类重载的数据访问错误处理函数,如将出错的事件持久化到文件.
*/
protected void handleDataAccessException(DataAccessException e, List<LoggingEvent> errorEventBatch) {
if (e instanceof DataAccessResourceFailureException) {
logger.error("database connection error", e);
} else {
logger.error("other database error", e);
}
for (LoggingEvent event : errorEventBatch) {
logger.error("event insert to database error, ignore it: "
+ new LoggingEventWrapper(event).convertToString(), e);
}
}
/** *//**
* 可被子类重载的sql提供函数,可对sql语句进行特殊处理,如日志表的表名可带日期后缀 LOG_2009_02_31.
*/
protected String getActualSql() {
return sql;
}
}
这个类的作用有
1)当没有处理的日志在1000以内时,不停执行日志的处理(设置在QueuesHolder中),超过1000,就报错(见QueueAppender的append方法).
2)每次都把一条日志放到buffer中,达到10条时开始批量的把日志入数据库,条数和入库的sql都写在上面的spring配置文件中。
可以看到,主要的方法就是processMessage。那么,这个processMessage方法是在哪里被调用的呢?
在上面的JdbcLogWriter类的代码中可以看到,它继承自BlockingConsumer,我们看看BlockingConsumer里面有些什么:
/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id$
*/
package org.springside.examples.showcase.queue;
/** *//**
* 采用即时阻塞读取Queue中消息策略的Consumer.
*/
public abstract class BlockingConsumer extends QueueConsumer {
/** *//**
* 线程执行函数,阻塞获取消息并调用processMessage()进行处理.
*/
public void run() {
//循环阻塞获取消息直到线程被中断.
try {
while (!Thread.currentThread().isInterrupted()) {
Object message = queue.take();
processMessage(message);
}
} catch (InterruptedException e) {
// Ignore.
} finally {
//退出线程前调用清理函数.
clean();
}
}
/** *//**
* 消息处理函数.
*/
protected abstract void processMessage(Object message);
/** *//**
* 退出清理函数.
*/
protected abstract void clean();
}
很明显,BlockingConsumer肯定是继承自Thread类或者实现于Runnable接口的线程类,在线程启动的时候processMessage方法被调用。当我们需要别的需要处理日志内容时,就可以继承BlockingConsumer写自己的processMessage来处理日志了。
5、下面,让我们看看这个线程类是怎么启动的吧。看一下BlockingConsumer就知道,它其实还继承于另外一个类QueueConsumer:
/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id$
*/
package org.springside.examples.showcase.queue;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springside.modules.utils.ThreadUtils;
import org.springside.modules.utils.ThreadUtils.CustomizableThreadFactory;
/** *//**
* 单线程消费Queue中消息的任务基类.
*
* 定义了QueueConsumer的启动关闭流程.
*
* TODO:支持多线程执行.
*
* @see QueuesHolder
*
* @author calvin
*/
@SuppressWarnings("unchecked")
public abstract class QueueConsumer implements Runnable {
protected Logger logger = LoggerFactory.getLogger(getClass());
protected String queueName;
protected int shutdownTimeout = Integer.MAX_VALUE;
protected boolean persistence = true;
protected String persistencePath = System.getProperty("java.io.tmpdir") + File.separator + "queue";
protected Object persistenceLock = new Object(); //用于在backup与restore间等待的锁.
protected BlockingQueue queue;
protected ExecutorService executor;
/** *//**
* 任务所消费的队列名称.
*/
public void setQueueName(String queueName) {
this.queueName = queueName;
}
/** *//**
* 停止任务时最多等待的时间, 单位为毫秒.
*/
public void setShutdownTimeout(int shutdownTimeout) {
this.shutdownTimeout = shutdownTimeout;
}
/** *//**
* 在JVM关闭时是否需要持久化未完成的消息到文件.
*/
public void setPersistence(boolean persistence) {
this.persistence = persistence;
}
/** *//**
* 系统关闭时将队列中未处理的消息持久化到文件的目录,默认为系统临时文件夹下的queue目录.
*/
public void setPersistencePath(String persistencePath) {
this.persistencePath = persistencePath;
}
/** *//**
* 任务初始化函数.
*/
@PostConstruct
public void start() throws IOException, ClassNotFoundException, InterruptedException {
queue = QueuesHolder.getQueue(queueName);
if (persistence) {
synchronized (persistenceLock) {
restoreQueue();
}
}
executor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("Queue Consumer-" + queueName));
executor.execute(this);
}
/** *//**
* 任务结束函数.
*/
@PreDestroy
public void stop() throws IOException {
try {
ThreadUtils.normalShutdown(executor, shutdownTimeout, TimeUnit.MILLISECONDS);
} finally {
if (persistence) {
synchronized (persistenceLock) {
backupQueue();
}
}
}
}
/** *//**
* 保存队列中的消息到文件.
*/
protected void backupQueue() throws IOException {
List list = new ArrayList();
queue.drainTo(list);
if (!list.isEmpty()) {
ObjectOutputStream oos = null;
try {
File file = new File(getPersistenceDir(), getPersistenceFileName());
oos = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
for (Object message : list) {
oos.writeObject(message);
}
logger.info("队列{}已持久化{}个消息到{}", new Object[] { queueName, list.size(), file.getAbsolutePath() });
} finally {
if (oos != null) {
oos.close();
}
}
} else {
logger.debug("队列{}为空,不需要持久化 .", queueName);
}
}
/** *//**
* 载入持久化文件中的消息到队列.
*/
protected void restoreQueue() throws ClassNotFoundException, IOException, InterruptedException {
ObjectInputStream ois = null;
File file = new File(getPersistenceDir(), getPersistenceFileName());
if (file.exists()) {
try {
ois = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
int count = 0;
while (true) {
try {
Object message = ois.readObject();
queue.put(message);
count++;
} catch (EOFException e) {
break;
}
}
logger.info("队列{}已从{}中恢复{}个消息.", new Object[] { queueName, file.getAbsolutePath(), count });
} finally {
if (ois != null) {
ois.close();
}
}
file.delete();
} else {
logger.debug("队列{}的持久化文件{}不存在", queueName, file.getAbsolutePath());
}
}
/** *//**
* 获取持久化文件路径.
* 持久化文件默认路径为java.io.tmpdir/queue/队列名.
* 如果java.io.tmpdir/queue/目录不存在,会进行创建.
*/
protected File getPersistenceDir() {
File parentDir = new File(persistencePath + File.separator);
if (!parentDir.exists()) {
parentDir.mkdirs();
}
return parentDir;
}
/** *//**
* 获取持久化文件的名称,默认为queueName,可重载.
*/
protected String getPersistenceFileName() {
return queueName;
}
}
这里终于可以确信JdbcLogWriter是一个实现了Runnable的线程类了。我们先略过那些保存日志到文件的方法,关注它的启动方法start()。在start方法中,用到了concurrent包的Executors来执行线程任务。所以整个的过程是这样的:
1、spring随应用启动,创建QueuesHolder静态类用于存放多种queueName的日志queue;创建JdbcLogWriter开始启动线程,不停循环处理日志。
2、log4j随应用启动,并产生日志,把日志存到queue中(使用offer方法)。
3、JdbcLogWriter不停的把日志从queue中移出来(使用take方法)。
3、每当有10条日志生成,JdbcLogWriter的updateBatch方法就把日志批量入库,这个工作在processMesage方法里面。
这就是springside日志入库的整个过程了,兹以为记。
4、
我的微博
http://t.sina.com.cn/1401900445