88 lines
3.4 KiB
Python
88 lines
3.4 KiB
Python
import argparse
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from files.util import scan_folder, accepted_file_type
|
|
from models.track import TrackInfo
|
|
from service import index
|
|
from service.provider import search_song, SongRecognizeError
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ConcurrentExecutionError(Exception):
|
|
pass
|
|
|
|
|
|
def arg_parser():
|
|
parser = argparse.ArgumentParser(description="Organize music from source folder to destination folder using Shazam")
|
|
parser.add_argument("--src", "-s", metavar="SRC", type=str, help="source folder to scan")
|
|
parser.add_argument("--dest", "-d", metavar="DEST", type=str, help="where it copy files")
|
|
parser.add_argument("--no-index", "-i", type=bool, default=False, help="disable music indexed")
|
|
return parser.parse_args()
|
|
|
|
|
|
def check_lock_file(path: str):
|
|
if not os.path.exists(path):
|
|
with open(path, 'w+'):
|
|
pass
|
|
else:
|
|
raise ConcurrentExecutionError(f"{path} already exists! Another application instance is in execution.")
|
|
|
|
|
|
async def main(options) -> Any:
|
|
src_lock: str = ""
|
|
dest_lock: str = ""
|
|
lock_filename = ".sync_pymorg.lock"
|
|
try:
|
|
src_lock = os.path.join(options.src, lock_filename)
|
|
dest_lock = os.path.join(options.dest, lock_filename)
|
|
check_lock_file(src_lock)
|
|
check_lock_file(dest_lock)
|
|
|
|
for song_file in scan_folder(options.src):
|
|
if not accepted_file_type(song_file):
|
|
logger.info(f"Skipped file {song_file}.")
|
|
else:
|
|
logger.info(f"Found file {song_file}.")
|
|
try:
|
|
if index.duplicated(song_file):
|
|
logger.info(f"{song_file} already collected.")
|
|
continue
|
|
|
|
track_info: TrackInfo = await search_song(song_file)
|
|
logging.info(f"Recognize file {song_file}:\n{track_info}")
|
|
album_path: str = os.path.join(options.dest, track_info.artist,
|
|
f"{track_info.album.released} - {track_info.album.name}")
|
|
if not os.path.exists(album_path):
|
|
Path(album_path).mkdir(mode=0o755, parents=True, exist_ok=True)
|
|
|
|
destination_path: str = os.path.join(album_path,
|
|
f"{track_info.track_number} - {track_info.title}.mp3")
|
|
logger.debug(f"Copy {os.path.basename(song_file)} in {destination_path}")
|
|
shutil.copy(song_file, destination_path)
|
|
if not options.no_index and index.add_track(track_info=track_info, path=destination_path):
|
|
logger.info(f"Added {destination_path} to index")
|
|
|
|
except SongRecognizeError as e:
|
|
logger.error(f"Error: {e}")
|
|
Path(f"{options.dest}/Unknown").mkdir(mode=0o755, parents=True, exist_ok=True)
|
|
shutil.copy(song_file, os.path.join(f"{options.dest}/Unknown/{os.path.basename(song_file)}"))
|
|
finally:
|
|
# Remove lock files
|
|
if os.path.exists(src_lock):
|
|
os.remove(src_lock)
|
|
if os.path.exists(dest_lock):
|
|
os.remove(dest_lock)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(filename="/dev/stdout", level=logging.DEBUG)
|
|
args = arg_parser()
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(main(args))
|