月挂夜中央

懒惰程序员

常用链接

统计

最新评论

springside3.*中log4j和java.util.concurrent的结合使用

        在springside3.*中的showcase案例中,有一个把log4j的日志存入数据库的演示,下面是我对这个案例的学习笔记。
1、我们首先来看下log4j相关日志的配置:
#Async Database Appender (Store business message)
log4j.appender.DB
=org.springside.examples.showcase.log.appender.QueueAppender
log4j.appender.DB.QueueName
=dblog

#Demo level with Async Database appender 
log4j.logger.DBLogExample
=INFO,Console,DB
log4j.additivity.DBLogExample
=false

其中org.springside.examples.showcase.log.appender.QueueAppender就是对ssLog4j日志的一个扩展,而日志的event(里面是日志的内容)是存放在一个BlockingQueue中,当有多个日志需要分别存入不同的地方时,就根据QueryName来区分。
2、接下来看一下org.springside.examples.showcase.log.appender.QueueAppender里面的内容:
/**
 * Copyright (c) 2005-2009 springside.org.cn
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * 
 * $Id: QueueAppender.java 1189 2010-09-01 17:24:12Z calvinxiu $
 
*/

package org.springside.examples.showcase.log.appender;

import java.util.concurrent.BlockingQueue;

import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import org.springside.examples.showcase.queue.QueuesHolder;

/**
 * 轻量级的Log4j异步Appender.
 * 
 * 将所有消息放入QueueManager所管理的Blocking Queue中.
 * 
 * 
@see QueuesHolder
 * 
 * 
@author calvin
 
*/

public class QueueAppender extends org.apache.log4j.AppenderSkeleton {

    
protected String queueName;

    
protected BlockingQueue<LoggingEvent> queue;

    
/**
     * AppenderSkeleton回调函数, 事件到达时将时间放入Queue.
     
*/

