안녕하세요.


AWS 상에서 운영을 하다 보니 Cloudwatch를 통하여 모니터링을 많이하게 되었습니다.

더군다나 EC2 위에 올린 MongoDB 모니터링이 필요하여 고민하던 도중 cloudwatch를 이용하기로 결정하고 python으로 개발 하였습니다.


MongoDB의 모니터링은 여러 방법이 많지만,

저는 mongostat / mongotop 을 이용하기로 하였습니다.


MongoDB에 대해서는 아키텍처와 지난번 Real MongoDB 세미나 참석하여 얻은 배움이 전부라 쉽지 않았습니다.


참고로 모니터링 관련 자료는 아래를 보고 툴 대신에 mongostat / top 을 하기로 한 것입니다.

https://docs.mongodb.com/manual/administration/monitoring/


해당 스크립트는 아래 github 스크립트를 custom 한 것입니다.(저작권 관련해서는 어떻게 되는지 모르겠네요..ㅠ)



Mongotop 의 경우도 개발한 mongostat를 커스텀하여 개발한 것입니다.

수집하는 데이터는 기본적으로 현재는 query/ insert / delete / update 관련이며, mongotop 의 경우 주요 테이블을 list하여 수집하였으며, 수집 데이터는 total time / readlock time / writelock time / queries time 을 수집하는 형태로 하였습니다.

또한 1초 단위로 수집한 시간을 뺀 것을 표시하였습니다. (표시 데이터 = 현재 수집한 시간 상태 - 1초전 수집한 데이터)

1회용 수집이기 때문에 crontab 에 등록해서 주기적으로 수집하면 됩니다.

혹여나 문의 사항 있으시면 댓글 또는 happy8510@gmail.com 으로 연락 주시거나 facebook 통해서 연락 주시면 도움 드릴 수 있는 부분에 대해서 도움 드리겠습니다.(개발자가 아니라서 개발 문의 사항은 한계가 있습니다..ㅠㅠ)

cloudwatch에 올리기 위해서는 미리 aws-config 설정 하시기 바랍니다.




* mongostat.py

#!/usr/bin/env python
import argparse
#hyungi unicode incoding
import commands
import datetime, os, time, sys, random
import boto3
 
from pymongo import MongoClient
from pymongo.errors import OperationFailure
from pymongo.errors import ConnectionFailure
from pymongo.errors import ServerSelectionTimeoutError
 
# Version number
PYTHON_MONGOSTAT_VERSION = "0.0.1"
 
# Not authorized error
MONGO2_NOT_AUTH = "unauthorized"
MONGO3_NOT_AUTH = "not authorized"
 
# Authentication failure message
MONGO3_AUTH_FAILUR = "Authentication failed"
 
cloudwatch = boto3.client('cloudwatch')
 
