diff --git a/files/util.py b/files/util.py index 67c0648..e70ec8d 100644 --- a/files/util.py +++ b/files/util.py @@ -1,5 +1,6 @@ import logging import os +import re from typing import Any import magic @@ -16,4 +17,4 @@ def scan_folder(src: str): def accepted_file_type(file: Any) -> bool: mime = magic.from_file(file, mime=True) logger.info(f"MIME {mime} for file {file}") - return mime in ('audio/mpeg', 'audio/mp3') + return re.search('audio/*', mime) is not None diff --git a/main.py b/main.py index ffb8eea..ed2a9fe 100644 --- a/main.py +++ b/main.py @@ -7,7 +7,9 @@ from pathlib import Path from typing import Any from files.util import scan_folder, accepted_file_type -from service.provider import search_song +from models.track import TrackInfo +from service import index +from service.provider import search_song, SongRecognizeError logger = logging.getLogger(__name__) @@ -16,29 +18,42 @@ def arg_parser(): parser = argparse.ArgumentParser(description="Organize music from source folder to destination folder using Shazam") parser.add_argument("--src", "-s", metavar="SRC", type=str, help="source folder to scan") parser.add_argument("--dest", "-d", metavar="DEST", type=str, help="where it copy files") + parser.add_argument("--no-index", "-i", type=bool, default=False, help="disable music indexed") return parser.parse_args() -async def main(src, dst) -> Any: - for song_file in scan_folder(src): - if accepted_file_type(song_file): - print(f"Found file {song_file}") - track_info = await search_song(song_file) - logging.info(f"Recognize file {song_file} as {track_info}") - album_path: str = os.path.join(dst, track_info.artist, - f"{track_info.album.released} - {track_info.album.name}") - if not os.path.exists(album_path): - Path(album_path).mkdir(mode=0o755, parents=True, exist_ok=True) - - # I can copy to file now - shutil.copy(song_file, - os.path.join(album_path, f"{track_info.track_number} - {track_info.title}.mp3")) +async def main(options) -> Any: + for song_file in scan_folder(options.src): + if not accepted_file_type(song_file): + logger.info(f"Skipped file {song_file}") else: - print(f"Skipped file {song_file}") + logger.info(f"Found file {song_file}") + try: + if index.duplicated(song_file): + logger.info(f"{song_file} already collected") + continue + + track_info: TrackInfo = await search_song(song_file) + logging.info(f"Recognize file {song_file}:\n{track_info}") + album_path: str = os.path.join(options.dest, track_info.artist, + f"{track_info.album.released} - {track_info.album.name}") + if not os.path.exists(album_path): + Path(album_path).mkdir(mode=0o755, parents=True, exist_ok=True) + + destination_path: str = os.path.join(album_path, f"{track_info.track_number} - {track_info.title}.mp3") + logger.debug(f"Copy {os.path.basename(song_file)} in {destination_path}") + shutil.copy(song_file, destination_path) + if not options.no_index and index.add_track(track_info=track_info, path=destination_path): + logger.info(f"Added {destination_path} to index") + + except SongRecognizeError as e: + logger.error(f"Error: {e}") + Path(f"{options.dest}/Unknown").mkdir(mode=0o755, parents=True, exist_ok=True) + shutil.copy(song_file, os.path.join(f"{options.dest}/Unknown/{os.path.basename(song_file)}")) if __name__ == '__main__': - logging.basicConfig(filename="music.log", level=logging.DEBUG) + logging.basicConfig(filename="/dev/stdout", level=logging.DEBUG) args = arg_parser() loop = asyncio.get_event_loop() - loop.run_until_complete(main(args.src, args.dest)) + loop.run_until_complete(main(args)) diff --git a/models/track.py b/models/track.py index 2bf16b0..325ef4d 100644 --- a/models/track.py +++ b/models/track.py @@ -17,8 +17,7 @@ class Album: return self.__released def __str__(self): - return f"""Name: {self.__name} - Released: {self.__released}""" + return f"{self.__released} - {self.__name}" class TrackInfo: @@ -45,7 +44,9 @@ class TrackInfo: return self.__number def __str__(self) -> str: - return f"""Artist: {self.__artist} - Title: {self.__title} - Album: {self.__album} - Track Number: {self.__number}""" + return f""" + Artist: {self.__artist} + Title: {self.__title} + Album: {self.__album} + Track Number: {self.__number} + """ diff --git a/service/__init__.py b/service/__init__.py index e69de29..e3c7440 100644 --- a/service/__init__.py +++ b/service/__init__.py @@ -0,0 +1,37 @@ +import logging +import os +import sqlite3 +from contextlib import closing +from pathlib import Path + +logger = logging.getLogger(__name__) + +__CACHE_PATH: str = f"{os.path.expandvars("$HOME")}/.cache/morg" +__INDEX_PATH: str = f"{__CACHE_PATH}/index.db" + + +def initialize(): + print("Initialize database") + if not os.path.exists(__CACHE_PATH): + Path(__CACHE_PATH).mkdir(mode=0o755, parents=True, exist_ok=True) + + # Initialize database tables if not exists + try: + with closing(sqlite3.connect(__INDEX_PATH)) as conn: + with closing(conn.cursor()) as cursor: + cursor.execute(""" + create table if not exists songs ( + artist TEXT, + title TEXT, + album TEXT, + released INT, + path TEXT, + fingerprint TEXT + ) + """) + except sqlite3.OperationalError as e: + print("Error: ", e) + logger.error("Error: ", e) + + +initialize() diff --git a/service/index.py b/service/index.py new file mode 100644 index 0000000..c7971d5 --- /dev/null +++ b/service/index.py @@ -0,0 +1,42 @@ +import hashlib +import logging +import sqlite3 +from contextlib import closing + +from models.track import TrackInfo +from service import __INDEX_PATH + +logger = logging.getLogger(__name__) + + +def __fingerprint(file: str) -> str: + with open(file, "rb", buffering=0) as f: + return hashlib.file_digest(f, 'sha256').hexdigest() + + +def duplicated(path: str) -> bool: + try: + with closing(sqlite3.connect(database=__INDEX_PATH)) as conn: + with closing(conn.cursor()) as cursor: + rows = cursor.execute("select 1 from songs where fingerprint = ?", (__fingerprint(path),)).fetchall() + return True if len(rows) > 0 else False + except sqlite3.OperationalError as e: + logger.error("Database error:", e) + + +def add_track(track_info: TrackInfo, path: str) -> bool: + if duplicated(path): + logger.warning(f"File {path} is duplicated") + return False + + try: + with closing(sqlite3.connect(database=__INDEX_PATH)) as conn: + with closing(conn.cursor()) as cursor: + cursor.execute("insert into songs values (?, ?, ?, ?, ?, ?)", + (track_info.artist, track_info.title, track_info.album.name, track_info.album.released, + path, __fingerprint(path))) + conn.commit() + except sqlite3.OperationalError as e: + logger.error("Database error:", e) + + return True diff --git a/service/provider.py b/service/provider.py index 2a557ac..4ab42a9 100644 --- a/service/provider.py +++ b/service/provider.py @@ -1,6 +1,8 @@ import itertools import json import logging +import os.path +import re from typing import Any from api import shazam @@ -9,11 +11,16 @@ from models.track import TrackInfo, Album logger = logging.getLogger(__name__) +class SongRecognizeError(Exception): + pass + + async def __extract_data(data: dict[str, Any]) -> dict[str, Any]: - def album_field(x: str, lst: list) -> str | int: + def album_field(x: str, lst: list) -> str | int | None: for elem in lst: if x == elem["title"]: return elem["text"] + return None def seek_track(x: str, metadata: dict[str, Any]) -> dict[str, Any]: tts = metadata["data"][0]["relationships"]["tracks"]["data"] @@ -22,28 +29,47 @@ async def __extract_data(data: dict[str, Any]) -> dict[str, Any]: return t["attributes"] return {} + def sanitize(s: str) -> str: + return re.sub(f'{os.path.sep}', repl='-', string=s) + logger.debug(json.dumps(data, indent=2)) track_data = data["track"] track_attrs = list(itertools.chain(*[x["metadata"] for x in track_data["sections"] if "metadata" in x])) - album_attrs = seek_track( - x=track_data["title"], - metadata=await shazam.album(album_id=int(track_data['albumadamid'])) - ) + album_id = track_data.get('albumadamid') + if album_id: + album_attrs = seek_track( + x=track_data["title"], + metadata=await shazam.album(album_id=int(album_id)) + ) - return { - "title": track_data["title"], - "artist": track_data["subtitle"], - "track_number": album_attrs["trackNumber"], - "album": { - "id": int(track_data['albumadamid']), - "name": album_field("Album", track_attrs), - "released": album_field("Released", track_attrs) + return { + "title": sanitize(track_data["title"]), + "artist": sanitize(track_data["subtitle"]), + "track_number": album_attrs["trackNumber"], + "album": { + "id": int(album_id), + "name": sanitize(album_field("Album", track_attrs)), + "released": album_field("Released", track_attrs) + } + } + else: + return { + "title": sanitize(track_data["title"]), + "artist": sanitize(track_data["subtitle"]), + "track_number": 0, + "album": { + "id": 0, + "name": "Unknown", + "released": 0 + } } - } async def search_song(song_file: str) -> TrackInfo: raw_data: dict[str, Any] = await shazam.recognize(song_file) + if "track" not in raw_data: + raise SongRecognizeError(f"Not found {song_file}") + track_data = await __extract_data(data=raw_data) album_data = track_data["album"] return TrackInfo(