Source code for src.albums_database

"""
Create and manipulate a relational database for holding album data.
"""
import csv
import logging.config
import os
import traceback
from datetime import datetime
from time import time

import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Date, Float, Integer, String
from sqlalchemy.orm import sessionmaker
from flask_sqlalchemy import SQLAlchemy

from src import load_data

logger = logging.getLogger(__name__)
Base = declarative_base()


[docs]class Albums(Base): """Create a data model for the database to capture albums.""" __tablename__ = "albums" id = Column(Integer(), primary_key=True) album = Column(String(100), nullable=False) artist = Column(String(100)) reviewauthor = Column(String(50), nullable=False) score = Column(Float(), nullable=False) releaseyear = Column(Integer()) reviewdate = Column(Date()) recordlabel = Column(String(100)) genre = Column(String(50)) danceability = Column(Float()) energy = Column(Float()) key = Column(Float()) loudness = Column(Float()) speechiness = Column(Float()) acousticness = Column(Float()) instrumentalness = Column(Float()) liveness = Column(Float()) valence = Column(Float()) tempo = Column(Float()) def __repr__(self): return "Album(%r, %r)" % (self.album, self.artist)
[docs]def create_db(engine_string: str) -> None: """Create database from provided engine string.""" # The Base.metadata object collects and manages Table operations engine = sqlalchemy.create_engine(engine_string) try: Base.metadata.create_all(engine) except sqlalchemy.exc.OperationalError: logger.error(""" Could not create database! Check your connection string and confirm that you: 1. Are on the Northwestern VPN, and 2. Have database permissions to create tables. """) else: logger.info("Database created")
[docs]def delete_db(engine_string: str) -> None: """Delete database from provided engine string.""" # The Base.metadata object collects and manages Table operations engine = sqlalchemy.create_engine(engine_string) try: Base.metadata.drop_all(engine) except sqlalchemy.exc.OperationalError: logger.error(""" Could not delete database! Check your connection string and confirm that you: 1. Are on the Northwestern VPN, and 2. Have database permissions to delete tables. """) else: logger.info("Database deleted")
[docs]class AlbumManager: """Manages Flask <-> SQLAlchemy connection and adds data to database.""" def __init__(self, app=None, engine_string=None): """ Create a SQLAlchemy session. A Session establishes and maintains the conversation with the database, and holds the ORM-mapped objects which can be queried. More info: https://docs.sqlalchemy.org/en/14/orm/session_basics.html Args: app (Flask, optional): Flask app. Defaults to None. engine_string (str, optional): Engine string. Defaults to None. Raises: ValueError: If neither an app nor an engine string is provided. """ # Regardless of input form, we want a session object for future use if app: self.database = SQLAlchemy(app) self.session = self.database.session elif engine_string: engine = sqlalchemy.create_engine(engine_string) Session = sessionmaker(bind=engine) self.session = Session() else: raise ValueError("Need either an engine string or a Flask app to initialize") def __repr__(self): return "AlbumManager(%r)" % self.session
[docs] def close(self) -> None: """ Close the current SQLAlchemy session. Returns: None """ self.session.close()
[docs] def add_album( self, album: str, artist: str, reviewauthor: str, score: float, releaseyear: int, reviewdate: datetime, recordlabel: str, genre: str, danceability: float, energy: float, key: float, loudness: float, speechiness: float, acousticness: float, instrumentalness: float, liveness: float, valence: float, tempo: float, ) -> None: """ Seed an existing database with additional albums. Args: album (str): Album title artist (str): Artist reviewauthor (str): Name of reviewing author score (float): Pitchfork rating releaseyear (int): Album release year reviewdate (str): Album review date (%B %d %Y) recordlabel (str): Album's record label(s) genre (str): Album genre danceability (float): Spotify danceability score energy (float): Spotify energy score key (float): Spotify key score loudness (float): Spotify loudness score speechiness (float): Spotify speechiness score acousticness (float): Spotify acousticness score instrumentalness (float): Spotify instrumentalness score liveness (float): Spotify liveness score valence (float): Spotify valence score tempo (float): Spotify tempo score Returns: None """ try: # The original dataset provides dates in form (for example) "June 9 2021" reviewdate = datetime.strptime(reviewdate, "%B %d %Y").date() except ValueError: traceback.print_exc() logger.error("Failed to parse the given reviewdate \"%s\". Aborting.", reviewdate) else: # Add to database session = self.session new_album = Albums( album=album, artist=artist, reviewauthor=reviewauthor, score=score, releaseyear=releaseyear, reviewdate=reviewdate, recordlabel=recordlabel, genre=genre, danceability=danceability, energy=energy, key=key, loudness=loudness, speechiness=speechiness, acousticness=acousticness, instrumentalness=instrumentalness, liveness=liveness, valence=valence, tempo=tempo ) session.add(new_album) try: session.commit() except sqlalchemy.exc.OperationalError: traceback.print_exc() logger.error( """Could not find table. Rolling back transaction. Please check your connection string and ensure that you are connected to the Northwestern VPN.""" ) session.rollback() else: logger.info("%s added to database", album)
[docs] def ingest_dataset(self, file_or_path: str) -> None: """ Add entries from a CSV file to the database. Args: file_or_path (str): Location of dataset to load into database Returns: None Raises: `ValueError` from `parse_s3` if the provided S3 path is invalid. """ session = self.session # If the referenced filepath is in S3, `open()` cannot access -- save a local copy. # Put the local copy in the same place it would have gone inside S3. # If a local copy exists already, just use that instead if file_or_path.startswith("s3://"): # First confirm that the s3 path is valid try: _, s3path = load_data.parse_s3(file_or_path) except ValueError: logger.error("Error: Invalid S3 path!") raise # Error is due to user input, so bubble up to user local_path = s3path if not os.path.exists(local_path): load_data.download_file_from_s3(local_path=local_path, s3path=file_or_path) logger.debug("Downloaded a copy of the file to %s", local_path) else: logger.debug("Using existing local copy of dataset at %s", local_path) else: local_path = file_or_path start_time = time() albums = [] try: with open(local_path, "r", encoding="utf-8") as file: reader = csv.DictReader(file) for row in reader: try: # Convert reviewdate field to datetime row["reviewdate"] = datetime.strptime(row["reviewdate"], "%B %d %Y").date() except ValueError: # Actual date string doesn't match the given format # (likely has been parsed before during cleaning and is now in ISO format) row["reviewdate"] = datetime.strptime(row["reviewdate"], "%Y-%m-%d").date() albums.append(Albums(**row)) except FileNotFoundError: logger.error("Could not find file %s to ingest", local_path) raise try: session.add_all(albums) session.commit() except sqlalchemy.exc.OperationalError: traceback.print_exc() logger.error( """Could not find table. Rolling back transaction. Please check your connection string and ensure that you are connected to the Northwestern VPN.""" ) session.rollback() else: logger.info( "Contents of %s added to database. Time taken: %0.4fs", file_or_path, time() - start_time )