반응형

개발은 어렵지는 않지만 시간이 오래 걸린다.

특히나 많은 검증을 위해 테스트를 해야되고 이에 대해 오류가 생기거나, 다양한 경우에 대한 대비가 필요하다..


아래 스크립트는 Replication 도중에 Sync가 안맞아 잘못된 데이터나 데이터가 없는 경우에 대해서 확인하여 추가하여 준다.

(오히려 Slave에서 데이가 있으며...Master에서 없는 경우는 해당 사항이 없다.....ㅠ)


로직은 간단하다.


1. file(ini파일로 지정) 를 읽어 해당 파일 내의 테이블 명을 읽는다.(ini 파일 작성 형ㅌㅐ : 테이블1 (줄바꿈) 테이블2 (줄바꿈) ...으로 구분한다.) 

2. 테이블 정보를 가지고  Master에 접속하여 해당 테이블에 대한 PK를 확인 및 테이블 컬럼들을 받아 온다.

3. 다시 해당 테이블 정보를 가지고 full scan한다.

4. Slave에 접속하여 마스터에 대한 테이블 정보( PK)를 가지고 데이터 여부를 확인한다.

5. 해당 데이터가 있으면 Update를 없으면 Insert를 한다.


스크립트 내용은 아래와 같다...스크립트 개발 시 참고하시기를 ....ㅠ

(해당 스크립트에 대한 검증없이 적용 하셨다가 사고가 나도 책임 못집니다........ㅠㅠ 검증의 검증을 하시기를...)

 




#coding: utf-8


import pymysql
import os
import time

now = time.localtime()
todate = "%04d%02d%02d %02d:%02d:%02d" % (now.tm_year, now.tm_mon, now.tm_mday, now.tm_hour, now.tm_min, now.tm_sec)

print "======================================================="
print " Start Time is %s " % todate
print "======================================================="

##### Use Korean Character #######
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
##################################
# Master Info
srv_Master = '마스터IP'

# DBName
srv_DBName = '디비명!!'

# slave1 / slave2 / ...
lst_srv = ['슬레이브IP1','슬레이브IP2',....]
#lst_table = [] -> file 에서 읽어서 사용
#File 형태는 확장자가 ini 이며, 내부에는 한 라인당 하나의 테이블명으로 지정
_filepath = 'INI파일이 저장되어 있는 위치(경로) /home/path...'

def fn_ConnectDB(srv, strDBName) :

db = pymysql.connect(host=srv, port=3306, user='접속유저명', passwd='접속패스워드', db='접속디비명', autocommit=True, use_unicode=True, charset="utf8")

dbCursor = db.cursor()

return dbCursor


def fn_TruncateTable(tmpTable, strDBName) :
for srv in lst_srv: # 2servers
##check Data in Slave DB
curSlave = fn_ConnectDB(srv, srv_DBName)

sqlTruncate = "TRUNCATE TABLE %s.%s" % (strDBName, tmpTable)

print "Nothing Key in table : %s.%s" % (strDBName, tmpTable)
print sqlTruncate

#check curSlave.execute(sqlTruncate)
curSlave.close()


def fn_GetTableInfo(curMaster, strDBName, tmpTable) :
#PK확인
sqlPK = "select COLUMN_NAME, TABLE_NAME, TABLE_SCHEMA " \
" from INFORMATION_SCHEMA.COLUMNS " \
"where " \
" TABLE_NAME='%s' " \
" and TABLE_SCHEMA='%s' " \
" and COLUMN_KEY = 'PRI'" % (tmpTable, strDBName)
curMaster.execute(sqlPK)
#pkResult = curMaster.fetchall()
pkResult = curMaster.fetchall()

#if pkResult == None :
if curMaster.rowcount == 0 :
print 'Nothing Key!!!'
#truncate 진행 -> Key가 없을 경우 truncate 진행
fn_TruncateTable(tmpTable, strDBName)

# 아래처럼 group concat 하면 PK가 무조건 앞에 위치하게
sqlColumns = "SELECT GROUP_CONCAT(COLUMN_NAME) " \
"FROM INFORMATION_SCHEMA.COLUMNS " \
"WHERE " \
" TABLE_NAME = '%s' " \
" AND TABLE_SCHEMA='%s' " % (tmpTable, strDBName)
curMaster.execute(sqlColumns)
# pkResult = curMaster.fetchall()
colResult = curMaster.fetchall()

return(pkResult, colResult)

