Merge branch 'feature/index'

This commit is contained in:
Fabio Scotto di Santolo
2024-11-01 11:18:02 +01:00
6 changed files with 161 additions and 39 deletions

View File

@@ -1,5 +1,6 @@
import logging import logging
import os import os
import re
from typing import Any from typing import Any
import magic import magic
@@ -16,4 +17,4 @@ def scan_folder(src: str):
def accepted_file_type(file: Any) -> bool: def accepted_file_type(file: Any) -> bool:
mime = magic.from_file(file, mime=True) mime = magic.from_file(file, mime=True)
logger.info(f"MIME {mime} for file {file}") logger.info(f"MIME {mime} for file {file}")
return mime in ('audio/mpeg', 'audio/mp3') return re.search('audio/*', mime) is not None

51
main.py
View File

@@ -7,7 +7,9 @@ from pathlib import Path
from typing import Any from typing import Any
from files.util import scan_folder, accepted_file_type from files.util import scan_folder, accepted_file_type
from service.provider import search_song from models.track import TrackInfo
from service import index
from service.provider import search_song, SongRecognizeError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -16,29 +18,42 @@ def arg_parser():
parser = argparse.ArgumentParser(description="Organize music from source folder to destination folder using Shazam") parser = argparse.ArgumentParser(description="Organize music from source folder to destination folder using Shazam")
parser.add_argument("--src", "-s", metavar="SRC", type=str, help="source folder to scan") parser.add_argument("--src", "-s", metavar="SRC", type=str, help="source folder to scan")
parser.add_argument("--dest", "-d", metavar="DEST", type=str, help="where it copy files") parser.add_argument("--dest", "-d", metavar="DEST", type=str, help="where it copy files")
parser.add_argument("--no-index", "-i", type=bool, default=False, help="disable music indexed")
return parser.parse_args() return parser.parse_args()
async def main(src, dst) -> Any: async def main(options) -> Any:
for song_file in scan_folder(src): for song_file in scan_folder(options.src):
if accepted_file_type(song_file): if not accepted_file_type(song_file):
print(f"Found file {song_file}") logger.info(f"Skipped file {song_file}")
track_info = await search_song(song_file)
logging.info(f"Recognize file {song_file} as {track_info}")
album_path: str = os.path.join(dst, track_info.artist,
f"{track_info.album.released} - {track_info.album.name}")
if not os.path.exists(album_path):
Path(album_path).mkdir(mode=0o755, parents=True, exist_ok=True)
# I can copy to file now
shutil.copy(song_file,
os.path.join(album_path, f"{track_info.track_number} - {track_info.title}.mp3"))
else: else:
print(f"Skipped file {song_file}") logger.info(f"Found file {song_file}")
try:
if index.duplicated(song_file):
logger.info(f"{song_file} already collected")
continue
track_info: TrackInfo = await search_song(song_file)
logging.info(f"Recognize file {song_file}:\n{track_info}")
album_path: str = os.path.join(options.dest, track_info.artist,
f"{track_info.album.released} - {track_info.album.name}")
if not os.path.exists(album_path):
Path(album_path).mkdir(mode=0o755, parents=True, exist_ok=True)
destination_path: str = os.path.join(album_path, f"{track_info.track_number} - {track_info.title}.mp3")
logger.debug(f"Copy {os.path.basename(song_file)} in {destination_path}")
shutil.copy(song_file, destination_path)
if not options.no_index and index.add_track(track_info=track_info, path=destination_path):
logger.info(f"Added {destination_path} to index")
except SongRecognizeError as e:
logger.error(f"Error: {e}")
Path(f"{options.dest}/Unknown").mkdir(mode=0o755, parents=True, exist_ok=True)
shutil.copy(song_file, os.path.join(f"{options.dest}/Unknown/{os.path.basename(song_file)}"))
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(filename="music.log", level=logging.DEBUG) logging.basicConfig(filename="/dev/stdout", level=logging.DEBUG)
args = arg_parser() args = arg_parser()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.run_until_complete(main(args.src, args.dest)) loop.run_until_complete(main(args))

View File

@@ -17,8 +17,7 @@ class Album:
return self.__released return self.__released
def __str__(self): def __str__(self):
return f"""Name: {self.__name} return f"{self.__released} - {self.__name}"
Released: {self.__released}"""
class TrackInfo: class TrackInfo:
@@ -45,7 +44,9 @@ class TrackInfo:
return self.__number return self.__number
def __str__(self) -> str: def __str__(self) -> str:
return f"""Artist: {self.__artist} return f"""
Title: {self.__title} Artist: {self.__artist}
Album: {self.__album} Title: {self.__title}
Track Number: {self.__number}""" Album: {self.__album}
Track Number: {self.__number}
"""

View File