class MongoInstance():
    'Class for mongodb instance'
 
    def __init__(self, host, port, username, password):
        'Initialize the mongodb instance information and create connection to it.'
 
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.stats_info = {}
        selfurl='mongodb://%s:%s@%s:%s/admin' % (self.username, self.password, self.host,self.port)
 
        # Create connection to mongodb server
        try:
            #url change
            #self.connection = MongoClient(self.host, self.port)
            self.connection = MongoClient(selfurl)
 
        except ConnectionFailure:
            print "Connection error: create connection to mongodb instance failed."
            exit(1)
 
        # Get the mongodb version
        try:
            server_info = self.connection.server_info()
            self.version = server_info['version']
        except ServerSelectionTimeoutError:
            print "Timeout error: get server information of mongodb instance timeout."
 
        return
 
 
    def try_stats_command(self):
        'Try to execute the serverStatus command to see if authentication required.'
 
        # Execute the serverStatus command at first and handle possible exceptions
        errmsg = server_status = server_status2 = {}
        sleep = 1
        admin = self.connection.admin
        try:
            server_status = admin.command({"serverStatus":1})
            time.sleep(sleep)
            server_status2= admin.command({"serverStatus":1})
           # server_status = ','.joint(tmp_server_status)
        except OperationFailure, op_failure:
            errmsg = op_failure.details
        except:
            print "Execution error: get server status of mongodb instance failed."
            exit(1)
 
        #print server_status
        print 'errmsg :' + str(errmsg)
 
        # Check to see if the mongodb server enables authentication
        if errmsg != {}:
            if errmsg['errmsg'].find(MONGO2_NOT_AUTH) == -1 and errmsg['errmsg'].find(MONGO3_NOT_AUTH) == -1:
                print "Execution error: %s." % errmsg['errmsg']
                exit(1)
            else:
                # Authenticate with the given username and password
                try:
                    admin.authenticate(self.username, self.password)
                except OperationFailure, op_failure:
                    print "Execution error: authenticate to mongodb instance failed."
                    exit(1)
                # Try to execute the serverStatus command again
                try:
                    server_status = admin.command({"serverStatus":1})
                    time.sleep(sleep)
                    server_status2= admin.command({"serverStatus":1})
                except OperationFailure, op_failure:
                    print "Execution error: %s." % op_failure.details['errmsg']
                    exit(1)
 
        thetime = datetime.datetime.now().strftime("%d-%m-%Y.%H:%M:%S")
        cmd = "cat /proc/loadavg"
        out = commands.getstatusoutput(cmd)
        load = out[1].split()[0]
 
        pq = 0
        pi = 0
        pu = 0
        pd = 0
        pgm = 0
 
        = 0
        = 0
        = 0
        = 0
        gm= 0
        glativeW = 0
        glativeR = 0
        # hyungi
        #lok = round(float(server_status[u'globalLock'][u'ratio']),2)
        res = int(server_status[u'mem'][u'resident'])
        vir = int(server_status[u'mem'][u'virtual'])
        mapd = int(server_status[u'mem'][u'mapped'])
 
        #past "sleep" ago status
        pq = int(server_status[u'opcounters'][u'query'])
        pi = int(server_status[u'opcounters'][u'insert'])
        pu = int(server_status[u'opcounters'][u'update'])
        pd = int(server_status[u'opcounters'][u'delete'])
        pgm = int(server_status[u'opcounters'][u'getmore'])
        pcon = int(server_status[u'connections'][u'current'])
 
        #now status
        = int(server_status2[u'opcounters'][u'query'])
        = int(server_status2[u'opcounters'][u'insert'])
        = int(server_status2[u'opcounters'][u'update'])
        = int(server_status2[u'opcounters'][u'delete'])
        gm = int(server_status[u'opcounters'][u'getmore'])
        con = int(server_status2[u'connections'][u'current'])
 
        glactiveW = int(server_status[u'globalLock'][u'activeClients'][u'writers'])
        glactiveR = int(server_status[u'globalLock'][u'activeClients'][u'readers'])
 
        template="%12s%22s%12s%12s%12s%12s%12s%12s%12s%12s%12s%12s%12s%12s"
        header=('hostname''time''insert''query''update',  \
                'delete''getmore','active con',  'resident', \
                'virtual','mapped','load''Act Writer''Act Reader')
 
        cloudwatch.put_metric_data(
            MetricData=[
                {
                    'MetricName''MongoDB-Insert Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Insert'
                        },
                    ],
                    'Unit''None',
                    'Value': (i-pi)/sleep
                },
                {
                    'MetricName''MongoDB-Query Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Query'
                        },
                    ],
                    'Unit''None',
                    'Value': (q-pq)/sleep
                },
                {
                    'MetricName''MongoDB-Query Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Delete'
                        },
                    ],
                    'Unit''None',
                    'Value': (d-pd)/sleep
                },
                {
                    'MetricName''MongoDB-Query Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Update'
                        },
                    ],
                    'Unit''None',
                    'Value': (u-pu)/sleep
                },
            ],
            Namespace='LogMetrics'
        )
 
        server_statusstr="hostname, thetime, (i-pi)/sleep, (q-pq)/sleep, (u-pu)/sleep, (d-pd)/sleep, (gm-pgm)/sleep, con, res, vir, mapd, load, glactiveW, glactiveR"
 
        print template % header
        print template % (eval(server_statusstr))
 