    @Override
    
public void append(LoggingEvent event) {
        
if (queue == null{
            queue 
= QueuesHolder.getQueue(queueName);
        }


        
boolean sucess = queue.offer(event);

        
if (sucess) {
            LogLog.debug(
"put event to queue success:" + new LoggingEventWrapper(event).convertToString());

        }
 else {
            LogLog.error(
"Put event to queue fail:" + new LoggingEventWrapper(event).convertToString());
        }

    }


    
/**
     * AppenderSkeleton回调函数,关闭Logger时的清理动作.
     
*/

    
public void close() {
    }


    
/**
     * AppenderSkeleton回调函数, 设置是否需要定义Layout.
     
*/

    
public boolean requiresLayout() {
        
return false;
    }


    
/**
     * Log4j根据getter/setter从log4j.properties中注入同名参数.
     
*/

    
public String getQueueName() {
        
return queueName;
    }


    
/**
     * 
@see #getQueueName()
     
*/

    
public void setQueueName(String queueName) {
        
this.queueName = queueName;
    }

}

这是对Log4j扩展的标准做法,继承abstract class AppenderSkeleton,实现它的abstract  protected   void append(LoggingEvent event) 方法。
而这个方法的实现很简单,就是根据queueName从queuesHolder中取出一个BlockingQueue<LoggingEvent>,然后把LoggerEvent塞到这个BlockingQueue中去,关于queuesHolder,下面会讲到。到这为止,log4j的活就完成了,下面的都是concurrent的事了。
3、让我们转到spring的配置文件中,看看springside是如何接收下面的工作,下面是applicationContext-log.xml的片段:
    <!-- 消息Queue管理器-->
    
<bean class="org.springside.examples.showcase.queue.QueuesHolder">
        
<property name="queueSize" value="1000" />
    
</bean>

    
<!-- 读出Queue中日志消息写入数据库的任务 -->
    
<bean id="jdbcLogWriter" class="org.springside.examples.showcase.log.appender.JdbcLogWriter">
        
<property name="queueName" value="dblog" />
        
<property name="batchSize" value="10" />
        
<property name="sql">
            
<value>
                insert into SS_LOG(THREAD_NAME,LOGGER_NAME,LOG_TIME,LEVEL,MESSAGE)
                values(:thread_name,:logger_name,:log_time,:level,:message)
            
</value>
        
</property>
    
</bean>

我们先从简单的下手,先看QueuesHolder:
private static ConcurrentMap<String, BlockingQueue> queueMap = new MapMaker().concurrencyLevel(32).makeMap();//消息队列

/**
     * 根据queueName获得消息队列的静态函数.
     * 如消息队列还不存在, 会自动进行创建.
     
*/

    
public static <T> BlockingQueue<T> getQueue(String queueName) {
        BlockingQueue queue 
= queueMap.get(queueName);

        
if (queue == null{
            BlockingQueue newQueue 
= new LinkedBlockingQueue(queueSize);

            
//如果之前消息队列还不存在,放入新队列并返回Null.否则返回之前的值.
            queue = queueMap.putIfAbsent(queueName, newQueue);
            
if (queue == null{
                queue 
= newQueue;
            }

        }

        
return queue;
    }
其实这个类很简单,就是一个map,key就是上面log4j配置文件中的queueName,value就是一个BlockingQueue,这样就可以存放多个日志queue,做不同的处理。
4、下面这个是重头戏,先把JdbcLogWriter的代码全贴出来:
/**
 * Copyright (c) 2005-2009 springside.org.cn
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * 
 * $Id: JdbcAppenderTask.java 353 2009-08-22 09:33:28Z calvinxiu
 
*/

package org.springside.examples.showcase.log.appender;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.annotation.Resource;
import javax.sql.DataSource;

import org.apache.log4j.spi.LoggingEvent;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springside.examples.showcase.queue.BlockingConsumer;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

/**
 * 将Queue中的log4j event写入数据库的消费者任务.
 * 
 * 即时阻塞的读取Queue中的事件,达到缓存上限后使用Jdbc批量写入模式.
 * 如需换为定时读取模式,继承于PeriodConsumer稍加改造即可.
 * 
 * 
@see BlockingConsumer
 * 
 * 
@author calvin
 
*/

public class JdbcLogWriter extends BlockingConsumer {

    
protected String sql;
    
protected int batchSize = 10;

    
protected List<LoggingEvent> eventsBuffer = Lists.newArrayList();
    
protected SimpleJdbcTemplate jdbcTemplate;
    
protected TransactionTemplate transactionTemplate;

    
/**
     * 带Named Parameter的insert sql.
     * 
     * Named Parameter的名称见AppenderUtils中的常量定义.
     
*/

    
public void setSql(String sql) {
        
this.sql = sql;
    }


    
/**
     * 批量读取事件数量, 默认为10.
     
*/

    
public void setBatchSize(int batchSize) {
        
this.batchSize = batchSize;
    }


    
/**
     * 根据注入的DataSource创建jdbcTemplate.
     
*/

    @Resource
    
public void setDataSource(DataSource dataSource) {
        jdbcTemplate 
= new SimpleJdbcTemplate(dataSource);
    }


    
/**
     * 根据注入的PlatformTransactionManager创建transactionTemplate.
     
*/

    @Resource
    
public void setDefaultTransactionManager(PlatformTransactionManager defaultTransactionManager) {
        transactionTemplate 
= new TransactionTemplate(defaultTransactionManager);
    }


    
/**
     * 消息处理函数,将消息放入buffer,当buffer达到batchSize时执行批量更新函数.
     
*/

    @Override
    
protected void processMessage(Object message) {
        LoggingEvent event 
= (LoggingEvent) message;
        eventsBuffer.add(event);

        
if (logger.isDebugEnabled()) {
            logger.debug(
"get event: {}"new LoggingEventWrapper(event).convertToString());
        }


        
//已到达BufferSize则执行批量插入操作
        if (eventsBuffer.size() >= batchSize) {
            updateBatch();
        }

    }


    
/**
     * 将Buffer中的事件列表批量插入数据库.
     
*/

    @SuppressWarnings(
"unchecked")
    
public void updateBatch() {
        
try {
            
//分析事件列表, 转换为jdbc批处理参数.
            int i = 0;
            Map[] paramMapArray 
= new HashMap[eventsBuffer.size()];
            
for (LoggingEvent event : eventsBuffer) {
                paramMapArray[i
++= parseEvent(event);
            }

            
final SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(paramMapArray);

            
//执行批量插入,如果失败调用失败处理函数.
            transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                @Override
                
protected void doInTransactionWithoutResult(TransactionStatus status) {
                    
try {
                        jdbcTemplate.batchUpdate(getActualSql(), batchParams);
                        
if (logger.isDebugEnabled()) {
                            
for (LoggingEvent event : eventsBuffer) {
                                logger.debug(
"saved event: {}"new LoggingEventWrapper(event).convertToString());
                            }

                        }

                    }
 catch (DataAccessException e) {
                        status.setRollbackOnly();
                        handleDataAccessException(e, eventsBuffer);
                    }

                }

            }
);

            
//清除已完成的Buffer
            eventsBuffer.clear();
        }
 catch (Exception e) {
            logger.error(
"批量提交任务时发生错误.", e);
        }

    }


    
/**
     * 退出清理函数,完成buffer中未完成的消息.
     
*/

    @Override
    
protected void clean() {
        
if (!eventsBuffer.isEmpty()) {
            updateBatch();
        }

        logger.debug(
"cleaned task {}"this);
    }


    
/**
     * 分析Event, 建立Parameter Map, 用于绑定sql中的Named Parameter.
     
*/

    
protected Map<String, Object> parseEvent(LoggingEvent event) {
        Map
<String, Object> parameterMap = Maps.newHashMap();
        LoggingEventWrapper eventWrapper 
= new LoggingEventWrapper(event);

        parameterMap.put(
"thread_name", eventWrapper.getThreadName());
        parameterMap.put(
"logger_name", eventWrapper.getLoggerName());
        parameterMap.put(
"log_time", eventWrapper.getDate());
        parameterMap.put(
"level", eventWrapper.getLevel());
        parameterMap.put(
"message", eventWrapper.getMessage());
        
return parameterMap;
    }


    
/**
     * 可被子类重载的数据访问错误处理函数,如将出错的事件持久化到文件.
     
*/

    
protected void handleDataAccessException(DataAccessException e, List<LoggingEvent> errorEventBatch) {
        
if (e instanceof DataAccessResourceFailureException) {
            logger.error(
"database connection error", e);
        }
 else {
            logger.error(
"other database error", e);
        }


        
for (LoggingEvent event : errorEventBatch) {
            logger.error(
"event insert to database error, ignore it: "
                    
+ new LoggingEventWrapper(event).convertToString(), e);
        }

    }


    
/**
     * 可被子类重载的sql提供函数,可对sql语句进行特殊处理,如日志表的表名可带日期后缀 LOG_2009_02_31.
     
*/

    
protected String getActualSql() {
        
return sql;
    }

}

这个类的作用有
1)当没有处理的日志在1000以内时,不停执行日志的处理(设置在QueuesHolder中),超过1000,就报错(见QueueAppender的append方法).
2)每次都把一条日志放到buffer中,达到10条时开始批量的把日志入数据库,条数和入库的sql都写在上面的spring配置文件中。
可以看到,主要的方法就是processMessage。那么,这个processMessage方法是在哪里被调用的呢?
在上面的JdbcLogWriter类的代码中可以看到,它继承自BlockingConsumer,我们看看BlockingConsumer里面有些什么:
/**
 * Copyright (c) 2005-2009 springside.org.cn
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * 
 * $Id$
 
*/