@@ -0,0 +1,37 @@
import logging
import os
import sqlite3
from contextlib import closing
from pathlib import Path
logger = logging.getLogger(__name__)
__CACHE_PATH: str = f"{os.path.expandvars("$HOME")}/.cache/morg"
__INDEX_PATH: str = f"{__CACHE_PATH}/index.db"
def initialize():
print("Initialize database")
if not os.path.exists(__CACHE_PATH):
Path(__CACHE_PATH).mkdir(mode=0o755, parents=True, exist_ok=True)
# Initialize database tables if not exists
try:
with closing(sqlite3.connect(__INDEX_PATH)) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute("""
create table if not exists songs (
artist TEXT,
title TEXT,
album TEXT,
released INT,
path TEXT,
fingerprint TEXT
)
""")
except sqlite3.OperationalError as e:
print("Error: ", e)
logger.error("Error: ", e)
initialize()

42
service/index.py Normal file
View File

@@ -0,0 +1,42 @@
import hashlib
import logging
import sqlite3
from contextlib import closing
from models.track import TrackInfo
from service import __INDEX_PATH
logger = logging.getLogger(__name__)
def __fingerprint(file: str) -> str:
with open(file, "rb", buffering=0) as f:
return hashlib.file_digest(f, 'sha256').hexdigest()
def duplicated(path: str) -> bool:
try:
with closing(sqlite3.connect(database=__INDEX_PATH)) as conn:
with closing(conn.cursor()) as cursor:
rows = cursor.execute("select 1 from songs where fingerprint = ?", (__fingerprint(path),)).fetchall()
return True if len(rows) > 0 else False
except sqlite3.OperationalError as e:
logger.error("Database error:", e)
def add_track(track_info: TrackInfo, path: str) -> bool:
if duplicated(path):
logger.warning(f"File {path} is duplicated")
return False
try:
with closing(sqlite3.connect(database=__INDEX_PATH)) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute("insert into songs values (?, ?, ?, ?, ?, ?)",
(track_info.artist, track_info.title, track_info.album.name, track_info.album.released,
path, __fingerprint(path)))
conn.commit()
except sqlite3.OperationalError as e:
logger.error("Database error:", e)
return True

View File

@@ -1,6 +1,8 @@
import itertools import itertools
import json import json
import logging import logging
import os.path
import re
from typing import Any from typing import Any
from api import shazam from api import shazam
@@ -9,11 +11,16 @@ from models.track import TrackInfo, Album
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SongRecognizeError(Exception):
pass
async def __extract_data(data: dict[str, Any]) -> dict[str, Any]: async def __extract_data(data: dict[str, Any]) -> dict[str, Any]:
def album_field(x: str, lst: list) -> str | int: def album_field(x: str, lst: list) -> str | int | None:
for elem in lst: for elem in lst:
if x == elem["title"]: if x == elem["title"]:
return elem["text"] return elem["text"]
return None
def seek_track(x: str, metadata: dict[str, Any]) -> dict[str, Any]: def seek_track(x: str, metadata: dict[str, Any]) -> dict[str, Any]:
tts = metadata["data"][0]["relationships"]["tracks"]["data"] tts = metadata["data"][0]["relationships"]["tracks"]["data"]
@@ -22,28 +29,47 @@ async def __extract_data(data: dict[str, Any]) -> dict[str, Any]:
return t["attributes"] return t["attributes"]
return {} return {}
def sanitize(s: str) -> str:
return re.sub(f'{os.path.sep}', repl='-', string=s)
logger.debug(json.dumps(data, indent=2)) logger.debug(json.dumps(data, indent=2))
track_data = data["track"] track_data = data["track"]
track_attrs = list(itertools.chain(*[x["metadata"] for x in track_data["sections"] if "metadata" in x])) track_attrs = list(itertools.chain(*[x["metadata"] for x in track_data["sections"] if "metadata" in x]))
album_attrs = seek_track( album_id = track_data.get('albumadamid')
x=track_data["title"], if album_id:
metadata=await shazam.album(album_id=int(track_data['albumadamid'])) album_attrs = seek_track(
) x=track_data["title"],
metadata=await shazam.album(album_id=int(album_id))
)
return { return {
"title": track_data["title"], "title": sanitize(track_data["title"]),
"artist": track_data["subtitle"], "artist": sanitize(track_data["subtitle"]),
"track_number": album_attrs["trackNumber"], "track_number": album_attrs["trackNumber"],
"album": { "album": {
"id": int(track_data['albumadamid']), "id": int(album_id),
"name": album_field("Album", track_attrs), "name": sanitize(album_field("Album", track_attrs)),
"released": album_field("Released", track_attrs) "released": album_field("Released", track_attrs)
}
}
else:
return {
"title": sanitize(track_data["title"]),
"artist": sanitize(track_data["subtitle"]),
"track_number": 0,
"album": {
"id": 0,
"name": "Unknown",
"released": 0
}
} }
}
async def search_song(song_file: str) -> TrackInfo: async def search_song(song_file: str) -> TrackInfo:
raw_data: dict[str, Any] = await shazam.recognize(song_file) raw_data: dict[str, Any] = await shazam.recognize(song_file)
if "track" not in raw_data:
raise SongRecognizeError(f"Not found {song_file}")
track_data = await __extract_data(data=raw_data) track_data = await __extract_data(data=raw_data)
album_data = track_data["album"] album_data = track_data["album"]
return TrackInfo( return TrackInfo(