반응형

Sharding의 정의

  • 같은 테이블 스키마를 가진 데이터를 다수의 데이터베이스에 분산하여 저장하는 방법
  • Application Level 에서도 가능 (RDBMS 에서 Sharding) 하지만, Databas Level에서도 가능 (ex-MongoDB / Redis 등)
  • Horizontal Partitioning (수평파티션) 이라고도 함

 

Sharding  적용

  • 프로그래밍, 운영적인 복잡도는 증가하고 높아지는 것을 뜻함
  • 가능하면 Sharding을 피하거나 지연시킬 수 있는 방법
    • Sacle Up
      • 하드스펙을 올리는 것
    • Read 부하가 크면
      • Replication 을 두어 Read 분산 처리
    • Table의 특정 컬럼만 사용 빈도수가 높다면
      • Vertically Partition(수직 파티션)을 진행
      • Data를 Hot, Warm, Cold data로 분리하여 처리
        • Memory DB를 활용
        • 테이블의 데이터 건수를 줄이는 것

Sharding 방식

  • Range Sharding (range based sharding)
    • key (shard key) 값에 따라서 range 를 나눠서 데이터 를 분배하는 방식
    • 비교적 간단하게 sharding이 가능
    • 증설에 재정렬 비용이 들지 않음
    • Shard key 의 선택이 중요
      • Shard key에 따라 일부 데이터가 몰릴 수 있음(hotspots)
      • 트래픽이 저조한 DB가 존재
  • Hash Sharding (hash based sharding)
    • key를 받아 해당 데이터를 hash 함수 결과로 분배
    • Hotspot를 방지하고 균등하게 분배
    • 재분배를 해야하는 경우(삭제 또는 추가) 전체 데이터를 다시 hash value를 이용하여 분배 (Migration 에 어려움)
  • Directory Based Sharding
    • shard내 어떤 데이터가 존재하는 지 추적할 수 있는 lookup table이 존재
    • range based sharding 과 비슷하지만, 특별한 기준에 의해 shard를 나눈 것이라, 동적으로 shard를 구성 가능
      • range나 hash 모두 정적인 반면, 해당 sharding은 유연성 있게 임의로 나누는 것이라 유연성을 갖춤
    • 쿼리 할 때 모두 lookup table 를 참조하기 때문에 lookup table이 문제를 일으킬 소지를 보유
      • lookup table이 hot spot 가능성
      • lookup table이 손상되면 문제 발생

 

MongoDB  Sharding

 

  • 분산 처리를 통한 효율성 향상이 가장 큰 목적이므로 3대 이상의 샤드 서버로 구축을 권장(최소 2대)
  • 싱글 노드 운영보다 최소 20~30% 추가 메모리 요구 (MongoS와 OpLog, Balancer 프로세스가 사용하게 될 추가 메모리 고려)
  • 샤드 시스템에 구축되는 config 서버는 최소 3대 이상 활성화를 권장.
    • Config 서버는 샤드 시스템 구축과 관련된 메타 데이터를 저장 관리하며 빠른 검색을 위한 인덱스 정보를 저장, 관리하고 있기 때문
    • 샤드 서버와는 별도의 서버에 구축이 원칙
    • Config 서버는 샤드 서버보다 저사양의 시스템으로 구축 가능

Config Server

 

  • Config 서버는 샤딩 시스템의 필수 구조
  • 최소 1대가 요구되며 장애로 인해 서비스가 중지되는 것을 피하기 위해 추가로 Config 서버 설정이 필요.(HA 구성 필요-PSS-필수)
  • Config 서버는 각 샤드 서버에 데이터들이 어떻게 분산 저장되어 있는지에 대한 Meta Data가 저장 (Shard 정보를 저장 관리)
    • Shard Meta 정보
      • MongoS가 처리하는 Chunk 단위로 된 chunk 리스트와 chunk들을의 range 정보를 보유
    • 분산 Lock
      • MongoS들 간의 config 서버와의 데이터 통신 동기화를 위해 도입
      • 샤딩을 수행할 연산들에 대해 분산 락을 사용
        • 여러개의 mongos가 동시에 동일한 chunk에 대한 작업을 시도하는 등의 이슈를 방지하기 위함
        • 작업을 수행하기 전 config server의 locks collection 의  lock을 획득 후에만 작업 가능
repl_conf:PRIMARY> db.locks.find() 
{ "_id" : "config", "state" : 0, "process" : "ConfigServer", "ts" : ObjectId("5d6b7f15a9f5ecd49052a36f"), "when" : ISODate("2019-09-01T08:19:33.165Z"), "who" : "ConfigServer:conn164", "why" : "createCollection" } 
{ "_id" : "config.system.sessions", "state" : 0, "process" : "ConfigServer", "ts" : ObjectId("5d6b7f15a9f5ecd49052a376"), "when" : ISODate("2019-09-01T08:19:33.172Z"), "who" : "ConfigServer:conn164", "why" : "createCollection" } 
{ "_id" : "testdb", "state" : 0, "process" : "ConfigServer", "ts" : ObjectId("5d62889d3ed72a6b6729a5ca"), "when" : ISODate("2019-08-25T13:09:49.728Z"), "who" : "ConfigServer:conn24", "why" : "shardCollection" } 
{ "_id" : "testdb.testCollection2", "state" : 0, "process" : "ConfigServer", "ts" : ObjectId("5d62889d3ed72a6b6729a5d1"), "when" : ISODate("2019-08-25T13:09:49.738Z"), "who" : "ConfigServer:conn24", "why" : "shardCollection" } 
{ "_id" : "test.testCollection2", "state" : 0, "process" : "ConfigServer", "ts" : ObjectId("5d6288ab3ed72a6b6729a65a"), "when" : ISODate("2019-08-25T13:10:03.834Z"), "who" : "ConfigServer:conn24", "why" : "shardCollection" }

 

      • Lock 역할
        • 밸런서(balancer)의 연산
        • Collection 의 분할(split)
        • Collection 이관(migration)

          • LockPinger : 해당 쓰레드는 30초 주기로 config 서버와 통신
    • 복제 집합 정보 : MongoS가 관리, 접속해야 하는 Mongo Shard 정보
  • MongoS가 데이터를 쓰고/읽기 작업을 수행할 때 Config 서버는 MongoS를 통해 데이터를 동기화-수집 진행

 

MongoS

 

  • 데이터를 Shard 서버로 분배해 주는 프로세스 (Router-Balancer)
    • Data를 분산하다 보면 작업의 일관성을 위하여 Lock을 사용
    • 이때 Chunk Size를 적절하게 설계하지 못하면 Migration 때문에 성능 저하 현상이 발생 가능성
    • DB 사용량이 적은 시간대 Balancer를 동작시키고 그 외 시간에는 끄는 방법도 성능 향상의 방법
  • 하나 이상의 프로세스가 활성화  가능(여러대의 MongoS를 운영 가능)
  • Application Server에서 실행 가능 (Application에서 직접적으로 접속하는 주체이며,독립적인 서버로 동작 가능하며,  Application 서버 내에서도 API 형태로 실행 가능)
    • MongoS를 Application Server 서버 local 에 설치하는 것을 추천
      (application server 가 별도의 라우터를 네트워크 공유 안하고, Local에서 직접 접근하기 때문에 효율성 증가. 별도의 서버를 구축 하지 않아서 서버 비용 절감. 단, 관리 포인트로 인한 문제점도 존재)

  • Config 서버로부터 Meta-Data를 Caching
    • read, write 작업시 해당 샤드를 찾을 수 있도록 캐쉬할 수 있는 기능을 제공
root@7d536b10b886:/# mongos --configdb config-replica-set/mongo1:27019,mongo2:27019 --bind_ip 0.0.0.0

 

  • MongoS가 실행될 때 Config 서버를 등록
    • Config 서버와 연결되면 샤딩 정책을 포함한 메타 정보를 연결된 모든 Config 서버에 전송
  • MongoS는 Config Server 와 연결하게 되는데, Config Server가 여러 대인 경우 여러 대 중 하나라도 연결이 안되면 MongoS 는 연결 실패로 MongoS가 실행되지 않음
  • MongoS 내에서는 데이터를 저장하지 않으며, 다른 MongoS간 연결이 없기 때문에 데이터 동기화를 위해 Config 서버를 이용
  • MongoS 의 쿼리 클러스터 라우팅 방법
    1. 쿼리를 보내야 하는 샤드 리스트를 결정
    2. 대상되는 샤드에 커서를 설정
    3. Target Shard에 보낸 결과를 받아 데이터를 병합하고 해당 결과를 Client로 Return
    4. Mongo3.6에서는, 집계 쿼리의 경우 각 Shard에서 작업하는 것이 아닌, Mongos에서 결과를 받아 merge 후 작업하여 리턴하는 형태로 변경
    5. MongoS에서 Pipline을 운영할 수 없는 2가지 경우
      1. 분할 파이프라인의 병합 부분에 Primary shard 에서 동작해야하는 부분이 포함되어 있는 경우
        • $lookup 집계가 실행중인 Shard Collection 과 동일한 Database 내에 있는 unshared collection과 진행 된다면, 병합은 Primary shard에서 실행해야 함
      2. 분할 파이프라인의 $group과 같은 Temporary data를 Disk에 기록해야 하는 경우가 포함된 경우, 또한 Client는 allowDiskUse를 True 지정했을 경우 Mongos를 사용할 수 없음
      3. 이런 경우, Merged 파이프라인은 Primary shard에서 하지 않고, 병합 대상인 샤드들 중 무작위로 선택된 샤드에서 실행
      • Shard cluster 에서 Aggregation 쿼리가 어떻게 동작하고 싶은지 알고 싶으면, explain:true 로 설정하여 aggregation을 실행하여 확인 가능
      • mergeType은 병합 단계에서 ("primaryShard", "anyShard", or "mongos") 로 보여주며,  분할 파이프라인은 개별샤드에서 실행된 작업을 리턴.
      • https://docs.mongodb.com/manual/core/sharded-cluster-query-router/ 참고

 

 

Chunk

  • collection을 여러 조각으로 파티션하고 각 조각을 여러 샤드 서버에 분산해서 저장하는데, 이 데이터 조각을 Chunk라고 함
  • chunk는 각 샤드서버에 균등하게 저장되어야 좋은 성능을 낼 수 있음
  • 균등하게 저장하기 위해 큰 청크를 작은 청크로 Split 하고 청크가 많은 샤드에서는 적은 샤드로 Chunk Migration 을 수행
  • 청크 사이즈는 Default 64Mb 이며 size를  변경도 가능 (또는 100,000 row)
    • Chunk size 변경 시 유의사항
      1. Chunk size를 작게하면 빈번한 마이그레이션이 동작하여 성능은 저하가 발생할 수 있으나, 데이터를 고르게 분배할 수 있는 효과를 볼 수 있다. (추가로 mongos에서 추가 비용이 발생)
      2. 청크 사이즈를 크게하면 마이그레이션 빈도는 줄어들어 네트워크나 mongos 에서 오버헤드 측면에서 효율적.그러나 잠재적으로 분포의 불균형이 발생 가능성
      3. 청크 사이즈는 마이그레이션할 청크 내 document 수와 비례(청크가 클수록 저장되는 document 수가 증가)
      4. 청크 사이즈는 기존 Collection을 분할할 때 최대 컬렉션 크기에 따라 영향. 샤딩 이후 청크 사이즈는 컬렉션 크기에 영향 없음
  • Chunk Split
    • 샤드 내 Chunk의 사이즈가 너무 커지는 것을 막기 위해 split 이 발생
    • 청크가 지정된 청크 사이즈를 초과하거나, 청크내의 문서 수가 마이그레이션할 청크당 최대 문서 수를 초과할 경우 발생
    • 이 때 split는 샤드 키 값을 기준으로 진행
    • Insert 또는 update 시 split 이 발생
    • split 이 발생하면 메타 데이터 변경이 발생하며, 데이터의 균등함을 가져온다
    • Split 을 한다고 해도 청크가 샤드에 고르게 분포되지 않을 수 있음
      • 이때 밸런서가 여러 샤드에 존재하는 청크를 재분배
      • 클러스터 밸런서 참고
  • Chunk Migration
    • 여러 Shard 로 나누어진 청크를 샤드간 균등하게 분배하기 위하여 Migration 을 진행
      1. Balance 프로세스가 moveChunk 명령을 Source 샤드로 명령(Chunk Migration이 필요한 Shard=Source Shard)
      2. Source Shard는 moveChunk 명령으로 이동 시작
        • 이동하는 Chunk는 라우팅 되어 동작하며, 경로에 대한 내역은 Source Shard에 저장(수신에 대한 내역)
      3. Target Shard는 해당 Chunk 관련한 Index를 생성(build)
      4. Target Shard는 Chunk 내의 Document 요청을 하고, 해당 데이터 사본을 수신 시작
      5. Chunk에서 최종(원본) Docuemnt를 수신한 후 Migration 간 변경된 내역이 있는지 확인하기 위하여 다시 한번 Target Shard는 동기화 프로세스를 시작
      6. 완전히 동기화가 완료되면, Config Server에 연결하여 Cluster Meta 데이터를 청크의 새 위치로 업데이트 진행(mongoS가 관여 하지 않을까....의견)
      7. Source Shard가 Meta 정보를 업데이트를 완료하고 Chunk에 열린 커서가 없으면 Source Shard는 Migration 대상을 삭제 진행
    • 샤드에서 여러 청크를 마이그레이션 하기 위해서는 밸런서는 한 번에 하나씩 청크를 마이그레이션 진행
      • 3.4에서부터는 병렬 청크 마이그레이션을 수행 가능. 단, 샤드가 한 번에 최대 하나만 참여하지만, 샤드가 n 개인 여러개의 샤딩된 클러스터의 경우 최대 샤드 n개/2 만큼 동시 Migration을 진행 가능
        • 클러스터 내 Chunk Migration은 한번만 가능했지만, 3.4부터는 Cluster 내 최대 n/2 개의 Chunk Migration은 가능(단 하나의 샤드당 하나의 Chunk Migration만 가능)
    • 하지만, 밸런서는 청크를 이동 후 이동한 청크를 삭제에 대해서(삭제단계)는 기다리지 않고 다음 마이그레이션 진행 (비동기식 청크 마이그레이션 삭제)
    • attemptToBalanceJumboChunks 라는 밸런서 설정을 하면, 마이그레이션 하기 너무 큰 청크는 밸런서가 이동 시키지 않음
    • Migration 조건
      • 수동
        • 대량 Insert 중 데이터를 배포해야 하는 경우 제한적으로 사용
        • 수동 마이그레이션 참조
      • 자동
        • 밸런서 프로세스가 Collection 내 Chunk들이 파편화가 발생하여 고르게 분포 되지 않았다고 판단 될 때 Chunk를 이동 시킴
        • 측정 임계값
          • 하나의 샤드 서번에 8개의 Chunk가 발생하면 다른 서버로 Migration이 발생하는데 20개 미만의 Chunk가 발생하면 평균 2번 정도의 Migration 이 발생
          • Migration이 빈번하게 발생하면 Chunk를 이동하기 위한 작업이 수시로 발생하기 때문에 성능 지연현상을 발생 시킬 수 있음
          • 적절한 빈도의 Migration이 발생되기 위해서는 적절한 Chunk Size를 할당이 필요

