多進程Python框架設計:針對百萬級抗原變異做抗原呈現預測

主要是想寫一個程式來當作媒介,來做一些並行運算的設計(當然,假如已經知道整體計算細節和運算的資料,也許可以考慮使用hadoop或是spark這些專門針對分散式資料的數據運算設計的框架),這邊算是土炮來練習如何用python來加速數據運算的框架,藉此熟悉python底層在thread和process的設計,最後這邊使用多進程(process)設計,搭配sqlite3在中間儲存資料,過程呼叫netMHC來進行單組fasta資料的運算。

實作過程發現的一些小心得和重點,把掉坑心得分享一下:
1. 理解sqlite3資料庫的特性
sqlite資料庫的輕特性很適合用來做數據處理原型的資料儲存用,但他在transaction rate限制下要在代碼中考慮進去,否則會成為程式速度很大的瓶頸,雖然sqlite資料庫其實是能支援多thread來讀取的,可是要在compile sqlite安裝時去修改原始代碼,對於不太熟悉其底層的人來說會很困難。

2. python語言的特性
python底層設計上有一個global interpreter lock的設計,所以當你所有程式都是python代碼的時候,即使使用python內的multiprocess或是multithreading,效果依舊有限,可是把python當作是控制主程式,用多線程或是進程來控制一些主要運算,這些運算是用其他語言小程式執行(shellscript, java 等外部程式),效果就會不錯。

3. 理解python3在tmp檔上之實作細節
在調用python3的tempfile模組時,要記得背後的原理,否則會發現當你開啟幾千個線程去使用temp黨時,名字強蹦的機會還是有的,這時候會頗嚴重的。

4. 理解processing和threading的差異及處理
processing和threading在python中的使用方式和語法其實非常相似,主要是底層資源的隔離或是溝通的差異,這兩種不一定是哪一種就比較好,要看自己計算上的瓶頸是什麼,是網路等待貧頸,或是資源調用大量等,頗多底層細節和對自己計算過程的掌握相關,其實真的需要經驗,也是這次採坑才知道,也才理解搭建出hadoop/spark框架的人真的頗有價值。

5. 理解在使用Multiprocessing.queueQueue.queue的差異
在測試過程會發現總會有幾個進程會無緣無故地停著不動,但也沒有進入下一個任務,後來才發現queue模組中的細節,比如像下面這個語句

def doJob(*args):
    que = args[0]
    conn = args[1]
    process_name = multiprocessing.current_process().name
    while que.qsize() > 0:
        try:
            job = que.get(timeout=5)
            job_num = que.qsize()
            job.do(conn, process_name, job_num)
        except:
            print("".join(["Queue is empty"]))
    print("".join(["****",process_name,"do a good job, let it take a break","***"]))

在上面的語法中,用que.qsize來作為thread/process結束時候的信號,讓他不會進行去跟queue要物件來執行,但後來發現在多進程或多線程下,有可能在做while中判斷時qsize是大於0,下一剎那當queue內物件已經被其他線程或是進程取完後,當他進入到queue.get時,預設是會一直等到有物件,除非設置timeout的參數,設置之後就可以讓這些不小心掉如無主狀態的進程或是線程歸位了!

import subprocess
import tempfile
import re
import sqlite3
import os
import time
import queue
import threading
import multiprocessing

def createdatabase(choose_HLA_type_super):
    """
    create the database and table with the HLA type
    """
    database_name = "{database}_netMHC.db".format(database=re.sub('-','_',choose_HLA_type_super))

# create the table in the database
    conn = sqlite3.connect(database_name)
    c = conn.cursor()

# Create different table according to peptides length