def fn_WorkingData(tmpMasterData, tmpTable, tmpInfo) :
for srv in lst_srv: # 2servers
##check Data in Slave DB
curSlave = fn_ConnectDB(srv, srv_DBName)
print "==================================================================================="
for tmpData in tmpMasterData :
tmpPK = ''
#curMaster[0]은 무조건 PK값임 / tmpInfo[0] - pk / tmpInfo[1] - all columns
if len(tmpInfo[0]) > 1 :
cntPk = 0
intcolPK = 0
#for colPK in tmpInfo[0][cntPk][0] :
for intcolPK in range(0,len(tmpInfo[0])) :
colPK = tmpInfo[0][intcolPK][0]
if cntPk <> (len(tmpInfo[0])-1) :
tmpPK += colPK + """= \"""" + str(tmpData[cntPk]) + """\" AND """
else :
tmpPK += colPK + """= \"""" + str(tmpData[cntPk]) + """\" """
cntPk += 1
print "Server PK Info : %s" % tmpPK
elif len(tmpInfo[0]) == 1 :
tmpPK = str(tmpInfo[0][0][0]) + """= \"""" + str(tmpData[0]) + """\" """
else :
#PK 가 없을경우 무조건 insert로 취급
tmpPK = " 1=0 "

sqlCheckData = "SELECT 1 FROM %s.%s " \
"WHERE " \
" %s " % (srv_DBName, tmpTable, tmpPK)
curSlave.execute(sqlCheckData)

tmpCheckCnt = curSlave.rowcount

#tmpColName 은 Insert에서 사용하기 위함 ColName 은 Update에서 사용하기 위함
tmpColName = tmpInfo[1][0]
ColName = tmpColName[0].split(',')

if tmpCheckCnt == 0 :
# 데이터 생성을 위한 초기화 작업
j = 0
strData = ""
for i in tmpData:
if j == 0:
strData = """(\"""" + str(i) + """\","""
elif j == (len(tmpData) - 1):
strData += """\"""" + str(i) + """\")"""
else:
strData += """\"""" + str(i) + """\","""
j += 1

#값이 없으니 Insert 진행
sqlInsertSlave = """INSERT INTO %s.%s (%s) VALUES %s""" % (srv_DBName, tmpTable, tmpColName[0], strData)

print ("Insert Query : %s") % sqlInsertSlave

#check 필요 curSlave.execute(sqlInsertSlave)

else :
#값이 있기 때문에 Update 진행
strData = ""
#ColName = tmpInfo[1].split(',')
for i in range(0,len(tmpData)) :
if i == (len(tmpData)-1):
strData += str(ColName[i]) + """ = \"""" + str(tmpData[i]) +"""\""""
else:
strData += str(ColName[i]) + """ = \"""" + str(tmpData[i]) + """\","""

sqlUpdateSlave = """UPDATE %s.%s SET %s WHERE %s""" % (srv_DBName, tmpTable, strData, tmpPK)

print ("Update Query : %s") % sqlUpdateSlave

#Check 필요 curSlave.execute(sqlUpdateSlave)
print "==================================================================================="
def fn_main(curMaster, tmpTable) :

# for tmpTable in lst_table : #60 tables
#tmpInfo[0] = pk column, tmpInfo[1] = All Column(col1,col2...)

tmpInfo = fn_GetTableInfo(curMaster, srv_DBName, tmpTable)

int_Start = 0

#for tCol in range(0, len(tmpInfo[1][0]))

tmpAllCol = tmpInfo[1][0]
#print tmpAllCol -> (u'CODEID,CGID,CG_CID,CODENAME,CODEMEMO,SORTS,USEYN,RDATE',) 이렇게 표현되어 tmpAllCol[0]으로 변환
while True :
#만건씩 잘라서 진행
sqlMasterTable = "SELECT %s " \
"FROM %s.%s " \
"limit %d, 10000" % (tmpAllCol[0], srv_DBName, tmpTable, int_Start)
print ("Limit Query : %s" % sqlMasterTable)

curMaster.execute(sqlMasterTable)

int_Start += 10000
tmpMasterData = curMaster.fetchall()

fn_WorkingData(tmpMasterData, tmpTable, tmpInfo)

if curMaster.rowcount < 10000 :
total_count = int_Start+curMaster.rowcount
print "Last data in Table %s (last select count %d/%d)" % (tmpTable, curMaster.rowcount, total_count)
print "==================================================================================="
break

def fn_read_log(_filename) :
f = open(_filename, 'r')
t_line = f.readlines()
f.close()

f_line = []
#중간에 \n 에 대해서 삭제 후에 진행
for i in t_line:
f_line.append(i.strip())

return f_line

if __name__ == "__main__":

curMaster = fn_ConnectDB(srv_Master, srv_DBName)

filenames = os.listdir(_filepath)
for filename in filenames :
full_filename = os.path.join(_filepath, filename)
ext = os.path.splitext(full_filename)[-1]

if ext == '.ini':
print "Talbes File : " + full_filename
lst_table = fn_read_log(full_filename)
print "Table List : " + str(lst_table)

for aTable in lst_table :
print "Table Name : " + aTable

if aTable <> '' :
fn_main(curMaster, aTable)

now = time.localtime()
finishtime = "%04d%02d%02d %02d:%02d:%02d" % (now.tm_year, now.tm_mon, now.tm_mday, now.tm_hour, now.tm_min, now.tm_sec)

print "======================================================="
print " Start Time is %s " % todate
print " End Time is %s " % finishtime

print "======================================================="


반응형

+ Recent posts