def mongostat_arg_check(args):
    'Check the given arguments to make sure they are valid.'
 
    # Make sure the rowcount not negative integer
    if args.rowcount and args.rowcount < 0:
        return False"number of stats line to print can not be negative."
 
    # Make sure both username and password should be given, or neither
    if args.username and not args.password:
        return False"only username given, without password."
    if not args.username and args.password:
        return False"only password given, without username."
 
    # Make sure the hostname is valid
    if args.host:
        hostinfo = args.host.split(':')
        if len(hostinfo) > 2:
            return False"invalid mongodb host, only HOSTNAME of HOSTNAME:PORT acceptable."
        if len(hostinfo) == 2:
            try:
                port = int(hostinfo[1])
                if args.port and args.port != port:
                    return False"ports given by port option and host option not match."
            except ValueError:
                return False"invalid mongodb host, the port part not integer."
 
    return TrueNone
 
def mongostat_start(host, port, username, password, rowcount, noheaders, json):
    'Start monitor the mongodb server status and output stats one time per second.'
 
    # Create mongodb instance and make sure we can execute the serverStatus command correctly
    mongo_instance = MongoInstance(host, port, username, password)
    mongo_instance.try_stats_command()
    # print mongo_instance.host, mongo_instance.port, mongo_instance.version, mongo_instance.storage_engine
 
if __name__ == '__main__':
    # Default configurations
    hostname, username, password    = '호스트정보''유저''비밀번호'
    port, rowcount                  = 포트, 0
    noheaders, json                 = FalseFalse
 
    # Define a argument parser for all possible options
    parser = argparse.ArgumentParser(description="Monitor basic MongoDB server statistics.")
    parser.add_argument("--version"help="print the tool version and exit", action="store_true")
    parser.add_argument("--host"help="mongodb host to connect to")
    parser.add_argument("--port"help="server port (can also use --host HOSTNAME:PORT)"type=int)
    parser.add_argument("-u""--username"help="username for authentication")
    parser.add_argument("-p""--password"help="password for authentication")
    parser.add_argument("--noheaders"help="don't output column names", action="store_true")
    parser.add_argument("-n""--rowcount"help="number of stats lines to print (0 for indefinite)"type=int)
    parser.add_argument("--json"help="output as JSON rather than a formatted table", action="store_true")
 
    # Parse all the given options and make sure they are valid
    arguments = parser.parse_args()
    if arguments.version:
        print "Python mongostat version: %s" % PYTHON_MONGOSTAT_VERSION
        exit(0)
    ok, errmsg = mongostat_arg_check(arguments)
    if ok == False:
        print "Argument error: %s" % errmsg
        exit(1)
 
    # Get the given arguments
    if arguments.host:
        hostinfo = arguments.host.split(':')
        hostname = hostinfo[0]
        if len(hostinfo) == 2:
            port = int(hostinfo[1])
    if arguments.port:
        port = arguments.port
    if arguments.username:
        # We make sure username and password must both given or neither in mongostat_arg_check
        username = arguments.username
        password = arguments.password
    if arguments.rowcount:
        rowcount = arguments.rowcount
    if arguments.noheaders:
        noheaders = True
    if arguments.json:
        json = True
 
    # Start the mongostat
    mongostat_start(hostname, port, username, password, rowcount, noheaders, json)


* mongotop.py


#!/usr/bin/env python
import argparse
#hyungi unicode incoding
import commands
import datetime, os, time, sys, random
import boto3
 
from pymongo import MongoClient
from pymongo.errors import OperationFailure
from pymongo.errors import ConnectionFailure
from pymongo.errors import ServerSelectionTimeoutError
 
# Version number
PYTHON_MONGOSTAT_VERSION = "0.0.1"
 
# Not authorized error
MONGO2_NOT_AUTH = "unauthorized"
MONGO3_NOT_AUTH = "not authorized"
 
# Authentication failure message
MONGO3_AUTH_FAILUR = "Authentication failed"
 