# Create table
    c.execute('''CREATE TABLE Neoantigen_14
             (pos integer, HLA text, peptide text, Core text, Offset integer, I_pos integer, I_len integer,
              D_pose integer, D_len integer, iCore text, Identity text, llog50k real, Affinity real, Rank REAL, BindLevel integer)
              ''')
    c.execute('''CREATE TABLE Neoantigen_13
             (pos integer, HLA text, peptide text, Core text, Offset integer, I_pos integer, I_len integer,
              D_pose integer, D_len integer, iCore text, Identity text, llog50k real, Affinity real, Rank REAL, BindLevel integer)
              ''')
    c.execute('''CREATE TABLE Neoantigen_12
             (pos integer, HLA text, peptide text, Core text, Offset integer, I_pos integer, I_len integer,
              D_pose integer, D_len integer, iCore text, Identity text, llog50k real, Affinity real, Rank REAL, BindLevel integer)
              ''')
    c.execute('''CREATE TABLE Neoantigen_11
             (pos integer, HLA text, peptide text, Core text, Offset integer, I_pos integer, I_len integer,
              D_pose integer, D_len integer, iCore text, Identity text, llog50k real, Affinity real, Rank REAL, BindLevel integer)
              ''')
    c.execute('''CREATE TABLE Neoantigen_10
             (pos integer, HLA text, peptide text, Core text, Offset integer, I_pos integer, I_len integer,
              D_pose integer, D_len integer, iCore text, Identity text, llog50k real, Affinity real, Rank REAL, BindLevel integer)
              ''')
    c.execute('''CREATE TABLE Neoantigen_9
             (pos integer, HLA text, peptide text, Core text, Offset integer, I_pos integer, I_len integer,
              D_pose integer, D_len integer, iCore text, Identity text, llog50k real, Affinity real, Rank REAL, BindLevel integer)
              ''')
    c.execute('''CREATE TABLE Neoantigen_8
             (pos integer, HLA text, peptide text, Core text, Offset integer, I_pos integer, I_len integer,
              D_pose integer, D_len integer, iCore text, Identity text, llog50k real, Affinity real, Rank REAL, BindLevel integer)
              ''')
    # Save (commit) the changes
    conn.commit()

    # We can also close the connection if we are done with it.
    conn.close()

    #print(''.join(['Database ', database_name,' created!']))

# string 14 - 9
def peptideshorten(peptide):
    """Prepare the peptides for 14 - 8 lengths affinity predict
    Args:
        seq
    Returns:
        A Generator gave the trim peptides from 14 - 8
    Raises:
    """
    i = 0
    yield peptide
    while i < 6:         i += 1         yield peptide[i:-i] def stringtrim(peptide):     """Trim the string to remove spaces     Args:         seq     Returns:         A string without spaces     Raises:        """     return re.sub('\s','',peptide) # Input with single peptide per time def netMHCcalculate(single_peptide_name, single_comment, single_peptide, predict_list, choose_HLA_type_super):     #print("Begin to predict with netMHC!")     while True:         try:             single_peptide_window = next(single_peptide)             peptide_seq_len = len(single_peptide_window)             peptide_len = str(int((peptide_seq_len + 1)/2))             one_record = ''.join(['>',single_peptide_name,' ',single_comment,'\n',single_peptide_window])

            #print(''.join(['The whole peptides length: ', str(peptide_seq_len)]))

            #print(''.join(['The Peptide Length for prediction: ', peptide_len]))

            #print(one_record)

            f = tempfile.NamedTemporaryFile(mode='w+',delete=True, suffix='.fasta')
            f.write(one_record)
            f.seek(1)

            predict_list.append(subprocess.check_output(['sh', 'netMHC_one.sh', choose_HLA_type_super, peptide_len, f.name]))

            f.close()

        except StopIteration:
            break
def multplerecords_netMHCcalculate(multiple_records_iter, choose_HLA_type_super):
    i = 0
    predict_list = []
    while True:
        try:
            single_record_fasta = ''.join(next(multiple_records_iter))
            f = tempfile.NamedTemporaryFile(mode='w+',delete=True, suffix='.fasta', prefix=''.join(['tmp',str(round(time.time()))]))
            f.write(single_record_fasta)
            f.seek(1)
            predict_list.append(subprocess.check_output(['sh', 'netMHC_one.sh', choose_HLA_type_super, str(14-i), f.name]))
            f.close()
            i += 1
        except StopIteration:
            break
    return(predict_list)

def get_single_row_predict(predict_iter):
    single_row_predict = []
    while True:
        try:
            single_result_list = iter(next(predict_iter).decode().split('\n'))
            while True:
                try:
                    single_result = next(single_result_list)
                    if len(single_result) == 0:
                        continue
                    single_row_predict.append(single_result)
                except StopIteration:
                    break
        except StopIteration:
            break
    return(single_row_predict)
