개발은 어렵지는 않지만 시간이 오래 걸린다.

특히나 많은 검증을 위해 테스트를 해야되고 이에 대해 오류가 생기거나, 다양한 경우에 대한 대비가 필요하다..


아래 스크립트는 Replication 도중에 Sync가 안맞아 잘못된 데이터나 데이터가 없는 경우에 대해서 확인하여 추가하여 준다.

(오히려 Slave에서 데이가 있으며...Master에서 없는 경우는 해당 사항이 없다.....ㅠ)


로직은 간단하다.


1. file(ini파일로 지정) 를 읽어 해당 파일 내의 테이블 명을 읽는다.(ini 파일 작성 형ㅌㅐ : 테이블1 (줄바꿈) 테이블2 (줄바꿈) ...으로 구분한다.) 

2. 테이블 정보를 가지고  Master에 접속하여 해당 테이블에 대한 PK를 확인 및 테이블 컬럼들을 받아 온다.

3. 다시 해당 테이블 정보를 가지고 full scan한다.

4. Slave에 접속하여 마스터에 대한 테이블 정보( PK)를 가지고 데이터 여부를 확인한다.

5. 해당 데이터가 있으면 Update를 없으면 Insert를 한다.


스크립트 내용은 아래와 같다...스크립트 개발 시 참고하시기를 ....ㅠ

(해당 스크립트에 대한 검증없이 적용 하셨다가 사고가 나도 책임 못집니다........ㅠㅠ 검증의 검증을 하시기를...)

 




#coding: utf-8


import pymysql
import os
import time

now = time.localtime()
todate = "%04d%02d%02d %02d:%02d:%02d" % (now.tm_year, now.tm_mon, now.tm_mday, now.tm_hour, now.tm_min, now.tm_sec)

print "======================================================="
print " Start Time is %s " % todate
print "======================================================="

##### Use Korean Character #######
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
##################################
# Master Info
srv_Master = '마스터IP'

# DBName
srv_DBName = '디비명!!'

# slave1 / slave2 / ...
lst_srv = ['슬레이브IP1','슬레이브IP2',....]
#lst_table = [] -> file 에서 읽어서 사용
#File 형태는 확장자가 ini 이며, 내부에는 한 라인당 하나의 테이블명으로 지정
_filepath = 'INI파일이 저장되어 있는 위치(경로) /home/path...'

def fn_ConnectDB(srv, strDBName) :

db = pymysql.connect(host=srv, port=3306, user='접속유저명', passwd='접속패스워드', db='접속디비명', autocommit=True, use_unicode=True, charset="utf8")

dbCursor = db.cursor()

return dbCursor


def fn_TruncateTable(tmpTable, strDBName) :
for srv in lst_srv: # 2servers
##check Data in Slave DB
curSlave = fn_ConnectDB(srv, srv_DBName)

sqlTruncate = "TRUNCATE TABLE %s.%s" % (strDBName, tmpTable)

print "Nothing Key in table : %s.%s" % (strDBName, tmpTable)
print sqlTruncate

#check curSlave.execute(sqlTruncate)
curSlave.close()


def fn_GetTableInfo(curMaster, strDBName, tmpTable) :
#PK확인
sqlPK = "select COLUMN_NAME, TABLE_NAME, TABLE_SCHEMA " \
" from INFORMATION_SCHEMA.COLUMNS " \
"where " \
" TABLE_NAME='%s' " \
" and TABLE_SCHEMA='%s' " \
" and COLUMN_KEY = 'PRI'" % (tmpTable, strDBName)
curMaster.execute(sqlPK)
#pkResult = curMaster.fetchall()
pkResult = curMaster.fetchall()

#if pkResult == None :
if curMaster.rowcount == 0 :
print 'Nothing Key!!!'
#truncate 진행 -> Key가 없을 경우 truncate 진행
fn_TruncateTable(tmpTable, strDBName)

