目錄
- 項(xiàng)目背景
- 改進(jìn)思路
- 觀察數(shù)據(jù)特征
- 多進(jìn)程處理
- 思路總結(jié)
- 數(shù)據(jù)處理技巧
項(xiàng)目背景
在處理過程中,今天上午需要更新A字段,下午爬蟲組完成了規(guī)格書或圖片的爬取又需要更新圖片和規(guī)格書字段,由于單表千萬級(jí)深度翻頁會(huì)導(dǎo)致處理速度越來越慢。
select a,b,c from db.tb limit 10000 offset 9000000
但是時(shí)間是有限的,是否有更好的方法去解決這種問題呢?
改進(jìn)思路
是否有可以不需要深度翻頁也可以進(jìn)行數(shù)據(jù)更新的憑據(jù)?
是的,利用自增id列
觀察數(shù)據(jù)特征
此單表有自增id列且為主鍵,根據(jù)索引列查詢數(shù)據(jù)和更新數(shù)據(jù)是最理想的途徑。
select a,b, c from db.tb where id=9999999;
update db.tb set a=x where id=9999999;
多進(jìn)程處理
每個(gè)進(jìn)程處理一定id范圍內(nèi)的數(shù)據(jù),這樣既避免的深度翻頁又可以同時(shí)多進(jìn)程處理數(shù)據(jù)。
提高數(shù)據(jù)查詢速度的同時(shí)也提高了數(shù)據(jù)處理速度。
下面是我編寫的任務(wù)分配函數(shù),供參考:
def mission_handler(all_missions, worker_mission_size):
"""
根據(jù)總?cè)蝿?wù)數(shù)和每個(gè)worker的任務(wù)數(shù)計(jì)算出任務(wù)列表, 任務(wù)列表元素為(任務(wù)開始id, 任務(wù)結(jié)束id)。
例: 總?cè)蝿?wù)數(shù)100個(gè),每個(gè)worker的任務(wù)數(shù)40, 那么任務(wù)列表為:[(1, 40), (41, 80), (81, 100)]
:param all_missions: 總?cè)蝿?wù)數(shù)
:param worker_mission_size: 每個(gè)worker的最大任務(wù)數(shù)
:return: [(start_id, end_id), (start_id, end_id), ...]
"""
worker_mission_ids = []
current_id = 0
while current_id = all_missions:
start_id = all_missions if current_id + 1 >= all_missions else current_id + 1
end_id = all_missions if current_id + worker_mission_size >= all_missions else current_id + worker_mission_size
if start_id == end_id:
if worker_mission_ids[-1][1] == start_id:
break
worker_mission_ids.append((start_id, end_id))
current_id += worker_mission_size
return worker_mission_ids
假設(shè)單表id最大值為100, 然后我們希望每個(gè)進(jìn)程處理20個(gè)id,那么任務(wù)列表將為:
>>> mission_handler(100, 40)
[(1, 40), (41, 80), (81, 100)]
那么,
進(jìn)程1將只需要處理id between 1 to 40的數(shù)據(jù);
進(jìn)程2將只需要處理id between 41 to 80的數(shù)據(jù);
進(jìn)程3將只需要處理id between 81 to 100的數(shù)據(jù)。
from concurrent.futures import ProcessPoolExecutor
def main():
# 自增id最大值
max_id = 30000000
# 單worker處理數(shù)據(jù)量
worker_mission_size = 1000000
# 使用多進(jìn)程進(jìn)行處理
missions = mission_handler(max_id, worker_mission_size)
workers = []
executor = ProcessPoolExecutor()
for idx, mission in enumerate(missions):
start_id, end_id = mission
workers.append(executor.submit(data_handler, start_id, end_id, idx))
def data_handler(start_id, end_id, worker_id):
pass
思路總結(jié)
- 避免深度翻頁進(jìn)而使用自增id進(jìn)行查詢數(shù)據(jù)和數(shù)據(jù)
- 使用多進(jìn)程處理數(shù)據(jù)
數(shù)據(jù)處理技巧
記錄處理成功與處理失敗的數(shù)據(jù)id,以便后續(xù)跟進(jìn)處理
# 用另外一張表記錄處理狀態(tài)
insert into db.tb_handle_status(row_id, success) values (999, 0);
循環(huán)體內(nèi)進(jìn)行異常捕獲,避免程序異常退出
def data_handler(start_id, end_id, worker_id):
# 數(shù)據(jù)連接
conn, cursor = mysql()
current_id = start_id
try:
while current_id = end_id:
try:
# TODO 數(shù)據(jù)處理代碼
pass
except Exception as e:
# TODO 記錄處理結(jié)果
# 數(shù)據(jù)移動(dòng)到下一條
current_id += 1
continue
else:
# 無異常,繼續(xù)處理下一條數(shù)據(jù)
current_id += 1
except Exception as e:
return 'worker_id({}): result({})'.format(worker_id, False)
finally:
# 數(shù)據(jù)庫資源釋放
cursor.close()
conn.close()
return 'worker_id({}): result({})'.format(worker_id, True)
更新數(shù)據(jù)庫數(shù)據(jù)盡量使用批量提交
sql = """update db.tb set a=%s, b=%s where id=%s"""
values = [
('a_value', 'b_value', 9999),
('a_value', 'b_value', 9998),
...
]
# 批量提交,減少網(wǎng)絡(luò)io以及鎖獲取頻率
cursor.executemany(sql, values)
以上就是MySQL單表千萬級(jí)數(shù)據(jù)處理的思路分享的詳細(xì)內(nèi)容,更多關(guān)于MySQL單表千萬級(jí)數(shù)據(jù)處理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
您可能感興趣的文章:- mysql千萬級(jí)數(shù)據(jù)大表該如何優(yōu)化?
- MySQL千萬級(jí)數(shù)據(jù)表的優(yōu)化實(shí)戰(zhàn)記錄