cloudwatch = boto3.client('cloudwatch')
#Hyungi / 5 collections select
lstCollection = ['collectionname1','collectionname2','collectionname3','collectionname4','collectionname5']
 
class MongoInstance():
    'Class for mongodb instance'
 
    def __init__(self, host, port, username, password):
        'Initialize the mongodb instance information and create connection to it.'
 
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.stats_info = {}
        selfurl='mongodb://%s:%s@%s:%s/admin' % (self.username, self.password, self.host,self.port)
 
        # Create connection to mongodb server
        try:
            #hyungi
            #url change
            #self.connection = MongoClient(self.host, self.port)
            self.connection = MongoClient(selfurl)
 
        except ConnectionFailure:
            print "Connection error: create connection to mongodb instance failed."
            exit(1)
 
        # Get the mongodb version
        try:
            server_info = self.connection.server_info()
            self.version = server_info['version']
        except ServerSelectionTimeoutError:
            print "Timeout error: get server information of mongodb instance timeout."
 
        return
 
 
    def try_stats_command(self):
        'Try to execute the serverStatus command to see if authentication required.'
 
        # Execute the serverStatus command at first and handle possible exceptions
        errmsg = server_status = server_status2 = {}
        sleep = 1
        admin = self.connection.admin
        try:
            server_status = admin.command({"top":1})
            time.sleep(sleep)
            server_status2= admin.command({"top":1})
            #server_status2= admin.command({"serverStatus":1})
        except OperationFailure, op_failure:
            errmsg = op_failure.details
        except:
            print "Execution error: get server status of mongodb instance failed."
            exit(1)
 
        #print server_status
        print 'errmsg :' + str(errmsg)
 
        # Check to see if the mongodb server enables authentication
        if errmsg != {}:
            if errmsg['errmsg'].find(MONGO2_NOT_AUTH) == -1 and errmsg['errmsg'].find(MONGO3_NOT_AUTH) == -1:
                print "Execution error: %s." % errmsg['errmsg']
                exit(1)
            else:
                # Authenticate with the given username and password
                try:
                    admin.authenticate(self.username, self.password)
                except OperationFailure, op_failure:
                    print "Execution error: authenticate to mongodb instance failed."
                    exit(1)
                # Try to execute the serverStatus command again
                try:
                    server_status = admin.command({"top":1})
                    time.sleep(sleep)
                    server_status2= admin.command({"top":1})
                except OperationFailure, op_failure:
                    print "Execution error: %s." % op_failure.details['errmsg']
                    exit(1)
 
        thetime = datetime.datetime.now().strftime("%d-%m-%Y.%H:%M:%S")
        cmd = "cat /proc/loadavg"
        out = commands.getstatusoutput(cmd)
        load = out[1].split()[0]
 
        for strCollist in lstCollection :
            tmpName = 'rocketchat.%s' % strCollist
            print tmpName
            ptotaltime = int(server_status[u'totals'][tmpName][u'total'][u'time'])
            totaltime = int(server_status2[u'totals'][tmpName][u'total'][u'time'])
 
            prelock = int(server_status[u'totals'][tmpName][u'readLock'][u'time'])
            relock = int(server_status2[u'totals'][tmpName][u'readLock'][u'time'])
 
            pwrlock = int(server_status[u'totals'][tmpName][u'writeLock'][u'time'])
            wrlock = int(server_status2[u'totals'][tmpName][u'writeLock'][u'time'])
 
            pquery = int(server_status[u'totals'][tmpName][u'queries'][u'time'])
            query = int(server_status2[u'totals'][tmpName][u'queries'][u'time'])
 
            strMetric_total_Name='MongoDB-%s-%s Value' % (strCollist,'total time')
            strMetric_read_lock_Name='MongoDB-%s-%s Value' % (strCollist,'readLock time')
            strMetric_write_lock_Name='MongoDB-%s-%s Value' % (strCollist,'writeLock time')
            strMetric_query_Name='MongoDB-%s-%s Value' % (strCollist,'queries time')
 
            cloudwatch.put_metric_data(
                MetricData=[
                    {
                        'MetricName': strMetric_total_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''total time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (totaltime-ptotaltime)/1000
                    },
                    {
                        'MetricName': strMetric_read_lock_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''readLock time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (relock-prelock)/1000
                    },
                    {
                        'MetricName': strMetric_write_lock_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''writeLock time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (wrlock-pwrlock)/1000
                    },
                    {
                        'MetricName': strMetric_query_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''queries time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (query-pquery)/1000
                    },
                ],
                Namespace='LogMetrics'
            )
 
            template="%12s%12s%12s%12s"
            header=('totime''relock''wrlock''query')
 
            server_statusstr="(totaltime-ptotaltime)/1000, (relock-prelock)/1000, (wrlock-pwrlock)/1000, (query-pquery)/1000"
 
            print template % header
            print template % (eval(server_statusstr))
 
