본문 바로가기

D.S/DE

220227일 - Kinesis 로 stream data 처리해보기

728x90

 

 

AWS가 확실히 문서나 피드백 수집이나 활용이나..훨씬 사용자 친화적이다. 파이프라인 만드는 것도 쉽다. 유튜브에서 활용사례 등 자료 찾기도 좋다. 괜히 1등하는게 아니다.

 

최근에 인터뷰 준비를 하면서 클라우드 서비스로 데이터 레이크 - 데이터 웨어하우스 를 구축하는 부분을 많이 조사했다. 조사하면서 AWS가 상당히 데이터플랫폼 구축에 친화적인 듯 하여 한번 호다닥 만들어 보기로 했다.

 

조사하면서 오 이런 것도 됨?하거나 해보면 좋은 것들은 다 다뤄볼 예정. 데이터는 크게 신경쓰지 않고 더미로 던질 것이다.

 

  • kinesis로 받은 데이터를 s3나 RDS에 집어넣기
  • RDS의 트랜잭션 로그를 가져와서 실시간으로 s3에 집어넣기
  • 간단한 데이터 레이크 → 웨어하우스 플랫폼 구축해보기

등등..

 

개인적으로 클라우드 서비스에서 제일 중요하게 다룰 건 정책이라 생각한다. 권한을 따로 관리해주는 팀이나 그룹이 있으면 편하겠지만 대부분은 구축하는 사람이 이런 설정을 다 해줘야 하니까. 정책설명도 구글보다 잘 되어있는 것 같다.

그 외에 서비스 가격 계산도 예제도 잘 나와있고 구글보다 훨 나은 거 같은데..? AWS를 잘 쓰다 GCP로 옮기면 GCP 적응이 쉬울 것 같다.

 


 

이 글에서는 AWS Kinesis를 이용해 stream data 주고 받는 작업관련 개념들을 찾아볼 것이다.

 

가격 계산

큰 트래픽은 내가 (돈을) 감당을 못 하니 대충 초당 로그 2개 정도 받고 레코드 크기는 최대 5KB정도로 조사했다. 어차피 테스트이니 데이터 보존은 1일.

 

Kinesis Data Streams 인스턴스 생성

 

 

IAM 정책 설정과 사용자 생성

 

사용자 stream-test를 만들고 사용자에게 바로 정책을 연결시키고 → access key 내역 가져오기.

그룹을 만든 뒤에 그룹에 정책을 적용할 수도 있다. 그룹을 나중에 사용하기 위해 data-platform이라는 그룹을 만들어두었다. 그룹의 구성원에게 기본적으로 적용할 정책을 안 정해서 정책은 설정하지 않음.

역할이 5개, 정책이 3개나 있어서 뭔가 봤더니 이전에 sageMaker를 사용해보면서 만들었던 정책들이..안 한게 아니다. 기억이 안 날 뿐;;; 회사에서 라벨링 툴 알아본다고 만들었다.

 

 

S3 버킷 준비

정책 설정. 후에 사용할 예정.

 

python package 세팅

 


pip install kinesis-python boto3

 

kinesis-python을 설치했지만 결과적으로 boto3로 kinesis 클라이언트 작업을 할 수 있어서 boto3로 작업했다.

 

 

Producer

윈도우에서 테스트중이기 때문에 환경변수 AWS_SHARED_CREDENTIALS_FILE 에 credentials 경로를 넣어줬다.

현재 kinesis의 node수는 3개이다.

data를 넣을 때 partition_key를 “string”으로 고정하고 넣어서 특정 샤드에만 데이터를 넣었다.


from pprint import pprint
from datetime import datetime
import boto3
from botocore.config import Config
import utils_


utils_.set_boto3_config() 
my_config = Config(
    signature_version='v4',
    retries={
        'max_attempts': 10,
        'mode': 'standard'
    }
)

client = boto3.client('kinesis', config=my_config)

print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
msg = f'hello kinesis  at {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
print(msg)

response = client.put_record(
    StreamName='stream-test',
    Data=bytes(msg, 'utf-8'),
    PartitionKey='string'#,
    #ExplicitHashKey='string'#,
    #SequenceNumberForOrdering='string'
)
pprint(response)



 

Consumer

 

그냥 덜렁 get_records만 불러오면 안 된다.

  • describe_stream에서 스트림에서 shard정보를 가져온다.
  • 위의 테스트에서는 특정 샤드에만 데이터를 넣었으므로 그 샤드의 Id를 가져온다.
  • 샤드ID로 shard_iterator(문자열)를 가져온다.
  • get_records에 shard_iterator를 param으로 념겨서 record 리스트를 받아온다.


from time import sleep
from pprint import pprint
import boto3
import utils_

utils_.set_boto3_config() # 커스텀 함수
client = boto3.client('kinesis')

response = client.describe_stream(StreamName='stream-test')
pprint(response)

# for shard in response['StreamDescription']['Shards']:
my_shard_id = response['StreamDescription']['Shards'][2]['ShardId']

pprint(f'my_shard_id: {my_shard_id}')
shard_iterator = client.get_shard_iterator(StreamName='stream-test',
                                           ShardId=my_shard_id,
                                           ShardIteratorType='TRIM_HORIZON')#,
                                           #Timestamp=datetime(2022, 2, 27))

# print(shard_iterator)

my_shard_iterator = shard_iterator['ShardIterator']
record_response = client.get_records(ShardIterator=my_shard_iterator, Limit=10)
print('record_response:')
pprint(record_response)

records = record_response['Records']
if len(records) > 0:
    for x in records:
        print('data: {}'.format(x['Data']))
    print('===============')

while 'NextShardIterator' in record_response:
    record_response = client.get_records(ShardIterator=record_response['NextShardIterator'],
                                         Limit=10)

    records = record_response['Records']
    if len(records) > 0:
        for x in records:
            print('data: {}'.format(x['Data']))
        print('===============')

    sleep(1)

 

무슨 단위로 데이터를 묶어서 가져오는거지.


# consumer가 받은 모든 데이터

data: b'hello kinesis'
===============
data: b'hello kinesis'
===============
data: b'hello kinesis'
data: b'hello kinesis'
data: b'hello kinesis'
===============
data: b'hello kinesis 2'
data: b'hello kinesis  at 2022-02-28 12:32:21.482718'
data: b'hello kinesis  at 2022-02-28 12:32:31.596383'
data: b'hello kinesis  at 2022-02-28 12:32:42'
===============

 

consumer의 shardIteratorType 설정

 

TRIM_HORIZON 을 설정했는데도 과거 데이터가 안 읽히길래 찾아봤는데, 단순히 한 번의 NextShardIterator로 모든 적제된 stream data가 읽히질 않는다.

위의 코드처럼 NextShardIterator로 데이터를 계속 업데이트하면서 보관중인 steam data에 도달하길 기다려야 한다. 읽는데 시간이 걸려서 기다려야 한다.

 

 

 

ShardIteratorType 설명

  • LATEST는 consumer가 시작되고 나서 들어오는 데이터가 읽힌다. 보관되어 있는 데이터는 읽히지 않음.

 

https://www.slideshare.net/frodriguezolivera

 

기타

  • 순서가 필요한 데이터는 하나의 샤드에 넣어야 한다. partition_key를 동일하게 해서 넣어야 한다. partition(문자열) →MD5해시로 샤드에 매핑된다.
  • data size는 최대 1MB
  • Sequence Number는 샤드 내 순서 보장을 위한 번호
  • 데이터 처리 성능을 위한 shard merge, split

 

 

참조

 

 

 

 

 

 

반응형