# 아래처럼 group concat 하면 PK가 무조건 앞에 위치하게
sqlColumns = "SELECT GROUP_CONCAT(COLUMN_NAME) " \
"FROM INFORMATION_SCHEMA.COLUMNS " \
"WHERE " \
" TABLE_NAME = '%s' " \
" AND TABLE_SCHEMA='%s' " % (tmpTable, strDBName)
curMaster.execute(sqlColumns)
# pkResult = curMaster.fetchall()
colResult = curMaster.fetchall()

return(pkResult, colResult)

def fn_WorkingData(tmpMasterData, tmpTable, tmpInfo) :
for srv in lst_srv: # 2servers
##check Data in Slave DB
curSlave = fn_ConnectDB(srv, srv_DBName)
print "==================================================================================="
for tmpData in tmpMasterData :
tmpPK = ''
#curMaster[0]은 무조건 PK값임 / tmpInfo[0] - pk / tmpInfo[1] - all columns
if len(tmpInfo[0]) > 1 :
cntPk = 0
intcolPK = 0
#for colPK in tmpInfo[0][cntPk][0] :
for intcolPK in range(0,len(tmpInfo[0])) :
colPK = tmpInfo[0][intcolPK][0]
if cntPk <> (len(tmpInfo[0])-1) :
tmpPK += colPK + """= \"""" + str(tmpData[cntPk]) + """\" AND """
else :
tmpPK += colPK + """= \"""" + str(tmpData[cntPk]) + """\" """
cntPk += 1
print "Server PK Info : %s" % tmpPK
elif len(tmpInfo[0]) == 1 :
tmpPK = str(tmpInfo[0][0][0]) + """= \"""" + str(tmpData[0]) + """\" """
else :
#PK 가 없을경우 무조건 insert로 취급
tmpPK = " 1=0 "

sqlCheckData = "SELECT 1 FROM %s.%s " \
"WHERE " \
" %s " % (srv_DBName, tmpTable, tmpPK)
curSlave.execute(sqlCheckData)

tmpCheckCnt = curSlave.rowcount

#tmpColName 은 Insert에서 사용하기 위함 ColName 은 Update에서 사용하기 위함
tmpColName = tmpInfo[1][0]
ColName = tmpColName[0].split(',')

if tmpCheckCnt == 0 :
# 데이터 생성을 위한 초기화 작업
j = 0
strData = ""
for i in tmpData:
if j == 0:
strData = """(\"""" + str(i) + """\","""
elif j == (len(tmpData) - 1):
strData += """\"""" + str(i) + """\")"""
else:
strData += """\"""" + str(i) + """\","""
j += 1

#값이 없으니 Insert 진행
sqlInsertSlave = """INSERT INTO %s.%s (%s) VALUES %s""" % (srv_DBName, tmpTable, tmpColName[0], strData)

print ("Insert Query : %s") % sqlInsertSlave

#check 필요 curSlave.execute(sqlInsertSlave)

else :
#값이 있기 때문에 Update 진행
strData = ""
#ColName = tmpInfo[1].split(',')
for i in range(0,len(tmpData)) :
if i == (len(tmpData)-1):
strData += str(ColName[i]) + """ = \"""" + str(tmpData[i]) +"""\""""
else:
strData += str(ColName[i]) + """ = \"""" + str(tmpData[i]) + """\","""

sqlUpdateSlave = """UPDATE %s.%s SET %s WHERE %s""" % (srv_DBName, tmpTable, strData, tmpPK)

print ("Update Query : %s") % sqlUpdateSlave

#Check 필요 curSlave.execute(sqlUpdateSlave)
print "==================================================================================="
def fn_main(curMaster, tmpTable) :

# for tmpTable in lst_table : #60 tables
#tmpInfo[0] = pk column, tmpInfo[1] = All Column(col1,col2...)

tmpInfo = fn_GetTableInfo(curMaster, srv_DBName, tmpTable)

int_Start = 0

