以前の記事でJP1のジョブの実行時間を測定する方法を記事にしましたが、今回はそのデータをPythonで読み込みSQL ServerにINSERTしようと思います。pythonでSQL ServerにテキストファイルのデータをINSERTする方法はいろいろありますが、今回はSQLServerには、テキストファイルはpandasを使って読み込み、SQL Serverにはpyodbcで接続して取り込みを行おうと思います。
取得したテキストファイルは以下の記事を参考にしてください。
JP1の実行時間の結果を読み込んでSQLServerにINSERTするサンプル
#PythonでSQL ServerにInsertする。
from myfunc import db_util as db
import datetime as dt
import pandas as pd
import openpyxl as px
import glob
import os
import shutil
#実行結果ファイルが格納されている場所(任意)
f_path = r'\\ホスト名\共有フォルダ\JP1結果'
#実行結果が格納されているフォルダの拡張子を指定
filepath = os.path.join(f_path,"*.txt")
#取込完了したファイルの移動先
move_path = r'\\ホスト名\共有フォルダ\JP1結果\old'
#取り込み対象のファイルリストを作成
read_files = glob.glob(filepath)
#取込対象ファイルの件数を取得
filecount = len(glob.glob1(f_path,"*.txt"))
#取込対象がない場合は、
if filecount == 0 :
print("取込対象がありません。")
#oldフォルダがなければ作る
if not os.path.exists(move_path):
os.makedirs(move_path)
#すべて取り込む
for i_file_name in read_files:
#取込対象ファイルのパス
join_path = os.path.join(f_path,i_file_name)
#ファイルが存在すれば続行
if os.path.isfile(join_path):
#処理対象のファイル
print(join_path + 'を処理します。')
#読み取りファイル
readfile = join_path
#ジョブ実行結果をpandasで読み込む(スペース区切り)
# s_date(ジョブ実行開始日)
# s_sec(ジョブ実行開始時間)
# e_date(ジョブ実行終了日)
# e_sec(ジョブ実行終了時間)
# ret(リターンコード)ジョブのみリターンコードが入る
# jobname(ジョブのパス)
df = pd.read_csv(readfile,sep=' ',encoding = 'shift-jis',names = ['s_date','s_sec','e_date','e_sec','ret','jobname'])
# ジョブ実行開始時間が****/**/**でないものが対象
df = df[~(df['s_date'] == '****/**/**')]
# ジョブ実行終了時間が****/**/**でないものが対象
df = df[~(df['e_date'] == '****/**/**')]
# 実行日と実行時間が空白で区切られているので結合する
df['ジョブ実行開始時間']= df['s_date'] + ' ' + df['s_sec']
df['ジョブ実行終了時間']= df['e_date'] + ' ' + df['e_sec']
df['リターンコード'] = df['ret']
#不要な列は削除して以下の項目をそのままテーブルに突っ込む
# ジョブ実行開始時間
# ジョブ実行終了時間
# リターンコード
# ジョブ名
df = df.drop(columns=['s_date','s_sec','e_date','e_sec','ret','jobname'])
#datetime型に変換
df['ジョブ実行終了時間']=pd.to_datetime(df['ジョブ実行終了時間'])
df['ジョブ実行開始時間']=pd.to_datetime(df['ジョブ実行開始時間'])
#処理実行時間を秒数で算出
df['処理時間'] = (df['ジョブ実行終了時間'] - df['ジョブ実行開始時間']).astype('timedelta64[s]')
#リターンコードが'***'でないものは、ジョブなので詳細にINSERT
df_dtl = df[~(df['リターンコード'] == '***')]
#リターンコードが'***'の場合は、対象とする。
df = df[(df['リターンコード'] == '***')]
#SQL ServerのDBに接続(共通化して別ファイルに関数化)
con = db.login_ReportDB()
cursor = con.cursor()
try:
#JOB_RESPONSE用(df)
for Index,row in df.iterrows():
cursor.execute("INSERT INTO JOB_RESPONSE (ジョブ実行開始時間,ジョブ実行終了時間,処理時間,リターンコード,ジョブ名) values (?,?,?,?,?)",
row.ジョブ実行開始時間,row.ジョブ実行終了時間,row.処理時間,row.リターンコード,row.ジョブ名)
#con.commit()
#print(join_path + 'のテーブル登録が完了しました。')
#JOB_RESPONSE_DETAIL用(df_dtl)
for Index,row in df_dtl.iterrows():
cursor.execute("INSERT INTO JOB_RESPONSE_DETAIL (ジョブ実行開始時間,ジョブ実行終了時間,処理時間,リターンコード,ジョブ名) values (?,?,?,?,?)",
row.ジョブ実行開始時間,row.ジョブ実行終了時間,row.処理時間,row.リターンコード,row.ジョブ名)
con.commit()
cursor.close
con.close
print(join_path + 'のテーブル登録が完了しました。')
filename = os.path.basename(join_path)
m_filename = os.path.join(move_path,filename)
if os.path.isfile(m_filename):
os.remove(m_filename)
shutil.move(join_path,move_path)
except Exception as e:
con.rollback()
cursor.close
con.close
print(e)
print(join_path + 'のテーブル登録に失敗しました。処理を中断します')
break
サンプルの説明
ファイルレイアウト
ジョブ実行開始日△ジョブ開始実行時間△ジョブ実行終了日△ジョブ実行終了時間△リターンコード△ジョブネットフルパス
△は、スペース。
pandasのread_csvをでスペース区切りで読み込む
テキストファイルの読み込みは、pandasのread_csvメソッドを利用すると簡単に取り込めます。read_csvメソッドは、csv形式のカンマ区切るでなくともsep=’ ‘でスペース区切りにすることで読み込むことが可能です。
ファイルの件数はlenで取得
len(取得するフォルダパス)でファイルの件数を取得することができます。ファイルの存在確認などでよく使うので便利です。
df = df[~(df[‘列名’] == ”)]はでないもの
~は、でないものという意味を表します。上記のPGでは、リターンコードの値でジョブの実行時間とジョブネットの時間に切り分けできますし、ジョブ実行時間が未実行状態の場合は、ジョブ実行開始日の値が****/**/**になっているものなどがあるので除外するために使用しています。
変数dfにセットされたファイルの読み込み結果をINSERT
私が保守しているシステムの環境はSQLServerなので今回はpyodbcを利用してSQL Serverに接続してファイルを1行ずつ読み込みINSERTしています。
INSERTがエラーになる場合を想定してrollbackも記載する。
基本的にデータの仕様が正しければ、エラーになることはありませんが、当然想定外のデータが読み込まれることもあると思います。その場合を考慮して必ず例外のエラーが発生した場合はrollbackされるように例外処理は書いておきましょう。rollbackされずにエラーになるとSQLのINSERT処理がCOMIIT(確定)もRollback(元に戻す)もできない状態になり、トランザクションが確定しない状態になるので注意してください。
まとめ
- テキストファイルの読み込みはpandasのread_csvが便利
- SQL Serverへの接続はpyodbcを使う。
- INSERTなどの登録/更新処理は必ず例外処理を書く。
コメント