package org.springside.examples.showcase.queue;

/**
 * 采用即时阻塞读取Queue中消息策略的Consumer.
 
*/

public abstract class BlockingConsumer extends QueueConsumer {

    
/**
     * 线程执行函数,阻塞获取消息并调用processMessage()进行处理.
     
*/

    
public void run() {
        
//循环阻塞获取消息直到线程被中断.
        try {
            
while (!Thread.currentThread().isInterrupted()) {
                Object message 
= queue.take();
                processMessage(message);
            }

        }
 catch (InterruptedException e) {
            
// Ignore.
        }
 finally {
            
//退出线程前调用清理函数.
            clean();
        }

    }


    
/**
     * 消息处理函数.
     
*/

    
protected abstract void processMessage(Object message);

    
/**
     * 退出清理函数.
     
*/

    
protected abstract void clean();
}
很明显,BlockingConsumer肯定是继承自Thread类或者实现于Runnable接口的线程类,在线程启动的时候processMessage方法被调用。当我们需要别的需要处理日志内容时,就可以继承BlockingConsumer写自己的processMessage来处理日志了。
5、下面,让我们看看这个线程类是怎么启动的吧。看一下BlockingConsumer就知道,它其实还继承于另外一个类QueueConsumer:
/**
 * Copyright (c) 2005-2009 springside.org.cn
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * 
 * $Id$
 
*/

package org.springside.examples.showcase.queue;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springside.modules.utils.ThreadUtils;
import org.springside.modules.utils.ThreadUtils.CustomizableThreadFactory;

/**
 * 单线程消费Queue中消息的任务基类.
 * 
 * 定义了QueueConsumer的启动关闭流程.
 * 
 * TODO:支持多线程执行.
 * 
 * 
@see QueuesHolder
 * 
 * 
@author calvin
 
*/