def mongostat_arg_check(args):
    'Check the given arguments to make sure they are valid.'
 
    # Make sure the rowcount not negative integer
    if args.rowcount and args.rowcount < 0:
        return False"number of stats line to print can not be negative."
 
    # Make sure both username and password should be given, or neither
    if args.username and not args.password:
        return False"only username given, without password."
    if not args.username and args.password:
        return False"only password given, without username."
 
    # Make sure the hostname is valid
    if args.host:
        hostinfo = args.host.split(':')
        if len(hostinfo) > 2:
            return False"invalid mongodb host, only HOSTNAME of HOSTNAME:PORT acceptable."
        if len(hostinfo) == 2:
            try:
                port = int(hostinfo[1])
                if args.port and args.port != port:
                    return False"ports given by port option and host option not match."
            except ValueError:
                return False"invalid mongodb host, the port part not integer."
 
    return TrueNone
 
def mongostat_start(host, port, username, password, rowcount, noheaders, json):
    'Start monitor the mongodb server status and output stats one time per second.'
 
    # Create mongodb instance and make sure we can execute the serverStatus command correctly
    mongo_instance = MongoInstance(host, port, username, password)
    mongo_instance.try_stats_command()
    # print mongo_instance.host, mongo_instance.port, mongo_instance.version, mongo_instance.storage_engine
 
if __name__ == '__main__':
    # Default configurations
    hostname, username, password    = '접속ip''유저명''비밀번호'
    port, rowcount                  = 포트, 0
    noheaders, json                 = FalseFalse
 
    # Define a argument parser for all possible options
    parser = argparse.ArgumentParser(description="Monitor basic MongoDB server statistics.")
    parser.add_argument("--version"help="print the tool version and exit", action="store_true")
    parser.add_argument("--host"help="mongodb host to connect to")
    parser.add_argument("--port"help="server port (can also use --host HOSTNAME:PORT)"type=int)
    parser.add_argument("-u""--username"help="username for authentication")
    parser.add_argument("-p""--password"help="password for authentication")
    parser.add_argument("--noheaders"help="don't output column names", action="store_true")
    parser.add_argument("-n""--rowcount"help="number of stats lines to print (0 for indefinite)"type=int)
    parser.add_argument("--json"help="output as JSON rather than a formatted table", action="store_true")
 
    # Parse all the given options and make sure they are valid
    arguments = parser.parse_args()
    if arguments.version:
        print "Python mongostat version: %s" % PYTHON_MONGOSTAT_VERSION
        exit(0)
    ok, errmsg = mongostat_arg_check(arguments)
    if ok == False:
        print "Argument error: %s" % errmsg
        exit(1)
 
    # Get the given arguments
    if arguments.host:
        hostinfo = arguments.host.split(':')
        hostname = hostinfo[0]
        if len(hostinfo) == 2:
            port = int(hostinfo[1])
    if arguments.port:
        port = arguments.port
    if arguments.username:
        # We make sure username and password must both given or neither in mongostat_arg_check
        username = arguments.username
        password = arguments.password
    if arguments.rowcount:
        rowcount = arguments.rowcount
    if arguments.noheaders:
        noheaders = True
    if arguments.json:
        json = True
 
    # Start the mongostat
    mongostat_start(hostname, port, username, password, rowcount, noheaders, json)

