-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspotify_transformation_load_function.py
99 lines (89 loc) · 4.3 KB
/
spotify_transformation_load_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import json
import boto3
import pandas as pd
from datetime import datetime
from io import StringIO
def album(data):
album_list = []
for row in data['items']:
album_name = row['track']['album']['name']
album_release_date = row['track']['album']['release_date']
album_total_tracks = row['track']['album']['total_tracks']
album_url = row['track']['album']['external_urls']['spotify']
album_id = row['track']['album']['id']
album_element = {'album_name': album_name, 'album_release_date': album_release_date,'album_total_tracks': album_total_tracks, 'album_url': album_url, 'album_id': album_id}
album_list.append(album_element)
return album_list
def song(data):
song_list = []
for row in data['items']:
song_id = row['track']['id']
song_name = row['track']['name']
song_duration = row['track']['duration_ms']
song_url = row['track']['external_urls']['spotify']
song_popularity = row['track']['popularity']
song_added = row['added_at']
album_id = row['track']['album']['id']
artist_id = row['track']['album']['artists'][0]['id']
song_element = { 'song_id': song_id, 'song_name': song_name, 'song_duration': song_duration, 'song_url': song_url, 'song_popularity': song_popularity, 'song_added': song_added, 'album_id': album_id, 'artist_id': artist_id}
song_list.append(song_element)
return song_list
def artist(data):
artist_list = []
for row in data['items']:
for key,value in row.items():
if key == "track":
for artist in value['artists']:
artist_dict = {'artist_id': artist['id'], 'artist_name': artist['name'], 'external_url': artist['href']}
artist_list.append(artist_dict)
return artist_list
def lambda_handler(event, context):
s3 = boto3.client('s3')
Bucket = "spotify-etl-project-ishaan"
Key = "raw_data/to_processed/"
spotify_data = []
spotify_keys = []
for file in s3.list_objects(Bucket = Bucket, Prefix = Key)['Contents']:
file_key = file['Key']
print(file_key)
if file_key.split('.')[-1] == "json":
response = s3.get_object(Bucket = Bucket, Key = file_key)
content = response['Body']
jsonObject = json.loads(content.read())
spotify_data.append(jsonObject)
spotify_keys.append(file_key)
for data in spotify_data:
album_list = album(data)
song_list = song(data)
artist_list = artist(data)
album_df = pd.DataFrame.from_dict(album_list)
album_df = album_df.drop_duplicates(subset=['album_id'])
album_df['album_release_date'] = pd.to_datetime(album_df['album_release_date'])
album_key = "transformed_data/album_data/album_transformed_" + str(datetime.now()) + ".csv"
album_buffer = StringIO()
album_df.to_csv(album_buffer, index=False)
album_content = album_buffer.getvalue()
s3.put_object(Bucket = Bucket, Key = album_key, Body = album_content)
song_df = pd.DataFrame.from_dict(song_list)
song_df = song_df.drop_duplicates(subset=['song_name'])
song_df['song_added'] = pd.to_datetime(song_df['song_added'])
song_key = "transformed_data/song_data/song_transformed_" + str(datetime.now()) + ".csv"
song_buffer = StringIO()
song_df.to_csv(song_buffer, index=False)
song_content = song_buffer.getvalue()
s3.put_object(Bucket = Bucket, Key = song_key, Body = song_content)
artist_df = pd.DataFrame.from_dict(artist_list)
artist_df = artist_df.drop_duplicates(subset=['artist_id'])
artist_key = "transformed_data/artist_data/artist_transformed_" + str(datetime.now()) + ".csv"
artist_buffer = StringIO()
artist_df.to_csv(artist_buffer, index=False)
artist_content = artist_buffer.getvalue()
s3.put_object(Bucket = Bucket, Key = artist_key, Body = artist_content)
s3_resource = boto3.resource('s3')
for key in spotify_keys:
copy_source = {
'Bucket' : Bucket,
'Key' : key
}
s3_resource.meta.client.copy(copy_source, Bucket, 'raw_data/processed/' + key.split("/")[-1])
s3_resource.Object(Bucket, key).delete()