📌 학습목표
1. 데이터 웨어하우스의 옵션
2. 데이터 레이크
3. 빅데이터 처리 프레임워크
4. ETL(데이터 파이프라인), ELT
5. 다양한 데이터 소스
6. Airflow
7. 데이터 플랫폼 발전 단계
8. 실리콘 밸리 회사들의 데이터 스택 트렌드
9. [실습] Google Colab으로 간단한 ETL 작성해보기
데이터 웨어하우스(Data Warehouse) 옵션
- 확장 가능성(Scalable)과 적정한 비용이 중요함
- 데이터 규모가 커지면서 '클라우드'가 대세
- 오픈소스 기반(Presto, Hive)을 사용하는 경우도 클라우드 버전 존재
- 데이터가 작다면 굳이 빅데이터 기반 데이터베이스 사용할 필요X
1. 고정비용 옵션
장점 : 비용이 고정되어있기 때문에 비용 관리 측면에서는 좋음
단점 : 사용하지 않아도 비용을 지불해야 함
예) AWS의 Redshift
2. 가변비용 옵션
장점 : 쓴만큼 돈을 지불하므로 효율적. 확장 가능(처리할 수 있는 데이터의 양이 더 큼)
단점 : 비용 관리 측면에서 예측 불가
예) Google Cloud의 BigQuery, Snowflake
※ Iceberg를 제외하고는 모두 SQL을 지원하는 '빅데이터 기반 데이터베이스'
1) AWS Redshift
- 2012년에 시작된 AWS 기반의 데이터웨어하우스로 PB 스케일 데이터 분산 처리 가능
- Postgresql과 호환되는 SQL로 처리 가능하게 해줌
- Python UDF (User Defined Function)의 작성을 통해 기능 확장 가능
- 초기 고정비용 모델로 시작 → 현재 가변비용 모델도 지원 (Redshift Serverless)
- 온디맨드 가격 이외에도 예약 가격 옵션도 지원
- CSV, JSON, Avro, Parquet 등과 같은 다양한 데이터 포맷을 지원
- AWS내의 다른 서비스들과 연동이 쉬움
- S3, DynamoDB, SageMaker(ML 모델의 실행도 지원)등등
- Redshift의 기능 확장을 위해 Redshift Spectrum, AWS Athena등의 서비스와 같이 사용 가능
- 배치 데이터 중심이지만 실시간 데이터 처리 지원
- 웹 콘솔 이외에도 API를 통한 관리/제어 가능
2) Snowflake
- 2014년에 클라우드 기반 데이터웨어하우스로 시작됨 (2020년 상장)
- 지금은 데이터 클라우드라고 부를 수 있을 정도로 발전
- 데이터 판매를 통한 매출을 가능하게 해주는 Data Sharing & Marketplace 제공
- ETL과 다양한 데이터 통합 기능 제공
- SQL 기반으로 빅데이터 저장, 처리, 분석을 가능하게 해줌
- UDF로 비구조화된 데이터 처리와 머신러닝 기능 제공
- CSV, JSON, Avro, Parquet 등과 같은 다양한 데이터 포맷을 지원
- S3, GC 클라우드 스토리지, Azure Blog Storage도 지원
- 배치 데이터 중심이지만 실시간 데이터 처리 지원
- 웹 콘솔 이외에도 API를 통한 관리/제어 가능
📌 Data Sharing & Data Marketplace
데이터 웨어하우스의 형태로 내부에서만 사용하는 것이 아니라, 외부의 기업, 개인 고객들에게도 판매할 수 있는 기능
3) Google Cloud BigQuery
- 2010년에 시작된 구글 클라우드의 데이터 웨어하우스 서비스
- 구글 클라우드의 대표적인 서비스
- BigQuery SQL이란 SQL 데이터 처리 가능 (Nested fields, repeated fields 지원)
- 가변 비용(기본)과 고정 비용(옵션) 지원
- CSV, JSON, AVro, Parquet 등과 같은 다양한 데이터 포맷을 지원
- 구글 클라우드 내의 다른 서비스들과 연동이 쉬움
- 클라우드 스토리지, 데이터플로우, AutoML 등등
- 배치 데이터 중심이지만 실시간 데이터 처리 지원
- 웹 콘솔 이외에도 API를 통한 관리/제어 가능
4) Apache Hive
- Facebook이 2008년에 시작한 아파치 오픈소스 프로젝트 (디스크 중심)
- 하둡 기반으로 동작하는 SQL 기반 데이터 웨어하우스 서비스
- HiveQL이라 부르는 SQL 지원
- MapReduce위에서 동작하는 버전과 Apache Tez를 실행 엔진으로 동작하는 버전 두 가지가 존재
- 다른 하둡 기반 오픈소스들과 연동이 쉬움 (Spark, HBase 등등)
- 자바나 파이썬으로 UDF 작성 가능
- CSV, JSON, Avro, Parquet 등과 같은 다양한 데이터 포맷을 지원
- 배치 빅데이터 프로세싱 시스템
- 데이터 파티셔닝과 버킷팅과 같은 최적화 작업 지원
- 빠른 처리속도 보다는 처리할 수 있는 데이터 양의 크기에 최적화
- 웹 UI와 커맨드라인 UI (CLI라고 부름) 두 가지를 지원
- 점점 Spark에 의해 밀리는 분위기임
5) Apache Presto
- Facebook이 2013년에 시작한 아파치 오픈소스 프로젝트 (메모리 중심)
- 다양한 데이터소스에 존재하는 데이터를 대상으로 SQL 실행 가능
- HDFS (Hadoop Distributed File System), S3, Cassandra, MySQL 등등
- PrestoSQL이란 부르는 SQL 지원
- CSV, JSON, AVro, ORC, Parquet 등과 같은 다양한 데이터 포맷을 지원
- 배치 빅데이터 프로세싱 시스템
- Hive와는 다르게 빠른 응답 속도에 좀더 최적화 (메모리 기반)
- 웹 UI와 커맨드라인 UI (CLI라고 부름) 두 가지를 지원
- AWS Athena가 바로 Presto를 기반으로 만들어짐
6) Apache Iceberg
- Netflix가 2018년에 시작한 아파치 오픈소스 프로젝트로 데이터 웨어하우스 기술이 아는
- 대용량 SCD (Slowly-Changing Datasets) 데이터를 다룰 수 있는 테이블 포맷
- HDFS, S3, Azure Blob Storage 등의 클라우드 스토리지 지원
- ACID 트랙잭션과 타임여행 (과거 버전으로 롤백과 변경 기록 유지 등등)
- 스키마 진화 (Schema Evolution) 지원을 통한 컬럼 제거와 추가 가능 (테이블 재작성 없이)
- 자바와 파이썬 API를 지원
- Spark, Flink, Hive, Hudi 등의 다른 Apache 시스템과 연동 가능
7) Apache Spark
- UC 버클리 AMPLab이 2013년에 시작한 Apache 오픈소스 프로젝트
- 빅데이터 처리 관련 종합선물세트
- 배치처리(APISQL), 실시간처리, 그래프처리, 머신러닝 기능 제공
- (이미 만들어져 있는)다양한 분산처리 시스템 지원
- 하둡(YARN), AWS EMR, Google Cloud Dataproc, Mesos, K8s 55
- (이미 만들어져 있는)다양한 파일시스템과 연동 가능
- HDFS, S3, Cassandra, HBase 등등
- CSV, JSON, Avro, ORC, Parquet 등과 같은 다양한 데이터 포맷을 지원
- 다양한 언어 지원: Java, Python, Scala, R
데이터 레이크 (Data Lake)
= 구조화 데이터 + 비구조화 데이터(로그파일)
- 보존 기한이 없는 모든 데이터를 원래 형태대로 보존하는 스토리지에 가까움
- DW보다 규모가 더 크고 경제적인 스토리지
- 보통 클라우드 스토리지가 됨(ex. AWS라면 S3가 대표적인 데이터 레이크)
- 데이터 레이크가 있는 환경에서 ETL/ELT 진행
빅데이터 처리 프레임워크
- 분산 환경 기반
- 1대 혹은 그 이상의 서버로 구성
- ①분산 파일 시스템(큰 데이터를 저장)과 ②분산 컴퓨팅 시스템(저장된 데이터를 읽어서 프로세싱 후 새로운 데이터를 생성)이 필요
- Fault Tolerance : 소수의 서버가 고장나도 동작해야함
- Replication Factor : 하나의 서버에만 저장하는 것이 아니라 비상시를 대비하여 여러 서버에 저장할 수 있도록 개수 지정(보통 3으로 지정)
- 확장이 용이해야함
- Scale Out과 Scale Up이 둘다 가능해야 함
- 용량을 증대하기 위해서 서버 추가
대표적인 빅데이터 프로세싱 시스템
- 1세대 : Hadoop(YARN) 기반 MApreduce, Hive/Presto
- 2세대 : Spark(SQL, DataFrame, Streaing, ML, Graph)
ETL (Extract, Transform, Load) = 데이터 파이프라인 = 데이터 웍플로우 = DAG
: 데이터 레이크와 데이터 웨어하우스 바깥 → 안으로 데이터를 가져오는 것 (Airflow로 구현)
Extract : 데이터를 데이터 소스에서 읽어내는 과정으로, 보통 API를 호출
Transform : 필요하다면 원본 데이터의 포맷을 원하는 형태로 변경시키는 과정(굳이 변환할 필요 X)
Load : 최종적으로 데이터 웨어하우스에 테이블로 집어 넣는 과정
- ETL이 실패했을 경우 이를 고쳐서 재실행하는 등의 과정을 적절히 스케줄링 및 관리하는 것이 중요 → ETL 스케줄러 혹은 Framework이 필요(ex. Airflow)
- Airflow에서는 DAG(Directed Acyclic Graph)라고 부름
- 데이터를 소스로부터 목적지로 복사하는 작업
- 이 작업은 보통 코딩 (파이썬 혹은 스칼라) 혹은 SQL을 통해 이뤄짐
- 대부분의 경우 목적지는 데이터 웨어하우스가 됨
- 데이터 소스의 예시 :
- Click stream, call data, ads performance data, transactions, sensor data, metadata 등
- 좀더 구체적인 예들: 프로덕션 데이터베이스, 로그 파일, API, 실시간 스트림 데이터 등
- 데이터 목적지의 예시 :
- 데이터 웨어하우스, 캐시 시스템 (Redis, Memcache), 프로덕션 데이터베이스, NoSQL, S3 등
✓ 데이터 파이프라인 종류
1. Raw Data ETL Jobs
: 외부와 내부 데이터 소스에서 데이터를 읽어다가 (많은 경우 API를 통하게 됨) → 적당한 데이터 포맷 변환 후 (데이터의 크기가 커지면 Spark등이 필요해짐) → 데이터 웨어하우스 로드
- 이 작업은 보통 데이터 엔지니어가 함
2. Summary/Report Jobs
: DW(혹은 DL)로부터 데이터를 읽어 다시 DW에 쓰는 ETL
- Raw Data를 읽고 조인해서 일종의 리포트 형식이나 요약 테이블을 다시 만드는 용도
- 특수한 형태로는 A/B Test 결과를 분석하는 데이터 파이프라인도 존재
- 데이터 분석가의 역할 : SQL (CTAS를 통해)만으로 요약 테이블 생성
- 데이터 엔지니어의 역할 : 데이터 분석가들이 편하게 작업할 수 있는 환경을 만들어 주는 것
→ Analytics Engineer (dbt)
3. Production Data Jobs
: DW로부터 데이터를 읽어 다른 Storage(많은 경우 프로덕션 환경)로 쓰는 ETL
- 요약 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
- 혹은 머신러닝 모델에서 필요한 feature들을 미리 계산해두는 경우
- 이 경우 흔한 타켓 스토리지:
- CassandralHBase/DynamoDB와 같은 NoSQL
- MySQL과 같은 관계형 데이터베이스 (OLTP)
- Redis/Memcache와 같은 캐시
- ElasticSearch와 같은 검색엔진
- 이 경우 흔한 타켓 스토리지:
✓ 데이터 파이프라인 생성시 고려할 점
- 이상과 현실간의 괴리
(ex. 버그, 데이터 소스상의 이슈, 의존도에 대한 이해도 부족, 유지보수 비용의 증가)
<Best Practices (모범 사례)>
1. 가능하면 데이터가 작을 경우 매번 통채로 복사해서 테이블을 만들기 (Full Refresh)
- Incremental update만이 가능하다면, 대상 데이터소스가 갖춰야할 몇 가지 조건이 있음
- 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요:
- created (데이터 업데이트 관점에서 필요하지는 않음)
- modified
- deleted
- 데이터 소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야함
- 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요:
2. 멱등성(Idempotency)을 보장해야 한다.
- 동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 말아야함
- 예를 들면 중복 데이터가 생기지 말아야함
- 중요한 포인트는 critical point들이 모두 one atomic action으로 실행이 되어야 한다는 점
- SQL transaction이 꼭 필요한 기술
- SQL transaction이 꼭 필요한 기술
3. Backfill(과거 데이터를 다시 채우는 과정)이 쉬워야 한다.
- 실패한 데이터 파이프라인을 재실행하는 것이 쉬워야함
- Airflow는 특히 Backfill에 강점을 갖고있음
4. 데이터 파이프라인을 만들 때마다 입력과 출력을 명확히하고 문서화해야 한다.
- 비지니스 오너 명시: 누가 이 데이터를 요청했는지를 기록으로 남길 것
- 이게 나중에 데이터 카탈로그로 들어가서 데이터 디스커버리에 사용 가능함
- 데이터 리니지가 중요해짐 → 이걸 이해하지 못하면 온갖 종류의 사고 발생
5. 주기적으로 쓸모없는 데이터들을 삭제해야 한다.
- 사용되지 않는 테이블 및 데이터 파이프라인을 사전에 차단
- 필요한 데이터만 DW에 보관하고 과거 데이터를 DL(또는 스토리지)로 이동
6. 데이터 파이프라인 사고시 마다 사고 리포트(post-mortem)를 작성해야 한다.
- 목적 : 동일한(혹은 비슷한) 사고 재발 방지
- 사고 원인(root-cause)을 이해 및 방지하기 위한 액션 아이템들의 실행이 중요해짐
- 기술 부채의 정도를 이야기해주는 바로미터
7. 중요 데이터 파이프라인의 입력과 출력을 체크해야 한다.
- 아주 간단하게 입력 레코드의 수와 출력 레코드의 수가 몇개인지 체크하는 것부터 시작해보자
- 써머리 테이블을 만들어내고 Primary key가 존재한다면 Primary key uniqueness가 보장되는지 체크하는 것이 필요함
- 중복 레코드 체크
ELT
: 데이터 레이크와 데이터 웨어하우스 내부 데이터를 처리 및 조작해서 (요약된) 새로운 데이터를 만드는 프로세스 (DBT를 사용)
- 데이터가 다양해지고 커짐에 따라 데이터를 모두 조인해서 사용하는 것이 불가능해짐 → 주기적으로 요약 데이터를 만들어 사용하는 것이 효율적 → DBT 사용(Analytics Engineering)
- 데이터 분석가가 이를 수행한다.
- 요약 테이블을 만들어줌
- 데이터 레이크 사용
다양한 데이터 소스의 예
- 프로덕션 데이터베이스(웹/앱에서 사용하는 데이터베이스)의 데이터
- MySQL, Postgres 등
- 이메일 마케팅 데이터
- Mailchimp, HubSpot, SendGrid 등
- 크레딧카드 매출 데이터
- Stripe
- 서포트 티켓 데이터
- Zendesk, Kustomer 등
- 서포트 콜 데이터
- ChannelTalk, RingCentral, Talkdesk 등
- 세일즈 데이터
- Salesforce
- 사용자 이벤트 로그
- Amplitude, MixPanel, 웹서버로그 등
Airflow
: 다수의 ETL이 존재할 경우 이를 스케줄해주고 이들간의 의존관계(dependency)를 정의해주는 관리 및 운영 프레임웍(스케줄러)
- Backfill : 특정 ETL이 실패할 경우 이에 관한 에러 메세지를 받고 재실행해주는 기능
- Python3 기반 오픈소스 프로젝트
- AWS와 구글클라우드와 Azure에서도 지원
- Airflow에서는 ETL을 DAG라 부르며 웹 인터페이스를 통한 관리 기능 제공
- 구성 컴포넌트 : 스케줄러, 웹서버, 워커 (Worker)
데이터 플랫폼의 발전 단계
1. (초기) 데이터 웨어하우스 + ETL
2. (발전) 데이터의 양 증가
- Spark와 같은 빅데이터 처리 시스템 도입
- 데이터 레이크 도입 : 보통 로그 데이터와 같은 대용량 비구조화 데이터 대상
- 데이터 소스 → 데이터 파이프라인 → 데이터 웨어하우스
- 데이터 소스 → 데이터 파이프라인 → 데이터 레이크
- 데이터 레이크 → 데이터 파이프라인 → 데이터 웨어하우스
- 이때 Spark/Hadoop 등이 사용됨
- Hadoop: Hive/Presto등이 기반됨
✓ 데이터의 양이 커지기 때문에 데이터 레이크와 같은 경제적이고 규모가 더 큰 스토리지가 필요함
✓ 이러한 데이터를 프로세싱 하려면 대용량 분산 처리 시스템인 Spark, Hive/Presto의 도입이 필요함
3. (성숙) 데이터 활용 증대
- 현업단의 데이터 활용이 가속화
- ELT가 중요해짐에 따라 DBT 등의 Analytics Engineering 도입
- MLOps 등 머신러닝 관련 효율성을 높이려는 노력 증대
실리콘밸리 기업들의 데이터 스택 트렌드
[실습] Google Colab으로 간단한 ETL 작성해보기
실습 환경
- Redshift dc2.large
- 2 CPU, 15GB memory, 160GB SSD
- Host: 호스트주소.redshift.amazonaws.com
- Port: 포트번호Database: DB이름
- ID: 본인의 ID (이 ID로 스키마가 만들어져 있음)
- Password: 본인의 패스워드
실습 ETL 개요
- 웹 상에 존재하는 이름_성별.csv 파일을 다운로드한다.
- 다운로드한 데이터 소스를 원하는 포맷으로 변경한다.
- Redshift에 있는 테이블로 적재(복사)한다.
1단계. Redshift에 각자에게 할당된 schema밑에 테이블 생성
CREATE TABLE (본인의스키마).name_gender(
name varchar(32) primary key,
gender varchar(8)
);
※ Redshift와 같은 빅데이터 기반의 데이터 웨어하우스에서 Primary key는 그 유일성이 보장되지 않고 그냥 힌트일 뿐이다.
이를 보장하는 것은 개발자나 데이터 분석가의 책임이다.
Redshift 연결 함수 정의
import psycopg2
# Redshift connection 함수
def get_Redshift_connection():
host = "redshift호스트주소.redshift.amazonaws.com"
redshift_user = "ID"
redshift_pass = "PW"
port = 포트번호
dbname = "DB명"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True) # autocommit=True의 의미 : SQL을 사용해서 테이블의 내용을 바꾸는 경우 그 내용이 바로바로 커밋된다(ex. 사용자가 테이블을 삭제하면 바로 삭제됨)
return conn.cursor()
2단계. 데이터 소스 불러오기
name과 gender 두 개의 필드가 존재한다.
3단계. Extract, Transform, Load 3개의 함수를 정의한 후 호출
1) Extract
URL을 인자로 받은 후, 해당 URL에 연결된 파일을 다운로드 받아서 리턴한다.
import requests
def extract(url):
f = requests.get(url) # URL을 읽어서(보안이 걸려있지 않은 url만 가능)
return (f.text) # 데이터 리턴(f.text : url이 가리키는 문서의 내용을 하나의 긴 문자열로 리턴)
2) Transform
`extract` 함수가 리턴한 데이터를 인자로 받은 후, 정리해서 name과 gender라는 필드를 갖는 리스트로 리턴한다.
def transform(data):
# data를 name, gender로 받기
# 리스트로 변환하여 리턴
lines = text.strip().split("\n") # 줄바꿈 중심으로 쪼개기
records = [] # 빈 리스트 생성
for l in lines[1:]: # 첫번째 줄은 헤더이므로 인덱스 0이 아닌 1부터 반복
(name, gender) = l.split(",") # l = "Keeyong,M" -> ['keeyong', 'M']
records.append([name, gender]) # records라는 리스트에 name, gender가 묶인 하나의 아이템도 리스트로 들어가게됨
return records
3) Load
`transform` 함수가 리턴한 리스트에 들어있는 name과 gender 필드를 앞서 만들었던 Redshift 테이블(1단계 참고)에 적재한다.
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
cur = get_Redshift_connection() # 1단계에서 만들었던 Redshift 연결 함수 호출
cur.execute("BEGIN;") # BEGIN으로 트랜젝션 열기
try:
cur.execute("""DROP TABLE IF EXISTS 본인스키마명.name_gender; # 이미 테이블이 있으면 제거
CREATE TABLE 본인스키마명.name_gender (
name varchar(32) primary key,
gender varchar(8)
);""")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO 본인스키마명.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender) # name - gender 한줄을 테이블에 적재
cur.execute(sql)
cur.execute("END") # 동일 코드 cur.execute("COMMIT"); conn.commit() # 무사히 마치면 트랜잭션 닫기
# 트랜잭션 사이에 에러가 발생할 경우 except 이후 구문으로 넘어가서 ROLLBACK을 통해 이전 상태로 돌아간다
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK")
🤔 Full-Refresh 형태로 동작
: 매번 URL에 연결된 파일을 전부 읽어서 테이블에 처음부터 다시 적재하는 방식
즉, 테이블의 내용을 전부 날리고 하나씩 레코드를 적재하는 방식으로 동작함
► 이 과정에서 중간에 에러가 발생하면 테이블의 내용이 불완전하게 남기 때문에 SQL의 Transaction을 사용해서 `ROLLBACK`을 통해 원래의 온전했던 상태로 돌아갈 수 있도록 해야 한다.
► `ROLLBACK`의 기대 효과 : 데이터 정확성과 멱등성 보장