반응형




해당 내용은 [Amazon Redshift 아키텍처 및 모범사례::김민성::AWS Summit Seoul 2018] 에서 발표한 동영상을 보며 정리한 내용입니다.
자세한 내용은 아래 링크 가셔서 동영상과 ppt 를 보시면 더 쉬울듯 하며,
저는 공부의 목적으로 보며 정리한 내용입니다.

발표내용
  • 리더 노드는 각 slice 로 실행계획에 맞춰 데이터 수집을 하라고 보내고 마지막 slice 가 끝날때까지 기다렸다가 끝난 데이터를 수집하여 제공
  • slice 는 redshift 내 각 클러스터의 cpu 즉 작업을 실질적으로 하는 모듈로 이해하면 될듯

Distkey 형태
  1. 분산키 : key 를 가지고 각 slice 로 저장(slice 는 cpu당 개수와 동일한 개수를 가지고 있음) - group by 할 때 / key를 가지고 join 하는 경우 좋음
    - 각 노드에 균등하게 데이터 분산이 필요
    - 가장 큰 테이블에서 조인에 활용되는 컬럼
    - group by 조건에서 사용되는 컬럼
    - 높은 Cardinality를 보유한 컬럼
    ★ 아래와 같은 현상이 발생하면 slice1번이 끝날때까지 기다려야 하기에 데이터 분산이 불균형이 생기면 이에 대한 기다림이 길어지기에 문제가 발생할 수 있음을 인지하고 있어야 한다
    - Equality filter에서 활용되는 컬럼 (Equal 를 사용하면 하나의 slice로 고정이 되어 버리기 때문에 좋지 않음)
    - 데이터의 몰림을 유발하는 경우
    ★ 분산키는 어떻게 사용 해야 되는가!
    - 가장 큰 Dimension table의 pk와 fact table의 foreign키를 disk key 로 선택
    - 나머지 조인이 되는 조건의 Dimension table(작은 테이블)은 distribution all 를 검토 -> 모든 node들에 해당 테이블에 대한 데이터가 존재하는 형태(경우에 따라 다르지만 보통 300만건 이하의 경우 데이터 분산 타입을 all 로 선택해도 무방)
    이렇게 하면 성능적으로 가장 좋은 형태로 제공
    Ⅰ 가장 자주 활용되는 쿼리 확인(https://github.com/awslabs/amazon-redshift-utils/tree/master/src/AdminScripts/top-queries.sql 활용)
    Ⅱ 조인 또는 group by 에 활용되며 고른 데이터 분포 및 높은 Cardinality 보유한 컬럼을 선택
    Ⅲ 그게 아니면-위와 같이 최고의 방법을 찾기 힘들다면.. (Fact table은 Even으로 Dimension Table-기준 테이블 은 All로)
  2. 전체 : 상품 테이블과 같이 개수는 적으나 모든 join이나 그런것에 필요한 경우 각 slice 에 모두 저장 (데이터 storge 가 증가)
  3. 균등 : round robin (Even)
  • explain 하였을 경우 redshift 에서 네트워크를 통한 데이터 이동 관련 내용 (데이터 이동은 가장 비용이 비싼 operation)
DS_DIST*
INFO (아래로 갈수록 퍼포먼스가 좋지 않음)
DS_DIST_NONE
Preferred, no data transfer between nodes
DS_DIST_ALL_NONE
One table has DISTSTYLE ALL, no data transfer between nodes
DS_DIST_ALL_INNER
Inner table is being sent to the Single Node
DS_DIST_INNER
Inner table is being redistributed in an outer join
DS_DIST_OUTER
Outer table is being redistributed in an outer join
DS_BCAST_INNER
Inner table is being broadcast to all nodes.
DS_BCAST_BOTH
Both tables are being broadcast to all nodex.

  • 리더 노드가 각 slice 로 
성능 향상
1. 보다 높은 하드웨어 스펙
2. 병렬화의 달성
3. 네트워크를 통한 트레픽의 최소화


하지만 결국은 최고의 성능을 내기 위해서는 "스토리지 I/O의 최소화"

  • 모든 블록을 1Mb형태로 나눠서 저장하는데..
  • Zone Map : 각 Block 에 대한 최대값 및 최소값에 대한 정보 저장 > 그렇기 때문에 정렬 키를 이용하면 필요한 Block 만 읽기 때문에 정렬 키가 중요


정렬 키 (sortkey)
  • 정렬 키가 지정된 테이블의 경우 데이터 로드 시 정렬 순서대로 디스크에 저장
  • 정렬 키의 종류
    • 정렬 키 : 테이블의 단일 컬럼의 값 기준으로 데이터 정렬하여 저장
    • 복합 정렬키(Compound Sort Key) :
      • 최대 6개의 컬럼까지 활용하여 정렬 키를 지정
      • 선언한 순서에 따라 순차적으로 정렬되어 저장 -> 가장 먼저 선언한 키에 대한 높은 가중치
      • 조인 및 Group by, order by 효과적이며, Merge 조인에 효과
    • 인터리브 정렬키 (Interleaved Sort Key)
      • 최대 8개의 컬럼까지 활용하여 정렬키를 지정
      • 정렬 키에서 각 열의 하위 집합에 똑같은 가중치 부여 -> 복합 정렬키는 앞의 데이터가 가중치가 높다면, 인터리브 정렬키는 뒤의 키에 대해서도 동일하게 가중치를 부여(데이터의 양이 증가??)
      • AD-Hoc 형태의 쿼리에서 높은 성능을 제공
ex) Product 테이블에 sort key로 type, color, size를 지정

