( 참고 : “FastCampus, 데이터 엔지니어링 올인원” )
[ Data Engineering ]
DynamoDB에 데이터 넣기
Python을 통해, 지속적으로 DynamoDB에 데이터를 넣을 것!
이때 사용하는 패키지 이름 : boto3
1. Basic Setup
import sys
import os
import boto3
2. DynamoDB에 Connect
Connect하는 함수 :
- ` boto3.resource()`
try:
dynamodb = boto3.resource('dynamodb',
region_name='ap-northeast-2',
endpoint_url='http://dynamodb.ap-northeast-2.amazonaws.com')
except:
logging.error('Connect Fail')
sys.exit(1)
3. 가져올 Data 살펴보기
우선, 우리의 목표를 다시 생각해보자.
MySQL DB의 artists 테이블에, 여러 artists들의 ID를 저장해놨다. 해당 ID를 사용하여, spotify api를 request해서 top track을 받아온 뒤, 이것을 DynamoDB에 저장할 것이다.
특정 Table을 불러오기 :
dynamodb.Table('테이블명')
table = dynamodb.Table('top_tracks')
(이전에 저장했든 MySQL DB에서) Artists ID 불러오기 &
DynamoDB의 ‘top_tracks’ 테이블에, 해당 artists의 top track을 담기
cursor.execute('SELECT id FROM artists')
countries = ['US', 'CA']
for country in countries:
for (artist_id, ) in cursor.fetchall():
URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(artist_id)
params = {
'country': country
}
r = requests.get(URL, params=params, headers=headers)
raw = json.loads(r.text)
for track in raw['tracks']:
data = {
'artist_id': artist_id,
'country': country
}
data.update(track)
table.put_item(Item=data)
- 아래와 같이 data들이 잘 담긴것을 확인할 수 있다!
4. Code Summary
def main():
try:
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2', endpoint_url='http://dynamodb.ap-northeast-2.amazonaws.com')
except:
logging.error('could not connect to dynamodb')
sys.exit(1)
try:
conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
sys.exit(1)
headers = get_headers(client_id, client_secret)
table = dynamodb.Table('top_tracks')
cursor.execute('SELECT id FROM artists')
countries = ['US', 'CA']
for country in countries:
for (artist_id, ) in cursor.fetchall():
URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(artist_id)
params = {
'country': country
}
r = requests.get(URL, params=params, headers=headers)
raw = json.loads(r.text)
for track in raw['tracks']:
data = {
'artist_id': artist_id,
'country': country
}
data.update(track)
table.put_item(Item=data)