AWS가 확실히 문서나 피드백 수집이나 활용이나..훨씬 사용자 친화적이다. 파이프라인 만드는 것도 쉽다. 유튜브에서 활용사례 등 자료 찾기도 좋다. 괜히 1등하는게 아니다.
최근에 인터뷰 준비를 하면서 클라우드 서비스로 데이터 레이크 - 데이터 웨어하우스 를 구축하는 부분을 많이 조사했다. 조사하면서 AWS가 상당히 데이터플랫폼 구축에 친화적인 듯 하여 한번 호다닥 만들어 보기로 했다.
조사하면서 오 이런 것도 됨?하거나 해보면 좋은 것들은 다 다뤄볼 예정. 데이터는 크게 신경쓰지 않고 더미로 던질 것이다.
- kinesis로 받은 데이터를 s3나 RDS에 집어넣기
- RDS의 트랜잭션 로그를 가져와서 실시간으로 s3에 집어넣기
- 간단한 데이터 레이크 → 웨어하우스 플랫폼 구축해보기
등등..
개인적으로 클라우드 서비스에서 제일 중요하게 다룰 건 정책이라 생각한다. 권한을 따로 관리해주는 팀이나 그룹이 있으면 편하겠지만 대부분은 구축하는 사람이 이런 설정을 다 해줘야 하니까. 정책설명도 구글보다 잘 되어있는 것 같다.
그 외에 서비스 가격 계산도 예제도 잘 나와있고 구글보다 훨 나은 거 같은데..? AWS를 잘 쓰다 GCP로 옮기면 GCP 적응이 쉬울 것 같다.
이 글에서는 AWS Kinesis를 이용해 stream data 주고 받는 작업과 관련 개념들을 찾아볼 것이다.
가격 계산
큰 트래픽은 내가 (돈을) 감당을 못 하니 대충 초당 로그 2개 정도 받고 레코드 크기는 최대 5KB정도로 조사했다. 어차피 테스트이니 데이터 보존은 1일.
Kinesis Data Streams 인스턴스 생성
IAM 정책 설정과 사용자 생성
사용자 stream-test를 만들고 사용자에게 바로 정책을 연결시키고 → access key 내역 가져오기.
그룹을 만든 뒤에 그룹에 정책을 적용할 수도 있다. 그룹을 나중에 사용하기 위해 data-platform이라는 그룹을 만들어두었다. 그룹의 구성원에게 기본적으로 적용할 정책을 안 정해서 정책은 설정하지 않음.
역할이 5개, 정책이 3개나 있어서 뭔가 봤더니 이전에 sageMaker를 사용해보면서 만들었던 정책들이..안 한게 아니다. 기억이 안 날 뿐;;; 회사에서 라벨링 툴 알아본다고 만들었다.
S3 버킷 준비
정책 설정. 후에 사용할 예정.
python package 세팅
pip install kinesis-python boto3
kinesis-python을 설치했지만 결과적으로 boto3로 kinesis 클라이언트 작업을 할 수 있어서 boto3로 작업했다.
Producer
윈도우에서 테스트중이기 때문에 환경변수 AWS_SHARED_CREDENTIALS_FILE
에 credentials 경로를 넣어줬다.
현재 kinesis의 node수는 3개이다.
data를 넣을 때 partition_key를 “string”으로 고정하고 넣어서 특정 샤드에만 데이터를 넣었다.
from pprint import pprint
from datetime import datetime
import boto3
from botocore.config import Config
import utils_
utils_.set_boto3_config()
my_config = Config(
signature_version='v4',
retries={
'max_attempts': 10,
'mode': 'standard'
}
)
client = boto3.client('kinesis', config=my_config)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
msg = f'hello kinesis at {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
print(msg)
response = client.put_record(
StreamName='stream-test',
Data=bytes(msg, 'utf-8'),
PartitionKey='string'#,
#ExplicitHashKey='string'#,
#SequenceNumberForOrdering='string'
)
pprint(response)
Consumer
그냥 덜렁 get_records
만 불러오면 안 된다.
- describe_stream에서 스트림에서 shard정보를 가져온다.
- 위의 테스트에서는 특정 샤드에만 데이터를 넣었으므로 그 샤드의 Id를 가져온다.
- 샤드ID로 shard_iterator(문자열)를 가져온다.
- get_records에 shard_iterator를 param으로 념겨서 record 리스트를 받아온다.
from time import sleep
from pprint import pprint
import boto3
import utils_
utils_.set_boto3_config() # 커스텀 함수
client = boto3.client('kinesis')
response = client.describe_stream(StreamName='stream-test')
pprint(response)
# for shard in response['StreamDescription']['Shards']:
my_shard_id = response['StreamDescription']['Shards'][2]['ShardId']
pprint(f'my_shard_id: {my_shard_id}')
shard_iterator = client.get_shard_iterator(StreamName='stream-test',
ShardId=my_shard_id,
ShardIteratorType='TRIM_HORIZON')#,
#Timestamp=datetime(2022, 2, 27))
# print(shard_iterator)
my_shard_iterator = shard_iterator['ShardIterator']
record_response = client.get_records(ShardIterator=my_shard_iterator, Limit=10)
print('record_response:')
pprint(record_response)
records = record_response['Records']
if len(records) > 0:
for x in records:
print('data: {}'.format(x['Data']))
print('===============')
while 'NextShardIterator' in record_response:
record_response = client.get_records(ShardIterator=record_response['NextShardIterator'],
Limit=10)
records = record_response['Records']
if len(records) > 0:
for x in records:
print('data: {}'.format(x['Data']))
print('===============')
sleep(1)
무슨 단위로 데이터를 묶어서 가져오는거지.
# consumer가 받은 모든 데이터
data: b'hello kinesis'
===============
data: b'hello kinesis'
===============
data: b'hello kinesis'
data: b'hello kinesis'
data: b'hello kinesis'
===============
data: b'hello kinesis 2'
data: b'hello kinesis at 2022-02-28 12:32:21.482718'
data: b'hello kinesis at 2022-02-28 12:32:31.596383'
data: b'hello kinesis at 2022-02-28 12:32:42'
===============
consumer의 shardIteratorType 설정
TRIM_HORIZON
을 설정했는데도 과거 데이터가 안 읽히길래 찾아봤는데, 단순히 한 번의 NextShardIterator로 모든 적제된 stream data가 읽히질 않는다.
위의 코드처럼 NextShardIterator로 데이터를 계속 업데이트하면서 보관중인 steam data에 도달하길 기다려야 한다. 읽는데 시간이 걸려서 기다려야 한다.
ShardIteratorType 설명
- LATEST는 consumer가 시작되고 나서 들어오는 데이터가 읽힌다. 보관되어 있는 데이터는 읽히지 않음.
https://www.slideshare.net/frodriguezolivera
기타
- 순서가 필요한 데이터는 하나의 샤드에 넣어야 한다. partition_key를 동일하게 해서 넣어야 한다. partition(문자열) →MD5해시로 샤드에 매핑된다.
- data size는 최대 1MB
- Sequence Number는 샤드 내 순서 보장을 위한 번호
- 데이터 처리 성능을 위한 shard merge, split
참조
- 이미지 출처(슬라이드쉐어)
- Amazon Kinesis Stream 개념 (다른 분 정리)
- Amazon Kinesis (다른 분 정리)