# Seperate the conn, reduce the frequence to open a new connection
def update_insert_database(single_row_predict_iter, conn):
    print("Begin to insert result into database")
    c = conn.cursor()
    one_larger_trasaction_14 = []
    one_larger_trasaction_13 = []
    one_larger_trasaction_12 = []
    one_larger_trasaction_11 = []
    one_larger_trasaction_10 = []
    one_larger_trasaction_9 = []
    one_larger_trasaction_8 = []

    while True:
        try:
            single_one_row_predict = next(single_row_predict_iter).split(',')
            #print(single_one_row_predict)
            D_POS, D_HLA, D_Peptide, D_Core, D_Offset, D_I_pos, D_I_len, D_D_pos, D_D_len, D_iCore, D_Identity, D_log50k, D_Affinity, D_Rank, D_BindLevel = single_one_row_predict
            if len(stringtrim(D_Peptide)) == 14:
                    one_larger_trasaction_14.append((int(D_POS), re.sub('\s','',D_HLA), re.sub('\s','',D_Peptide), re.sub('\s','',D_Core), int(D_Offset), int(D_I_pos),int(D_I_len), int(D_D_pos), int(D_D_len), re.sub('\s','',D_iCore), re.sub('\s','',D_Identity), float(D_log50k), float(D_Affinity), float(D_Rank), float(D_BindLevel)))
            if len(stringtrim(D_Peptide)) == 13:
                    one_larger_trasaction_13.append((int(D_POS), re.sub('\s','',D_HLA), re.sub('\s','',D_Peptide), re.sub('\s','',D_Core), int(D_Offset), int(D_I_pos),int(D_I_len), int(D_D_pos), int(D_D_len), re.sub('\s','',D_iCore), re.sub('\s','',D_Identity), float(D_log50k), float(D_Affinity), float(D_Rank), float(D_BindLevel)))
            if len(stringtrim(D_Peptide)) == 12:
                    one_larger_trasaction_12.append((int(D_POS), re.sub('\s','',D_HLA), re.sub('\s','',D_Peptide), re.sub('\s','',D_Core), int(D_Offset), int(D_I_pos),int(D_I_len), int(D_D_pos), int(D_D_len), re.sub('\s','',D_iCore), re.sub('\s','',D_Identity), float(D_log50k), float(D_Affinity), float(D_Rank), float(D_BindLevel)))
            if len(stringtrim(D_Peptide)) == 11:
                    one_larger_trasaction_11.append((int(D_POS), re.sub('\s','',D_HLA), re.sub('\s','',D_Peptide), re.sub('\s','',D_Core), int(D_Offset), int(D_I_pos),int(D_I_len), int(D_D_pos), int(D_D_len), re.sub('\s','',D_iCore), re.sub('\s','',D_Identity), float(D_log50k), float(D_Affinity), float(D_Rank), float(D_BindLevel)))
            if len(stringtrim(D_Peptide)) == 10:
                    one_larger_trasaction_10.append((int(D_POS), re.sub('\s','',D_HLA), re.sub('\s','',D_Peptide), re.sub('\s','',D_Core), int(D_Offset), int(D_I_pos),int(D_I_len), int(D_D_pos), int(D_D_len), re.sub('\s','',D_iCore), re.sub('\s','',D_Identity), float(D_log50k), float(D_Affinity), float(D_Rank), float(D_BindLevel)))
            if len(stringtrim(D_Peptide)) == 9:
                    one_larger_trasaction_9.append((int(D_POS), re.sub('\s','',D_HLA), re.sub('\s','',D_Peptide), re.sub('\s','',D_Core), int(D_Offset), int(D_I_pos),int(D_I_len), int(D_D_pos), int(D_D_len), re.sub('\s','',D_iCore), re.sub('\s','',D_Identity), float(D_log50k), float(D_Affinity), float(D_Rank), float(D_BindLevel)))
            if len(stringtrim(D_Peptide)) == 8:
                    one_larger_trasaction_8.append((int(D_POS), re.sub('\s','',D_HLA), re.sub('\s','',D_Peptide), re.sub('\s','',D_Core), int(D_Offset), int(D_I_pos),int(D_I_len), int(D_D_pos), int(D_D_len), re.sub('\s','',D_iCore), re.sub('\s','',D_Identity), float(D_log50k), float(D_Affinity), float(D_Rank), float(D_BindLevel)))
        except StopIteration:
            # close the database
            #print(''.join(['Insert data into database, finished!']))
            break

    c.executemany("INSERT INTO Neoantigen_14 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", one_larger_trasaction_14)
    conn.commit()

    c.executemany("INSERT INTO Neoantigen_13 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", one_larger_trasaction_13)
    conn.commit()

    c.executemany("INSERT INTO Neoantigen_12 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", one_larger_trasaction_12)
    conn.commit()

    c.executemany("INSERT INTO Neoantigen_11 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", one_larger_trasaction_11)
    conn.commit()

    c.executemany("INSERT INTO Neoantigen_10 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", one_larger_trasaction_10)
    conn.commit()

    c.executemany("INSERT INTO Neoantigen_9 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", one_larger_trasaction_9)
    conn.commit()

    c.executemany("INSERT INTO Neoantigen_8 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", one_larger_trasaction_8)
    conn.commit()

