需求描述
項目需求測試過程中,需要向Nginx服務器發(fā)送一些用例請求,然后查看對應的Nginx日志,判斷是否存在特征內(nèi)容,來判斷任務是否執(zhí)行成功。為了提升效率,需要將這一過程實現(xiàn)自動化。
實踐環(huán)境
Python 3.6.5
代碼設計與實現(xiàn)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
@CreateTime: 2021/06/26 9:05
@Author : shouke
'''
import time
import threading
import subprocess
from collections import deque
def collect_nginx_log():
global nginx_log_queue
global is_tasks_compete
global task_status
args = 'tail -0f /usr/local/openresty/nginx/logs/access.log'
while task_status != 'req_log_got':
with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc:
log_for_req = ''
outs, errs = '', ''
try:
outs, errs = proc.communicate(timeout=2)
except subprocess.TimeoutExpired:
print('獲取nginx日志超時,正在重試')
proc.kill()
try:
outs, errs = proc.communicate(timeout=5)
except subprocess.TimeoutExpired:
print('獲取nginx日志超時,再次超時,停止重試')
break
finally:
for line in outs.split('\n'):
flag = '\"client_ip\":\"10.118.0.77\"' # 特征
if flag in line: # 查找包含特征內(nèi)容的日志
log_for_req += line
if task_status == 'req_finished':
nginx_log_queue.append(log_for_req)
task_status = 'req_log_got'
def run_tasks(task_list):
'''
運行任務
:param task_list 任務列表
'''
global nginx_log_queue
global is_tasks_compete
global task_status
for task in task_list:
thread = threading.Thread(target=collect_nginx_log,
name="collect_nginx_log")
thread.start()
time.sleep(1) # 執(zhí)行任務前,讓收集日志線程先做好準備
print('正在執(zhí)行任務:%s' % task.get('name'))
# 執(zhí)行Nginx任務請求
# ...
task_status = 'req_finished'
time_to_wait = 0.1
while task_status != 'req_log_got': # 請求觸發(fā)的nginx日志收集未完成
time.sleep(time_to_wait)
time_to_wait += 0.01
else:# 獲取到用例請求觸發(fā)的nginx日志
if nginx_log_queue:
nginx_log = nginx_log_queue.popleft()
task_status = 'req_ready'
# 解析日志
# do something here
# ...
else:
print('存儲請求日志的隊列為空')
# do something here
# ...
if __name__ == '__main__':
nginx_log_queue = deque()
is_tasks_compete = False # 所有任務是否執(zhí)行完成
task_status = 'req_ready' # req_ready,req_finished,req_log_got # 存放執(zhí)行次任務任務的一些狀態(tài)
print('###########################任務開始###########################')
tast_list = [{'name':'test_task', 'other':'...'}]
run_tasks(tast_list)
is_tasks_compete = True
current_active_thread_num = len(threading.enumerate())
while current_active_thread_num != 1:
time.sleep(2)
current_active_thread_num = len(threading.enumerate())
print('###########################任務完成###########################')
注意:
1、上述代碼為啥不一步到位,直接 tail -0f /usr/local/openresty/nginx/logs/access.log | grep "特征內(nèi)容"
呢?這是因為這樣做無法獲取到Nginx的日志
2、實踐時發(fā)現(xiàn),第一次執(zhí)行proc.communicate(timeout=2)
獲取日志時,總是無法獲取,會超時,需要二次獲取,并且timeout
設置太小時(實踐時嘗試過設置為1秒
),也會導致第二次執(zhí)行時無法獲取Nginx日志。
到此這篇關于Python 實時獲取任務請求對應的Nginx日志的文章就介紹到這了,更多相關Python獲取Nginx日志內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- python實現(xiàn)的分析并統(tǒng)計nginx日志數(shù)據(jù)功能示例
- python+pandas分析nginx日志的實例
- Python解析nginx日志文件
- python實現(xiàn)分析apache和nginx日志文件并輸出訪客ip列表的方法
- Python 分析Nginx訪問日志并保存到MySQL數(shù)據(jù)庫實例