posts - 167,  comments - 30,  trackbacks - 0
#!/usr/bin/env python
#
 -*- coding: utf-8 -*-
#
===============================================================================
#
#
 Copyright (c) 2015 Letv.com, Inc. All Rights Reserved
#
#
 python2.6版本安装:
#
           yum install python-futures
#
           yum install python-qpid
#
           yum install supervisor  由supervisor来管理进程,当进程挂掉后自动监控并重启
#
 author: david
#
 date: 2015/10/14 16:42:02
#
#
===============================================================================
from qpid.messaging import *
import time, sys
import urllib2
import hashlib
import json
import traceback
from concurrent.futures import *
import logging
import logging.handlers

LOG_DIR = "/home/ldw/logs/geturl/online"
LOG_MID_FAIL=LOG_DIR + "/update_fail.log"
LOG_FILE_NAME=LOG_DIR + "/geturl_update.log"

# default test broker
QPID_QUEUE_NAME_TEST = "ldw.update_v.queue"
SERVERS_TEST = ['10.3.3.3']
SERVERS_PRO = ['10.1.1.1.''10.2.2.2']
BROKER_TEST = "xyz/xyz@10.11.1.1:5672"
BROKER_PRO = "xyz/xyz@10.11.1.2:5672"
RECONNECTION_URLS = ['']
SERVERS = SERVERS_TEST
BROKER = BROKER_TEST
QUEUE_NAME = QPID_QUEUE_NAME_TEST
THREAD_COUNT = 16

# 获取当前时间
def getNowTime():
    ISOTIMEFORMAT='%Y-%m-%d %H:%M:%S'
    return time.strftime(ISOTIMEFORMAT, time.localtime())

# 记录处理失败日志
def fail2log(id):
    # 使用with关键字,自动关闭文件流
    with  open(LOG_MID_FAIL,'a') as log:
        log.write('%s\n'%(id))

# 记录日志, 可以提出来共用
def init_logger():
    logging.basicConfig()
    logger = logging.getLogger("__name__")
    logger.setLevel(logging.INFO)
    when:S:Seconds M:Minutes D:Days H:Hours interval:  backupCount:0 not deleted
    # 1天更换一次文件日志,7为保留日志文件个数
    logger_fh = logging.handlers.TimedRotatingFileHandler(LOG_FILE_NAME, 'D', 1, 7)
    logger_fh.suffix = "%Y%m%d-%H%M.log"
    logger_fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
    logger.addHandler(logger_fh)
    return logger
_logger = init_logger()

# 初始化环境变量,当在局部函数中修改全局变量时,使用global关键字
def init_env(env):
    global BROKER
    # 其他变量类似处理
    if env != 'PRO':
        BROKER = BROKER_PRO
    _logger.info('env:%s,broker:%s,queue_name:%s,reconnnection_urls:%s, servers: %s, thread_pool_count:%s'
                 % (env,BROKER,QUEUE_NAME,str(RECONNECTION_URLS),str(SERVERS), str(THREAD_COUNT)))

# 处理业务逻辑
def process(id, ip):
    key = 'xyz';
    action = 'delete';
    tm = str(int(time.time()))
    sig = hashlib.md5(tm+action+key).hexdigest()
    url = "http://ip/test?sig="+sig
    try:
        req = urllib2.Request(url)
        # 绑定Host
        # req.add_header('Host', 'www.ldw.com')
        response = urllib2.urlopen(req, timeout = 1)
        result = response.read()
        if result.find("400")>=0:
            fail2log(id)
            _logger.error('flush cache fail, url :%s' % (url))
        else:
            _logger.info('flush cache OK, IP :%s' % (ip))
    except Exception:
        fail2log(id)
        _logger.error('flush cbase cache error, url: %s' % (url))

# init qpid connection
def qpid_get_conn():
    conn = Connection(BROKER, heartbeat=6, reconnect=True, reconnect_limit=60, reconnect_interval=4, reconnect_urls=RECONNECTION_URLS)
    conn.open()
    sess = conn.session()
    rec = sess.receiver(QUEUE_NAME)
    return conn, sess, rec

# receive message of qpid queue, multithread implements
def qpid_receive(rec):
    try:
        epool = ThreadPoolExecutor(max_workers=THREAD_COUNT)
        while True:
            try:
                message = rec.fetch()
                data = json.loads(message.content)
                _logger.info(' receive message:%s' % (message.content))
                mid = data['mid']
                
                for ip in SERVERS:
                    try:
                        epool.submit(process(mid, ip))
                    except Exception,e:
                        pass
                        fail2log(mid)
                        exstr = traceback.format_exc()
                        _logger.error(' thread pool process message error:%s' % (exstr))
            except Exception, e:
                pass
                fail2log(mid)
                _logger.error(' fetch message error, msg:%s' % (message.content))
            sess.acknowledge() # message ack
    except Exception, e:
        pass
        exstr = traceback.format_exc()
        _logger.error(' start receive message error:%s' % (exstr))

if __name__ ==   '__main__':
    if(len(sys.argv) < 2):
        print "Usage: \"python input args error\""
        sys.exit()
    env = sys.argv[1]
    if env != 'TEST' and env != 'PRO':
        print "Usage: \"input evn args error\""
        sys.exit()

    try:
        init_env(env)
        conn, sess, rec =  qpid_get_conn()
        qpid_receive(rec)
        for c in [ conn, sess, rec]:
             if c: c.close()
    except Exception, e:
        pass
        exstr = traceback.format_exc()
        _logger.error('init qpid connection message error:%s' % (exstr))
posted on 2015-10-30 20:20 David1228 阅读(2872) 评论(0)  编辑  收藏 所属分类: 动态语言Python

只有注册用户登录后才能发表评论。


网站导航:
 

<2015年10月>
27282930123
45678910
11121314151617
18192021222324
25262728293031
1234567

常用链接

留言簿(4)

随笔分类

随笔档案

文章档案

新闻分类

新闻档案

相册

收藏夹

Java

Linux知识相关

Spring相关

云计算/Linux/虚拟化技术/

友情博客

多线程并发编程

开源技术

持久层技术相关

搜索

  •  

积分与排名

  • 积分 - 356943
  • 排名 - 154

最新评论

阅读排行榜

评论排行榜