#for tCol in range(0, len(tmpInfo[1][0]))

tmpAllCol = tmpInfo[1][0]
#print tmpAllCol -> (u'CODEID,CGID,CG_CID,CODENAME,CODEMEMO,SORTS,USEYN,RDATE',) 이렇게 표현되어 tmpAllCol[0]으로 변환
while True :
#만건씩 잘라서 진행
sqlMasterTable = "SELECT %s " \
"FROM %s.%s " \
"limit %d, 10000" % (tmpAllCol[0], srv_DBName, tmpTable, int_Start)
print ("Limit Query : %s" % sqlMasterTable)

curMaster.execute(sqlMasterTable)

int_Start += 10000
tmpMasterData = curMaster.fetchall()

fn_WorkingData(tmpMasterData, tmpTable, tmpInfo)

if curMaster.rowcount < 10000 :
total_count = int_Start+curMaster.rowcount
print "Last data in Table %s (last select count %d/%d)" % (tmpTable, curMaster.rowcount, total_count)
print "==================================================================================="
break

def fn_read_log(_filename) :
f = open(_filename, 'r')
t_line = f.readlines()
f.close()

f_line = []
#중간에 \n 에 대해서 삭제 후에 진행
for i in t_line:
f_line.append(i.strip())

return f_line

if __name__ == "__main__":

curMaster = fn_ConnectDB(srv_Master, srv_DBName)

filenames = os.listdir(_filepath)
for filename in filenames :
full_filename = os.path.join(_filepath, filename)
ext = os.path.splitext(full_filename)[-1]

if ext == '.ini':
print "Talbes File : " + full_filename
lst_table = fn_read_log(full_filename)
print "Table List : " + str(lst_table)

for aTable in lst_table :
print "Table Name : " + aTable

if aTable <> '' :
fn_main(curMaster, aTable)

now = time.localtime()
finishtime = "%04d%02d%02d %02d:%02d:%02d" % (now.tm_year, now.tm_mon, now.tm_mday, now.tm_hour, now.tm_min, now.tm_sec)

print "======================================================="
print " Start Time is %s " % todate
print " End Time is %s " % finishtime

print "======================================================="


반응형


설명을 제대로 못하는 지식은 내가 제대로 이해하지 못했거나 모르는 지식이다.



1. INNER JOIN
  - 두 테이블간의 조인 조건을 만족하는 ROW만 리턴
  - 일반적인 조인 으로 이해하면 됨
  - 아래 코딩 내용을 보게 되면 이해가 쉽게 될 것 같다.
FOR ( record1 IN TABLE1) { // 드라이빙 테이블 (join을 주도하는 테이블)
  FOR (reocrd2 IN TABLE2) { // 드리븐 테이블 (join에서 끌려가는 테이블)
  IF ( record1.join_column == record2.join_column) {
  join_record_found(record1.*, record2.*);
  } else {
  join_record_notfound( ); //outer join과 달리 만약에 매칭되는게 없다면 더이상 찾지 않는다
  }
  }
}

 
2. OUTER JOIN
  - LEFT/RIGHT/FULL 형태의 OUTER JOIN 이 있음
  - LEFT OUTER JOIN의 경우 조인문 왼쪽에 있는 테이블의 모든 결과를 가져온 후 오른쪽 테이블의 데이터를 매칭하며, 매칭되는 데이터가 없는 경우 NULL 매칭
  - RIGHT OUTER JOIN은 LEFT 조인의 반대  - FULL OUTER JOIN은 일반적으로 사용할 일이 없으며, DB에 따라 지원하지 않음

FOR (record1 IN TABLE1) {
FOr (record2 IN TABLE2) {
IF ( record1.join_column == record2.join_column) {
join_record_found(record1.*, record2.*);
} else {
join_record_found(record1.*, NULL);// Inner Join과 다른 부분. 매칭이 안된다고 하더라도 드라이빙 테이블 데이터(매칭안된 데이터) 와 null로 같이 매칭
}
}
}