Chunk 수

Chunk Migration 수

1 ~20개 미만

2

21 ~ 80 개 미만

4

80개 이상

8

 

Sharded Cluster Balancer

https://docs.mongodb.com/manual/core/sharding-balancer-administration/#sharding-internals-balancing

  • 각 Shard의 Chunk 수를 모니터링하는 백그라운드 프로세스
  • 샤드의 청크 수가 Migration 임계치 값에 도달 하면 밸런서가 샤드간에 Chunk를 자동으로 Migration하고 샤드 당 동일한 수의 청크를 유지
    • 샤드된 컬렉션의 청크를 모든 샤드 컬렉션에 균등하게 재분배하는 역할(밸러서는 Default로 enable)
    • 샤드간 청크가 균등하게 유지될 때까지 밸런서가 동작 (Migration 는 위의 Chunk Migration 동작을 참고)
  • 밸런싱 절차는 사용자 및 어플리케이션 계층에서 투명하게 동작하지만, Migration이 진행되는 동안에는 부하가 발생
  • 밸런서는 Config server Replica set에서 Primary에서 동작
  • 유지보수를 위해 밸런서를 비활성화도 가능하며 수행 시간을 설정도 가능
  • balancing 작업이 02:00 에 자동으로 수행
#Config 서버에서 동작을 확인 가능

repl_conf:PRIMARY> db.settings.update (

... {_id:"balancer"},

... {$set : {activeWindow: {start: "02:00", stop : "06:00" } } },

... {upsert : true } )

WriteResult({ "nMatched" : 0, "nUpserted" : 1, "nModified" : 0, "_id" : "balanver" })

repl_conf:PRIMARY> db.settings.find()

{ "_id" : "chunksize", "value" : 64 }

{ "_id" : "balancer", "stopped" : false }

{ "_id" : "autosplit", "enabled" : true }

{ "_id" : "balanver", "activeWindow" : { "start" : "02:00", "stop" : "06:00" } }
  • 샤드 추가 및 제거
    • 샤드를 추가하게 되면 새로운 샤드에는 Chunk가 없기 때문에 불균형이 발생
    • 클러스터에서 샤드를 제거하면 상주하는 청크가 클러스터 전체로 재분배가 일어나므로 샤드 추가와 같이 불균형이 발생
    • 샤드를 추가하거나 삭제 모두 마이그레이션 하기 시작하면 시간이 소요
    • 효율적인 방안은 좀 더 조사가 필요

 

Sharding System 주의점

  1. 하나의 Shard 서버에 데이터가 집중되고 균등 분산이 안 되는 경우
    • Shard Key로 설정된 필드의 Cardinality가 낮은 경우에 Chunk Size는 반드시 64Mb 단위로 분할되는 것이 아님
      • 때로는 64Mb보다 훨씬 큰 크기의 Chunk 크기로 생성되기도 함
      • Default로 8개 정도의 Chunk가 발생하면 Migration이 발생하기 때문에 다른 서버로 Chunk를 이동하는 횟수는 줄어들게 되고 자연스럽게 하나의 샤드 서버에 만 데이터 집중되는 현상 발생
    • 적절한 Shard Key를 선택하지 못한 경우 발생하는 문제점(Shard Key의 중요성)
    • 혹여나 균등 분산이 안된다고 판단되면 Chunk Size를 줄이는 것을 추천(Migration 빈도수가 높아 져 균등하게 분활은 되나 성능 저하 발생)
  2. 특정 Shard 서버에 IO 트래픽이 증가하는 경우
    • MongoDB의 샤드 서버는 동일한 Shard Key를 가진 데이터들을 같은 샤드 서버에 저장하기 위해 Split 과 Migration 수행
    • Shard 서버의 IO 트래픽이 증가하는 이유는 너무 낮은 Cardinality 를 가진 Field를 설정 때문
    • 데이터가 집중적으로 저장되어 있는 Chunk를 Hot Chunk라고 하며, 특정 서버에 집중되어 있을 때 상대적으로 서버 IO 트래픽이 증가
  3. 샤드 클러스터의 밸런스가 균등하지 않는 경우
    • 데이터를 입력할 때 로드 밸런싱이 적절하게 수행되었지만 사용자의 필요에 따라 특정 서버의 데이터를 삭제 또는 다른 저장 공간으로 이동했다면 Balancer가 깨지게 됨
    • Shard key 의 낮은 Cadinality
    • 하나의 Collection에 대량의 Insert 되는 것 보다 분산 저장되는 속도가 늦는 경우 밸런스 불균형 발생
    • Insert 되는 속도에 비해 Chunk Migration이 빈약한 네트워크 대역폭으로 인해 빠르게 이동되지 못하는 경우 발생
    • 하루 일과 중 Peak 시간에 빈번한 Migration이 발생하게 되면 시스템 자원의 효율성이 떨어지게 되어 성능 지연 발생
      • 피크 시간에 Chunk Migration을 중지하고 유휴 시간에 작업 될 수 있도록 Balance 설정
  4. 과도한 Chunk Migration이 클러스트 동작을 멈추는 경우
    • 빈번한 Chunk Migration이 일시적으로 Cluster 서버 전체의 성능 지연 문제를 유발
    • 빈번한 Migration 회수를 줄일 수 없다면 유휴 시간에 작업 될 수 있도록 Balance 설정
    • 불필요하게 큰 Chunk Size는 네트워크 트래픽을 증가시키고 시스템 자원의 효율성을 저하 시키는 원인이 될 수 있으므로 Chunk size를 줄이는 것 고려
    • Shard 서버의 밸런싱이 적절하지 않다는 것은 Shard 서버의 수가 처리하려는 데이터 발생 양에 비해 부족하기 때문이므로 샤드 서버 대수 증설 고려
  5. 쓰기 성능이 지연되고 빠른 검색이 안 되는 경우
    • 초당 몇 만 건 이상의 데이터들이 동시에 저장되기 위해 Collection의 크기가 중요
    • 하나의 Collection은 여러개의 익스텐트 구조로 생성되는데, 익스텐트 사이즈가 작게 생성되어 있으면 잦은 익스텐트 할당으로 인해 불필요한 대기 시간이 발생
    • 충분한 익스텐트 크기로 생성(createCollection 을 이용하여 Collection을 명시적으로 생성하면서 size를 조정 가능)
    • 빠른 쓰기 성능이 요구되는 경우 Rich Docuemnt 구조로 설계하는 것이 유리  (Data Model Design 참고 /  Embedded Document도 고려)
    • 메모리 부족으로 인한 성능 저하로 메모리 증설도 고려
  • Shard Key 의 중요성,(1~5) Balancer는 유휴 시간대에 작업 추천(3), 적절한 Shard 수(4), 메모리도 체크(5)

참고

반응형
반응형
  • 저희 회사에서 가장 많이 사용하는 구성이라 기존에 직접 설치 메뉴얼 만든 것을 바탕으로 재 구성 해 보았습니다.
  • 현재 대부분 3.4 버전을 많이 사용하고 있으며, 현재 구성하는 것들은 4.2 버전을 사용하고 있습니다.
  • 로그성 DB 위주로 사용하다 보니, 변경을 하더라도 문제 없이 동작을 하였기에 4.2 로 올리고 있는 중입니다.'
    1. Replica Set 이란?
      • 2초마다 Heartbeat하여  Primary와 Secondary 서로 서버 상태를 체크
      • Primary
      • Secondary
        • Readonly - Read 에 대한 분산
        • Primary Server의 데이터를 복제하며, HA 용도
      • Primary 선출 방식
        • 남은 Secondary 노드 들이 vote 투표를 통해 Primary 선출을 진행
        • 선출 방식 중 우선 순위에 의한 선출 방식의 경우 데이터의 동기화 수준 상관없이 우선 순위(priority)에 의해서 선출되는 방식
          • priority 값이 높을 수록 Primary 로 선출이 된다.
        • Arbiter 선출 방식은 데이터를 가지고 있지 않지만, 투표에 참여가 가능한 Arbiter를 이용하여, 선출 시에만 사용되는 서버 (비용 절감 효과)
          • DBA 관리 측면에서 어떤 서버가 Primary가 되는지 알기 쉬움
          • 실제로 구성 시  rs.initiate 명령어(하단 구성에서 찾을 수 있음) 에서 각 node 들에 대해 우선 순위-가중치-priority 에 값을 적용
      • Oplog복제 방식
        • DML 내용들은 모두 OpLog에 가장 먼저 작성
        • Primary 에 먼저 데이터를 적용 후 Oplog에 저장
        • Secondary (각 member들) 에서는 Oplog를 가져와 자신의 DB에 저장
        • oplogSizeMB 옵션으로 크기를 지정할 수 있으며, 복제 중에는 replSetResizeOplog 명령(재시작 필요 없음)을 사용하여 크기를 변경 가능
        • 기존 oplog 구성된 최대 크기에 도달했을 경우 생성이 되지만, v4.4 이후 부터는 시간을 지정 가능하여, 구성 시간이 오래되었을 경우 별도 파일로 생성 가능
        • Slow Oplog  Application
          • v4.2 (4.0.6) 부터 Replica 에서 Oplog를 적용하는 시간이 slow log 시간(slowOpSampleRate) 보다 큰 경우 oplog 항목을 기록 가능
    2. 구성
      • AWS EC2를 사용하기 때문에 그에 맞춰져 있습니다.
      • 구성은 P-S-A 로 진행 하였습니다. (맨 하단에 위치)
      • 인증은 내부 인증을 사용하기 위해, key 파일을 생성하여 진행 하였습니다. (Update Replica Set to Keyfile Authentication 참고)
    3. 레플리카 셋 배포
      1. 추가
        • rs.add({host:"아비터host:port", priority:1, arbiterOnly:true})
        • rs.add({host:"192.168.0.2:27017", priority:1, arbiterOnly:true})
      2. Secondary 추가
        • rs.add({host:"SecondaryHost:port", priority:1})
        • rs.add({host:"192.168.0.2:27017", priority:1})
      3. 확인하는 방법
        • rs.conf()
        • rs.status()
      4. 하나의 서버에 여러개 아비터 띄우는 방법
        1. 아비터 서버에 해당 port / data /log 만 바꿔서 하나씩 mongodb 띄우기
        2. Primary에서 위의 add 명령어 사용해서 추가
        3. 확인

 

 

 


P-S-A 구성하는 방법

  1. Download 및 기본 셋팅  (Primary, Secondary 모두 동일하게 구성)
#mongodb 를 다운 및 압축 풀고 지정 경로에 저장
#필요 시 OS 유저 생성 (user add mongod)
$sudo groupadd mongod
$sudo useradd -g mongod mongod

