extract のソースコード

import os
import zipfile
from pathlib import Path
import re
import codecs
import multiprocessing
from multiprocessing import Process

root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))).replace('\\', '/')
data_path = root_dir + '/python/data'
download_path = root_dir + '/zip/download'
extract_path = root_dir + '/zip/extract'

[ドキュメント]def get_zip_dic(): """EDINETコードとダウンロードしたZIPファイルのリストの辞書を返す。 * 辞書のキー : EDINETコード * 辞書の値 : ダウンロードしたZIPファイルのリスト Returns: 辞書 """ dic = {} # ダウンロードのフォルダーにあるすべてのZIPファイルに対し for zip_path_obj in Path(download_path).glob("**/*.zip"): zip_path = str(zip_path_obj) # EDINETコードをファイル名から得る。 edinetCode = os.path.basename(zip_path).split('-')[0] if edinetCode in dic: # EDINETコードが辞書にある場合 dic[edinetCode].append(zip_path) else: # EDINETコードが辞書にない場合 dic[edinetCode] = [ zip_path ] return dic
[ドキュメント]def group_zip(cpu_count, cpu_id, dic): """CPUごとのサブプロセスの処理 EDINETコードをCPU数で割った余りがCPU-IDに等しければ処理をする。 Args: cpu_count(int): CPU数 cpu_id (int): CPU-ID (0, ..., CPU数 - 1) dic : EDINETコードとダウンロードしたZIPファイルのリストの辞書 """ xbrl = re.compile('XBRL/PublicDoc/jpcrp[-_0-9a-zA-Z]+\.xbrl') for edinetCode, zip_paths in dic.items(): assert edinetCode[0] == 'E' if int(edinetCode[1:]) % cpu_count != cpu_id: # EDINETコードをCPU数で割った余りがCPU-IDに等しくない場合 continue extract_sub_path = '%s/%s' % (extract_path, cpu_id) if not os.path.exists(extract_sub_path): # 抽出先のフォルダーがなければ作る。 os.makedirs(extract_sub_path) # 抽出先のZIPファイルのパス extract_zip_path = "%s/%s.zip" % (extract_sub_path, edinetCode) print(extract_zip_path) # 抽出先のZIPファイルを書き込みモードで開く。 with zipfile.ZipFile(extract_zip_path, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip: # ダウンロードしたZIPファイルのリストに対し for zip_path in zip_paths: try: # ダウンロードしたZIPファイルを読み取りモードで開く。 with zipfile.ZipFile(zip_path) as zf: # ダウンロードしたZIPファイル内のXBRLファイルに対し for xbrl_file in [x for x in zf.namelist() if xbrl.match(x)]: # XBRLファイルのデータを読む。 with zf.open(xbrl_file) as f: xml_bin = f.read() # 抽出先のZIPファイルの中にXBRLファイルを書く。 file_name = xbrl_file.split('/')[-1] new_zip.writestr(file_name, xml_bin) break except zipfile.BadZipFile: print("\nBadZipFile : %s\n" % zip_path) continue
if __name__ == '__main__': dic = get_zip_dic() cpu_count = multiprocessing.cpu_count() process_list = [] # CPUごとにサブプロセスを作って並列処理をする。 for cpu_id in range(cpu_count): p = Process(target=group_zip, args=(cpu_count, cpu_id, dic)) process_list.append(p) p.start() # すべてのサブプロセスが終了するのを待つ。 for p in process_list: p.join()