#conn.close()
class Job:
    def __init__(self, sub_name_list, sub_comment_list, sub_seq_list, multiple_fasta_num, HLA_type):
        self.sub_seq_list = sub_seq_list
        self.sub_name_list = sub_name_list
        self.sub_comment_list = sub_comment_list
        self.HLA_type = HLA_type
        self.multiple_fasta_num = multiple_fasta_num
    def do(self, conn, process_name, job_num):
        print(''.join([process_name,": Begin ",str(job_num),"_th Job!"]))
        multiple_records = create_multiple_records(self.sub_name_list, self.sub_comment_list, self.sub_seq_list, self.multiple_fasta_num)

        print(''.join(['_',process_name,"_Finish create multiple recors!"]))
        multiple_records_iter = iter(multiple_records)

        print(''.join(['_',process_name,"_create multiple record iterator"]))
        # calculate the netMHC on single peptite with 8 - 14 peptide and single HLA type

        predict_list = multplerecords_netMHCcalculate(multiple_records_iter, self.HLA_type)
        print(''.join(['_',process_name,"_Finish netMHCcalculate"]))

        predict_iter = iter(predict_list)
        print(''.join(['_',process_name,"_Create precit_list iterator"]))
        # turn calculate result into single string for per sequence affinity prediction

        single_row_predict = get_single_row_predict(predict_iter)
        print(''.join(['_',process_name,"_Create single row preidcit data"]))

        single_row_predict_iter = iter(single_row_predict)
        # insert the data into database
        print(''.join(['_',process_name,"Prepare to insert database"]))

        update_insert_database(single_row_predict_iter, conn)
        print(''.join(["_",process_name," Job", "calculate finished!"]))
def doJob(*args):
    que = args[0]
    conn = args[1]
    process_name = multiprocessing.current_process().name
    while que.qsize() > 0:
        try:
            job = que.get(timeout=5)
            job_num = que.qsize()
            job.do(conn, process_name, job_num)
        except:
            print("".join(["Queue is empty"]))
    print("".join(["****",process_name,"do a good job, let it take a break","***"]))

def seperateIterate(iter_item, multiple_fasta_num):
    i = 0
    sub_item_list = []
    while i < multiple_fasta_num:
        sub_item_list.append(next(iter_item))
        i += 1
    return sub_item_list

HLA_type_super=["HLA-A0101","HLA-A0201","HLA-A0301","HLA-A2402","HLA-A2601","HLA-B0702","HLA-B0801","HLA-B2705","HLA-B3901","HLA-B4001","HLA-B5801","HLA-B1501"]

