올해 1월부터 진행 한 MongoDB Study에 대해서 정리해 봅니다.

함께하는 멤버들의 블로그 입니다.

 

루이스(본인) : hyunki1019.tistory.com/

라라님 : rastalion.me/

닷닷님 : 닷닷이의 DB공간 : 네이버 블로그 (naver.com)

재혁님 : marsettler.com/

 

그 외 분들은 다른 자료로 공유를 하시기 때문에 그 분들의 내용은 추후에 업로드 해보도록 하겠습니다.

주로 4.x (최신 버전) 으로 진행 하고 있습니다.

반응형

[원본] 아래 블로그와 동일한 작업이며, 단지 소스 오픈 여부 입니다. 아래 작성자님 무한 감사 드립니다.

https://brunch.co.kr/@alden/53

  • 더불어 문서에 앞서 ST Unitas 황선규님에게 감사 인사 드립니다. 대부분의 코딩 수정을 도와주셨습니다.
    (거의 다 작성해 주셨습니다 ㅎㅎ)

목적

  • Cloudwatch 상에서 확인할 수 있는 그래프를 Slack에서 이미지로 확인
  • 가독성이 높아져서 현상 확인하기 쉬움
  • 그래프를 확인하기 위해 AWS에 접속하는 불편 감수

 

동작방식

  • Lambda 에서 미리 셋팅한 Cloudwatch 정보를 이용하여(json) 현재 상태를 이미지화 하여 Slack으로 전송
  • Python 3.7로 개발

 

필요한 모듈

  • boto3
  • certifi
  • chardet
  • idna
  • json
  • requests

 

사전지식

  • Lambda 를 이용하기 위해서는 python에서 사용하는 모든 모듈을 파일로 보유해야함(모두 하나의 zip으로 압축해서 업로드)
    (Lambda에 대한 사전 지식이 있으면 괜찮지만, 처음 접하기에 쉽지 않음 - pip로 local로 필요한 모듈을 설치 후 해당 모듈의 폴더 전체가 필요)
    ex) C:\Users\접속한계정\AppData\Local\Programs\Python\Python37\Lib\site-packages 에 설치한 모듈 존재

  • Lambda 테스트를 위해서는 핸들러의 명칭에 존재하는 함수가 반드시 명시되어 있어야 하며, 함수의 Default 핸들러는 수정하면 안됨

  • 권한 : boto3.client('cloudwatch').get_metric_widget_image 을 사용하기 위해서는 Cloudwatch에 접근할 수 있는 권한과 함께 "cloudwatch:GetMetricWidgetImage" 이라는 권한도 필요
    Lambda를 생성하는 계정에 모든 권한이 존재 하더라도, Lambda 하단의 역할에서 존재 여부를 체크 필요
    IAM 권한에서 있는지 여부 체크 필요 (아래는 관련 모든 권한을 부여함)

    {

        "Version": "2012-10-17",

        "Statement": [

            {

                "Sid": "VisualEditor0",

                "Effect": "Allow",

                "Action": [

                    "cloudwatch:DescribeAlarmHistory",

                    "cloudwatch:GetDashboard",

                    "cloudwatch:GetMetricData",

                    "cloudwatch:DescribeAlarmsForMetric",

                    "cloudwatch:DescribeAlarms",

                    "cloudwatch:GetMetricStatistics",

                    "cloudwatch:GetMetricWidgetImage"

                ],

                "Resource": "*"

            }

        ]

    }

필요한 Cloudwatch 정보

  1. 이미지로 보고자 하는 Cloudwatch 지표를 확인하여 저장
    아래 샘플은 Cloudwatch의 지표 중 하나인 전체 EC2 CPU 정보


    소스를 선택하면 아래의 정보를 확인할 수 있다.

     

     

    {
    "view": "timeSeries",
    "stacked": false,
    "metrics": [
    [ "AWS/EC2", "CPUUtilization" ]
    ],
    "title": "전체 EC2 CPU",
    "width": 2310,
    "height": 250,
    "start": "-PT3H",
    "end": "P0D",
    "timezone": "+0900"
    }


    여기서 가장 중요한 것은 metrics 정보를 잘 저장하면 된다.

  2. Slack 으로 전달하기 위해서 token 정보 필요
    Slack bot을 미리 생성하였기 때문에 해당 Slack bot을 이용하였으며, channels 만 변경 한다면 동일 Bot을 이용해도 문제 없을 것이라고 예상
    아래에서 channels 에서 필요한 곳으로 변경 하자.

    slack_params = {

                "filename":"test.png",

                "token":"SLACK BOT의 token 값을 넣어 주시면 됩니다",

                "channels":['#aws-db-slowquery-noti <-와 같이 채널 명칭을 넣어주세요.']

            }

전체 소스

 

Sample_python

import json

import boto3

import requests

 

cloudwatch = boto3.client('cloudwatch')

metric_sources = [

{

        "metrics":

        [ "AWS/EC2", "CPUUtilization"

        ],

        "view": "timeSeries",

        "stacked": False,

        "region": "ap-northeast-2",

        "timezone": "+0900"

    }

]

 

def metric(event, context):

    for metric_data in metric_sources :

        metric_data = json.dumps(metric_data)

         

        #print(metric_data)

        image_data = cloudwatch.get_metric_widget_image(MetricWidget=metric_data)

    #    print(image_data)

         

        slack_params = {

            "filename":"test.png",

            "token":"SLACK BOT의 token 값을 넣어 주시면 됩니다",

            "channels":['#aws-db-slowquery-noti <-와 같이 채널 명칭을 넣어주세요.']

        }

        image = {'file': image_data['MetricWidgetImage']}

         

        requests.post("https://slack.com/api/files.upload", params=slack_params, files=image)

 

        

- 직접 테스트한 내용

 

 

 

lambda_python

import json

import boto3

import requests

 

cloudwatch = boto3.client('cloudwatch')

metric_sources = [

    {

        "metrics": [

                       [ "LogMetrics", "MongoDB-queries time Value", "MongoDB-Primary-Collections", "queries time millisecond", { "stat": "Sum" } ],
            [ ".", "MongoDB-queries time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-queries time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-queries time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-queries time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-readLock time Value", ".", "readLock time millisecond", { "stat": "Sum" } ],
            [ ".", "MongoDB-readLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-readLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-readLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-users-readLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-total time Value", ".", "total time millisecond", { "stat": "Sum" } ],
            [ ".", "MongoDB-total time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-total time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-total time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-users-total time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-writeLock time Value", ".", "writeLock time millisecond", { "stat": "Sum" } ],
            [ ".", "MongoDB-writeLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-writeLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-users-writeLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-writeLock time Value", ".", ".", { "stat": "Sum" } ]

        ],

        "view": "timeSeries",

        "stacked": False,

        "region": "ap-northeast-2",

        "timezone": "+0900"

    },

{

        "metrics": [

        [ "AWS/RDS", "CPUUtilization", "DBInstanceIdentifier", "rds-m1" ],

        [ "...", "rds-s1" ],

        [ "...", "rds-s2" ],

        [ "...", "rds-s3" ],

        [ "...", "rds-s4" ],

        [ "...", "rds-s5" ]

        ],

        "view": "timeSeries",

        "stacked": False,

        "region": "ap-northeast-2",

        "timezone": "+0900"

    }

]

 

def metric(event, context):

    for metric_data in metric_sources :

        metric_data = json.dumps(metric_data)

         

        #print(metric_data)

        image_data = cloudwatch.get_metric_widget_image(MetricWidget=metric_data)

    #    print(image_data)

         

        slack_params = {

            "filename":"test.png",

            "token":"SLACK BOT의 token 값을 넣어 주시면 됩니다",

            "channels":['#aws-db-slowquery-noti <-와 같이 채널 명칭을 넣어주세요.']

        }

        image = {'file': image_data['MetricWidgetImage']}

         

        requests.post("https://slack.com/api/files.upload", params=slack_params, files=image)

 

        