복합정렬키
인터리브정렬키
where type = 's' and color = 'B' and size = 'xl'
정렬을 기반한 가장 우수한 성능
순서 등에 관계 없이 일관성 있는 성능 제공
where type = 's' and color = 'B'
약간 효과적
where type = 's' and size = 'xl'
제한적으로 성능에 기여

쿼리 분석

파일 분산 및 병렬 로딩
  • Slice 단위로 작업을 처리를 하지만, Slice는 파일 하나당 하나의 slice가 할당되기에 반드시 파일을 여러개로 나눈 후 메타데이터를 지정하여 작업 진행을 권장
아래 표 참조 (노드당 100Mbyte sec)
매니페스트 파일
Copy 커맨드를 통한 실행
{
   "entries":[
      {"url":"s3://mybucket-alpha/client.aa", "mandatory":true},
      {"url":"s3://mybucket-alpha/client.ab", "mandatory":true},
      {"url":"s3://mybucket-alpha/client.ac", "mandatory":true},
      {"url":"s3://mybucket-alpha/client.ad", "mandatory":true}
   ]
}
copy customer
from 's3://mybucket/cust.manifest'
iam_role 'arn:aws:iam::111111111:role/MyRedshiftRole'
manifest;

압축
  • 압축을 기반으로 디스크 사용량 및 I/O 더욱 줄일 수 있음
  • Copy 커멘트 사용 시 10만 행을 우선 적제 후 압축 옵션 평가 (COMPUDATE on)
  • "Analyze Compression"  커멘트를 통해 현재 압축(인코딩)에 대한 평가 후 "alter table" 로 변경
  • 2~4배의 효과

기타
  • Vacuum
    • 변경되거나 삭제된 블럭에 대한 정리(플래그만 남겨놓고 디스크에는 남아져 있음) - vocuum을 이용하여 정리를 해줌
    • 정렬키에 맞춰 데이터 정렬(재정렬을 해주기 위해 vocuum을 사용) , rename을 해도 됨
    • 옵션 : full, sort only, delete only, reindex (인터리브 정렬키)
  • 통계 업데이트
    • 통계 업데이트 자동 실행 조건
      • create table as select 
      • select into
      • copy (with compupdate on)
    • 통계 업데이트를 실행해줘야 하는 경우
      • 5% 이상 데이터 변경된 경우
      • 외례키를 추가하거나 삭제한 경우
      • Vacuum 실행 후 
  • 제약 조건 (not null, primary, foreign, unique)은 쿼리 옵티마이저가 활용 (plan  만들 때 참고하기 때문)