HLA_type_all_ABC=["HLA-A0101","HLA-A0201","HLA-A0202","HLA-A0203","HLA-A0205","HLA-A0206","HLA-A0207","HLA-A0211","HLA-A0212","HLA-A0216","HLA-A0217","HLA-A0219","HLA-A0250","HLA-A0301","HLA-A1101","HLA-A2301","HLA-A2402","HLA-A2403","HLA-A2501","HLA-A2601","HLA-A2602","HLA-A2603","HLA-A2902","HLA-A3001","HLA-A3002","HLA-A3101","HLA-A3201","HLA-A3207","HLA-A3215","HLA-A3301","HLA-A6601","HLA-A6801","HLA-A6802","HLA-A6823","HLA-A6901","HLA-A8001","HLA-B4403","HLA-B4501","HLA-B4601","HLA-B4801","HLA-B5101","HLA-B5301","HLA-B5401","HLA-B5701","HLA-B5801","HLA-B8301","HLA-B0702","HLA-B0801","HLA-B0802","HLA-B0803","HLA-B1402","HLA-B1501","HLA-B1502","HLA-B1503","HLA-B1509","HLA-B1517","HLA-B1801","HLA-B2705","HLA-B2720","HLA-B3501","HLA-B3503","HLA-B3801","HLA-B3901","HLA-B4001","HLA-B4002","HLA-B4013","HLA-B4201","HLA-B4402","HLA-B5802","HLA-B7301","HLA-C0303","HLA-C0401","HLA-C0501","HLA-C0602","HLA-C0701","HLA-C0702","HLA-C0802","HLA-C1203","HLA-C1402","HLA-C1502","HLA-E0101"]

result_title = ['pos', 'HLA', 'peptide', 'core', 'offset','I_pos','I_len', 'D_pos','D_len', 'iCore', 'Identity', 'log50k', 'Affinity','Rank','BindLevel']

# Parse the fasta into program
tabule_fasta = subprocess.check_output(['sh', 'TurnTabule.sh', 'cosmic_neoantigen_tmp.fasta'])
tabule_fasta_list = tabule_fasta.decode().split('\n')

# Splite the singel record into name, comment, seq
name_list = []
comment_list = []
seq_list = []
for fasta in tabule_fasta_list:
    try:
        name, comment, seq = fasta.split('\t')
        name_list.append(name)
        comment_list.append(comment)
        seq_list.append(seq)
    except ValueError:
        print(fasta.split('\t'))
choose_HLA_type_super = HLA_type_super[0]

# create multiple database for each threads
database_create_list = []
process_num = 100

for i in range(process_num):
    database_create_list.append(''.join([choose_HLA_type_super,"_",str(i)]))
# create the database
for database_name in database_create_list:
    createdatabase(database_name)

database_name_list = []
for i in range(process_num):
    database_name_list.append(''.join([re.sub('-','_',choose_HLA_type_super),'_',str(i),"_netMHC.db"]))

database_conn_list = []
for i in range(process_num):
    conn = sqlite3.connect(database_name_list[i])
    database_conn_list.append(conn)

# use iterator to decrease memory use
iter_name = iter(name_list)
iter_comment = iter(comment_list)
iter_seq = iter(seq_list)
# que_list
# change to use the multiprocessing.queue
# emerge the que to one
que_list = []
work_num = 3573
multiple_fasta_num = 1000
i = 0
que = multiprocessing.Queue()
while i < work_num:
    try:
        sub_name_list = seperateIterate(iter_name, multiple_fasta_num)
        sub_comment_list = seperateIterate(iter_comment, multiple_fasta_num)
        sub_seq_list = seperateIterate(iter_seq, multiple_fasta_num)
        que.put(Job(sub_name_list, sub_comment_list, sub_seq_list, multiple_fasta_num, choose_HLA_type_super))
        i += 1
    except StopIteration:
        break

start_time  = time.time()
process_list = []
for i in range(process_num):
    p = multiprocessing.Process(target=doJob, name=''.join(['Process_',str(i)]), args=(que, database_conn_list[i]))
    process_list.append(p)
    print(p)
    p.start()

for p in process_list:
    p.join()
print(''.join(["done!, take",str(time.time()-start_time), " sec"]))

發表迴響

在下方填入你的資料或按右方圖示以社群網站登入:

WordPress.com 標誌

您的留言將使用 WordPress.com 帳號。 登出 /  變更 )

Google+ photo

您的留言將使用 Google+ 帳號。 登出 /  變更 )

Twitter picture

您的留言將使用 Twitter 帳號。 登出 /  變更 )

Facebook照片

您的留言將使用 Facebook 帳號。 登出 /  變更 )

w

連結到 %s