출처: http://dimdim.tistory.com/entry/SQL-JOIN-정리-Inner-Join-Outer-Join [딤딤이의 블로그]

출처 : Real Maria DB [위키북스]

반응형

마리아 DB에서 Galera Cluster를 설치하면서 개념이 필요해서 검색해서 공유해 본다.

추가적으로 정리가 되면 MariaDB에서 Galera Cluster 설치 하는 부분을 공유할 예정이다.


[원본] http://bcho.tistory.com/1062


MySQL Galera Replication


조대협 (http://bcho.tistory.com)


RDBMS 오픈소스 중에서 단연 가장 많이 사용되는 것은 MySQL인데, 근래에 웹 스케일이 커지면서, 단일 인스턴스로만 서비스가 불가능한 용량까지 가게 되서, 이 MySQL에 대한 클러스터링 스케일링에 대한 이슈가 많아졌다. 이에 Tungsten, MySQL Replication, NDB, Galera 등 다양한 클러스터링 방법이 있는데, 그중에서 갈레라 클러스터링 (Galera Clustering)에 대해서 간단하게 정리하고자 한다.


MySQL Replication


갈레라 클러스터링을 이해하기에 앞서서 먼저 가장 널리(그리고 쉽게) 사용되는 MySQL Replication 방식에 대해서 알아보자. MySQL Replication 방식은 Master/Slave 방식의 구성이 일반적이며, 이 구성의 경우 특정 노드는 쓰기를 담당하고 나머지 노드는 읽기를 담당하는 형태로 구성이 된다.

통상적으로 데이타 베이스 트랜젝션의 60~80%가 읽기 트렌젝션이기 때문에, 이러한 구조를 사용하더라도 충분히 성능의 향상을 기대할 수 있다.


다음 그림은 MySQL Replication 의 간단한 구조도 이다.

 




먼저 좌측의 Master Node에 쓰기 트렌젝션이 수행되면 Master node는 데이타를 저장하고, 트렌젝션에 대한 로그를 내부적으로 BIN LOG라는 파일에 저장한다. (시간 순서대로 수행한 업데이트 트렌젝션이 기록되어 있다.)


Slave Node에서는 이 BIN LOG를 복사해온다. 이 복사 작업을 IO Thread라는 스레드가 수행하는데, 이렇게 읽어온 내용은 Replay Log라는 파일에 기록이 된다. 이렇게 기록된 내용은 SQL Thread라는 스레드가 읽어서, 하나씩 수행을 해서  MySQL 데이타 파일에 기록을 한다.


쉽게 설명하면, insert 쿼리를 master node에서 실행했으면 그 쿼리가 master node의 bin log에 기록이 되고, 이 내용은 slave node에 복사가 된후에, slave node에서 같은 쿼리가 수행이 되서 복제가 반영되는 방식이다.


방식이 단순해서 신뢰도가 높은 반면, 단점으로는

  • 읽기와 쓰기 노드를 분리해야 하며,
  • 데이타 복제가 동기 방식이 아닌 비동기 방식으로 적용된다. 바꿔서 말하면, master node에 적용한 데이타 변경사항이 slave에 반영될때까지 일정 시간이 걸린다는 것으로, master와 slave node간의 순간적인 데이타 불일치성이 발생할 수 있다는 것이다.


Galera cluster


Galera cluster는 http://galeracluster.com/ 에서 제공되는 오픈소스로, 동기방식의 복제 구조를 사용하고 있다.

간단하게 구조를 살펴보자 아래 그림을 보면 




각각의 노드가 있을때, 아무 노드에나 쓰기나 업데이트가 발생하면, 모든 노드에 데이타를 복사를 완료하고 나서, 업데이트 내용이 파일에 저장된다. 아키텍쳐상의 구조를 보면, 위의 그림과 같이 각 MySQL 노드에는 WSREP 라는 모듈이 있다. 이 모듈은 데이타베이스에 복제를 위한 범용 모듈로 여기에 마치 드라이버처럼 Galera replication module을 연결해주면 데이타 변경이 있을때 마다, 이 Garela replication module이 다른 mysql node로 데이타를 복제한다.


약간 더 구체적인 구조를 살펴보면 노드간의 데이타 복제는 다음과 같은 흐름을 따르게 된다. 





노드에 트랜젝션이 발생하고 COMMIT이 실행이되면, 디스크에 내용을 쓰기 전에 다른 노드로 복제를 요청하고 다른 노드에 복제 요청이 접수되었을때, 해당 노드의 디스크에 실제로 데이타를 쓰게 된다.


이러한 특성으로, 전체 노드에 데이타가 항상 일관성있게 저장되고, 모든 노드가 마스터 노드로 작동을 하며, 특정 노드가 장애가 나더라도 서비스에 크게 문제가 없다. 

(MySQL Replication의 경우 마스터 노드가 장애가 나면 슬레이브 노드중 하나를 마스터로 승격을 해야하는 등 다소 운영 프로세스가 갈레라에 비해서는 복잡하다.)


상당히 좋아 보이는 구조이기는 한데, 반대로 가지는 단점도 만만하지 않다.


성능

먼저 성능적인 부분에서, 데이타를 디스크에 저장하기 전에, 다른 모든 노드에 데이타 복제 요청을 해야 하기 때문에, 비동기 방식의 MySQL Replication에 비해서, 쓰기 성능이 떨어지는 것으로 보인다.


장애 전파

이렇게 다른 노드에 복제 요청을 하는 클러스터 구조의 경우, 장애를 다른 노드로 전파 시킬 가능성이 높은데, 예전에 대표적인 웹 애플리케이션 서버인 웹로직의 경우 유사한 세션 클러스터링 구조를 사용했다. 

이 경우 복제를 요청했을때 복제 요청을 받은 노드가 장애 상황 특히 느려지거나 일시적으로 멈췄으때, 복제를 요청한 노드가 응답을 받지 못하고 대기하게 되고, 이 대기한 노드에 다른 노드가 복제를 또 요청하면, 같은 이유로 복제가 지연 되면서 클러스터를 타고 장애가 전파되는 현상을 야기하게 된다.

그래서 갈레라 클러스터의 경우 LOCK문제가 생기거나 슬로우 쿼리들이 많이 발생할때 장애를 전파시킬 수 있는 잠재적인 문제를 가지고 있다.


스케일링의 한계

갈레라가 모든 노드에 데이타를 복제하고 트렌젝션을 끝내는 만큼, 전체적인 노드수가 많아지게 되면, 복제를 하는데 그만큼 시간이 많이 걸림에 따라, 하나의 클러스터에서 유지할 수 있는 노드의 수가 한계가 있어져서, 횡적 스케일링의 한계가 올 수 있다. 


이런 단점에도 불구하고, 모든 노드에 읽기 쓰기가 가능한 멀티 마스터 구조와 모든 노드의 데이타를 일관적으로 유지 시켜준다는 장점과 쉬운 설정 방법으로 인하여 MySQL 클러스터를 구성한다면 한번쯤 검토해봐야 하는 솔루션이 아닌가 한다.


(아쉽게도 국내 사례는 그다지 많지 않은듯...)


몇가지 참고 사항

  • 갈레라 클러스터는 서로 다른 MySQL 버전간에도 클러스터로 묶을 수 있다.
  • 갈레라 클러스터에서 노드가 떨어졌다가 붙으면 일정 부분은 GTID (Global Transaction ID)를 이용하여, 데이타가 복제 되지 않은 델타 부분만 복제가 가능하지만, 시차가 오래되 버리면 풀 백업본을 가져다 엎어야 한다. (풀백업은 복구는 시간이 많이 걸림)


반응형

+ Recent posts