dev-hyungi-cloudwatch-screenshot.zip
0.87MB

 

 - 전체 소스에 대한 zip파일 (해당 파일을 Lambda에 업로드 하면 필요한 모듈을 한번에 upload

 

 

출처

 https://brunch.co.kr/@alden/53

반응형

'AWS' 카테고리의 다른 글

[Amazon MemoryDB for Redis] 정리 및 비교  (0) 2021.12.22
[Amazon MemoryDB for Redis] 정리 및 비교  (0) 2021.12.22

안녕하세요.

RDS Slow query 를 수집하기 위해서 알아보던 도중(예전에 김종열 팀장님 도움으로 python으로 수집하던 소스는 있지만, UI로 보여주고 싶은 마음에) ELK 를 이용하면 제가 원하는 화면을 보여줄 수 있을 것 같아서 시작하게 되었습니다.

 

EC2에 ELK를 설치 후 Filebeat 를 이용하면 가능했지만, 아래와 같은 문제점이 있었습니다.

 

1. Slowquery를 다운 받아 진행하게 된다면, 실시간이 아닌 배치 형태로 진행이라 실시간으로 보기 원하는 부분에 대해 니즈에 부합

2. ELK 에 대해 유지보수 필요

3. 다운 받은 Slowquery 에 대해 관리 필요

 

등으로 인해 AWS ELK를 찾아보게 되었습니다.

 

다행히 AWS ELK는 정말 만들기 쉽더군요. (ec2에서 ELK도 어렵운건 아니지만..)

이후 아래 영상을 통해 cloudwatch상에 올라가 있는 slowquery를 ELK로 연동하는 것도 쉽게 할 수 있었습니다.

 

Amazon Elastcsearch Service 소개 및 활용 방법 (윤석찬)

2016년 월간 웨비나 2월 - Amazon Elastcsearch Service 소개 및 활용 방법 (윤석찬)

www.slideshare.net

위의 연동을 간단하게 이미지로 정리하면 아래와 같습니다.

빨간색 네모로 표신한 정도가 됩니다.

 

RDS Slow query 발생 -> Cloudwatch Log -> Lambda(AWS제공) -> Elasticsearch Service(ES) -> Kibana

이제 아래는 Cloudwatch Slow log를 ELK 로 연동하는 설정 부분입니다.

 

1. Cloudwatch 의 로그 목록 선택 후 (slow query 선택 후 Elasticsearch Service 측 스트림 선택)

2. 생성한  ES 선택(Elasticsearch Service)

3. 로그 형식 및 필터 구성 시 기타로 설정
   기타로 선택하는 이유는 여기서 필터패턴을 하게 되면 Slowquery의 여러 형태로 인해 원치 않은 데이터가 다른 형태로 필터가 되는 현상 발생
    또한, cloudwatch 로그 형식 필터에서 아직까지 정규화 표현식을 제공하지 않으므로 slowquery를 필터 적용하기에는 맞지 않음(띄어쓰기 필터 등만 지원)

 

4. 이후 별다른 설정 없이 진행(스트리밍 시작)

- 구독되고 있는 것을 확인 가능(여기서 중요한 것은 모든 cloudwatch 를 이용해서 ES 로 보내는 작업-비단 Slowquery 뿐만 아니라 모든 작업/ 은 동일한 Lambda호출/ 2번의 캡쳐 화면을 보면 Lambda함수에 대한 내용이 있음)

 

하지만 Cloudwatch에서 제공하는 필터는 RDS slowquery 형태에 맞지 않습니다. 더군다나 쿼리들마다 형태가 다르다 보니 정영화 되지 않은 데이터를 일괄적인 형태로 보여지는 것에 대해서는 한계가 있더군요.

 

더불어 여러 RDS에서 들어오는 slowquery를 AWS에서 제공하는 하나의 lambda(node.js) 에서만 처리가 되니 어느 DB에서 생긴 slowquery인지 확인이 어렵더군요.

 

다행히, AWS Support 에 문의 하였더니 Lambda 수정의 방법과 기존 고려했던 export 받은 후 처리 2가지 방법을 제시하였으며, Lambda의 경우 친절하게 수정한 코더도 주시더군요!!!

 

하지만, Lambda 수정은 제 몫이기에 처음으로 접해보는 node.js를 잡고 끙끙대기 시작했습니다.

console.log 를 이용한 일일이 로그 찍어가며 테스트하며 수정을 했습니다.

 

아래는 수정한 Lambda입니다. 개발자 분들이 보시면 얼마나 욕할지...ㅠㅠ하지만 저는 개발자가 아니기에...당당히 소스를 공유 합니다.

수정하셔도 되고 그대로 적용하셔도 됩니다.(무슨 근자감...)

 

수정 부분은 2군데 입니다.

1. 여기는 rds의 네임을 cwl 뒤에 작성해 줌으로서 구분이 가능해 집니다.(AWS Jihyun B. 님이 제공)

        // index name format: cwl-YYYY.MM.DD Slowquery modify 1
/*       
        var indexName = [
            'cwl-' + timestamp.getUTCFullYear(),              // year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');
*/
        var indexName = [
            'cwl' + payload.logGroup.toLowerCase().split('/').join('-') + '-' + timestamp.getUTCFullYear(),              // log group + year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');

2. 여기는 정규표현식을 이용하여 slow query 를 파싱 및 필터를 적용한 후 맵핑하는 곳입니다. 참고로 저는 node.js 는 처음이며, 정규표현식도 모릅니다..ㅠ그래서 하드 코딩을 했는데...동적으로 코딩 하기에는 머리가 안돌아가서 그냥 포기했습니다. 이쁘게 사용하실 분은 수정하셔도 됩니다. 

 

message는 slow 쿼리의 전문이 들어오는 곳이며, extractedFields 는 맵핑하는 형태가 들어오는 곳인데, 우리가 필터를 기타로 한 후 작성안하고 넘기기 때문에 if 문에 걸리지 않고 else 로 빠지는 것입니다.

 

function buildSource(message, extractedFields) {

...
if 문 끝에 else로 추가
...

//Slowquery add
    else {
        console.log('Slow query Regular expression')
 
        var qualityRegex = /User@Host: ([^&@]+) /igm;
        var ipRegex = /\d+\.\d+\.\d+\.\d+/igm;
        var querytimeReg = /Query_time: ([^& ]+)/igm;
        var locktimeReg = /Lock_time: ([^& ]+)/igm;
        var querytimeReg = /Query_time: ([^& ]+)/igm;
        var rowsentReg = /Rows_sent: ([\d^&]+)/igm;
        var rowexaminedReg = /Rows_examined: ([\d^&]+)/igm;
        var slowqueryReg = /select ([^&;]+)|update ([^&;]+)|delete ([^&;]+)/igm;
 
        var userhost, ip, querytime, querylock, rowsent, rowexaminge, slowquery ='';
 
        var matches, qualities = [];
        var source = {};
         
        while (matches = qualityRegex.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
         
        userhost = qualities[0];
        ip = ipRegex.exec(message)[0];
 
         
        matches, qualities = [];
         
        while (matches = querytimeReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        querytime = qualities[0];
         
         
        matches, qualities = [];
         
        while (matches = locktimeReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
         
        querylock = qualities[0];
         
        matches, qualities = [];
         
        while (matches = rowsentReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        rowsent = qualities[0];
         
        matches, qualities = [];
         
        while (matches = rowexaminedReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        rowexamined = qualities[0];
         
        slowquery = slowqueryReg.exec(message)[0];
 
        console.log(userhost);
        console.log(ip);
        console.log(querytime);
        console.log(querylock);
        console.log(rowsent);
        console.log('hyungi rowexaminge: ',rowexamined);
        console.log('hyungi query :',slowquery);
         
        source['User@Host'] = userhost;
        source['IP'] = ip;
        source['Query_time'] = 1 * querytime;
        source['Lock_time'] = 1 * querylock;
        source['Rows_sent'] = 1 * rowsent;
        source['Rows_examined'] = 1 * rowexamined;
        source['Query'] = slowquery;
 
        console.log('Slow query Filter complete : ', source)
         
        return source;
    }

 

아래는 전체 Lambda 소스 입니다.( LogsToElasticsearch_st-elk-slowquery )

// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');
 
var endpoint = 'search-st-elk-slowquery-edn6wa74e2h3zchmep6lbu2xky.ap-northeast-2.es.amazonaws.com';
 
// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;
 
exports.handler = function(input, context) {
    // decode input from base64
    var zippedInput = new Buffer.from(input.awslogs.data, 'base64');
     
    // decompress the input
    zlib.gunzip(zippedInput, function(error, buffer) {
        if (error) { context.fail(error); return; }
 
        // parse the input from JSON
        var awslogsData = JSON.parse(buffer.toString('utf8'));
 
        // transform the input to Elasticsearch documents
        var elasticsearchBulkData = transform(awslogsData);
 
        // skip control messages
        if (!elasticsearchBulkData) {
            console.log('Received a control message');
            context.succeed('Control message handled successfully');
            return;
        }
 
        // post documents to the Amazon Elasticsearch Service
        post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
            console.log('Response: ' + JSON.stringify({
                "statusCode": statusCode
            }));
 
            if (error) {
                logFailure(error, failedItems);
                context.fail(JSON.stringify(error));
            } else {
                console.log('Success: ' + JSON.stringify(success));
                context.succeed('Success');
            }
        });
    });
};
 
function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null;
    }
    var bulkRequestBody = '';
 
    payload.logEvents.forEach(function(logEvent) {
        var timestamp = new Date(1 * logEvent.timestamp);
 
        // index name format: cwl-YYYY.MM.DD Slowquery modify 1
/*       
        var indexName = [
            'cwl-' + timestamp.getUTCFullYear(),              // year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');
*/
        var indexName = [
            'cwl' + payload.logGroup.toLowerCase().split('/').join('-') + '-' + timestamp.getUTCFullYear(),              // log group + year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');
         
        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;
 
        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = payload.logGroup;
        action.index._id = logEvent.id;
         
        bulkRequestBody += [
            JSON.stringify(action),
            JSON.stringify(source),
        ].join('\n') + '\n';
    });
    return bulkRequestBody;
}
 
function buildSource(message, extractedFields) {
 
    if (extractedFields) {
        var source = {};
 
        for (var key in extractedFields) {
            if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
                var value = extractedFields[key];
 
                if (isNumeric(value)) {
                    source[key] = 1 * value;
                    continue;
                }
 
                jsonSubString = extractJson(value);
                if (jsonSubString !== null) {
                    source['$' + key] = JSON.parse(jsonSubString);
                }
 
                source[key] = value;
            }
        }
        return source;
    }
//Slowquery add
    else {
        console.log('Slow query Regular expression')
 
        var qualityRegex = /User@Host: ([^&@]+) /igm;
        var ipRegex = /\d+\.\d+\.\d+\.\d+/igm;
        var querytimeReg = /Query_time: ([^& ]+)/igm;
        var locktimeReg = /Lock_time: ([^& ]+)/igm;
        var querytimeReg = /Query_time: ([^& ]+)/igm;
        var rowsentReg = /Rows_sent: ([\d^&]+)/igm;
        var rowexaminedReg = /Rows_examined: ([\d^&]+)/igm;
        var slowqueryReg = /select ([^&;]+)|update ([^&;]+)|delete ([^&;]+)/igm;
 
        var userhost, ip, querytime, querylock, rowsent, rowexaminge, slowquery ='';
 
        var matches, qualities = [];
        var source = {};
         
        while (matches = qualityRegex.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
         
        userhost = qualities[0];
        ip = ipRegex.exec(message)[0];
 
         
        matches, qualities = [];
         
        while (matches = querytimeReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        querytime = qualities[0];
         
         
        matches, qualities = [];
         
        while (matches = locktimeReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
         
        querylock = qualities[0];
         
        matches, qualities = [];
         
        while (matches = rowsentReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        rowsent = qualities[0];
         
        matches, qualities = [];
         
        while (matches = rowexaminedReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        rowexamined = qualities[0];
         
        slowquery = slowqueryReg.exec(message)[0];
 
        console.log(userhost);
        console.log(ip);
        console.log(querytime);
        console.log(querylock);
        console.log(rowsent);
        console.log('hyungi rowexaminge: ',rowexamined);
        console.log('hyungi query :',slowquery);
         
        source['User@Host'] = userhost;
        source['IP'] = ip;
        source['Query_time'] = 1 * querytime;
        source['Lock_time'] = 1 * querylock;
        source['Rows_sent'] = 1 * rowsent;
        source['Rows_examined'] = 1 * rowexamined;
        source['Query'] = slowquery;
 
        console.log('Slow query Filter complete : ', source)
         
        return source;
    }
 
    jsonSubString = extractJson(message);
    if (jsonSubString !== null) {
        return JSON.parse(jsonSubString);
    }
 
    return {};
}
 
function extractJson(message) {
    var jsonStart = message.indexOf('{');
    if (jsonStart < 0) return null;
    var jsonSubString = message.substring(jsonStart);
    return isValidJson(jsonSubString) ? jsonSubString : null;
}
 
function isValidJson(message) {
    try {
        JSON.parse(message);
    } catch (e) { return false; }
    return true;
}
 
function isNumeric(n) {
    return !isNaN(parseFloat(n)) && isFinite(n);
}
 
function post(body, callback) {
    var requestParams = buildRequest(endpoint, body);
 
    var request = https.request(requestParams, function(response) {
        var responseBody = '';
        response.on('data', function(chunk) {
            responseBody += chunk;
        });
 
        response.on('end', function() {
            var info = JSON.parse(responseBody);
            var failedItems;
            var success;
            var error;
             
            if (response.statusCode >= 200 && response.statusCode < 299) {
                failedItems = info.items.filter(function(x) {
                    return x.index.status >= 300;
                });
 
                success = {
                    "attemptedItems": info.items.length,
                    "successfulItems": info.items.length - failedItems.length,
                    "failedItems": failedItems.length
                };
            }
 
            if (response.statusCode !== 200 || info.errors === true) {
                // prevents logging of failed entries, but allows logging
                // of other errors such as access restrictions
                delete info.items;
                error = {
                    statusCode: response.statusCode,
                    responseBody: info
                };
            }
 
            callback(error, success, response.statusCode, failedItems);
        });
    }).on('error', function(e) {
        callback(e);
    });
    request.end(requestParams.body);
}
 
function buildRequest(endpoint, body) {
    var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
    var region = endpointParts[2];
    var service = endpointParts[3];
    var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
    var date = datetime.substr(0, 8);
    var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
    var kRegion = hmac(kDate, region);
    var kService = hmac(kRegion, service);
    var kSigning = hmac(kService, 'aws4_request');
     
    var request = {
        host: endpoint,
        method: 'POST',
        path: '/_bulk',
        body: body,
        headers: {
            'Content-Type': 'application/json',
            'Host': endpoint,
            'Content-Length': Buffer.byteLength(body),
            'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
            'X-Amz-Date': datetime
        }
    };
 
    var canonicalHeaders = Object.keys(request.headers)
        .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
        .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
        .join('\n');
 
    var signedHeaders = Object.keys(request.headers)
        .map(function(k) { return k.toLowerCase(); })
        .sort()
        .join(';');
 
    var canonicalString = [
        request.method,
        request.path, '',
        canonicalHeaders, '',
        signedHeaders,
        hash(request.body, 'hex'),
    ].join('\n');
 
    var credentialString = [ date, region, service, 'aws4_request' ].join('/');
 
    var stringToSign = [
        'AWS4-HMAC-SHA256',
        datetime,
        credentialString,
        hash(canonicalString, 'hex')
    ] .join('\n');
 
    request.headers.Authorization = [
        'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
        'SignedHeaders=' + signedHeaders,
        'Signature=' + hmac(kSigning, stringToSign, 'hex')
    ].join(', ');
 
    return request;
}
 
function hmac(key, str, encoding) {
    return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}
 
function hash(str, encoding) {
    return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}
 
function logFailure(error, failedItems) {
    if (logFailedResponses) {
        console.log('Error: ' + JSON.stringify(error, null, 2));
 
        if (failedItems && failedItems.length > 0) {
            console.log("Failed Items: " +
                JSON.stringify(failedItems, null, 2));
        }
    }
}

 

이렇게 하면  Elastisearch 에서 index 가 생성 되었는지 체크하면 됩니다.

 

- Slowquery 가 발생해야 생성이 되므로 Lamdba 까지 수정이 완료 되었다면 제대로 들어 오는지 체크를 위해 DB에서 select sleep(30); 으로 슬로우 쿼리 생성 진행
- Lamdba 가 호출 되어서 진행 되었는지 Cloudwatch 통해서 확인 가능(수정한 Lamdba 로그가 생성 되었는지 체크도 가능

 

이렇게 하여 RDS 모든 서비스에 대해 Slow쿼리를 Kibana통해서 확인이 되네요.

뿌듯하네요! 이제 Slowquery 모니터링 합시다.

 

 

참고

 

- node.js 정규화표현식

https://developer.mozilla.org/ko/docs/Web/JavaScript/Guide/%EC%A0%95%EA%B7%9C%EC%8B%9D

 

정규 표현식

정규 표현식은 문자열에 나타는 특정 문자 조합과 대응시키기 위해 사용되는 패턴입니다. 자바스크립트에서, 정규 표현식 또한 객체입니다.  이 패턴들은 RegExp의 exec 메소드와 test 메소드  ,그리고 String의  match메소드 , replace메소드 , search메소드 ,  split 메소드와 함께 쓰입니다 . 이 장에서는 자바스크립트의 정규식에 대하여 설명합니다.

developer.mozilla.org

- node.js 를 직접 코딩하여 정규표현식 통해서 확인이 가능한지 간단하게 테스트 할 수 있습니다.

https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/RegExp

 

RegExp

The RegExp constructor creates a regular expression object for matching text with a pattern.

developer.mozilla.org

- node.js 정규표현식
https://stackoverflow.com/questions/7954022/javascript-regular-expression-multiple-match

 

Javascript Regular Expression multiple match

I'm trying to use javascript to do a regular expression on a url (window.location.href) that has query string parameters and cannot figure out how to do it. In my case, there is a query string para...

stackoverflow.com

- 정규표현식 테스트
https://regexr.com/

 

RegExr: Learn, Build, & Test RegEx

RegExr is an online tool to learn, build, & test Regular Expressions (RegEx / RegExp).

regexr.com

 

반응형

안녕하세요.


AWS 상에서 운영을 하다 보니 Cloudwatch를 통하여 모니터링을 많이하게 되었습니다.

더군다나 EC2 위에 올린 MongoDB 모니터링이 필요하여 고민하던 도중 cloudwatch를 이용하기로 결정하고 python으로 개발 하였습니다.


MongoDB의 모니터링은 여러 방법이 많지만,

저는 mongostat / mongotop 을 이용하기로 하였습니다.


MongoDB에 대해서는 아키텍처와 지난번 Real MongoDB 세미나 참석하여 얻은 배움이 전부라 쉽지 않았습니다.


참고로 모니터링 관련 자료는 아래를 보고 툴 대신에 mongostat / top 을 하기로 한 것입니다.

https://docs.mongodb.com/manual/administration/monitoring/


해당 스크립트는 아래 github 스크립트를 custom 한 것입니다.(저작권 관련해서는 어떻게 되는지 모르겠네요..ㅠ)



Mongotop 의 경우도 개발한 mongostat를 커스텀하여 개발한 것입니다.

수집하는 데이터는 기본적으로 현재는 query/ insert / delete / update 관련이며, mongotop 의 경우 주요 테이블을 list하여 수집하였으며, 수집 데이터는 total time / readlock time / writelock time / queries time 을 수집하는 형태로 하였습니다.

또한 1초 단위로 수집한 시간을 뺀 것을 표시하였습니다. (표시 데이터 = 현재 수집한 시간 상태 - 1초전 수집한 데이터)

1회용 수집이기 때문에 crontab 에 등록해서 주기적으로 수집하면 됩니다.

혹여나 문의 사항 있으시면 댓글 또는 happy8510@gmail.com 으로 연락 주시거나 facebook 통해서 연락 주시면 도움 드릴 수 있는 부분에 대해서 도움 드리겠습니다.(개발자가 아니라서 개발 문의 사항은 한계가 있습니다..ㅠㅠ)

cloudwatch에 올리기 위해서는 미리 aws-config 설정 하시기 바랍니다.




* mongostat.py

#!/usr/bin/env python
import argparse
#hyungi unicode incoding
import commands
import datetime, os, time, sys, random
import boto3
 
from pymongo import MongoClient
from pymongo.errors import OperationFailure
from pymongo.errors import ConnectionFailure
from pymongo.errors import ServerSelectionTimeoutError
 
# Version number
PYTHON_MONGOSTAT_VERSION = "0.0.1"
 
# Not authorized error
MONGO2_NOT_AUTH = "unauthorized"
MONGO3_NOT_AUTH = "not authorized"
 
# Authentication failure message
MONGO3_AUTH_FAILUR = "Authentication failed"
 
cloudwatch = boto3.client('cloudwatch')
 
class MongoInstance():
    'Class for mongodb instance'
 
    def __init__(self, host, port, username, password):
        'Initialize the mongodb instance information and create connection to it.'
 
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.stats_info = {}
        selfurl='mongodb://%s:%s@%s:%s/admin' % (self.username, self.password, self.host,self.port)
 
        # Create connection to mongodb server
        try:
            #url change
            #self.connection = MongoClient(self.host, self.port)
            self.connection = MongoClient(selfurl)
 
        except ConnectionFailure:
            print "Connection error: create connection to mongodb instance failed."
            exit(1)
 
        # Get the mongodb version
        try:
            server_info = self.connection.server_info()
            self.version = server_info['version']
        except ServerSelectionTimeoutError:
            print "Timeout error: get server information of mongodb instance timeout."
 
        return
 
 
    def try_stats_command(self):
        'Try to execute the serverStatus command to see if authentication required.'
 
        # Execute the serverStatus command at first and handle possible exceptions
        errmsg = server_status = server_status2 = {}
        sleep = 1
        admin = self.connection.admin
        try:
            server_status = admin.command({"serverStatus":1})
            time.sleep(sleep)
            server_status2= admin.command({"serverStatus":1})
           # server_status = ','.joint(tmp_server_status)
        except OperationFailure, op_failure:
            errmsg = op_failure.details
        except:
            print "Execution error: get server status of mongodb instance failed."
            exit(1)
 
        #print server_status
        print 'errmsg :' + str(errmsg)
 
        # Check to see if the mongodb server enables authentication
        if errmsg != {}:
            if errmsg['errmsg'].find(MONGO2_NOT_AUTH) == -1 and errmsg['errmsg'].find(MONGO3_NOT_AUTH) == -1:
                print "Execution error: %s." % errmsg['errmsg']
                exit(1)
            else:
                # Authenticate with the given username and password
                try:
                    admin.authenticate(self.username, self.password)
                except OperationFailure, op_failure:
                    print "Execution error: authenticate to mongodb instance failed."
                    exit(1)
                # Try to execute the serverStatus command again
                try:
                    server_status = admin.command({"serverStatus":1})
                    time.sleep(sleep)
                    server_status2= admin.command({"serverStatus":1})
                except OperationFailure, op_failure:
                    print "Execution error: %s." % op_failure.details['errmsg']
                    exit(1)
 
        thetime = datetime.datetime.now().strftime("%d-%m-%Y.%H:%M:%S")
        cmd = "cat /proc/loadavg"
        out = commands.getstatusoutput(cmd)
        load = out[1].split()[0]
 
        pq = 0
        pi = 0
        pu = 0
        pd = 0
        pgm = 0
 
        = 0
        = 0
        = 0
        = 0
        gm= 0
        glativeW = 0
        glativeR = 0
        # hyungi
        #lok = round(float(server_status[u'globalLock'][u'ratio']),2)
        res = int(server_status[u'mem'][u'resident'])
        vir = int(server_status[u'mem'][u'virtual'])
        mapd = int(server_status[u'mem'][u'mapped'])
 
        #past "sleep" ago status
        pq = int(server_status[u'opcounters'][u'query'])
        pi = int(server_status[u'opcounters'][u'insert'])
        pu = int(server_status[u'opcounters'][u'update'])
        pd = int(server_status[u'opcounters'][u'delete'])
        pgm = int(server_status[u'opcounters'][u'getmore'])
        pcon = int(server_status[u'connections'][u'current'])
 
        #now status
        = int(server_status2[u'opcounters'][u'query'])
        = int(server_status2[u'opcounters'][u'insert'])
        = int(server_status2[u'opcounters'][u'update'])
        = int(server_status2[u'opcounters'][u'delete'])
        gm = int(server_status[u'opcounters'][u'getmore'])
        con = int(server_status2[u'connections'][u'current'])
 
        glactiveW = int(server_status[u'globalLock'][u'activeClients'][u'writers'])
        glactiveR = int(server_status[u'globalLock'][u'activeClients'][u'readers'])
 
        template="%12s%22s%12s%12s%12s%12s%12s%12s%12s%12s%12s%12s%12s%12s"
        header=('hostname''time''insert''query''update',  \
                'delete''getmore','active con',  'resident', \
                'virtual','mapped','load''Act Writer''Act Reader')
 
        cloudwatch.put_metric_data(
            MetricData=[
                {
                    'MetricName''MongoDB-Insert Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Insert'
                        },
                    ],
                    'Unit''None',
                    'Value': (i-pi)/sleep
                },
                {
                    'MetricName''MongoDB-Query Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Query'
                        },
                    ],
                    'Unit''None',
                    'Value': (q-pq)/sleep
                },
                {
                    'MetricName''MongoDB-Query Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Delete'
                        },
                    ],
                    'Unit''None',
                    'Value': (d-pd)/sleep
                },
                {
                    'MetricName''MongoDB-Query Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Update'
                        },
                    ],
                    'Unit''None',
                    'Value': (u-pu)/sleep
                },
            ],
            Namespace='LogMetrics'
        )
 
        server_statusstr="hostname, thetime, (i-pi)/sleep, (q-pq)/sleep, (u-pu)/sleep, (d-pd)/sleep, (gm-pgm)/sleep, con, res, vir, mapd, load, glactiveW, glactiveR"
 
        print template % header
        print template % (eval(server_statusstr))
 
def mongostat_arg_check(args):
    'Check the given arguments to make sure they are valid.'
 
    # Make sure the rowcount not negative integer
    if args.rowcount and args.rowcount < 0:
        return False"number of stats line to print can not be negative."
 
    # Make sure both username and password should be given, or neither
    if args.username and not args.password:
        return False"only username given, without password."
    if not args.username and args.password:
        return False"only password given, without username."
 
    # Make sure the hostname is valid
    if args.host:
        hostinfo = args.host.split(':')
        if len(hostinfo) > 2:
            return False"invalid mongodb host, only HOSTNAME of HOSTNAME:PORT acceptable."
        if len(hostinfo) == 2:
            try:
                port = int(hostinfo[1])
                if args.port and args.port != port:
                    return False"ports given by port option and host option not match."
            except ValueError:
                return False"invalid mongodb host, the port part not integer."
 
    return TrueNone
 
def mongostat_start(host, port, username, password, rowcount, noheaders, json):
    'Start monitor the mongodb server status and output stats one time per second.'
 
    # Create mongodb instance and make sure we can execute the serverStatus command correctly
    mongo_instance = MongoInstance(host, port, username, password)
    mongo_instance.try_stats_command()
    # print mongo_instance.host, mongo_instance.port, mongo_instance.version, mongo_instance.storage_engine
 
if __name__ == '__main__':
    # Default configurations
    hostname, username, password    = '호스트정보''유저''비밀번호'
    port, rowcount                  = 포트, 0
    noheaders, json                 = FalseFalse
 
    # Define a argument parser for all possible options
    parser = argparse.ArgumentParser(description="Monitor basic MongoDB server statistics.")
    parser.add_argument("--version"help="print the tool version and exit", action="store_true")
    parser.add_argument("--host"help="mongodb host to connect to")
    parser.add_argument("--port"help="server port (can also use --host HOSTNAME:PORT)"type=int)
    parser.add_argument("-u""--username"help="username for authentication")
    parser.add_argument("-p""--password"help="password for authentication")
    parser.add_argument("--noheaders"help="don't output column names", action="store_true")
    parser.add_argument("-n""--rowcount"help="number of stats lines to print (0 for indefinite)"type=int)
    parser.add_argument("--json"help="output as JSON rather than a formatted table", action="store_true")
 
    # Parse all the given options and make sure they are valid
    arguments = parser.parse_args()
    if arguments.version:
        print "Python mongostat version: %s" % PYTHON_MONGOSTAT_VERSION
        exit(0)
    ok, errmsg = mongostat_arg_check(arguments)
    if ok == False:
        print "Argument error: %s" % errmsg
        exit(1)
 
    # Get the given arguments
    if arguments.host:
        hostinfo = arguments.host.split(':')
        hostname = hostinfo[0]
        if len(hostinfo) == 2:
            port = int(hostinfo[1])
    if arguments.port:
        port = arguments.port
    if arguments.username:
        # We make sure username and password must both given or neither in mongostat_arg_check
        username = arguments.username
        password = arguments.password
    if arguments.rowcount:
        rowcount = arguments.rowcount
    if arguments.noheaders:
        noheaders = True
    if arguments.json:
        json = True
 
    # Start the mongostat
    mongostat_start(hostname, port, username, password, rowcount, noheaders, json)


* mongotop.py


#!/usr/bin/env python
import argparse
#hyungi unicode incoding
import commands
import datetime, os, time, sys, random
import boto3
 
from pymongo import MongoClient
from pymongo.errors import OperationFailure
from pymongo.errors import ConnectionFailure
from pymongo.errors import ServerSelectionTimeoutError
 
# Version number
PYTHON_MONGOSTAT_VERSION = "0.0.1"
 
# Not authorized error
MONGO2_NOT_AUTH = "unauthorized"
MONGO3_NOT_AUTH = "not authorized"
 
# Authentication failure message
MONGO3_AUTH_FAILUR = "Authentication failed"
 
cloudwatch = boto3.client('cloudwatch')
#Hyungi / 5 collections select
lstCollection = ['collectionname1','collectionname2','collectionname3','collectionname4','collectionname5']
 
class MongoInstance():
    'Class for mongodb instance'
 
    def __init__(self, host, port, username, password):
        'Initialize the mongodb instance information and create connection to it.'
 
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.stats_info = {}
        selfurl='mongodb://%s:%s@%s:%s/admin' % (self.username, self.password, self.host,self.port)
 
        # Create connection to mongodb server
        try:
            #hyungi
            #url change
            #self.connection = MongoClient(self.host, self.port)
            self.connection = MongoClient(selfurl)
 
        except ConnectionFailure:
            print "Connection error: create connection to mongodb instance failed."
            exit(1)
 
        # Get the mongodb version
        try:
            server_info = self.connection.server_info()
            self.version = server_info['version']
        except ServerSelectionTimeoutError:
            print "Timeout error: get server information of mongodb instance timeout."
 
        return
 
 
    def try_stats_command(self):
        'Try to execute the serverStatus command to see if authentication required.'
 
        # Execute the serverStatus command at first and handle possible exceptions
        errmsg = server_status = server_status2 = {}
        sleep = 1
        admin = self.connection.admin
        try:
            server_status = admin.command({"top":1})
            time.sleep(sleep)
            server_status2= admin.command({"top":1})
            #server_status2= admin.command({"serverStatus":1})
        except OperationFailure, op_failure:
            errmsg = op_failure.details
        except:
            print "Execution error: get server status of mongodb instance failed."
            exit(1)
 
        #print server_status
        print 'errmsg :' + str(errmsg)
 
        # Check to see if the mongodb server enables authentication
        if errmsg != {}:
            if errmsg['errmsg'].find(MONGO2_NOT_AUTH) == -1 and errmsg['errmsg'].find(MONGO3_NOT_AUTH) == -1:
                print "Execution error: %s." % errmsg['errmsg']
                exit(1)
            else:
                # Authenticate with the given username and password
                try:
                    admin.authenticate(self.username, self.password)
                except OperationFailure, op_failure:
                    print "Execution error: authenticate to mongodb instance failed."
                    exit(1)
                # Try to execute the serverStatus command again
                try:
                    server_status = admin.command({"top":1})
                    time.sleep(sleep)
                    server_status2= admin.command({"top":1})
                except OperationFailure, op_failure:
                    print "Execution error: %s." % op_failure.details['errmsg']
                    exit(1)
 
        thetime = datetime.datetime.now().strftime("%d-%m-%Y.%H:%M:%S")
        cmd = "cat /proc/loadavg"
        out = commands.getstatusoutput(cmd)
        load = out[1].split()[0]
 
        for strCollist in lstCollection :
            tmpName = 'rocketchat.%s' % strCollist
            print tmpName
            ptotaltime = int(server_status[u'totals'][tmpName][u'total'][u'time'])
            totaltime = int(server_status2[u'totals'][tmpName][u'total'][u'time'])
 
            prelock = int(server_status[u'totals'][tmpName][u'readLock'][u'time'])
            relock = int(server_status2[u'totals'][tmpName][u'readLock'][u'time'])
 
            pwrlock = int(server_status[u'totals'][tmpName][u'writeLock'][u'time'])
            wrlock = int(server_status2[u'totals'][tmpName][u'writeLock'][u'time'])
 
            pquery = int(server_status[u'totals'][tmpName][u'queries'][u'time'])
            query = int(server_status2[u'totals'][tmpName][u'queries'][u'time'])
 
            strMetric_total_Name='MongoDB-%s-%s Value' % (strCollist,'total time')
            strMetric_read_lock_Name='MongoDB-%s-%s Value' % (strCollist,'readLock time')
            strMetric_write_lock_Name='MongoDB-%s-%s Value' % (strCollist,'writeLock time')
            strMetric_query_Name='MongoDB-%s-%s Value' % (strCollist,'queries time')
 
            cloudwatch.put_metric_data(
                MetricData=[
                    {
                        'MetricName': strMetric_total_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''total time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (totaltime-ptotaltime)/1000
                    },
                    {
                        'MetricName': strMetric_read_lock_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''readLock time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (relock-prelock)/1000
                    },
                    {
                        'MetricName': strMetric_write_lock_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''writeLock time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (wrlock-pwrlock)/1000
                    },
                    {
                        'MetricName': strMetric_query_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''queries time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (query-pquery)/1000
                    },
                ],
                Namespace='LogMetrics'
            )
 
            template="%12s%12s%12s%12s"
            header=('totime''relock''wrlock''query')
 
            server_statusstr="(totaltime-ptotaltime)/1000, (relock-prelock)/1000, (wrlock-pwrlock)/1000, (query-pquery)/1000"
 
            print template % header
            print template % (eval(server_statusstr))
 
def mongostat_arg_check(args):
    'Check the given arguments to make sure they are valid.'
 
    # Make sure the rowcount not negative integer
    if args.rowcount and args.rowcount < 0:
        return False"number of stats line to print can not be negative."
 
    # Make sure both username and password should be given, or neither
    if args.username and not args.password:
        return False"only username given, without password."
    if not args.username and args.password:
        return False"only password given, without username."
 
    # Make sure the hostname is valid
    if args.host:
        hostinfo = args.host.split(':')
        if len(hostinfo) > 2:
            return False"invalid mongodb host, only HOSTNAME of HOSTNAME:PORT acceptable."
        if len(hostinfo) == 2:
            try:
                port = int(hostinfo[1])
                if args.port and args.port != port:
                    return False"ports given by port option and host option not match."
            except ValueError:
                return False"invalid mongodb host, the port part not integer."
 
    return TrueNone
 
def mongostat_start(host, port, username, password, rowcount, noheaders, json):
    'Start monitor the mongodb server status and output stats one time per second.'
 
    # Create mongodb instance and make sure we can execute the serverStatus command correctly
    mongo_instance = MongoInstance(host, port, username, password)
    mongo_instance.try_stats_command()
    # print mongo_instance.host, mongo_instance.port, mongo_instance.version, mongo_instance.storage_engine
 
if __name__ == '__main__':
    # Default configurations
    hostname, username, password    = '접속ip''유저명''비밀번호'
    port, rowcount                  = 포트, 0
    noheaders, json                 = FalseFalse
 
    # Define a argument parser for all possible options
    parser = argparse.ArgumentParser(description="Monitor basic MongoDB server statistics.")
    parser.add_argument("--version"help="print the tool version and exit", action="store_true")
    parser.add_argument("--host"help="mongodb host to connect to")
    parser.add_argument("--port"help="server port (can also use --host HOSTNAME:PORT)"type=int)
    parser.add_argument("-u""--username"help="username for authentication")
    parser.add_argument("-p""--password"help="password for authentication")
    parser.add_argument("--noheaders"help="don't output column names", action="store_true")
    parser.add_argument("-n""--rowcount"help="number of stats lines to print (0 for indefinite)"type=int)
    parser.add_argument("--json"help="output as JSON rather than a formatted table", action="store_true")
 
    # Parse all the given options and make sure they are valid
    arguments = parser.parse_args()
    if arguments.version:
        print "Python mongostat version: %s" % PYTHON_MONGOSTAT_VERSION
        exit(0)
    ok, errmsg = mongostat_arg_check(arguments)
    if ok == False:
        print "Argument error: %s" % errmsg
        exit(1)
 
    # Get the given arguments
    if arguments.host:
        hostinfo = arguments.host.split(':')
        hostname = hostinfo[0]
        if len(hostinfo) == 2:
            port = int(hostinfo[1])
    if arguments.port:
        port = arguments.port
    if arguments.username:
        # We make sure username and password must both given or neither in mongostat_arg_check
        username = arguments.username
        password = arguments.password
    if arguments.rowcount:
        rowcount = arguments.rowcount
    if arguments.noheaders:
        noheaders = True
    if arguments.json:
        json = True
 
    # Start the mongostat
    mongostat_start(hostname, port, username, password, rowcount, noheaders, json)

반응형




해당 내용은 [Amazon Redshift 아키텍처 및 모범사례::김민성::AWS Summit Seoul 2018] 에서 발표한 동영상을 보며 정리한 내용입니다.
자세한 내용은 아래 링크 가셔서 동영상과 ppt 를 보시면 더 쉬울듯 하며,
저는 공부의 목적으로 보며 정리한 내용입니다.

발표내용
  • 리더 노드는 각 slice 로 실행계획에 맞춰 데이터 수집을 하라고 보내고 마지막 slice 가 끝날때까지 기다렸다가 끝난 데이터를 수집하여 제공
  • slice 는 redshift 내 각 클러스터의 cpu 즉 작업을 실질적으로 하는 모듈로 이해하면 될듯

Distkey 형태
  1. 분산키 : key 를 가지고 각 slice 로 저장(slice 는 cpu당 개수와 동일한 개수를 가지고 있음) - group by 할 때 / key를 가지고 join 하는 경우 좋음
    - 각 노드에 균등하게 데이터 분산이 필요
    - 가장 큰 테이블에서 조인에 활용되는 컬럼
    - group by 조건에서 사용되는 컬럼
    - 높은 Cardinality를 보유한 컬럼
    ★ 아래와 같은 현상이 발생하면 slice1번이 끝날때까지 기다려야 하기에 데이터 분산이 불균형이 생기면 이에 대한 기다림이 길어지기에 문제가 발생할 수 있음을 인지하고 있어야 한다
    - Equality filter에서 활용되는 컬럼 (Equal 를 사용하면 하나의 slice로 고정이 되어 버리기 때문에 좋지 않음)
    - 데이터의 몰림을 유발하는 경우
    ★ 분산키는 어떻게 사용 해야 되는가!
    - 가장 큰 Dimension table의 pk와 fact table의 foreign키를 disk key 로 선택
    - 나머지 조인이 되는 조건의 Dimension table(작은 테이블)은 distribution all 를 검토 -> 모든 node들에 해당 테이블에 대한 데이터가 존재하는 형태(경우에 따라 다르지만 보통 300만건 이하의 경우 데이터 분산 타입을 all 로 선택해도 무방)
    이렇게 하면 성능적으로 가장 좋은 형태로 제공
    Ⅰ 가장 자주 활용되는 쿼리 확인(https://github.com/awslabs/amazon-redshift-utils/tree/master/src/AdminScripts/top-queries.sql 활용)
    Ⅱ 조인 또는 group by 에 활용되며 고른 데이터 분포 및 높은 Cardinality 보유한 컬럼을 선택
    Ⅲ 그게 아니면-위와 같이 최고의 방법을 찾기 힘들다면.. (Fact table은 Even으로 Dimension Table-기준 테이블 은 All로)
  2. 전체 : 상품 테이블과 같이 개수는 적으나 모든 join이나 그런것에 필요한 경우 각 slice 에 모두 저장 (데이터 storge 가 증가)
  3. 균등 : round robin (Even)
  • explain 하였을 경우 redshift 에서 네트워크를 통한 데이터 이동 관련 내용 (데이터 이동은 가장 비용이 비싼 operation)
DS_DIST*
INFO (아래로 갈수록 퍼포먼스가 좋지 않음)
DS_DIST_NONE
Preferred, no data transfer between nodes
DS_DIST_ALL_NONE
One table has DISTSTYLE ALL, no data transfer between nodes
DS_DIST_ALL_INNER
Inner table is being sent to the Single Node
DS_DIST_INNER
Inner table is being redistributed in an outer join
DS_DIST_OUTER
Outer table is being redistributed in an outer join
DS_BCAST_INNER
Inner table is being broadcast to all nodes.
DS_BCAST_BOTH
Both tables are being broadcast to all nodex.

  • 리더 노드가 각 slice 로 
성능 향상
1. 보다 높은 하드웨어 스펙
2. 병렬화의 달성
3. 네트워크를 통한 트레픽의 최소화


하지만 결국은 최고의 성능을 내기 위해서는 "스토리지 I/O의 최소화"

  • 모든 블록을 1Mb형태로 나눠서 저장하는데..
  • Zone Map : 각 Block 에 대한 최대값 및 최소값에 대한 정보 저장 > 그렇기 때문에 정렬 키를 이용하면 필요한 Block 만 읽기 때문에 정렬 키가 중요


정렬 키 (sortkey)
  • 정렬 키가 지정된 테이블의 경우 데이터 로드 시 정렬 순서대로 디스크에 저장
  • 정렬 키의 종류
    • 정렬 키 : 테이블의 단일 컬럼의 값 기준으로 데이터 정렬하여 저장
    • 복합 정렬키(Compound Sort Key) :
      • 최대 6개의 컬럼까지 활용하여 정렬 키를 지정
      • 선언한 순서에 따라 순차적으로 정렬되어 저장 -> 가장 먼저 선언한 키에 대한 높은 가중치
      • 조인 및 Group by, order by 효과적이며, Merge 조인에 효과
    • 인터리브 정렬키 (Interleaved Sort Key)
      • 최대 8개의 컬럼까지 활용하여 정렬키를 지정
      • 정렬 키에서 각 열의 하위 집합에 똑같은 가중치 부여 -> 복합 정렬키는 앞의 데이터가 가중치가 높다면, 인터리브 정렬키는 뒤의 키에 대해서도 동일하게 가중치를 부여(데이터의 양이 증가??)
      • AD-Hoc 형태의 쿼리에서 높은 성능을 제공
ex) Product 테이블에 sort key로 type, color, size를 지정

복합정렬키
인터리브정렬키
where type = 's' and color = 'B' and size = 'xl'
정렬을 기반한 가장 우수한 성능
순서 등에 관계 없이 일관성 있는 성능 제공
where type = 's' and color = 'B'
약간 효과적
where type = 's' and size = 'xl'
제한적으로 성능에 기여

쿼리 분석

파일 분산 및 병렬 로딩
  • Slice 단위로 작업을 처리를 하지만, Slice는 파일 하나당 하나의 slice가 할당되기에 반드시 파일을 여러개로 나눈 후 메타데이터를 지정하여 작업 진행을 권장
아래 표 참조 (노드당 100Mbyte sec)
매니페스트 파일
Copy 커맨드를 통한 실행
{
   "entries":[
      {"url":"s3://mybucket-alpha/client.aa", "mandatory":true},
      {"url":"s3://mybucket-alpha/client.ab", "mandatory":true},
      {"url":"s3://mybucket-alpha/client.ac", "mandatory":true},
      {"url":"s3://mybucket-alpha/client.ad", "mandatory":true}
   ]
}
copy customer
from 's3://mybucket/cust.manifest'
iam_role 'arn:aws:iam::111111111:role/MyRedshiftRole'
manifest;

압축
  • 압축을 기반으로 디스크 사용량 및 I/O 더욱 줄일 수 있음
  • Copy 커멘트 사용 시 10만 행을 우선 적제 후 압축 옵션 평가 (COMPUDATE on)
  • "Analyze Compression"  커멘트를 통해 현재 압축(인코딩)에 대한 평가 후 "alter table" 로 변경
  • 2~4배의 효과

기타
  • Vacuum
    • 변경되거나 삭제된 블럭에 대한 정리(플래그만 남겨놓고 디스크에는 남아져 있음) - vocuum을 이용하여 정리를 해줌
    • 정렬키에 맞춰 데이터 정렬(재정렬을 해주기 위해 vocuum을 사용) , rename을 해도 됨
    • 옵션 : full, sort only, delete only, reindex (인터리브 정렬키)
  • 통계 업데이트
    • 통계 업데이트 자동 실행 조건
      • create table as select 
      • select into
      • copy (with compupdate on)
    • 통계 업데이트를 실행해줘야 하는 경우
      • 5% 이상 데이터 변경된 경우
      • 외례키를 추가하거나 삭제한 경우
      • Vacuum 실행 후 
  • 제약 조건 (not null, primary, foreign, unique)은 쿼리 옵티마이저가 활용 (plan  만들 때 참고하기 때문)


중요
  1. 노드 크기의 선택 조건
  2. 적절한 분산키의 선택 방안
  3. 적절한 정렬키의 선택 방안
  4. 데이터 적제 방안
  5. 레드쉬프트 스펙트럼에서의 파티셔닝 및 파일 포멧

Redshift Spectrum (redshift와 별개!!!)
  • 아래는 참고로 하고 제대로 공부는 후에 하는 형태로 하자. redshift와 별개로 필요시 알고 있으면 활용방안이 있을 듯
  • s3의 수천개 이상의 노드들을 활용하여 쿼리 실행


수천개의 s3 내의 csv 파일을 읽어 들여 조인 진행

  • 파티셔닝 및 Columnar 파일 포맷 (ORC, Parquet) 사용
    • 파티셔닝 컬럼의 조건
      • 필터 및 조인의 조건이 되는 컬럼
      • 비지니스 유닛
      • 비지니스 그룹
      • 날짜 및 시간
  • 파일의 갯수는 Redsfhit 의 Slice의 수량 이상
  • 파일의 크기는 64 mb 이상을 권고
  • 각 파일은 크기를 권고











반응형

제목이 좀 자극적일 수 있으나, 

말 그데로 AWS DMS를 이용하여 mysql -> redshift 로 적재하는 가운데 테이블 카운트가 현저히 차이나는데도 불구하고 완료 되었다고 나오는 현상에 대해서 해결한 방안을 공유해 봅니다.


작업 방법

- AWS 의 DMS를 이용하여 Mysql 5.6의 데이터를 Redshift 로 적재

- 물론 CDC 도 진행하도록 설정



증상

  • 특정 DB의 테이블들을 DMS 하는 과정에서 전체 데이터의 10%도 되지 않는 데이터만 적재 후 적재가 완료 되었다고 status 상에서 확인
  • 해당 테이블들을 버전이 다른 replication instance 를 이용하여 진행 시 특정 버전(2.4.3)에서만 문제 없이 적재 및 CDC가 되는 현상을 확인
    - 상위 버전인 2.4.4 / 3.1.2 에서는 증상과 같은 현상이 발생

 

확인사항

  • Replication instance Engine 별로 테스트 진행
  • 하나의 테이블만 가지고 테스트 진행

 

조치사항

  1. Replication instance 를 2.4.3으로 지정하여 해당 instance로 복제 진행
  2. 상위 버전의 Replication instance를 사용하는 경우 Source Endpoint에 Resumefetchforxrows=0 를 지정한 후 진행하여 가능


√ Resumefetchforxrows 

  • MySQL 테이블 내 데이터가 많을 경우 마지막 데이터까지 가져온 이후에도 정해진 수 만큼 Fetching을 계속하도록 하는 옵션
    ex) Resumefetchforxrows = 1500 이면, 1500 row까지 fetching을 시도
  • Resumefetchforxrows=0으로 설정함으로써 DMS가 테이블내의 데이터를 의도적으로 끝까지 Fetching 시도 하도록 설정
  • Replication Engine 2.4.4 버전에서부터 Resumefetchforxrows 옵션을 값을 고정하면서 문제가 발생한 것으로 예상(AWS 답변)
    -> 2,4,4버전 이상을 사용하는 경우 Resumefetchforxrows 값을 0으로 고정해서 사용한 것을 권장



해결할 수 있었던 건 다행히 회사에서 aws와 계약을 맺은 상태라 AWS 엔지니어를 통해서 해결책을 받은 것이며,

모든 테스트들을 AWS 엔지니어가 재현이 불가하여 직접 다 테스트한 결과입니다.


부디 많은 분들께서 저와 같은 삽질을 피하시기 바랍니다.


또한 수정이 필요한 내용이 있다면 과감히 알려주시면 감사하겠습니다.

반응형

불과 1년전 딥런닝 관련하여 관심을 가지면서 해당 스터디에서 만나 뵙던 분 중 한분인 이재석님이란 분이 계셨다.


비록 2번정도의 스터디 이후 개인적인 일로 더이상 참여를 못했지만 그 때의 인연을 지금까지 페북에서 이어 나갈 수 있었다.


그 때 당시에 python 도 제대로 이해 못하셨던 분이었지만 꾸준히 하시던걸 보며 감명 받았는데, 어느새 챗봇을 만들어 세미나까지 연다고 하시니,

참석을 안할 수가 없었다.(정말 존경하는 분 중의 한분이 되셨다....많이 본받을 분이라고 생각한다)


DBA로써 다양한 지표에 대해서 알고 싶을때나 장애 발생 시 또는 모니터링을 위해 챗봇을 하면 좋다고 막연하게 생각하던 도중이라 더욱더 참가하고 싶었다.


여기의 포스트에는 당시의 ppt 내용 일부와 내가 한 결과물에 대해서 공유만 하려고 한다.

왜냐하면 이 모든게 그냥 소스를 가져와서 나의 카카오봇과 연동한 것 뿐이기 때문이다.



챗봇과 관련한 구글 영상




챗봇은 그리 대단한게 아니지만...만들면 만들수록 대단한거고,

우리가 만드는것은 대단한게 아니라 필요에 의해서 편의를 위해 만드는거라는 내용이 인상 깊었다(-이재석 님)


테스트를 위해 필요한 건 python / flask / ngrok 으로 카카오 봇을 만들수 있었다.


그리고 에코봇을 만들수 있는 소스

https://github.com/software-engineer-mj/chat-bot-seminar


심화로 구글 번역을 이용한 번역봇? 소스


https://github.com/Bricoler/chatbot-seminar


난 여기까지 운이 좋게 테스트를 할 수 있었다.


에코봇


아래는 번역봇이다.


세미나 ppt 는 아무래도 저작권이 있다고 생각하기에 공유는 못하고 깃헙은 이미 공유되어 있는거라 공유해 봤다.


이제 시작이다. 내가 필요한거를 어떻게 공유를 하고 좋을지에 대해서 하나씩 설계해 나갈 예정이다.


그리고 다시 한번 이재석님 감사합니다.

반응형

공부를 한다고 하지만..어느 순간되면 자만에 빠지기 마련이다.
내가 그런 케이스이다..

Hash index 와 B-Tree index에 대해서 제대로 몰라 진땀을 흘렸다...


다시 공부하다가 이렇게 좋은 경험담 덕분에 제대로 이해했다..


다시 한번 느낀다...공부에는 끊이 없다. 學不可以已 (학불가이이)


좋은 내용은 아래를 참고하자..

http://gywn.net/2015/01/innodb-adaptive-hash-index/


비슷한 내용이다(동일하다..)

http://tech.kakao.com/2016/04/07/innodb-adaptive-hash-index/


반응형

'MySQL' 카테고리의 다른 글

[Percona] pt-query-digest 사용 방법  (0) 2021.10.28
[MySQL] ARCHIVE Engine  (0) 2017.07.19
[펌][MySQL] CockroachDB in Comparison  (0) 2017.07.03
[펌] [MyISAM] myisamchk 사용하기  (0) 2016.11.25
[MySQL] auto_increment duplicate entry for key 1  (0) 2016.11.14

한번 더 말씀드리지만 저는 redis에 대해서 거의 모릅니다...

공부해 가는 단계지만 그래도 한글로 된 내용이 거의 없어서 공유해 봅니다.

(제가 못 찾은게 맞는듯 싶습니다.)


각설하고 이제 공유 진행하겠습니다.




AWS ec2 에 올린 redis4.0을 백업과 모니터링 문제로 고민하다가

Elasticache 로 옮기는 것에 대해 고민하게 되었다.


정확하게는 내가 아닌 다른 팀에 의해서.


redis의 r도 모르는 내가 직접 찾아보면서 관련 내용은 좀 있어 보이나,

제대로 이해하기도 힘들었고, 한글로 된 내용도 없어서 애를 좀 먹었다.


아래 github 자료의 도움으로 무사히 테스트까지 완료 할 수 있었다.


https://github.com/p/redis-dump-load


참고로 해당 스크립트는 python으로 개발 되었으며 나는 오로지 가지고 사용하기만 했다..ㅠ


내용은 아래와 같다.

1. redis에서 dump를 받아 해당 내용을 json 으로 추출

2. json 파일로 추출된 내용을 Elasticache로 Load 진행

>> json 파일가지고 커스터 마이징을 한다면 다른 DB로 Import도 가능할 것으로 예상


사용방법은 잘 나와있지만, 공유 차원으로...


Export (현재 redis에서 실행하는데, -d 옵션은 해당 추출하고자 하는 DB 를 뜻함)

$ python redisdl.py -H localhost -d 1 > susun_dump.json


Import (Elasticache 로 진행 / H옵션은 elasticache로 다이렉트 접속이 안되기 때문에 ec2에서 elasticache로 접속할 때 해당 주소host  / d옵션으로 2번 DB에 import 하였으며, -l 옵션을 사용하여 import 한다고 명령)

$ python redisdl.py -H apn2.cache.amazonaws.com -d 2 -l < susun_dump.json


내 과제는 python 커스터 마이징 하는것이 나의 또다른 과제..


반응형

Redis를 기존에는 개발자들이 관리 하였지만,

이제는 DBA가 관리하는게 더 수월할 것 같아서 관리하기에 앞서 선행 학습중이다.


이리저리 접속 진행해 보았지만,

관리자는 콘솔에서도 작업이 기본!!!이라고 생각하기에


콘솔로 접속 후 데이터 확인을 하기 시작했다.


기본 명령어도 모르니 확인하는건 불가능....


여러모로 찾다가 노하우를 찾아 공유해 본다.


여러 redis 블로그를 보면 기본적으로 사용 방법만 나와있지,

어떻게 확인하는지는 못찾아서 꺼이꺼이 찾아서 공유해 본다.


1. select 로 db??? 선택

(rdbms는 db를 생성하지만 redis는 정해져 있다. 1 ~ 15까지 / select 1 또는 select 15 이런식으로 선택)


2. RANDOMKEY 를 사용하여 또는 KEYS * 를 사용하여 해당 KEY들 검색


 - keys * 를 하게 되면 모든 키가 검색된다.

 - 10만개의 데이터를 테스트로 저장했기에 만건이 표시된다.

..

중략...

..


3. 확인하고 싶은 KEY 의 type 확인을 위해 type 키내용 확인

- string으로 저장했기에...(단순히 set 으로 INSERT 하였다)


4. 해당 Key 타입에 맞는 명령어를 찾아 내용확이

- 명령어를 모른다면 아래 참조 사이트에서 찾아보자..


명령어 참고 사이트 : http://redisgate.kr/redis/command/commands.php


하아....정말 가급적 관리자가 아니라면 RedisDesktopManager 를 설치해서 간편하게 확인하는 것이 편할 듯 싶다.



반응형

+ Recent posts