Added file's index
This commit is contained in:
@@ -0,0 +1,37 @@
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__CACHE_PATH: str = f"{os.path.expandvars("$HOME")}/.cache/morg"
|
||||
__INDEX_PATH: str = f"{__CACHE_PATH}/index.db"
|
||||
|
||||
|
||||
def initialize():
|
||||
print("Initialize database")
|
||||
if not os.path.exists(__CACHE_PATH):
|
||||
Path(__CACHE_PATH).mkdir(mode=0o755, parents=True, exist_ok=True)
|
||||
|
||||
# Initialize database tables if not exists
|
||||
try:
|
||||
with closing(sqlite3.connect(__INDEX_PATH)) as conn:
|
||||
with closing(conn.cursor()) as cursor:
|
||||
cursor.execute("""
|
||||
create table if not exists songs (
|
||||
artist TEXT,
|
||||
title TEXT,
|
||||
album TEXT,
|
||||
released INT,
|
||||
path TEXT,
|
||||
fingerprint TEXT
|
||||
)
|
||||
""")
|
||||
except sqlite3.OperationalError as e:
|
||||
print("Error: ", e)
|
||||
logger.error("Error: ", e)
|
||||
|
||||
|
||||
initialize()
|
||||
|
||||
42
service/index.py
Normal file
42
service/index.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import hashlib
|
||||
import logging
|
||||
import sqlite3
|
||||
from contextlib import closing
|
||||
|
||||
from models.track import TrackInfo
|
||||
from service import __INDEX_PATH
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def __fingerprint(file: str) -> str:
|
||||
with open(file, "rb", buffering=0) as f:
|
||||
return hashlib.file_digest(f, 'sha256').hexdigest()
|
||||
|
||||
|
||||
def duplicated(path: str) -> bool:
|
||||
try:
|
||||
with closing(sqlite3.connect(database=__INDEX_PATH)) as conn:
|
||||
with closing(conn.cursor()) as cursor:
|
||||
rows = cursor.execute("select 1 from songs where fingerprint = ?", (__fingerprint(path),)).fetchall()
|
||||
return True if len(rows) > 0 else False
|
||||
except sqlite3.OperationalError as e:
|
||||
logger.error("Database error:", e)
|
||||
|
||||
|
||||
def add_track(track_info: TrackInfo, path: str) -> bool:
|
||||
if duplicated(path):
|
||||
logger.warning(f"File {path} is duplicated")
|
||||
return False
|
||||
|
||||
try:
|
||||
with closing(sqlite3.connect(database=__INDEX_PATH)) as conn:
|
||||
with closing(conn.cursor()) as cursor:
|
||||
cursor.execute("insert into songs values (?, ?, ?, ?, ?, ?)",
|
||||
(track_info.artist, track_info.title, track_info.album.name, track_info.album.released,
|
||||
path, __fingerprint(path)))
|
||||
conn.commit()
|
||||
except sqlite3.OperationalError as e:
|
||||
logger.error("Database error:", e)
|
||||
|
||||
return True
|
||||
@@ -1,6 +1,8 @@
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import os.path
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from api import shazam
|
||||
@@ -9,11 +11,16 @@ from models.track import TrackInfo, Album
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SongRecognizeError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
async def __extract_data(data: dict[str, Any]) -> dict[str, Any]:
|
||||
def album_field(x: str, lst: list) -> str | int:
|
||||
def album_field(x: str, lst: list) -> str | int | None:
|
||||
for elem in lst:
|
||||
if x == elem["title"]:
|
||||
return elem["text"]
|
||||
return None
|
||||
|
||||
def seek_track(x: str, metadata: dict[str, Any]) -> dict[str, Any]:
|
||||
tts = metadata["data"][0]["relationships"]["tracks"]["data"]
|
||||
@@ -22,28 +29,47 @@ async def __extract_data(data: dict[str, Any]) -> dict[str, Any]:
|
||||
return t["attributes"]
|
||||
return {}
|
||||
|
||||
def sanitize(s: str) -> str:
|
||||
return re.sub(f'{os.path.sep}', repl='-', string=s)
|
||||
|
||||
logger.debug(json.dumps(data, indent=2))
|
||||
track_data = data["track"]
|
||||
track_attrs = list(itertools.chain(*[x["metadata"] for x in track_data["sections"] if "metadata" in x]))
|
||||
album_attrs = seek_track(
|
||||
x=track_data["title"],
|
||||
metadata=await shazam.album(album_id=int(track_data['albumadamid']))
|
||||
)
|
||||
album_id = track_data.get('albumadamid')
|
||||
if album_id:
|
||||
album_attrs = seek_track(
|
||||
x=track_data["title"],
|
||||
metadata=await shazam.album(album_id=int(album_id))
|
||||
)
|
||||
|
||||
return {
|
||||
"title": track_data["title"],
|
||||
"artist": track_data["subtitle"],
|
||||
"track_number": album_attrs["trackNumber"],
|
||||
"album": {
|
||||
"id": int(track_data['albumadamid']),
|
||||
"name": album_field("Album", track_attrs),
|
||||
"released": album_field("Released", track_attrs)
|
||||
return {
|
||||
"title": sanitize(track_data["title"]),
|
||||
"artist": sanitize(track_data["subtitle"]),
|
||||
"track_number": album_attrs["trackNumber"],
|
||||
"album": {
|
||||
"id": int(album_id),
|
||||
"name": sanitize(album_field("Album", track_attrs)),
|
||||
"released": album_field("Released", track_attrs)
|
||||
}
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"title": sanitize(track_data["title"]),
|
||||
"artist": sanitize(track_data["subtitle"]),
|
||||
"track_number": 0,
|
||||
"album": {
|
||||
"id": 0,
|
||||
"name": "Unknown",
|
||||
"released": 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async def search_song(song_file: str) -> TrackInfo:
|
||||
raw_data: dict[str, Any] = await shazam.recognize(song_file)
|
||||
if "track" not in raw_data:
|
||||
raise SongRecognizeError(f"Not found {song_file}")
|
||||
|
||||
track_data = await __extract_data(data=raw_data)
|
||||
album_data = track_data["album"]
|
||||
return TrackInfo(
|
||||
|
||||
Reference in New Issue
Block a user