Skip to content

Commit

Permalink
optimize cache update by removing locks in hot path
Browse files Browse the repository at this point in the history
  • Loading branch information
azuline committed Oct 30, 2023
1 parent 23b39a0 commit 8b4d331
Showing 1 changed file with 116 additions and 118 deletions.
234 changes: 116 additions & 118 deletions rose/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,19 +392,19 @@ def _update_cache_for_releases_executor(
# First, call readdir on every release directory. We store the results in a map of
# Path Basename -> (Release ID if exists, filenames).
dir_scan_start = time.time()
dir_tree: list[tuple[Path, str | None, list[str]]] = []
dir_tree: list[tuple[Path, str | None, list[Path]]] = []
release_uuids: list[str] = []
for rd in release_dirs:
release_id = None
files: list[str] = []
files: list[Path] = []
if not rd.is_dir():
logger.debug(f"Skipping scanning {rd} because it is not a directory")
continue
for root, _, fx in os.walk(str(rd)):
for f in fx:
if m := STORED_DATA_FILE_REGEX.match(f):
for root, _, subfiles in os.walk(str(rd)):
for sf in subfiles:
if m := STORED_DATA_FILE_REGEX.match(sf):
release_id = m[1]
files.append(os.path.join(root, f))
files.append(Path(root) / sf)
dir_tree.append((rd.resolve(), release_id, files))
if release_id is not None:
release_uuids.append(release_id)
Expand Down Expand Up @@ -577,19 +577,17 @@ def _update_cache_for_releases_executor(
# Check to see if we should even process the directory. If the directory does not have
# any tracks, skip it. And if it does not have any tracks, but is in the cache, remove
# it from the cache.
try:
first_audio_file = Path(
next(
f
for f in files
if any(f.lower().endswith(ext) for ext in SUPPORTED_AUDIO_EXTENSIONS)
)
)
except StopIteration:
first_audio_file: Path | None = None
for f in files:
if f.suffix.lower() in SUPPORTED_AUDIO_EXTENSIONS:
first_audio_file = f
break
else:
logger.debug(f"Did not find any audio files in release {source_path}, skipping")
logger.debug(f"Scheduling cache deletion for empty directory release {source_path}")
upd_delete_source_paths.append(str(source_path))
continue
assert first_audio_file is not None

# This value is used to track whether to update the database for this release. If this
# is False at the end of this loop body, we can save a database update call.
Expand Down Expand Up @@ -677,37 +675,34 @@ def _update_cache_for_releases_executor(
logger.debug(f"Datafile changed for release {source_path}, updating")
release_dirty = True
release.datafile_mtime = datafile_mtime
with lock(c, release_lock_name(preexisting_release_id)):
with datafile_path.open("rb") as fp:
diskdata = tomllib.load(fp)
datafile = StoredDataFile(
new=diskdata.get("new", True),
added_at=diskdata.get(
"added_at",
datetime.now().astimezone().replace(microsecond=0).isoformat(),
),
)
release.new = datafile.new
release.added_at = datafile.added_at
# For performance reasons (!!), don't acquire a lock here. However, acquire a lock
# if we are to write to the file. We won't worry about lost writes here.
with datafile_path.open("rb") as fp:
diskdata = tomllib.load(fp)
datafile = StoredDataFile(
new=diskdata.get("new", True),
added_at=diskdata.get(
"added_at",
datetime.now().astimezone().replace(microsecond=0).isoformat(),
),
)
release.new = datafile.new
release.added_at = datafile.added_at
new_resolved_data = asdict(datafile)
logger.debug(f"Updating values in stored data file for release {source_path}")
if new_resolved_data != diskdata:
# And then write the data back to disk if it changed. This allows us to update
# datafiles to contain newer default values.
new_resolved_data = asdict(datafile)
if new_resolved_data != diskdata:
logger.debug(
f"Updating values in stored data file for release {source_path}"
)
with datafile_path.open("wb") as fp:
tomli_w.dump(new_resolved_data, fp)
lockname = release_lock_name(preexisting_release_id)
with lock(c, lockname), datafile_path.open("wb") as fp:
tomli_w.dump(new_resolved_data, fp)

# Handle cover art change.
try:
cover = next(
Path(f).resolve()
for f in files
if os.path.basename(f).lower() in c.valid_cover_arts
)
except StopIteration: # No cover art in directory.
cover = None
cover = None
for f in files:
if f.name.lower() in c.valid_cover_arts:
cover = f
break
if cover != release.cover_image_path:
logger.debug(f"Cover art file for release {source_path} updated to path {cover}")
release.cover_image_path = cover
Expand Down Expand Up @@ -737,14 +732,12 @@ def _update_cache_for_releases_executor(
# tags.
pulled_release_tags = False
for f in files:
if not any(
os.path.basename(f).lower().endswith(ext) for ext in SUPPORTED_AUDIO_EXTENSIONS
):
if f.suffix.lower() not in SUPPORTED_AUDIO_EXTENSIONS:
continue

cached_track = cached_tracks.get(f, None)
cached_track = cached_tracks.get(str(f), None)
with contextlib.suppress(KeyError):
unknown_cached_tracks.remove(f)
unknown_cached_tracks.remove(str(f))

track_mtime = str(os.stat(f).st_mtime)
# Skip re-read if we can reuse a cached entry.
Expand Down Expand Up @@ -870,10 +863,12 @@ def _update_cache_for_releases_executor(
# a directory being written file-by-file and being processed in a half-written state.
track_id = tags.id
if not track_id or not tags.release_id:
with lock(c, release_lock_name(release.id)):
tags.id = tags.id or str(uuid6.uuid7())
tags.release_id = release.id
tags.flush()
# This is our first time reading this track in the system, so no cocurrent processes
# should be reading/writing this file. We can avoid locking. And If we have two
# concurrent first-time cache updates, other places will have issues too.
tags.id = tags.id or str(uuid6.uuid7())
tags.release_id = release.id
tags.flush()
# And refresh the mtime because we've just written to the file.
track_id = tags.id
track_mtime = str(os.stat(f).st_mtime)
Expand Down Expand Up @@ -910,51 +905,53 @@ def _update_cache_for_releases_executor(

# Now calculate whether this release is multidisc, and then assign virtual_filenames and
# formatted_release_positions for each track that lacks one.
multidisc = len({t.disc_number for t in tracks}) > 1
if release.multidisc != multidisc:
logger.debug(f"Release multidisc change detected for {source_path}, updating")
release_dirty = True
release.multidisc = multidisc
# Use this set to avoid name collisions.
seen_track_names: set[str] = set()
for i, t in enumerate(tracks):
formatted_release_position = ""
if multidisc and t.disc_number:
formatted_release_position += f"{t.disc_number:0>2}-"
if t.track_number:
formatted_release_position += f"{t.track_number:0>2}"
if formatted_release_position != t.formatted_release_position:
logger.debug(
f"Track formatted release position change detected for {t.source_path}, "
"updating"
)
tracks[i].formatted_release_position = formatted_release_position
track_ids_to_insert.add(t.id)

virtual_filename = ""
virtual_filename += f"{t.formatted_artists} - "
virtual_filename += t.title or "Unknown Title"
virtual_filename += t.source_path.suffix
virtual_filename = _sanitize_filename(virtual_filename)
# And in case of a name collision, add an extra number at the end. Iterate to find
# the first unused number.
original_virtual_filename = virtual_filename
collision_no = 2
while True:
if virtual_filename not in seen_track_names:
break
# Write the collision number before the file extension.
povf = Path(original_virtual_filename)
virtual_filename = f"{povf.stem} [{collision_no}]{povf.suffix}"
collision_no += 1
seen_track_names.add(virtual_filename)
if virtual_filename != t.virtual_filename:
logger.debug(
f"Track virtual filename change detected for {t.source_path}, updating"
)
tracks[i].virtual_filename = virtual_filename
track_ids_to_insert.add(t.id)
upd_playlist_track_filenames[t.id] = virtual_filename
# Only recompute this if any tracks have changed. Otherwise, save CPU cycles.
if track_ids_to_insert:
multidisc = len({t.disc_number for t in tracks}) > 1
if release.multidisc != multidisc:
logger.debug(f"Release multidisc change detected for {source_path}, updating")
release_dirty = True
release.multidisc = multidisc
# Use this set to avoid name collisions.
seen_track_names: set[str] = set()
for i, t in enumerate(tracks):
formatted_release_position = ""
if release.multidisc and t.disc_number:
formatted_release_position += f"{t.disc_number:0>2}-"
if t.track_number:
formatted_release_position += f"{t.track_number:0>2}"
if formatted_release_position != t.formatted_release_position:
logger.debug(
f"Track formatted release position change detected for {t.source_path}, "
"updating"
)
tracks[i].formatted_release_position = formatted_release_position
track_ids_to_insert.add(t.id)

virtual_filename = ""
virtual_filename += f"{t.formatted_artists} - "
virtual_filename += t.title or "Unknown Title"
virtual_filename += t.source_path.suffix
virtual_filename = _sanitize_filename(virtual_filename)
# And in case of a name collision, add an extra number at the end. Iterate to find
# the first unused number.
original_virtual_filename = virtual_filename
collision_no = 2
while True:
if virtual_filename not in seen_track_names:
break
# Write the collision number before the file extension.
povf = Path(original_virtual_filename)
virtual_filename = f"{povf.stem} [{collision_no}]{povf.suffix}"
collision_no += 1
seen_track_names.add(virtual_filename)
if virtual_filename != t.virtual_filename:
logger.debug(
f"Track virtual filename change detected for {t.source_path}, updating"
)
tracks[i].virtual_filename = virtual_filename
track_ids_to_insert.add(t.id)
upd_playlist_track_filenames[t.id] = virtual_filename

# Schedule database executions.
if unknown_cached_tracks or release_dirty or track_ids_to_insert:
Expand Down Expand Up @@ -991,29 +988,30 @@ def _update_cache_for_releases_executor(
[release.id, art.name, _sanitize_filename(art.name), art.role, art.alias]
)

for track in tracks:
if track.id not in track_ids_to_insert:
continue
logger.debug(f"Scheduling upsert for dirty track in database: {track.source_path}")
upd_track_args.append(
[
track.id,
str(track.source_path),
track.source_mtime,
track.virtual_filename,
track.title,
track.release_id,
track.track_number,
track.disc_number,
track.formatted_release_position,
track.duration_seconds,
track.formatted_artists,
]
)
for art in track.artists:
upd_track_artist_args.append(
[track.id, art.name, _sanitize_filename(art.name), art.role, art.alias]
if track_ids_to_insert:
for track in tracks:
if track.id not in track_ids_to_insert:
continue
logger.debug(f"Scheduling upsert for dirty track in database: {track.source_path}")
upd_track_args.append(
[
track.id,
str(track.source_path),
track.source_mtime,
track.virtual_filename,
track.title,
track.release_id,
track.track_number,
track.disc_number,
track.formatted_release_position,
track.duration_seconds,
track.formatted_artists,
]
)
for art in track.artists:
upd_track_artist_args.append(
[track.id, art.name, _sanitize_filename(art.name), art.role, art.alias]
)
logger.debug(f"Release update scheduling loop time {time.time() - loop_start=}")

exec_start = time.time()
Expand Down

0 comments on commit 8b4d331

Please sign in to comment.