#!/usr/bin/env python
# -*- coding: utf-8 -*-
#===============================================================================
#
# Copyright (c) 2015 Letv.com, Inc. All Rights Reserved
#
# python2.6版本安装:
# yum install python-futures
# yum install python-qpid
# yum install supervisor 由supervisor来管理进程,当进程挂掉后自动监控并重启
# author: david
# date: 2015/10/14 16:42:02
#
#===============================================================================
from qpid.messaging import *
import time, sys
import urllib2
import hashlib
import json
import traceback
from concurrent.futures import *
import logging
import logging.handlers
LOG_DIR = "/home/ldw/logs/geturl/online"
LOG_MID_FAIL=LOG_DIR + "/update_fail.log"
LOG_FILE_NAME=LOG_DIR + "/geturl_update.log"
# default test broker
QPID_QUEUE_NAME_TEST = "ldw.update_v.queue"
SERVERS_TEST = ['10.3.3.3']
SERVERS_PRO = ['10.1.1.1.', '10.2.2.2']
BROKER_TEST = "xyz/xyz@10.11.1.1:5672"
BROKER_PRO = "xyz/xyz@10.11.1.2:5672"
RECONNECTION_URLS = ['']
SERVERS = SERVERS_TEST
BROKER = BROKER_TEST
QUEUE_NAME = QPID_QUEUE_NAME_TEST
THREAD_COUNT = 16
# 获取当前时间
def getNowTime():
ISOTIMEFORMAT='%Y-%m-%d %H:%M:%S'
return time.strftime(ISOTIMEFORMAT, time.localtime())
# 记录处理失败日志
def fail2log(id):
# 使用with关键字,自动关闭文件流
with open(LOG_MID_FAIL,'a') as log:
log.write('%s\n'%(id))
# 记录日志, 可以提出来共用
def init_logger():
logging.basicConfig()
logger = logging.getLogger("__name__")
logger.setLevel(logging.INFO)
# when:S:Seconds M:Minutes D:Days H:Hours interval: backupCount:0 not deleted
# 1天更换一次文件日志,7为保留日志文件个数
logger_fh = logging.handlers.TimedRotatingFileHandler(LOG_FILE_NAME, 'D', 1, 7)
logger_fh.suffix = "%Y%m%d-%H%M.log"
logger_fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logger.addHandler(logger_fh)
return logger
_logger = init_logger()
# 初始化环境变量,当在局部函数中修改全局变量时,使用global关键字
def init_env(env):
global BROKER
# 其他变量类似处理
if env != 'PRO':
BROKER = BROKER_PRO
_logger.info('env:%s,broker:%s,queue_name:%s,reconnnection_urls:%s, servers: %s, thread_pool_count:%s'
% (env,BROKER,QUEUE_NAME,str(RECONNECTION_URLS),str(SERVERS), str(THREAD_COUNT)))
# 处理业务逻辑
def process(id, ip):
key = 'xyz';
action = 'delete';
tm = str(int(time.time()))
sig = hashlib.md5(tm+action+key).hexdigest()
url = "http://ip/test?sig="+sig
try:
req = urllib2.Request(url)
# 绑定Host
# req.add_header('Host', 'www.ldw.com')
response = urllib2.urlopen(req, timeout = 1)
result = response.read()
if result.find("400")>=0:
fail2log(id)
_logger.error('flush cache fail, url :%s' % (url))
else:
_logger.info('flush cache OK, IP :%s' % (ip))
except Exception:
fail2log(id)
_logger.error('flush cbase cache error, url: %s' % (url))
# init qpid connection
def qpid_get_conn():
conn = Connection(BROKER, heartbeat=6, reconnect=True, reconnect_limit=60, reconnect_interval=4, reconnect_urls=RECONNECTION_URLS)
conn.open()
sess = conn.session()
rec = sess.receiver(QUEUE_NAME)
return conn, sess, rec
# receive message of qpid queue, multithread implements
def qpid_receive(rec):
try:
epool = ThreadPoolExecutor(max_workers=THREAD_COUNT)
while True:
try:
message = rec.fetch()
data = json.loads(message.content)
_logger.info(' receive message:%s' % (message.content))
mid = data['mid']
for ip in SERVERS:
try:
epool.submit(process(mid, ip))
except Exception,e:
pass
fail2log(mid)
exstr = traceback.format_exc()
_logger.error(' thread pool process message error:%s' % (exstr))
except Exception, e:
pass
fail2log(mid)
_logger.error(' fetch message error, msg:%s' % (message.content))
sess.acknowledge() # message ack
except Exception, e:
pass
exstr = traceback.format_exc()
_logger.error(' start receive message error:%s' % (exstr))
if __name__ == '__main__':
if(len(sys.argv) < 2):
print "Usage: \"python input args error\""
sys.exit()
env = sys.argv[1]
if env != 'TEST' and env != 'PRO':
print "Usage: \"input evn args error\""
sys.exit()
try:
init_env(env)
conn, sess, rec = qpid_get_conn()
qpid_receive(rec)
for c in [ conn, sess, rec]:
if c: c.close()
except Exception, e:
pass
exstr = traceback.format_exc()
_logger.error('init qpid connection message error:%s' % (exstr))