중요
  1. 노드 크기의 선택 조건
  2. 적절한 분산키의 선택 방안
  3. 적절한 정렬키의 선택 방안
  4. 데이터 적제 방안
  5. 레드쉬프트 스펙트럼에서의 파티셔닝 및 파일 포멧

Redshift Spectrum (redshift와 별개!!!)
  • 아래는 참고로 하고 제대로 공부는 후에 하는 형태로 하자. redshift와 별개로 필요시 알고 있으면 활용방안이 있을 듯
  • s3의 수천개 이상의 노드들을 활용하여 쿼리 실행


수천개의 s3 내의 csv 파일을 읽어 들여 조인 진행

  • 파티셔닝 및 Columnar 파일 포맷 (ORC, Parquet) 사용
    • 파티셔닝 컬럼의 조건
      • 필터 및 조인의 조건이 되는 컬럼
      • 비지니스 유닛
      • 비지니스 그룹
      • 날짜 및 시간
  • 파일의 갯수는 Redsfhit 의 Slice의 수량 이상
  • 파일의 크기는 64 mb 이상을 권고
  • 각 파일은 크기를 권고











반응형

제목이 좀 자극적일 수 있으나, 

말 그데로 AWS DMS를 이용하여 mysql -> redshift 로 적재하는 가운데 테이블 카운트가 현저히 차이나는데도 불구하고 완료 되었다고 나오는 현상에 대해서 해결한 방안을 공유해 봅니다.


작업 방법

- AWS 의 DMS를 이용하여 Mysql 5.6의 데이터를 Redshift 로 적재

- 물론 CDC 도 진행하도록 설정



증상

  • 특정 DB의 테이블들을 DMS 하는 과정에서 전체 데이터의 10%도 되지 않는 데이터만 적재 후 적재가 완료 되었다고 status 상에서 확인
  • 해당 테이블들을 버전이 다른 replication instance 를 이용하여 진행 시 특정 버전(2.4.3)에서만 문제 없이 적재 및 CDC가 되는 현상을 확인
    - 상위 버전인 2.4.4 / 3.1.2 에서는 증상과 같은 현상이 발생

 

확인사항

  • Replication instance Engine 별로 테스트 진행
  • 하나의 테이블만 가지고 테스트 진행

 

조치사항

  1. Replication instance 를 2.4.3으로 지정하여 해당 instance로 복제 진행
  2. 상위 버전의 Replication instance를 사용하는 경우 Source Endpoint에 Resumefetchforxrows=0 를 지정한 후 진행하여 가능


√ Resumefetchforxrows 

  • MySQL 테이블 내 데이터가 많을 경우 마지막 데이터까지 가져온 이후에도 정해진 수 만큼 Fetching을 계속하도록 하는 옵션
    ex) Resumefetchforxrows = 1500 이면, 1500 row까지 fetching을 시도
  • Resumefetchforxrows=0으로 설정함으로써 DMS가 테이블내의 데이터를 의도적으로 끝까지 Fetching 시도 하도록 설정
  • Replication Engine 2.4.4 버전에서부터 Resumefetchforxrows 옵션을 값을 고정하면서 문제가 발생한 것으로 예상(AWS 답변)
    -> 2,4,4버전 이상을 사용하는 경우 Resumefetchforxrows 값을 0으로 고정해서 사용한 것을 권장



해결할 수 있었던 건 다행히 회사에서 aws와 계약을 맺은 상태라 AWS 엔지니어를 통해서 해결책을 받은 것이며,

모든 테스트들을 AWS 엔지니어가 재현이 불가하여 직접 다 테스트한 결과입니다.


부디 많은 분들께서 저와 같은 삽질을 피하시기 바랍니다.


또한 수정이 필요한 내용이 있다면 과감히 알려주시면 감사하겠습니다.

반응형

+ Recent posts