$ wget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-amazon2-4.2.6.tgz
$ tar -xvf mongodb-linux-x86_64-amazon2-4.2.6.tgz
$ mkdir /usr/local/mongodb
$ mv ./mongodb-linux-x86_64-amazon2-4.2.6/* /usr/local/mongodb
$ chown -R mongod:mongod /usr/local/mongodb

# data & log path 생성
$ mkdir -p /data/mongo/data
$ mkdir -p /data/mongo/log
$ chown -R mongod:mongod /data

#mongod 유저로 변경해서 아래내용 모두 진행
# add path
$ vi ~/.bash_profile

    # .bash_profile 
    # path에 아래 내용 추가 수정 진행, 저장

    mongodb=/usr/local/mongodb/bin
    PATH=$PATH:$HOME/.local/bin:$HOME/bin:$mongodb
 
# profile 적용
$  . ~/.bash_profile

# 존재 한다면 config 파일 백업
$ cp /etc/mongod.conf ~/mongod.conf.bak

 

  1. 공통 config 파일 설정 및 key 파일 생성 (Primary, Secondary 모두 동일하게 구성)
#Primary 내용

#key 파일 생성 (mongodb 간 접속을 진행 할 때 필요)

# 해당 key파일은 Secondary에 복사



$ sudo openssl rand -base64 756 > /usr/local/mongodb/mongo_repl.key

$ sudo chown mongod.mongod /usr/local/mongodb/mongo_repl.key

$ sudo chmod 600 /usr/local/mongodb/mongo_repl.key



# 아래 내용 복사

$ cat /usr/local/mongodb/mongo_repl.key

$ chmod 400 /usr/local/mongodb/mongo_repl.key
# Secondary DB 에서 위의 Primary DB에서 생성한 mongo_repl.key 의 내용을 복사 해서 동일한 위치에 붙여 넣고 저장 진행

# Secondary OS

# 복사한 Primary의 key 내용을 붙여 넣기 후 저장

$ vi /usr/local/mongodb/mongo_repl.key

$ chmod 400 /usr/local/mongodb/mongo_repl.key

 

  • Config 설정
# /etc/mongod.conf 수정 

$ sudo touch /etc/mongod.conf

$ sudo chown mongod.mongod /etc/mongod.conf

$ vi /etc/mongod.conf



#기존 내용 삭제 및 해당 내용으로 대체



systemLog:

   destination: file

   path: /data/mongo/log/mongo.log

   logAppend: true

   logRotate: rename



storage:

   engine: wiredTiger

   directoryPerDB: true

   wiredTiger:

      engineConfig:

         journalCompressor: snappy

      collectionConfig:

         blockCompressor: snappy

      indexConfig:

         prefixCompression: true

   dbPath: /data/mongo/data

   journal:

      enabled: true



processManagement:

   fork: true



#replication:

#   replSetName: "replSet"

  

net:

   bindIp: 0.0.0.0

   port: 27017



#아래 내용은 replica 구성이 완료 되면 아래 주석을 풀어주도록 하자

#replication:

#   replSetName: "replSet"

#security:

#  authorization: enabled

#  keyFile: /usr/local/mongodb/mongo_repl.key

 참고로 systemlog 의 quiet 는 권장하지 않는다.

문제 발생 시 에러 내역 트랙킹 하기 힘들기 때문. 

 

systemLog.quiet

Type: boolean

Run mongos or mongod in a quiet mode that attempts to limit the amount of output.

systemLog.quiet is not recommended for production systems as it may make tracking problems during particular connections much more difficult.

 

failIndexKeyTooLong : Index Key중 너무 큰게 있으면 저장되는 것을 fail 시켜라고 하는 파라미터

 

옵션 정리

http://blog.naver.com/PostView.nhn?blogId=theswice&logNo=220763046938&redirect=Dlog&widgetTypeCall=true

 

  1. Primary, Secondary  DB Startup 진행 & User 생성
#Primary, Secondary  모두 진행

#서비스 등록

$ sudo vi /etc/systemd/system/mongodb.service



[Service]

User=mongod

ExecStart=/usr/local/mongodb/bin/mongod --config /etc/mongod.conf



# sudo에 mongod 유저 등록

$ visudo -f /etc/sudoers



#아래 내용 찾아 하단에 추가

## Allow root to run any commands anywhere

root    ALL=(ALL)       ALL

mongod  ALL=(ALL)       ALL

 

  • MongoDB Startup
# Primary, Secondary , Arbiter 모두 진행

# Arbiter의 경우 이미 실행되어 있을 수 있으므로 mongodb가 Start가 되어있는지만 확인

# MongoDB 시작

$ sudo systemctl start mongodb



# MongoDB 상태 확인

$ ps -ef | grep mongod

 

  • MongoDB User 생성
# User 생성

# Primary에서만 진행 



$ mongo



// 인증 DB가 admin

// root 계정 생성 및 게임서버 유저, bi 등에서 생성되는 유저 생성 진행

mongo> use admin

mongo> db.createUser(

    {    

        user: "LKadmin",

        pwd: "test!23",

        roles:[

            { role: "root", db: "admin" }

        ]

    }

)

 

 

  • Replica DB 구성
# Primary 에서만 진행

# ------------------------------------------------------------------------------------------------------

replica 설정 초기화, IP 및 Port 확인 필요!!

# ------------------------------------------------------------------------------------------------------

# replica 설정 초기화



> rs.initiate(

   {

      _id: "replSet",

      version: 1,

      members: [

         { _id: 0, host : "10.95.171.77:27017", priority:3 },

         { _id: 1, host : "10.95.171.78:27017", priority:2 },

         { _id: 2, host : "10.95.171.79:27017", priority:1, arbiterOnly:true }

      ]

   }

)

> rs.status() # stateStr 값이 primary 여야 함

> rs.conf()

> exit

# Primary, Secondary Config 에서 아래 주석내용 풀어준 후 리스타트 진행

# vi /etc/mongod.conf



replication:

   replSetName: "replSet"



security:

  authorization: enabled

  keyFile: /usr/local/mongodb/mongo_repl.key



$  sudo systemctl restart mongodb



# Primary 접속하여 아래와 같이 나오는지 체크

# Primary 접속하여 아래와 같이 나오지 않는다면, replica 설정 초기화를 다시 진행해도 된다.(굳이 mongodb 리스타트 또는 config 주석을 변경 하지 않아도 된다.)



$ mongo admin -u LKadmin -p  

replSet:PRIMARY>



# secondary 접속 테스트

$ mongo admin -u LKadmin -p  



#아래와 같이 나오는지 체크

replSet:SECONDARY>

replSet:SECONDARY> rs.slaveOk()

replSet:SECONDARY> rs.status()

{

        "set" : "replSet",

        "date" : ISODate("2020-05-27T06:05:42.413Z"),

        "myState" : 2,

        "term" : NumberLong(1),

        "syncingTo" : "10.95.171.77:27017",

        "syncSourceHost" : "10.95.171.77:27017",

        "syncSourceId" : 0,

        "heartbeatIntervalMillis" : NumberLong(2000),

        "majorityVoteCount" : 2,

        "writeMajorityCount" : 2,

        "optimes" : {

                "lastCommittedOpTime" : {

                        "ts" : Timestamp(1590559533, 1),

                        "t" : NumberLong(1)

                },

                "lastCommittedWallTime" : ISODate("2020-05-27T06:05:33.008Z"),

                "readConcernMajorityOpTime" : {

                        "ts" : Timestamp(1590559533, 1),

                        "t" : NumberLong(1)

                },

                "readConcernMajorityWallTime" : ISODate("2020-05-27T06:05:33.008Z"),

                "appliedOpTime" : {

                        "ts" : Timestamp(1590559533, 1),

                        "t" : NumberLong(1)

                },

                "durableOpTime" : {

                        "ts" : Timestamp(1590559533, 1),

                        "t" : NumberLong(1)

                },

                "lastAppliedWallTime" : ISODate("2020-05-27T06:05:33.008Z"),

                "lastDurableWallTime" : ISODate("2020-05-27T06:05:33.008Z")

        },

        "lastStableRecoveryTimestamp" : Timestamp(1590559493, 1),

        "lastStableCheckpointTimestamp" : Timestamp(1590559493, 1),

        "members" : [

                {

                        "_id" : 0,

                        "name" : "10.95.171.77:27017",

                        "health" : 1,

                        "state" : 1,

                        "stateStr" : "PRIMARY",

                        "uptime" : 16200,

                        "optime" : {

                                "ts" : Timestamp(1590559533, 1),

                                "t" : NumberLong(1)

                        },

                        "optimeDurable" : {

                                "ts" : Timestamp(1590559533, 1),

                                "t" : NumberLong(1)

                        },

                        "optimeDate" : ISODate("2020-05-27T06:05:33Z"),

                        "optimeDurableDate" : ISODate("2020-05-27T06:05:33Z"),

                        "lastHeartbeat" : ISODate("2020-05-27T06:05:41.267Z"),

                        "lastHeartbeatRecv" : ISODate("2020-05-27T06:05:40.496Z"),

                        "pingMs" : NumberLong(0),

                        "lastHeartbeatMessage" : "",

                        "syncingTo" : "",

                        "syncSourceHost" : "",

                        "syncSourceId" : -1,

                        "infoMessage" : "",

                        "electionTime" : Timestamp(1590543352, 1),

                        "electionDate" : ISODate("2020-05-27T01:35:52Z"),

                        "configVersion" : 1

                },

                {

                        "_id" : 1,

                        "name" : "10.95.171.78:27017",

                        "health" : 1,

                        "state" : 2,

                        "stateStr" : "SECONDARY",

                        "uptime" : 17820,

                        "optime" : {

                                "ts" : Timestamp(1590559533, 1),

                                "t" : NumberLong(1)

                        },

                        "optimeDate" : ISODate("2020-05-27T06:05:33Z"),

                        "syncingTo" : "10.95.171.77:27017",

                        "syncSourceHost" : "10.95.171.77:27017",

                        "syncSourceId" : 0,

                        "infoMessage" : "",

                        "configVersion" : 1,

                        "self" : true,

                        "lastHeartbeatMessage" : ""

                },

                {

                        "_id" : 2,

                        "name" : "10.95.171.79:27017",

                        "health" : 1,

                        "state" : 7,

                        "stateStr" : "ARBITER",

                        "uptime" : 16200,

                        "lastHeartbeat" : ISODate("2020-05-27T06:05:41.266Z"),

                        "lastHeartbeatRecv" : ISODate("2020-05-27T06:05:42.157Z"),

                        "pingMs" : NumberLong(0),

                        "lastHeartbeatMessage" : "",

                        "syncingTo" : "",

                        "syncSourceHost" : "",

                        "syncSourceId" : -1,

                        "infoMessage" : "",

                        "configVersion" : 1

                }

        ],

        "ok" : 1,

        "$clusterTime" : {

                "clusterTime" : Timestamp(1590559533, 1),

                "signature" : {

                        "hash" : BinData(0,"W5JmQNO1ECoqd0kldQ26bHHzspk="),

                        "keyId" : NumberLong("6831331679710216195")

                }

        },

        "operationTime" : Timestamp(1590559533, 1)

}

 

 

완료 한 번씩 로그 체크는 진행

반응형
반응형

[MongoDB] 웨비나 - mongodb 에 대한 8가지 오해

  1. 작년 10월 에  MongoDB 웨비나에서 소개된 자료를 정리한 것이 있어서 직접 다시 집중적으로 정리하였습니다.
  • 소개

https://www.mongodb.com/presentations/korea-top-misunderstanding-of-mongodb

  • 비디오

https://vimeo.com/468475015

  • PPT

https://view.highspot.com/viewer/5f882ff5a2e3a96b5627dc39

 

[ WiredTiger Engine ]

 

  • 2014 년 Wired Tiger 업체를 인수
  • 3.0 에서 MMAPv1 -> WT 으로 변경 진행 (MMAPv1 4.0 이하에서는 사용 가능 / 4.2에서는 제거)\
  • 본질적으로 Transaction Storage Engine 이며, Read & Write 에 우수(MMAPv1 은 Insert 에 최적화된 Engine)
  • 4.0 에서 부터는 분산 Multi Transaction 을 지원
  • 특징
    • WiredTiger에서 압축은 기본 옵션
    • v4.2 부터 총 3개의 압축 옵션을 제공 (snappy, zlib, zstd)
      • 호환성 성능상 아직 이슈가 있어서 snappy 이용
      • 압축률은 zstd(zlib와 대등) 가 더 좋으나, 성능은 snappy 와 zlib 사이로 판단
    • 인덱스는 prefix 압축을 사용
      • 데이터는 WT의 Cache 상태에서는 non 압축 상태 즉, 데이터를 저장 시 Document로 bson 형태로 저장되는데, bson 은 non압축상태로 WT cache 에 존재하며 Disk 와 os file cache 에는 압축 상태로 저장.
      • 만약 find 하는 경우 먼저 cache 의 non 압축 상태에서 먼저 확인 후, Disk 의 압축 상태에서 찾아봐 압축을 해제하며 cache 에 적재
      • prefix 압축이란?
      • 그런데 Index 는 메모리 상 뿐만 아니라 디스크에도 prefix 압축된 상태로 사용(메모리의 효율성을 증대 가능)
    • 삭제된 공간을 재활용 못하기 때문에 Disk size가 무한대로 증가하는 과거 버전과 달리, 현재는 재활용 가능
    • 그래서, collection 에서 사용되지 않는 공간을 확인 가능하며, WT 에서 수동으로 compaction 을 할 수 있음
    • DDL 작업을 제외하고는 4.4 부터 온라인 Compaction 할 수 있음
      • 기존에는 Database level lock 을 잡는것과 달리, 4.4 부터는 CRUD 작업에 대해서는 Lock을 잡지 않음.
      • 그래서 운영간 Primary에서도 가능
    • 초기 버전에는 Global -> Database -> collection Level Lock 을 잡았음 (몇 백개의 Collection이 존재하는 과거 운영)
    • 현재는 Document Level Locking
    • Write no longer block all other writes
    • More CPU cores, instead of faster CPU speeds (하드웨어의 자원을 최대로 사용 가능 - 성능 향상 가능 - 고 사양 서버를 사용 가능하며 유리)
    • Avoid need to "micro-shard"
    • WT에서는 메모리 Cache size , Check point Interval time 등을 조정 가능 (파라메터 튜닝 가능)
    1. 압축
    2. 온라인 Compaction
    3. 향상된 Concurrency
    4. 최신 하드웨어 활용
    5.  조정 가능한 설정 파라메터
  • MMAPv1 와 비교한 레퍼런스 (amadeus , book my show, FaceIT 등의 글로벌 업체)
    • 스토리지 공간을 80% 절감
    • 최대 5~10 배 향상된 성능

[암호화]

  • 3.6 부터 Binding 을 Localhost 가 Default 로 되어 있어서, 설치 시 Localhost 에서만 접속 가능
  • 아래 내역들에 대해 Community 와 Enterprise 등 모두 사용 가능

 

[Check Point]

  • 스토리지 상의 일관된 데이터 페이지 셋으로, 내구성 / 원자성을 지닌 디스크의 데이터 스냅 샷이라고 정의
  • 쉽게 말하면, 데이터를 DML 하는 가운데 메모리 Page상으로 존재하는데, 매 1분 또는 메모리 Dirty 영역이 일정 임계치 이상이 되는 경우 해당 Page들을 Disk로 저장

 

[Journaling]

  • 체크 포인트 간의 발생하는 모든 데이터 수정 사항을 신속하게 유지할 수 있는 write-ahead 로그 (journal)
    • write-ahead 로그 : 디스크에 쓰기 전에 한번 더 별도의 공간에 저장 (파일시스템 내에 저널파일이-100mb 단위/ 존재하며 체크 포인트 발생하면 모두 삭제 됨)
    • 데이터 뿐만 아니라 인덱스 에서 발생하는 로그들 모두 기록
  • 저널링 버퍼가 존재하는데, 저널링 버퍼에서 저널링 파일로 저장되는 시간이 1oms  (10ms 의 유실 가능성 존재)
  • 체크 포인트 간에 장애가 발생하면 저널을 사용하여 마지막 체크 포인트 이후 수정 된 모든 데이터를 적용

* Journaling 없이도 MongoDB는 손실 없이 마지막 체크 포인트로 복구 가능하나, 마지막 체크 포인트 이후의 변경 사항을 복구하려면 저널링을 활성화해야 가능

 

[Write Concerns]

  • Write 작업을 위해 MongoDB cluster에서 Client Application Ack 요청을 제어
  • Write 작업을 할때 Primary 에 적용 하고 응답을 받고 Session 을 종료 할 것인지, Secondary 까지 적용이 완료 된 후 응답을 받아 종료를 할것인지, Primary 에서 조차도 응답을 안받고 작업을 종료 할 것인지에 대한 Level 제어
  • 또한 wtimeout 을 적용하여 해당 시간 보다 오래 소요되는 경우 Error return (단위 ms)
  • Write Concern Option
    • 1 : Written to Primary - Default
    • 2 : written to Primary and at least one Secondary
    • majority : Written to majority of nodes
  • ex) 과반수 노드가 write 된 후에 ack 를 달라고 하는데, 단 5초 이상 소요 시 error return
  • db.products.insert({name : "Louis.Kim"}, {writeConcern:{w:"majority", wtimeout:5000}})

[Multi-Document Distributed ACID Transactions]

  • Single/ Multi Document, Shard  등 Transaction 지원
  • RDBMS와 동일한 형태
    • Transaction Default Timeout 60초
    • 다중 구문, 유사한 문법
    • 어플리케이션에 쉽게 추가 가능
    • 여러 컬렉션에 대한 다중 구문 트랜잭션
  • ACID 보장
    • All or nothing execution
    • Snaphost isolaction ( Serializable Snapshot Isolaction의 공동 발명자인 Michael Cahill 포함한 Wired Tiger 회사 인수-와 연관 )

Causal Consistency

반응형
반응형

몽고DB란?

  • 크로스 플랫폼 Document 지향 DBMS
  • 빠른 속도와 확장성
    • NoSQL (비관계) DBMS
      • 기존 R(관계)DBMS의 규칙을 포기하는 대신, 뛰어난 확장성이나 성능을 발전시키는 방향
    • 대표적인 NoSQL DBMS

종류

관련 언어

Key-Value Store

Redis, Dynamo

Column 지향 언어

HBase, Cassandra

Document 지향 언어

MongoDB

Graph Database

Neo4J

  • 친숙함
    • JSON과 유사항 형태로 저장하는 방식
    • Javascript를 활용할 수 있음
    • Document 지향 스토어 (Schemaless)
    • 다양한 인덱스를 지원

 

  • 쉽고 빠른 분산 컴퓨팅 환경 구성 (신뢰성-Reliability / 확장성-Scalability)
    • Replication(HA) , Sharding(분산 저장) 제공하여 쉽게 구성이 가능

신뢰성(Reliability)

  • 서버 장애에도 서비스는 계속 동작
    • Primary 와 Secondry로 구성된 ReplicaSet 구조로 고가용성을 지원

확장성(Scalability)

  • 데이터와 트래픽 증가에 따라 수평확장(scale-out) 가능
    • 데이터를 샤딩하여 수평확장(scale-out) 할 수 있음

유연성(Flexibility)

  • 여러가지 형태의 데이터를 손쉽게 저장
    • 서비스 요구사항에 맞춰 다양한 종류의 데이터가 추가되어도 스키마 변경 과정 없이 필요한 데이터를 바로 저장하고 읽을 수 있음

Index 지원
(Index Support)

  • 다양한 조건으로 빠른 데이터 검색
    • 다양한 인덱스 지원 다양한 용도로 사용이 가능

MongoDB 구조

  • MongoDB에 여러 개의 Database 가 존재
    • create 문이 존재 하지 않으며, 원하는 DB명을 생성이 아닌 선택 후 document를 insert하면 생성 됨
  • 하나의 Database 내에 여러 개의 Collection 이 존재
    • DB와 동일하게 별도의 create 문이 존재 하지 않으며, document를 생성하면 자동으로 collection이 생성
  • 하나의 Collection에는 여러 Document가 존재
    • 실질적인 데이터
  • BSON
    • Binary JSON 구조
    • 필드 뒤에 값이 오는 형태
    • 필드와 필드는 쉼표로 구분
    • Document 하나는 중괄호로 감싸져 존재
    • 필드에는 문자열만 들어가며, 값에는 배열, 숫자, 3차원 위치 좌표, 또 다른 필드와 값을 가진 오프젝트도 값으로 가질 수 있음 (유연성-Flexibility)
    • 그 외 다른 정보를 모두 표현할 수 있도록 다양한 값의 형식을 지원
      • 그 중 중요한 ObjectID 만 정리
      • ObjectID
        • Document 생성 시 PK로 "_id" 필드가 자동으로 생성
        • _id 는 서로 겹치지 않는 ObjectID 타입으로 값을 할당
        • 동시에 생성되어도 서로 다른 값이 생성되어 유일 값
        • ObjectID 값은 "유닉스시간+기기id+프로세스id+카운터" 로 구성
          • 앞에 4byte는 유닉스 시간
          • 다음 3byte는 기기의 id 값
          • 다음 2byte는 프로세스 id 값
          • 마지막 3byte는 랜덤 값부터 시작하는 카운터로 구성
        • ObjectId.getTimestamp() 하면 생성된 시점을 알아낼 수 있음. (ObjectID를 이용하면 시간 range 로 검색이 가능)
> ObjectId("507c7f79bcf86cd7994f6c0e").getTimestamp()
ISODate("2012-10-15T21:26:17Z")

use database1            // database 선택
db.collection1.insert({              // collection.명령어 document
    "name" : "Rouis.Kim"   // field : value
    , "age" : 37           // field : integer value
    , "role" : "DBA"       // field : string value
    , "exp" : [{"Dev" : 5, "DBA" : 6}, {"Game": 3, "Ecommercail":2, "Enginner":3,"etc":3}]  // field : Object value
    , "loc" : [37.504997827623406, 127.02381255144938] // field : geomentry
    , "etc" : ["2 daughters","프로이직러"] // field : array value
})
// document가 생성되지만, collection 이 존재하지 않으면 collection, database가 존재 하지않으면 database 까지 생성 됨
// schemaless (스키마 유연성)

언제 어떻게 써야 할까?

  • Schema가 자주 바뀌는 환경
    • Document 지향 스토어 방식
      • 스키마의 변경이 자유로운 편
      • 추가될 기능이 확실하지 않고 상황에 다라 App에 저장될 자료가 지속적으로 달라지는 환경(개발 방법론으로 개발 하는 곳)
      • 애자일 개발 방법론 / 나선형 모델에 최적 (미주 추가)
  • 분산 컴퓨팅 환경
    • 샤딩과 복제를 지원하기 때문에 여러 대의 컴퓨터에 구성이 용이하기 때문에 빠른 분산 컴퓨팅 환경이 구축 필요한 경우

 

RDBMS

NoSQL

Schema

  • 명확한 데이터 구조 (Join 등으로 복잡성 증가, 스키마 수정 시  비용 발생)
  • 데이터 무결성 보장
  • 수평적 확장이 힘듦(수직적 확장)

Schemaless

  • 자유롭게 데이터를 관리(중복 데이터 저장 가능하여 데이터 관리 필요)
  • Data에 대한 규격화된 결과 얻기가 힘듦
  • Join이라는 개념이 없으며, Join 가능은 하나 힘ㄷ름
  • 수평적 확장이 쉬움

ACID / BASE 관련 좋은 글

https://embian.wordpress.com/2013/06/27/nosql-2/

https://velog.io/@qoik11/MongoDB-3rk4z5de3r

 

버전 관리

  • MongoDB는 두 번째 자리 숫자가 짝수인 버전이 안정화 버전이며, 홀수인 버전이 개발 중인 버전
  • 서비스를 하는 것이라면 두 번째 자리가 짝수인 버전을 사용하는 것 추천
  • Postgresql 도 비슷한 것으로...

 


애자일 개발 방법론

  • 신속한 반복 작업을 통해 실제 작동 가능한 소프트웨어를 개발하여 지속적으로 제공하기 위한 소프트웨어 개발 방식
  • 지속적으로 요구 사항을 반영하며, 의사소통이 많으며 지속적인 테스트 가 특징
  • 명세화 부족하며 잦은 변경으로 테스트 증가. 개발자 위주라 사업 관리 부분 미흡(유지보수 비용 증가)

나선형 모델

  • 시스템 개발 시 위험을 최소화 하기 위해 점진적으로 완벽한 시스템으로 개발해 나가는 나선형 모델로 ,계속 목표설정-위험분석-개발검증-고객평가를 계속하며 점진적으로 개발
  • 리스크를 최소화 하기 위하여 "위험 분석 단계"를 추가하여 위험 부담이 큰 대형 시스템 구축에 적합.
  • 프로젝트 기간이 길며, 관리가 어려움. 위험 관리에서는 해당 전문가가 필요
반응형
반응형

올해 1월부터 진행 한 MongoDB Study에 대해서 정리해 봅니다.

함께하는 멤버들의 블로그 입니다.

 

루이스(본인) : hyunki1019.tistory.com/

라라님 : rastalion.me/

닷닷님 : 닷닷이의 DB공간 : 네이버 블로그 (naver.com)

재혁님 : marsettler.com/

 

그 외 분들은 다른 자료로 공유를 하시기 때문에 그 분들의 내용은 추후에 업로드 해보도록 하겠습니다.

주로 4.x (최신 버전) 으로 진행 하고 있습니다.

반응형
반응형

[원본] 아래 블로그와 동일한 작업이며, 단지 소스 오픈 여부 입니다. 아래 작성자님 무한 감사 드립니다.

https://brunch.co.kr/@alden/53

  • 더불어 문서에 앞서 ST Unitas 황선규님에게 감사 인사 드립니다. 대부분의 코딩 수정을 도와주셨습니다.
    (거의 다 작성해 주셨습니다 ㅎㅎ)

목적

  • Cloudwatch 상에서 확인할 수 있는 그래프를 Slack에서 이미지로 확인
  • 가독성이 높아져서 현상 확인하기 쉬움
  • 그래프를 확인하기 위해 AWS에 접속하는 불편 감수

 

동작방식

  • Lambda 에서 미리 셋팅한 Cloudwatch 정보를 이용하여(json) 현재 상태를 이미지화 하여 Slack으로 전송
  • Python 3.7로 개발

 

필요한 모듈

  • boto3
  • certifi
  • chardet
  • idna
  • json
  • requests

 

사전지식

  • Lambda 를 이용하기 위해서는 python에서 사용하는 모든 모듈을 파일로 보유해야함(모두 하나의 zip으로 압축해서 업로드)
    (Lambda에 대한 사전 지식이 있으면 괜찮지만, 처음 접하기에 쉽지 않음 - pip로 local로 필요한 모듈을 설치 후 해당 모듈의 폴더 전체가 필요)
    ex) C:\Users\접속한계정\AppData\Local\Programs\Python\Python37\Lib\site-packages 에 설치한 모듈 존재

  • Lambda 테스트를 위해서는 핸들러의 명칭에 존재하는 함수가 반드시 명시되어 있어야 하며, 함수의 Default 핸들러는 수정하면 안됨

  • 권한 : boto3.client('cloudwatch').get_metric_widget_image 을 사용하기 위해서는 Cloudwatch에 접근할 수 있는 권한과 함께 "cloudwatch:GetMetricWidgetImage" 이라는 권한도 필요
    Lambda를 생성하는 계정에 모든 권한이 존재 하더라도, Lambda 하단의 역할에서 존재 여부를 체크 필요
    IAM 권한에서 있는지 여부 체크 필요 (아래는 관련 모든 권한을 부여함)

    {

        "Version": "2012-10-17",

        "Statement": [

            {

                "Sid": "VisualEditor0",

                "Effect": "Allow",

                "Action": [

                    "cloudwatch:DescribeAlarmHistory",

                    "cloudwatch:GetDashboard",

                    "cloudwatch:GetMetricData",

                    "cloudwatch:DescribeAlarmsForMetric",

                    "cloudwatch:DescribeAlarms",

                    "cloudwatch:GetMetricStatistics",

                    "cloudwatch:GetMetricWidgetImage"

                ],

                "Resource": "*"

            }

        ]

    }

필요한 Cloudwatch 정보

  1. 이미지로 보고자 하는 Cloudwatch 지표를 확인하여 저장
    아래 샘플은 Cloudwatch의 지표 중 하나인 전체 EC2 CPU 정보


    소스를 선택하면 아래의 정보를 확인할 수 있다.

     

     

    {
    "view": "timeSeries",
    "stacked": false,
    "metrics": [
    [ "AWS/EC2", "CPUUtilization" ]
    ],
    "title": "전체 EC2 CPU",
    "width": 2310,
    "height": 250,
    "start": "-PT3H",
    "end": "P0D",
    "timezone": "+0900"
    }


    여기서 가장 중요한 것은 metrics 정보를 잘 저장하면 된다.

  2. Slack 으로 전달하기 위해서 token 정보 필요
    Slack bot을 미리 생성하였기 때문에 해당 Slack bot을 이용하였으며, channels 만 변경 한다면 동일 Bot을 이용해도 문제 없을 것이라고 예상
    아래에서 channels 에서 필요한 곳으로 변경 하자.

    slack_params = {

                "filename":"test.png",

                "token":"SLACK BOT의 token 값을 넣어 주시면 됩니다",

                "channels":['#aws-db-slowquery-noti <-와 같이 채널 명칭을 넣어주세요.']

            }

전체 소스

 

Sample_python

import json

import boto3

import requests

 

cloudwatch = boto3.client('cloudwatch')

metric_sources = [

{

        "metrics":

        [ "AWS/EC2", "CPUUtilization"

        ],

        "view": "timeSeries",

        "stacked": False,

        "region": "ap-northeast-2",

        "timezone": "+0900"

    }

]

 

def metric(event, context):

    for metric_data in metric_sources :

        metric_data = json.dumps(metric_data)

         

        #print(metric_data)

        image_data = cloudwatch.get_metric_widget_image(MetricWidget=metric_data)

    #    print(image_data)

         

        slack_params = {

            "filename":"test.png",

            "token":"SLACK BOT의 token 값을 넣어 주시면 됩니다",

            "channels":['#aws-db-slowquery-noti <-와 같이 채널 명칭을 넣어주세요.']

        }

        image = {'file': image_data['MetricWidgetImage']}

         

        requests.post("https://slack.com/api/files.upload", params=slack_params, files=image)

 

        

- 직접 테스트한 내용

 

 

 

lambda_python

import json

import boto3

import requests

 

cloudwatch = boto3.client('cloudwatch')

metric_sources = [

    {

        "metrics": [

                       [ "LogMetrics", "MongoDB-queries time Value", "MongoDB-Primary-Collections", "queries time millisecond", { "stat": "Sum" } ],
            [ ".", "MongoDB-queries time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-queries time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-queries time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-queries time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-readLock time Value", ".", "readLock time millisecond", { "stat": "Sum" } ],
            [ ".", "MongoDB-readLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-readLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-readLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-users-readLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-total time Value", ".", "total time millisecond", { "stat": "Sum" } ],
            [ ".", "MongoDB-total time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-total time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-total time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-users-total time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-writeLock time Value", ".", "writeLock time millisecond", { "stat": "Sum" } ],
            [ ".", "MongoDB-writeLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-writeLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-users-writeLock time Value", ".", ".", { "stat": "Sum" } ],
            [ ".", "MongoDB-writeLock time Value", ".", ".", { "stat": "Sum" } ]

        ],

        "view": "timeSeries",

        "stacked": False,

        "region": "ap-northeast-2",

        "timezone": "+0900"

    },

{

        "metrics": [

        [ "AWS/RDS", "CPUUtilization", "DBInstanceIdentifier", "rds-m1" ],

        [ "...", "rds-s1" ],

        [ "...", "rds-s2" ],

        [ "...", "rds-s3" ],

        [ "...", "rds-s4" ],

        [ "...", "rds-s5" ]

        ],

        "view": "timeSeries",

        "stacked": False,

        "region": "ap-northeast-2",

        "timezone": "+0900"

    }

]

 

def metric(event, context):

    for metric_data in metric_sources :

        metric_data = json.dumps(metric_data)

         

        #print(metric_data)

        image_data = cloudwatch.get_metric_widget_image(MetricWidget=metric_data)

    #    print(image_data)

         

        slack_params = {

            "filename":"test.png",

            "token":"SLACK BOT의 token 값을 넣어 주시면 됩니다",

            "channels":['#aws-db-slowquery-noti <-와 같이 채널 명칭을 넣어주세요.']

        }

        image = {'file': image_data['MetricWidgetImage']}

         

        requests.post("https://slack.com/api/files.upload", params=slack_params, files=image)

 

        

dev-hyungi-cloudwatch-screenshot.zip
0.87MB

 

 - 전체 소스에 대한 zip파일 (해당 파일을 Lambda에 업로드 하면 필요한 모듈을 한번에 upload

 

 

출처

 https://brunch.co.kr/@alden/53

반응형

'AWS' 카테고리의 다른 글

[Amazon MemoryDB for Redis] 정리 및 비교  (0) 2021.12.22
[Amazon MemoryDB for Redis] 정리 및 비교  (0) 2021.12.22
반응형

안녕하세요.

RDS Slow query 를 수집하기 위해서 알아보던 도중(예전에 김종열 팀장님 도움으로 python으로 수집하던 소스는 있지만, UI로 보여주고 싶은 마음에) ELK 를 이용하면 제가 원하는 화면을 보여줄 수 있을 것 같아서 시작하게 되었습니다.

 

EC2에 ELK를 설치 후 Filebeat 를 이용하면 가능했지만, 아래와 같은 문제점이 있었습니다.

 

1. Slowquery를 다운 받아 진행하게 된다면, 실시간이 아닌 배치 형태로 진행이라 실시간으로 보기 원하는 부분에 대해 니즈에 부합

2. ELK 에 대해 유지보수 필요

3. 다운 받은 Slowquery 에 대해 관리 필요

 

등으로 인해 AWS ELK를 찾아보게 되었습니다.

 

다행히 AWS ELK는 정말 만들기 쉽더군요. (ec2에서 ELK도 어렵운건 아니지만..)

이후 아래 영상을 통해 cloudwatch상에 올라가 있는 slowquery를 ELK로 연동하는 것도 쉽게 할 수 있었습니다.

 

Amazon Elastcsearch Service 소개 및 활용 방법 (윤석찬)

2016년 월간 웨비나 2월 - Amazon Elastcsearch Service 소개 및 활용 방법 (윤석찬)

www.slideshare.net

위의 연동을 간단하게 이미지로 정리하면 아래와 같습니다.

빨간색 네모로 표신한 정도가 됩니다.

 

RDS Slow query 발생 -> Cloudwatch Log -> Lambda(AWS제공) -> Elasticsearch Service(ES) -> Kibana

이제 아래는 Cloudwatch Slow log를 ELK 로 연동하는 설정 부분입니다.

 

1. Cloudwatch 의 로그 목록 선택 후 (slow query 선택 후 Elasticsearch Service 측 스트림 선택)

2. 생성한  ES 선택(Elasticsearch Service)

3. 로그 형식 및 필터 구성 시 기타로 설정
   기타로 선택하는 이유는 여기서 필터패턴을 하게 되면 Slowquery의 여러 형태로 인해 원치 않은 데이터가 다른 형태로 필터가 되는 현상 발생
    또한, cloudwatch 로그 형식 필터에서 아직까지 정규화 표현식을 제공하지 않으므로 slowquery를 필터 적용하기에는 맞지 않음(띄어쓰기 필터 등만 지원)

 

4. 이후 별다른 설정 없이 진행(스트리밍 시작)

- 구독되고 있는 것을 확인 가능(여기서 중요한 것은 모든 cloudwatch 를 이용해서 ES 로 보내는 작업-비단 Slowquery 뿐만 아니라 모든 작업/ 은 동일한 Lambda호출/ 2번의 캡쳐 화면을 보면 Lambda함수에 대한 내용이 있음)

 

하지만 Cloudwatch에서 제공하는 필터는 RDS slowquery 형태에 맞지 않습니다. 더군다나 쿼리들마다 형태가 다르다 보니 정영화 되지 않은 데이터를 일괄적인 형태로 보여지는 것에 대해서는 한계가 있더군요.

 

더불어 여러 RDS에서 들어오는 slowquery를 AWS에서 제공하는 하나의 lambda(node.js) 에서만 처리가 되니 어느 DB에서 생긴 slowquery인지 확인이 어렵더군요.

 

다행히, AWS Support 에 문의 하였더니 Lambda 수정의 방법과 기존 고려했던 export 받은 후 처리 2가지 방법을 제시하였으며, Lambda의 경우 친절하게 수정한 코더도 주시더군요!!!

 

하지만, Lambda 수정은 제 몫이기에 처음으로 접해보는 node.js를 잡고 끙끙대기 시작했습니다.

console.log 를 이용한 일일이 로그 찍어가며 테스트하며 수정을 했습니다.

 

아래는 수정한 Lambda입니다. 개발자 분들이 보시면 얼마나 욕할지...ㅠㅠ하지만 저는 개발자가 아니기에...당당히 소스를 공유 합니다.

수정하셔도 되고 그대로 적용하셔도 됩니다.(무슨 근자감...)

 

수정 부분은 2군데 입니다.

1. 여기는 rds의 네임을 cwl 뒤에 작성해 줌으로서 구분이 가능해 집니다.(AWS Jihyun B. 님이 제공)

        // index name format: cwl-YYYY.MM.DD Slowquery modify 1
/*       
        var indexName = [
            'cwl-' + timestamp.getUTCFullYear(),              // year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');
*/
        var indexName = [
            'cwl' + payload.logGroup.toLowerCase().split('/').join('-') + '-' + timestamp.getUTCFullYear(),              // log group + year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');

2. 여기는 정규표현식을 이용하여 slow query 를 파싱 및 필터를 적용한 후 맵핑하는 곳입니다. 참고로 저는 node.js 는 처음이며, 정규표현식도 모릅니다..ㅠ그래서 하드 코딩을 했는데...동적으로 코딩 하기에는 머리가 안돌아가서 그냥 포기했습니다. 이쁘게 사용하실 분은 수정하셔도 됩니다. 

 

message는 slow 쿼리의 전문이 들어오는 곳이며, extractedFields 는 맵핑하는 형태가 들어오는 곳인데, 우리가 필터를 기타로 한 후 작성안하고 넘기기 때문에 if 문에 걸리지 않고 else 로 빠지는 것입니다.

 

function buildSource(message, extractedFields) {

...
if 문 끝에 else로 추가
...

//Slowquery add
    else {
        console.log('Slow query Regular expression')
 
        var qualityRegex = /User@Host: ([^&@]+) /igm;
        var ipRegex = /\d+\.\d+\.\d+\.\d+/igm;
        var querytimeReg = /Query_time: ([^& ]+)/igm;
        var locktimeReg = /Lock_time: ([^& ]+)/igm;
        var querytimeReg = /Query_time: ([^& ]+)/igm;
        var rowsentReg = /Rows_sent: ([\d^&]+)/igm;
        var rowexaminedReg = /Rows_examined: ([\d^&]+)/igm;
        var slowqueryReg = /select ([^&;]+)|update ([^&;]+)|delete ([^&;]+)/igm;
 
        var userhost, ip, querytime, querylock, rowsent, rowexaminge, slowquery ='';
 
        var matches, qualities = [];
        var source = {};
         
        while (matches = qualityRegex.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
         
        userhost = qualities[0];
        ip = ipRegex.exec(message)[0];
 
         
        matches, qualities = [];
         
        while (matches = querytimeReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        querytime = qualities[0];
         
         
        matches, qualities = [];
         
        while (matches = locktimeReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
         
        querylock = qualities[0];
         
        matches, qualities = [];
         
        while (matches = rowsentReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        rowsent = qualities[0];
         
        matches, qualities = [];
         
        while (matches = rowexaminedReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        rowexamined = qualities[0];
         
        slowquery = slowqueryReg.exec(message)[0];
 
        console.log(userhost);
        console.log(ip);
        console.log(querytime);
        console.log(querylock);
        console.log(rowsent);
        console.log('hyungi rowexaminge: ',rowexamined);
        console.log('hyungi query :',slowquery);
         
        source['User@Host'] = userhost;
        source['IP'] = ip;
        source['Query_time'] = 1 * querytime;
        source['Lock_time'] = 1 * querylock;
        source['Rows_sent'] = 1 * rowsent;
        source['Rows_examined'] = 1 * rowexamined;
        source['Query'] = slowquery;
 
        console.log('Slow query Filter complete : ', source)
         
        return source;
    }

 

아래는 전체 Lambda 소스 입니다.( LogsToElasticsearch_st-elk-slowquery )

// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');
 
var endpoint = 'search-st-elk-slowquery-edn6wa74e2h3zchmep6lbu2xky.ap-northeast-2.es.amazonaws.com';
 
// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;
 
exports.handler = function(input, context) {
    // decode input from base64
    var zippedInput = new Buffer.from(input.awslogs.data, 'base64');
     
    // decompress the input
    zlib.gunzip(zippedInput, function(error, buffer) {
        if (error) { context.fail(error); return; }
 
        // parse the input from JSON
        var awslogsData = JSON.parse(buffer.toString('utf8'));
 
        // transform the input to Elasticsearch documents
        var elasticsearchBulkData = transform(awslogsData);
 
        // skip control messages
        if (!elasticsearchBulkData) {
            console.log('Received a control message');
            context.succeed('Control message handled successfully');
            return;
        }
 
        // post documents to the Amazon Elasticsearch Service
        post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
            console.log('Response: ' + JSON.stringify({
                "statusCode": statusCode
            }));
 
            if (error) {
                logFailure(error, failedItems);
                context.fail(JSON.stringify(error));
            } else {
                console.log('Success: ' + JSON.stringify(success));
                context.succeed('Success');
            }
        });
    });
};
 
function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null;
    }
    var bulkRequestBody = '';
 
    payload.logEvents.forEach(function(logEvent) {
        var timestamp = new Date(1 * logEvent.timestamp);
 
        // index name format: cwl-YYYY.MM.DD Slowquery modify 1
/*       
        var indexName = [
            'cwl-' + timestamp.getUTCFullYear(),              // year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');
*/
        var indexName = [
            'cwl' + payload.logGroup.toLowerCase().split('/').join('-') + '-' + timestamp.getUTCFullYear(),              // log group + year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');
         
        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;
 
        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = payload.logGroup;
        action.index._id = logEvent.id;
         
        bulkRequestBody += [
            JSON.stringify(action),
            JSON.stringify(source),
        ].join('\n') + '\n';
    });
    return bulkRequestBody;
}
 
function buildSource(message, extractedFields) {
 
    if (extractedFields) {
        var source = {};
 
        for (var key in extractedFields) {
            if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
                var value = extractedFields[key];
 
                if (isNumeric(value)) {
                    source[key] = 1 * value;
                    continue;
                }
 
                jsonSubString = extractJson(value);
                if (jsonSubString !== null) {
                    source['$' + key] = JSON.parse(jsonSubString);
                }
 
                source[key] = value;
            }
        }
        return source;
    }
//Slowquery add
    else {
        console.log('Slow query Regular expression')
 
        var qualityRegex = /User@Host: ([^&@]+) /igm;
        var ipRegex = /\d+\.\d+\.\d+\.\d+/igm;
        var querytimeReg = /Query_time: ([^& ]+)/igm;
        var locktimeReg = /Lock_time: ([^& ]+)/igm;
        var querytimeReg = /Query_time: ([^& ]+)/igm;
        var rowsentReg = /Rows_sent: ([\d^&]+)/igm;
        var rowexaminedReg = /Rows_examined: ([\d^&]+)/igm;
        var slowqueryReg = /select ([^&;]+)|update ([^&;]+)|delete ([^&;]+)/igm;
 
        var userhost, ip, querytime, querylock, rowsent, rowexaminge, slowquery ='';
 
        var matches, qualities = [];
        var source = {};
         
        while (matches = qualityRegex.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
         
        userhost = qualities[0];
        ip = ipRegex.exec(message)[0];
 
         
        matches, qualities = [];
         
        while (matches = querytimeReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        querytime = qualities[0];
         
         
        matches, qualities = [];
         
        while (matches = locktimeReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
         
        querylock = qualities[0];
         
        matches, qualities = [];
         
        while (matches = rowsentReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        rowsent = qualities[0];
         
        matches, qualities = [];
         
        while (matches = rowexaminedReg.exec(message)) {
            qualities.push(decodeURIComponent(matches[1]));
        }
        rowexamined = qualities[0];
         
        slowquery = slowqueryReg.exec(message)[0];
 
        console.log(userhost);
        console.log(ip);
        console.log(querytime);
        console.log(querylock);
        console.log(rowsent);
        console.log('hyungi rowexaminge: ',rowexamined);
        console.log('hyungi query :',slowquery);
         
        source['User@Host'] = userhost;
        source['IP'] = ip;
        source['Query_time'] = 1 * querytime;
        source['Lock_time'] = 1 * querylock;
        source['Rows_sent'] = 1 * rowsent;
        source['Rows_examined'] = 1 * rowexamined;
        source['Query'] = slowquery;
 
        console.log('Slow query Filter complete : ', source)
         
        return source;
    }
 
    jsonSubString = extractJson(message);
    if (jsonSubString !== null) {
        return JSON.parse(jsonSubString);
    }
 
    return {};
}
 
function extractJson(message) {
    var jsonStart = message.indexOf('{');
    if (jsonStart < 0) return null;
    var jsonSubString = message.substring(jsonStart);
    return isValidJson(jsonSubString) ? jsonSubString : null;
}
 
function isValidJson(message) {
    try {
        JSON.parse(message);
    } catch (e) { return false; }
    return true;
}
 
function isNumeric(n) {
    return !isNaN(parseFloat(n)) && isFinite(n);
}
 
function post(body, callback) {
    var requestParams = buildRequest(endpoint, body);
 
    var request = https.request(requestParams, function(response) {
        var responseBody = '';
        response.on('data', function(chunk) {
            responseBody += chunk;
        });
 
        response.on('end', function() {
            var info = JSON.parse(responseBody);
            var failedItems;
            var success;
            var error;
             
            if (response.statusCode >= 200 && response.statusCode < 299) {
                failedItems = info.items.filter(function(x) {
                    return x.index.status >= 300;
                });
 
                success = {
                    "attemptedItems": info.items.length,
                    "successfulItems": info.items.length - failedItems.length,
                    "failedItems": failedItems.length
                };
            }
 
            if (response.statusCode !== 200 || info.errors === true) {
                // prevents logging of failed entries, but allows logging
                // of other errors such as access restrictions
                delete info.items;
                error = {
                    statusCode: response.statusCode,
                    responseBody: info
                };
            }
 
            callback(error, success, response.statusCode, failedItems);
        });
    }).on('error', function(e) {
        callback(e);
    });
    request.end(requestParams.body);
}
 
function buildRequest(endpoint, body) {
    var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
    var region = endpointParts[2];
    var service = endpointParts[3];
    var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
    var date = datetime.substr(0, 8);
    var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
    var kRegion = hmac(kDate, region);
    var kService = hmac(kRegion, service);
    var kSigning = hmac(kService, 'aws4_request');
     
    var request = {
        host: endpoint,
        method: 'POST',
        path: '/_bulk',
        body: body,
        headers: {
            'Content-Type': 'application/json',
            'Host': endpoint,
            'Content-Length': Buffer.byteLength(body),
            'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
            'X-Amz-Date': datetime
        }
    };
 
    var canonicalHeaders = Object.keys(request.headers)
        .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
        .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
        .join('\n');
 
    var signedHeaders = Object.keys(request.headers)
        .map(function(k) { return k.toLowerCase(); })
        .sort()
        .join(';');
 
    var canonicalString = [
        request.method,
        request.path, '',
        canonicalHeaders, '',
        signedHeaders,
        hash(request.body, 'hex'),
    ].join('\n');
 
    var credentialString = [ date, region, service, 'aws4_request' ].join('/');
 
    var stringToSign = [
        'AWS4-HMAC-SHA256',
        datetime,
        credentialString,
        hash(canonicalString, 'hex')
    ] .join('\n');
 
    request.headers.Authorization = [
        'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
        'SignedHeaders=' + signedHeaders,
        'Signature=' + hmac(kSigning, stringToSign, 'hex')
    ].join(', ');
 
    return request;
}
 
function hmac(key, str, encoding) {
    return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}
 
function hash(str, encoding) {
    return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}
 
function logFailure(error, failedItems) {
    if (logFailedResponses) {
        console.log('Error: ' + JSON.stringify(error, null, 2));
 
        if (failedItems && failedItems.length > 0) {
            console.log("Failed Items: " +
                JSON.stringify(failedItems, null, 2));
        }
    }
}

 

이렇게 하면  Elastisearch 에서 index 가 생성 되었는지 체크하면 됩니다.

 

- Slowquery 가 발생해야 생성이 되므로 Lamdba 까지 수정이 완료 되었다면 제대로 들어 오는지 체크를 위해 DB에서 select sleep(30); 으로 슬로우 쿼리 생성 진행
- Lamdba 가 호출 되어서 진행 되었는지 Cloudwatch 통해서 확인 가능(수정한 Lamdba 로그가 생성 되었는지 체크도 가능

 

이렇게 하여 RDS 모든 서비스에 대해 Slow쿼리를 Kibana통해서 확인이 되네요.

뿌듯하네요! 이제 Slowquery 모니터링 합시다.

 

 

참고

 

- node.js 정규화표현식

https://developer.mozilla.org/ko/docs/Web/JavaScript/Guide/%EC%A0%95%EA%B7%9C%EC%8B%9D

 

정규 표현식

정규 표현식은 문자열에 나타는 특정 문자 조합과 대응시키기 위해 사용되는 패턴입니다. 자바스크립트에서, 정규 표현식 또한 객체입니다.  이 패턴들은 RegExp의 exec 메소드와 test 메소드  ,그리고 String의  match메소드 , replace메소드 , search메소드 ,  split 메소드와 함께 쓰입니다 . 이 장에서는 자바스크립트의 정규식에 대하여 설명합니다.

developer.mozilla.org

- node.js 를 직접 코딩하여 정규표현식 통해서 확인이 가능한지 간단하게 테스트 할 수 있습니다.

https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/RegExp

 

RegExp

The RegExp constructor creates a regular expression object for matching text with a pattern.

developer.mozilla.org

- node.js 정규표현식
https://stackoverflow.com/questions/7954022/javascript-regular-expression-multiple-match

 

Javascript Regular Expression multiple match

I'm trying to use javascript to do a regular expression on a url (window.location.href) that has query string parameters and cannot figure out how to do it. In my case, there is a query string para...

stackoverflow.com

- 정규표현식 테스트
https://regexr.com/

 

RegExr: Learn, Build, & Test RegEx

RegExr is an online tool to learn, build, & test Regular Expressions (RegEx / RegExp).

regexr.com

 

반응형
반응형

안녕하세요.


AWS 상에서 운영을 하다 보니 Cloudwatch를 통하여 모니터링을 많이하게 되었습니다.

더군다나 EC2 위에 올린 MongoDB 모니터링이 필요하여 고민하던 도중 cloudwatch를 이용하기로 결정하고 python으로 개발 하였습니다.


MongoDB의 모니터링은 여러 방법이 많지만,

저는 mongostat / mongotop 을 이용하기로 하였습니다.


MongoDB에 대해서는 아키텍처와 지난번 Real MongoDB 세미나 참석하여 얻은 배움이 전부라 쉽지 않았습니다.


참고로 모니터링 관련 자료는 아래를 보고 툴 대신에 mongostat / top 을 하기로 한 것입니다.

https://docs.mongodb.com/manual/administration/monitoring/


해당 스크립트는 아래 github 스크립트를 custom 한 것입니다.(저작권 관련해서는 어떻게 되는지 모르겠네요..ㅠ)



Mongotop 의 경우도 개발한 mongostat를 커스텀하여 개발한 것입니다.

수집하는 데이터는 기본적으로 현재는 query/ insert / delete / update 관련이며, mongotop 의 경우 주요 테이블을 list하여 수집하였으며, 수집 데이터는 total time / readlock time / writelock time / queries time 을 수집하는 형태로 하였습니다.

또한 1초 단위로 수집한 시간을 뺀 것을 표시하였습니다. (표시 데이터 = 현재 수집한 시간 상태 - 1초전 수집한 데이터)

1회용 수집이기 때문에 crontab 에 등록해서 주기적으로 수집하면 됩니다.

혹여나 문의 사항 있으시면 댓글 또는 happy8510@gmail.com 으로 연락 주시거나 facebook 통해서 연락 주시면 도움 드릴 수 있는 부분에 대해서 도움 드리겠습니다.(개발자가 아니라서 개발 문의 사항은 한계가 있습니다..ㅠㅠ)

cloudwatch에 올리기 위해서는 미리 aws-config 설정 하시기 바랍니다.




* mongostat.py

#!/usr/bin/env python
import argparse
#hyungi unicode incoding
import commands
import datetime, os, time, sys, random
import boto3
 
from pymongo import MongoClient
from pymongo.errors import OperationFailure
from pymongo.errors import ConnectionFailure
from pymongo.errors import ServerSelectionTimeoutError
 
# Version number
PYTHON_MONGOSTAT_VERSION = "0.0.1"
 
# Not authorized error
MONGO2_NOT_AUTH = "unauthorized"
MONGO3_NOT_AUTH = "not authorized"
 
# Authentication failure message
MONGO3_AUTH_FAILUR = "Authentication failed"
 
cloudwatch = boto3.client('cloudwatch')
 
class MongoInstance():
    'Class for mongodb instance'
 
    def __init__(self, host, port, username, password):
        'Initialize the mongodb instance information and create connection to it.'
 
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.stats_info = {}
        selfurl='mongodb://%s:%s@%s:%s/admin' % (self.username, self.password, self.host,self.port)
 
        # Create connection to mongodb server
        try:
            #url change
            #self.connection = MongoClient(self.host, self.port)
            self.connection = MongoClient(selfurl)
 
        except ConnectionFailure:
            print "Connection error: create connection to mongodb instance failed."
            exit(1)
 
        # Get the mongodb version
        try:
            server_info = self.connection.server_info()
            self.version = server_info['version']
        except ServerSelectionTimeoutError:
            print "Timeout error: get server information of mongodb instance timeout."
 
        return
 
 
    def try_stats_command(self):
        'Try to execute the serverStatus command to see if authentication required.'
 
        # Execute the serverStatus command at first and handle possible exceptions
        errmsg = server_status = server_status2 = {}
        sleep = 1
        admin = self.connection.admin
        try:
            server_status = admin.command({"serverStatus":1})
            time.sleep(sleep)
            server_status2= admin.command({"serverStatus":1})
           # server_status = ','.joint(tmp_server_status)
        except OperationFailure, op_failure:
            errmsg = op_failure.details
        except:
            print "Execution error: get server status of mongodb instance failed."
            exit(1)
 
        #print server_status
        print 'errmsg :' + str(errmsg)
 
        # Check to see if the mongodb server enables authentication
        if errmsg != {}:
            if errmsg['errmsg'].find(MONGO2_NOT_AUTH) == -1 and errmsg['errmsg'].find(MONGO3_NOT_AUTH) == -1:
                print "Execution error: %s." % errmsg['errmsg']
                exit(1)
            else:
                # Authenticate with the given username and password
                try:
                    admin.authenticate(self.username, self.password)
                except OperationFailure, op_failure:
                    print "Execution error: authenticate to mongodb instance failed."
                    exit(1)
                # Try to execute the serverStatus command again
                try:
                    server_status = admin.command({"serverStatus":1})
                    time.sleep(sleep)
                    server_status2= admin.command({"serverStatus":1})
                except OperationFailure, op_failure:
                    print "Execution error: %s." % op_failure.details['errmsg']
                    exit(1)
 
        thetime = datetime.datetime.now().strftime("%d-%m-%Y.%H:%M:%S")
        cmd = "cat /proc/loadavg"
        out = commands.getstatusoutput(cmd)
        load = out[1].split()[0]
 
        pq = 0
        pi = 0
        pu = 0
        pd = 0
        pgm = 0
 
        = 0
        = 0
        = 0
        = 0
        gm= 0
        glativeW = 0
        glativeR = 0
        # hyungi
        #lok = round(float(server_status[u'globalLock'][u'ratio']),2)
        res = int(server_status[u'mem'][u'resident'])
        vir = int(server_status[u'mem'][u'virtual'])
        mapd = int(server_status[u'mem'][u'mapped'])
 
        #past "sleep" ago status
        pq = int(server_status[u'opcounters'][u'query'])
        pi = int(server_status[u'opcounters'][u'insert'])
        pu = int(server_status[u'opcounters'][u'update'])
        pd = int(server_status[u'opcounters'][u'delete'])
        pgm = int(server_status[u'opcounters'][u'getmore'])
        pcon = int(server_status[u'connections'][u'current'])
 
        #now status
        = int(server_status2[u'opcounters'][u'query'])
        = int(server_status2[u'opcounters'][u'insert'])
        = int(server_status2[u'opcounters'][u'update'])
        = int(server_status2[u'opcounters'][u'delete'])
        gm = int(server_status[u'opcounters'][u'getmore'])
        con = int(server_status2[u'connections'][u'current'])
 
        glactiveW = int(server_status[u'globalLock'][u'activeClients'][u'writers'])
        glactiveR = int(server_status[u'globalLock'][u'activeClients'][u'readers'])
 
        template="%12s%22s%12s%12s%12s%12s%12s%12s%12s%12s%12s%12s%12s%12s"
        header=('hostname''time''insert''query''update',  \
                'delete''getmore','active con',  'resident', \
                'virtual','mapped','load''Act Writer''Act Reader')
 
        cloudwatch.put_metric_data(
            MetricData=[
                {
                    'MetricName''MongoDB-Insert Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Insert'
                        },
                    ],
                    'Unit''None',
                    'Value': (i-pi)/sleep
                },
                {
                    'MetricName''MongoDB-Query Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Query'
                        },
                    ],
                    'Unit''None',
                    'Value': (q-pq)/sleep
                },
                {
                    'MetricName''MongoDB-Query Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Delete'
                        },
                    ],
                    'Unit''None',
                    'Value': (d-pd)/sleep
                },
                {
                    'MetricName''MongoDB-Query Value',
                    'Dimensions': [
                        {
                            'Name''MongoDB-Primary',
                            'Value''Update'
                        },
                    ],
                    'Unit''None',
                    'Value': (u-pu)/sleep
                },
            ],
            Namespace='LogMetrics'
        )
 
        server_statusstr="hostname, thetime, (i-pi)/sleep, (q-pq)/sleep, (u-pu)/sleep, (d-pd)/sleep, (gm-pgm)/sleep, con, res, vir, mapd, load, glactiveW, glactiveR"
 
        print template % header
        print template % (eval(server_statusstr))
 
def mongostat_arg_check(args):
    'Check the given arguments to make sure they are valid.'
 
    # Make sure the rowcount not negative integer
    if args.rowcount and args.rowcount < 0:
        return False"number of stats line to print can not be negative."
 
    # Make sure both username and password should be given, or neither
    if args.username and not args.password:
        return False"only username given, without password."
    if not args.username and args.password:
        return False"only password given, without username."
 
    # Make sure the hostname is valid
    if args.host:
        hostinfo = args.host.split(':')
        if len(hostinfo) > 2:
            return False"invalid mongodb host, only HOSTNAME of HOSTNAME:PORT acceptable."
        if len(hostinfo) == 2:
            try:
                port = int(hostinfo[1])
                if args.port and args.port != port:
                    return False"ports given by port option and host option not match."
            except ValueError:
                return False"invalid mongodb host, the port part not integer."
 
    return TrueNone
 
def mongostat_start(host, port, username, password, rowcount, noheaders, json):
    'Start monitor the mongodb server status and output stats one time per second.'
 
    # Create mongodb instance and make sure we can execute the serverStatus command correctly
    mongo_instance = MongoInstance(host, port, username, password)
    mongo_instance.try_stats_command()
    # print mongo_instance.host, mongo_instance.port, mongo_instance.version, mongo_instance.storage_engine
 
if __name__ == '__main__':
    # Default configurations
    hostname, username, password    = '호스트정보''유저''비밀번호'
    port, rowcount                  = 포트, 0
    noheaders, json                 = FalseFalse
 
    # Define a argument parser for all possible options
    parser = argparse.ArgumentParser(description="Monitor basic MongoDB server statistics.")
    parser.add_argument("--version"help="print the tool version and exit", action="store_true")
    parser.add_argument("--host"help="mongodb host to connect to")
    parser.add_argument("--port"help="server port (can also use --host HOSTNAME:PORT)"type=int)
    parser.add_argument("-u""--username"help="username for authentication")
    parser.add_argument("-p""--password"help="password for authentication")
    parser.add_argument("--noheaders"help="don't output column names", action="store_true")
    parser.add_argument("-n""--rowcount"help="number of stats lines to print (0 for indefinite)"type=int)
    parser.add_argument("--json"help="output as JSON rather than a formatted table", action="store_true")
 
    # Parse all the given options and make sure they are valid
    arguments = parser.parse_args()
    if arguments.version:
        print "Python mongostat version: %s" % PYTHON_MONGOSTAT_VERSION
        exit(0)
    ok, errmsg = mongostat_arg_check(arguments)
    if ok == False:
        print "Argument error: %s" % errmsg
        exit(1)
 
    # Get the given arguments
    if arguments.host:
        hostinfo = arguments.host.split(':')
        hostname = hostinfo[0]
        if len(hostinfo) == 2:
            port = int(hostinfo[1])
    if arguments.port:
        port = arguments.port
    if arguments.username:
        # We make sure username and password must both given or neither in mongostat_arg_check
        username = arguments.username
        password = arguments.password
    if arguments.rowcount:
        rowcount = arguments.rowcount
    if arguments.noheaders:
        noheaders = True
    if arguments.json:
        json = True
 
    # Start the mongostat
    mongostat_start(hostname, port, username, password, rowcount, noheaders, json)


* mongotop.py


#!/usr/bin/env python
import argparse
#hyungi unicode incoding
import commands
import datetime, os, time, sys, random
import boto3
 
from pymongo import MongoClient
from pymongo.errors import OperationFailure
from pymongo.errors import ConnectionFailure
from pymongo.errors import ServerSelectionTimeoutError
 
# Version number
PYTHON_MONGOSTAT_VERSION = "0.0.1"
 
# Not authorized error
MONGO2_NOT_AUTH = "unauthorized"
MONGO3_NOT_AUTH = "not authorized"
 
# Authentication failure message
MONGO3_AUTH_FAILUR = "Authentication failed"
 
cloudwatch = boto3.client('cloudwatch')
#Hyungi / 5 collections select
lstCollection = ['collectionname1','collectionname2','collectionname3','collectionname4','collectionname5']
 
class MongoInstance():
    'Class for mongodb instance'
 
    def __init__(self, host, port, username, password):
        'Initialize the mongodb instance information and create connection to it.'
 
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.stats_info = {}
        selfurl='mongodb://%s:%s@%s:%s/admin' % (self.username, self.password, self.host,self.port)
 
        # Create connection to mongodb server
        try:
            #hyungi
            #url change
            #self.connection = MongoClient(self.host, self.port)
            self.connection = MongoClient(selfurl)
 
        except ConnectionFailure:
            print "Connection error: create connection to mongodb instance failed."
            exit(1)
 
        # Get the mongodb version
        try:
            server_info = self.connection.server_info()
            self.version = server_info['version']
        except ServerSelectionTimeoutError:
            print "Timeout error: get server information of mongodb instance timeout."
 
        return
 
 
    def try_stats_command(self):
        'Try to execute the serverStatus command to see if authentication required.'
 
        # Execute the serverStatus command at first and handle possible exceptions
        errmsg = server_status = server_status2 = {}
        sleep = 1
        admin = self.connection.admin
        try:
            server_status = admin.command({"top":1})
            time.sleep(sleep)
            server_status2= admin.command({"top":1})
            #server_status2= admin.command({"serverStatus":1})
        except OperationFailure, op_failure:
            errmsg = op_failure.details
        except:
            print "Execution error: get server status of mongodb instance failed."
            exit(1)
 
        #print server_status
        print 'errmsg :' + str(errmsg)
 
        # Check to see if the mongodb server enables authentication
        if errmsg != {}:
            if errmsg['errmsg'].find(MONGO2_NOT_AUTH) == -1 and errmsg['errmsg'].find(MONGO3_NOT_AUTH) == -1:
                print "Execution error: %s." % errmsg['errmsg']
                exit(1)
            else:
                # Authenticate with the given username and password
                try:
                    admin.authenticate(self.username, self.password)
                except OperationFailure, op_failure:
                    print "Execution error: authenticate to mongodb instance failed."
                    exit(1)
                # Try to execute the serverStatus command again
                try:
                    server_status = admin.command({"top":1})
                    time.sleep(sleep)
                    server_status2= admin.command({"top":1})
                except OperationFailure, op_failure:
                    print "Execution error: %s." % op_failure.details['errmsg']
                    exit(1)
 
        thetime = datetime.datetime.now().strftime("%d-%m-%Y.%H:%M:%S")
        cmd = "cat /proc/loadavg"
        out = commands.getstatusoutput(cmd)
        load = out[1].split()[0]
 
        for strCollist in lstCollection :
            tmpName = 'rocketchat.%s' % strCollist
            print tmpName
            ptotaltime = int(server_status[u'totals'][tmpName][u'total'][u'time'])
            totaltime = int(server_status2[u'totals'][tmpName][u'total'][u'time'])
 
            prelock = int(server_status[u'totals'][tmpName][u'readLock'][u'time'])
            relock = int(server_status2[u'totals'][tmpName][u'readLock'][u'time'])
 
            pwrlock = int(server_status[u'totals'][tmpName][u'writeLock'][u'time'])
            wrlock = int(server_status2[u'totals'][tmpName][u'writeLock'][u'time'])
 
            pquery = int(server_status[u'totals'][tmpName][u'queries'][u'time'])
            query = int(server_status2[u'totals'][tmpName][u'queries'][u'time'])
 
            strMetric_total_Name='MongoDB-%s-%s Value' % (strCollist,'total time')
            strMetric_read_lock_Name='MongoDB-%s-%s Value' % (strCollist,'readLock time')
            strMetric_write_lock_Name='MongoDB-%s-%s Value' % (strCollist,'writeLock time')
            strMetric_query_Name='MongoDB-%s-%s Value' % (strCollist,'queries time')
 
            cloudwatch.put_metric_data(
                MetricData=[
                    {
                        'MetricName': strMetric_total_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''total time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (totaltime-ptotaltime)/1000
                    },
                    {
                        'MetricName': strMetric_read_lock_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''readLock time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (relock-prelock)/1000
                    },
                    {
                        'MetricName': strMetric_write_lock_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''writeLock time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (wrlock-pwrlock)/1000
                    },
                    {
                        'MetricName': strMetric_query_Name,
                        'Dimensions': [
                            {
                                'Name''MongoDB-Primary-Collections',
                                'Value''queries time millisecond'
                            },
                        ],
                        'Unit''None',
                        'Value': (query-pquery)/1000
                    },
                ],
                Namespace='LogMetrics'
            )
 
            template="%12s%12s%12s%12s"
            header=('totime''relock''wrlock''query')
 
            server_statusstr="(totaltime-ptotaltime)/1000, (relock-prelock)/1000, (wrlock-pwrlock)/1000, (query-pquery)/1000"
 
            print template % header
            print template % (eval(server_statusstr))
 
def mongostat_arg_check(args):
    'Check the given arguments to make sure they are valid.'
 
    # Make sure the rowcount not negative integer
    if args.rowcount and args.rowcount < 0:
        return False"number of stats line to print can not be negative."
 
    # Make sure both username and password should be given, or neither
    if args.username and not args.password:
        return False"only username given, without password."
    if not args.username and args.password:
        return False"only password given, without username."
 
    # Make sure the hostname is valid
    if args.host:
        hostinfo = args.host.split(':')
        if len(hostinfo) > 2:
            return False"invalid mongodb host, only HOSTNAME of HOSTNAME:PORT acceptable."
        if len(hostinfo) == 2:
            try:
                port = int(hostinfo[1])
                if args.port and args.port != port:
                    return False"ports given by port option and host option not match."
            except ValueError:
                return False"invalid mongodb host, the port part not integer."
 
    return TrueNone
 
def mongostat_start(host, port, username, password, rowcount, noheaders, json):
    'Start monitor the mongodb server status and output stats one time per second.'
 
    # Create mongodb instance and make sure we can execute the serverStatus command correctly
    mongo_instance = MongoInstance(host, port, username, password)
    mongo_instance.try_stats_command()
    # print mongo_instance.host, mongo_instance.port, mongo_instance.version, mongo_instance.storage_engine
 
if __name__ == '__main__':
    # Default configurations
    hostname, username, password    = '접속ip''유저명''비밀번호'
    port, rowcount                  = 포트, 0
    noheaders, json                 = FalseFalse
 
    # Define a argument parser for all possible options
    parser = argparse.ArgumentParser(description="Monitor basic MongoDB server statistics.")
    parser.add_argument("--version"help="print the tool version and exit", action="store_true")
    parser.add_argument("--host"help="mongodb host to connect to")
    parser.add_argument("--port"help="server port (can also use --host HOSTNAME:PORT)"type=int)
    parser.add_argument("-u""--username"help="username for authentication")
    parser.add_argument("-p""--password"help="password for authentication")
    parser.add_argument("--noheaders"help="don't output column names", action="store_true")
    parser.add_argument("-n""--rowcount"help="number of stats lines to print (0 for indefinite)"type=int)
    parser.add_argument("--json"help="output as JSON rather than a formatted table", action="store_true")
 
    # Parse all the given options and make sure they are valid
    arguments = parser.parse_args()
    if arguments.version:
        print "Python mongostat version: %s" % PYTHON_MONGOSTAT_VERSION
        exit(0)
    ok, errmsg = mongostat_arg_check(arguments)
    if ok == False:
        print "Argument error: %s" % errmsg
        exit(1)
 
    # Get the given arguments
    if arguments.host:
        hostinfo = arguments.host.split(':')
        hostname = hostinfo[0]
        if len(hostinfo) == 2:
            port = int(hostinfo[1])
    if arguments.port:
        port = arguments.port
    if arguments.username:
        # We make sure username and password must both given or neither in mongostat_arg_check
        username = arguments.username
        password = arguments.password
    if arguments.rowcount:
        rowcount = arguments.rowcount
    if arguments.noheaders:
        noheaders = True
    if arguments.json:
        json = True
 
    # Start the mongostat
    mongostat_start(hostname, port, username, password, rowcount, noheaders, json)

반응형
반응형




해당 내용은 [Amazon Redshift 아키텍처 및 모범사례::김민성::AWS Summit Seoul 2018] 에서 발표한 동영상을 보며 정리한 내용입니다.
자세한 내용은 아래 링크 가셔서 동영상과 ppt 를 보시면 더 쉬울듯 하며,
저는 공부의 목적으로 보며 정리한 내용입니다.

발표내용
  • 리더 노드는 각 slice 로 실행계획에 맞춰 데이터 수집을 하라고 보내고 마지막 slice 가 끝날때까지 기다렸다가 끝난 데이터를 수집하여 제공
  • slice 는 redshift 내 각 클러스터의 cpu 즉 작업을 실질적으로 하는 모듈로 이해하면 될듯

Distkey 형태
  1. 분산키 : key 를 가지고 각 slice 로 저장(slice 는 cpu당 개수와 동일한 개수를 가지고 있음) - group by 할 때 / key를 가지고 join 하는 경우 좋음
    - 각 노드에 균등하게 데이터 분산이 필요
    - 가장 큰 테이블에서 조인에 활용되는 컬럼
    - group by 조건에서 사용되는 컬럼
    - 높은 Cardinality를 보유한 컬럼
    ★ 아래와 같은 현상이 발생하면 slice1번이 끝날때까지 기다려야 하기에 데이터 분산이 불균형이 생기면 이에 대한 기다림이 길어지기에 문제가 발생할 수 있음을 인지하고 있어야 한다
    - Equality filter에서 활용되는 컬럼 (Equal 를 사용하면 하나의 slice로 고정이 되어 버리기 때문에 좋지 않음)
    - 데이터의 몰림을 유발하는 경우
    ★ 분산키는 어떻게 사용 해야 되는가!
    - 가장 큰 Dimension table의 pk와 fact table의 foreign키를 disk key 로 선택
    - 나머지 조인이 되는 조건의 Dimension table(작은 테이블)은 distribution all 를 검토 -> 모든 node들에 해당 테이블에 대한 데이터가 존재하는 형태(경우에 따라 다르지만 보통 300만건 이하의 경우 데이터 분산 타입을 all 로 선택해도 무방)
    이렇게 하면 성능적으로 가장 좋은 형태로 제공
    Ⅰ 가장 자주 활용되는 쿼리 확인(https://github.com/awslabs/amazon-redshift-utils/tree/master/src/AdminScripts/top-queries.sql 활용)
    Ⅱ 조인 또는 group by 에 활용되며 고른 데이터 분포 및 높은 Cardinality 보유한 컬럼을 선택
    Ⅲ 그게 아니면-위와 같이 최고의 방법을 찾기 힘들다면.. (Fact table은 Even으로 Dimension Table-기준 테이블 은 All로)
  2. 전체 : 상품 테이블과 같이 개수는 적으나 모든 join이나 그런것에 필요한 경우 각 slice 에 모두 저장 (데이터 storge 가 증가)
  3. 균등 : round robin (Even)
  • explain 하였을 경우 redshift 에서 네트워크를 통한 데이터 이동 관련 내용 (데이터 이동은 가장 비용이 비싼 operation)
DS_DIST*
INFO (아래로 갈수록 퍼포먼스가 좋지 않음)
DS_DIST_NONE
Preferred, no data transfer between nodes
DS_DIST_ALL_NONE
One table has DISTSTYLE ALL, no data transfer between nodes
DS_DIST_ALL_INNER
Inner table is being sent to the Single Node
DS_DIST_INNER
Inner table is being redistributed in an outer join
DS_DIST_OUTER
Outer table is being redistributed in an outer join
DS_BCAST_INNER
Inner table is being broadcast to all nodes.
DS_BCAST_BOTH
Both tables are being broadcast to all nodex.

  • 리더 노드가 각 slice 로 
성능 향상
1. 보다 높은 하드웨어 스펙
2. 병렬화의 달성
3. 네트워크를 통한 트레픽의 최소화


하지만 결국은 최고의 성능을 내기 위해서는 "스토리지 I/O의 최소화"

  • 모든 블록을 1Mb형태로 나눠서 저장하는데..
  • Zone Map : 각 Block 에 대한 최대값 및 최소값에 대한 정보 저장 > 그렇기 때문에 정렬 키를 이용하면 필요한 Block 만 읽기 때문에 정렬 키가 중요


정렬 키 (sortkey)
  • 정렬 키가 지정된 테이블의 경우 데이터 로드 시 정렬 순서대로 디스크에 저장
  • 정렬 키의 종류
    • 정렬 키 : 테이블의 단일 컬럼의 값 기준으로 데이터 정렬하여 저장
    • 복합 정렬키(Compound Sort Key) :
      • 최대 6개의 컬럼까지 활용하여 정렬 키를 지정
      • 선언한 순서에 따라 순차적으로 정렬되어 저장 -> 가장 먼저 선언한 키에 대한 높은 가중치
      • 조인 및 Group by, order by 효과적이며, Merge 조인에 효과
    • 인터리브 정렬키 (Interleaved Sort Key)
      • 최대 8개의 컬럼까지 활용하여 정렬키를 지정
      • 정렬 키에서 각 열의 하위 집합에 똑같은 가중치 부여 -> 복합 정렬키는 앞의 데이터가 가중치가 높다면, 인터리브 정렬키는 뒤의 키에 대해서도 동일하게 가중치를 부여(데이터의 양이 증가??)
      • AD-Hoc 형태의 쿼리에서 높은 성능을 제공
ex) Product 테이블에 sort key로 type, color, size를 지정

복합정렬키
인터리브정렬키
where type = 's' and color = 'B' and size = 'xl'
정렬을 기반한 가장 우수한 성능
순서 등에 관계 없이 일관성 있는 성능 제공
where type = 's' and color = 'B'
약간 효과적
where type = 's' and size = 'xl'
제한적으로 성능에 기여

쿼리 분석

파일 분산 및 병렬 로딩
  • Slice 단위로 작업을 처리를 하지만, Slice는 파일 하나당 하나의 slice가 할당되기에 반드시 파일을 여러개로 나눈 후 메타데이터를 지정하여 작업 진행을 권장
아래 표 참조 (노드당 100Mbyte sec)
매니페스트 파일
Copy 커맨드를 통한 실행
{
   "entries":[
      {"url":"s3://mybucket-alpha/client.aa", "mandatory":true},
      {"url":"s3://mybucket-alpha/client.ab", "mandatory":true},
      {"url":"s3://mybucket-alpha/client.ac", "mandatory":true},
      {"url":"s3://mybucket-alpha/client.ad", "mandatory":true}
   ]
}
copy customer
from 's3://mybucket/cust.manifest'
iam_role 'arn:aws:iam::111111111:role/MyRedshiftRole'
manifest;

압축
  • 압축을 기반으로 디스크 사용량 및 I/O 더욱 줄일 수 있음
  • Copy 커멘트 사용 시 10만 행을 우선 적제 후 압축 옵션 평가 (COMPUDATE on)
  • "Analyze Compression"  커멘트를 통해 현재 압축(인코딩)에 대한 평가 후 "alter table" 로 변경
  • 2~4배의 효과

기타
  • Vacuum
    • 변경되거나 삭제된 블럭에 대한 정리(플래그만 남겨놓고 디스크에는 남아져 있음) - vocuum을 이용하여 정리를 해줌
    • 정렬키에 맞춰 데이터 정렬(재정렬을 해주기 위해 vocuum을 사용) , rename을 해도 됨
    • 옵션 : full, sort only, delete only, reindex (인터리브 정렬키)
  • 통계 업데이트
    • 통계 업데이트 자동 실행 조건
      • create table as select 
      • select into
      • copy (with compupdate on)
    • 통계 업데이트를 실행해줘야 하는 경우
      • 5% 이상 데이터 변경된 경우
      • 외례키를 추가하거나 삭제한 경우
      • Vacuum 실행 후 
  • 제약 조건 (not null, primary, foreign, unique)은 쿼리 옵티마이저가 활용 (plan  만들 때 참고하기 때문)


중요
  1. 노드 크기의 선택 조건
  2. 적절한 분산키의 선택 방안
  3. 적절한 정렬키의 선택 방안
  4. 데이터 적제 방안
  5. 레드쉬프트 스펙트럼에서의 파티셔닝 및 파일 포멧

Redshift Spectrum (redshift와 별개!!!)
  • 아래는 참고로 하고 제대로 공부는 후에 하는 형태로 하자. redshift와 별개로 필요시 알고 있으면 활용방안이 있을 듯
  • s3의 수천개 이상의 노드들을 활용하여 쿼리 실행


수천개의 s3 내의 csv 파일을 읽어 들여 조인 진행

  • 파티셔닝 및 Columnar 파일 포맷 (ORC, Parquet) 사용
    • 파티셔닝 컬럼의 조건
      • 필터 및 조인의 조건이 되는 컬럼
      • 비지니스 유닛
      • 비지니스 그룹
      • 날짜 및 시간
  • 파일의 갯수는 Redsfhit 의 Slice의 수량 이상
  • 파일의 크기는 64 mb 이상을 권고
  • 각 파일은 크기를 권고











반응형
반응형

제목이 좀 자극적일 수 있으나, 

말 그데로 AWS DMS를 이용하여 mysql -> redshift 로 적재하는 가운데 테이블 카운트가 현저히 차이나는데도 불구하고 완료 되었다고 나오는 현상에 대해서 해결한 방안을 공유해 봅니다.


작업 방법

- AWS 의 DMS를 이용하여 Mysql 5.6의 데이터를 Redshift 로 적재

- 물론 CDC 도 진행하도록 설정



증상

  • 특정 DB의 테이블들을 DMS 하는 과정에서 전체 데이터의 10%도 되지 않는 데이터만 적재 후 적재가 완료 되었다고 status 상에서 확인
  • 해당 테이블들을 버전이 다른 replication instance 를 이용하여 진행 시 특정 버전(2.4.3)에서만 문제 없이 적재 및 CDC가 되는 현상을 확인
    - 상위 버전인 2.4.4 / 3.1.2 에서는 증상과 같은 현상이 발생

 

확인사항

  • Replication instance Engine 별로 테스트 진행
  • 하나의 테이블만 가지고 테스트 진행

 

조치사항

  1. Replication instance 를 2.4.3으로 지정하여 해당 instance로 복제 진행
  2. 상위 버전의 Replication instance를 사용하는 경우 Source Endpoint에 Resumefetchforxrows=0 를 지정한 후 진행하여 가능


√ Resumefetchforxrows 

  • MySQL 테이블 내 데이터가 많을 경우 마지막 데이터까지 가져온 이후에도 정해진 수 만큼 Fetching을 계속하도록 하는 옵션
    ex) Resumefetchforxrows = 1500 이면, 1500 row까지 fetching을 시도
  • Resumefetchforxrows=0으로 설정함으로써 DMS가 테이블내의 데이터를 의도적으로 끝까지 Fetching 시도 하도록 설정
  • Replication Engine 2.4.4 버전에서부터 Resumefetchforxrows 옵션을 값을 고정하면서 문제가 발생한 것으로 예상(AWS 답변)
    -> 2,4,4버전 이상을 사용하는 경우 Resumefetchforxrows 값을 0으로 고정해서 사용한 것을 권장



해결할 수 있었던 건 다행히 회사에서 aws와 계약을 맺은 상태라 AWS 엔지니어를 통해서 해결책을 받은 것이며,

모든 테스트들을 AWS 엔지니어가 재현이 불가하여 직접 다 테스트한 결과입니다.


부디 많은 분들께서 저와 같은 삽질을 피하시기 바랍니다.


또한 수정이 필요한 내용이 있다면 과감히 알려주시면 감사하겠습니다.

반응형

+ Recent posts