@SuppressWarnings(
"unchecked")
public abstract class QueueConsumer implements Runnable {

    
protected Logger logger = LoggerFactory.getLogger(getClass());

    
protected String queueName;
    
protected int shutdownTimeout = Integer.MAX_VALUE;

    
protected boolean persistence = true;
    
protected String persistencePath = System.getProperty("java.io.tmpdir"+ File.separator + "queue";
    
protected Object persistenceLock = new Object(); //用于在backup与restore间等待的锁.

    
protected BlockingQueue queue;
    
protected ExecutorService executor;

    
/**
     * 任务所消费的队列名称.
     
*/

    
public void setQueueName(String queueName) {
        
this.queueName = queueName;
    }


    
/**
     * 停止任务时最多等待的时间, 单位为毫秒.
     
*/

    
public void setShutdownTimeout(int shutdownTimeout) {
        
this.shutdownTimeout = shutdownTimeout;
    }


    
/**
     * 在JVM关闭时是否需要持久化未完成的消息到文件.
     
*/

    
public void setPersistence(boolean persistence) {
        
this.persistence = persistence;
    }


    
/**
     * 系统关闭时将队列中未处理的消息持久化到文件的目录,默认为系统临时文件夹下的queue目录.
     
*/

    
public void setPersistencePath(String persistencePath) {
        
this.persistencePath = persistencePath;
    }


    
/**
     * 任务初始化函数.
     
*/

    @PostConstruct
    
public void start() throws IOException, ClassNotFoundException, InterruptedException {

        queue 
= QueuesHolder.getQueue(queueName);

        
if (persistence) {
            
synchronized (persistenceLock) {
                restoreQueue();
            }

        }


        executor 
= Executors.newSingleThreadExecutor(new CustomizableThreadFactory("Queue Consumer-" + queueName));
        executor.execute(
this);
    }


    
/**
     * 任务结束函数.
     
*/

    @PreDestroy
    
public void stop() throws IOException {
        
try {
            ThreadUtils.normalShutdown(executor, shutdownTimeout, TimeUnit.MILLISECONDS);
        }
 finally {
            
if (persistence) {
                
synchronized (persistenceLock) {
                    backupQueue();
                }

            }

        }


    }


    
/**
     * 保存队列中的消息到文件.
     
*/

    
protected void backupQueue() throws IOException {
        List list 
= new ArrayList();
        queue.drainTo(list);

        
if (!list.isEmpty()) {
            ObjectOutputStream oos 
= null;
            
try {
                File file 
= new File(getPersistenceDir(), getPersistenceFileName());
                oos 
= new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
                
for (Object message : list) {
                    oos.writeObject(message);
                }

                logger.info(
"队列{}已持久化{}个消息到{}"new Object[] { queueName, list.size(), file.getAbsolutePath() });
            }
 finally {
                
if (oos != null{
                    oos.close();
                }

            }

        }
 else {
            logger.debug(
"队列{}为空,不需要持久化 .", queueName);
        }

    }


    
/**
     * 载入持久化文件中的消息到队列.
     
*/

    
protected void restoreQueue() throws ClassNotFoundException, IOException, InterruptedException {
        ObjectInputStream ois 
= null;
        File file 
= new File(getPersistenceDir(), getPersistenceFileName());

        
if (file.exists()) {
            
try {
                ois 
= new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
                
int count = 0;
                
while (true{
                    
try {
                        Object message 
= ois.readObject();
                        queue.put(message);
                        count
++;
                    }
 catch (EOFException e) {
                        
break;
                    }

                }

                logger.info(
"队列{}已从{}中恢复{}个消息."new Object[] { queueName, file.getAbsolutePath(), count });
            }
 finally {
                
if (ois != null{
                    ois.close();
                }

            }

            file.delete();
        }
 else {
            logger.debug(
"队列{}的持久化文件{}不存在", queueName, file.getAbsolutePath());
        }

    }


    
/**
     * 获取持久化文件路径.
     * 持久化文件默认路径为java.io.tmpdir/queue/队列名.
     * 如果java.io.tmpdir/queue/目录不存在,会进行创建.
     
*/

    
protected File getPersistenceDir() {
        File parentDir 
= new File(persistencePath + File.separator);
        
if (!parentDir.exists()) {
            parentDir.mkdirs();
        }

        
return parentDir;
    }


    
/**
     * 获取持久化文件的名称,默认为queueName,可重载.
     
*/

    
protected String getPersistenceFileName() {
        
return queueName;
    }

}

这里终于可以确信JdbcLogWriter是一个实现了Runnable的线程类了。我们先略过那些保存日志到文件的方法,关注它的启动方法start()。在start方法中,用到了concurrent包的Executors来执行线程任务。所以整个的过程是这样的:
1、spring随应用启动,创建QueuesHolder静态类用于存放多种queueName的日志queue;创建JdbcLogWriter开始启动线程,不停循环处理日志。
2、log4j随应用启动,并产生日志,把日志存到queue中(使用offer方法)。
3、JdbcLogWriter不停的把日志从queue中移出来(使用take方法)。
3、每当有10条日志生成,JdbcLogWriter的updateBatch方法就把日志批量入库,这个工作在processMesage方法里面。

这就是springside日志入库的整个过程了,兹以为记。
4、



我的微博 http://t.sina.com.cn/1401900445

posted on 2011-02-13 21:20 月挂夜中央 阅读(2202) 评论(0)  编辑  收藏 所属分类: java咖啡杯


只有注册用户登录后才能发表评论。


网站导航: