From 012db03e3a3a8fec27239460068720bfed2d17a2 Mon Sep 17 00:00:00 2001 From: JaffaKetchup Date: Wed, 1 Jan 2025 17:38:43 +0000 Subject: [PATCH] Added respect for downloading output streams' status Improved documentation Minor example app improvements --- .../loading_behaviour_selector.dart | 38 ++--- .../components/tiles/unspecified_tile.dart | 15 +- .../src/shared/state/download_provider.dart | 4 +- .../bulk_download/internal/control_cmds.dart | 4 + lib/src/bulk_download/internal/manager.dart | 150 ++++++++++------- .../tile_provider/tile_provider.dart | 25 ++- lib/src/store/download.dart | 151 ++++++++++++------ 7 files changed, 230 insertions(+), 157 deletions(-) diff --git a/example/lib/src/screens/main/secondary_view/contents/home/components/map_configurator/components/loading_behaviour_selector.dart b/example/lib/src/screens/main/secondary_view/contents/home/components/map_configurator/components/loading_behaviour_selector.dart index c80ebe7d..a4b453d4 100644 --- a/example/lib/src/screens/main/secondary_view/contents/home/components/map_configurator/components/loading_behaviour_selector.dart +++ b/example/lib/src/screens/main/secondary_view/contents/home/components/map_configurator/components/loading_behaviour_selector.dart @@ -14,8 +14,17 @@ class LoadingBehaviourSelector extends StatelessWidget { Selector( selector: (context, provider) => provider.loadingStrategy, builder: (context, loadingStrategy, _) => Column( + crossAxisAlignment: CrossAxisAlignment.start, mainAxisSize: MainAxisSize.min, children: [ + Padding( + padding: const EdgeInsets.only(left: 18), + child: Text( + 'Preferred Loading Strategy', + style: Theme.of(context).textTheme.labelMedium, + ), + ), + const SizedBox(height: 4), SizedBox( width: double.infinity, child: SegmentedButton( @@ -28,12 +37,12 @@ class LoadingBehaviourSelector extends StatelessWidget { ButtonSegment( value: BrowseLoadingStrategy.cacheFirst, icon: Icon(Icons.storage_rounded), - label: Text('Cache'), + label: Text('Cache First'), ), ButtonSegment( value: BrowseLoadingStrategy.onlineFirst, icon: Icon(Icons.public_rounded), - label: Text('Network'), + label: Text('Online First'), ), ], selected: {loadingStrategy}, @@ -46,31 +55,6 @@ class LoadingBehaviourSelector extends StatelessWidget { ), ), const SizedBox(height: 6), - /*Selector( - selector: (context, provider) => - provider.behaviourUpdateFromNetwork, - builder: (context, behaviourUpdateFromNetwork, _) => Row( - children: [ - const SizedBox(width: 8), - const Text('Update cache when network used'), - const Spacer(), - Switch.adaptive( - value: cacheBehavior != null && behaviourUpdateFromNetwork, - onChanged: cacheBehavior == null - ? null - : (value) => context - .read() - .behaviourUpdateFromNetwork = value, - thumbIcon: WidgetStateProperty.resolveWith( - (states) => states.contains(WidgetState.selected) - ? const Icon(Icons.edit) - : const Icon(Icons.edit_off), - ), - ), - const SizedBox(width: 8), - ], - ), - ),*/ ], ), ); diff --git a/example/lib/src/screens/main/secondary_view/contents/home/components/stores_list/components/tiles/unspecified_tile.dart b/example/lib/src/screens/main/secondary_view/contents/home/components/stores_list/components/tiles/unspecified_tile.dart index 16f6907f..006d88bb 100644 --- a/example/lib/src/screens/main/secondary_view/contents/home/components/stores_list/components/tiles/unspecified_tile.dart +++ b/example/lib/src/screens/main/secondary_view/contents/home/components/stores_list/components/tiles/unspecified_tile.dart @@ -1,4 +1,5 @@ import 'package:flutter/material.dart'; +import 'package:flutter_map_tile_caching/flutter_map_tile_caching.dart'; import 'package:provider/provider.dart'; import '../../../../../../../../../shared/misc/internal_store_read_write_behaviour.dart'; @@ -21,11 +22,15 @@ class _UnspecifiedTileState extends State { @override Widget build(BuildContext context) { final isAllUnselectedDisabled = context - .select( - (provider) => provider.currentStores['(unspecified)'], - ) - ?.toBrowseStoreStrategy() == - null; + .select( + (p) => p.currentStores['(unspecified)'], + ) + ?.toBrowseStoreStrategy() == + null || + context.select( + (p) => p.loadingStrategy, + ) == + BrowseLoadingStrategy.onlineFirst; return RepaintBoundary( child: Material( diff --git a/example/lib/src/shared/state/download_provider.dart b/example/lib/src/shared/state/download_provider.dart index 81535d64..281a8516 100644 --- a/example/lib/src/shared/state/download_provider.dart +++ b/example/lib/src/shared/state/download_provider.dart @@ -46,12 +46,14 @@ class DownloadingProvider extends ChangeNotifier { _rawTileEventsStream = downloadStreams.tileEvents.asBroadcastStream(); + bool isFirstEvent = true; downloadStreams.downloadProgress.listen( (evt) { // Focus on initial event - if (evt.attemptedTilesCount == 0) { + if (isFirstEvent) { _isFocused = true; focused.complete(); + isFirstEvent = false; } // Update stored value diff --git a/lib/src/bulk_download/internal/control_cmds.dart b/lib/src/bulk_download/internal/control_cmds.dart index 88bb4c95..90cea268 100644 --- a/lib/src/bulk_download/internal/control_cmds.dart +++ b/lib/src/bulk_download/internal/control_cmds.dart @@ -7,4 +7,8 @@ enum _DownloadManagerControlCmd { cancel, resume, pause, + startEmittingDownloadProgress, + stopEmittingDownloadProgress, + startEmittingTileEvents, + stopEmittingTileEvents, } diff --git a/lib/src/bulk_download/internal/manager.dart b/lib/src/bulk_download/internal/manager.dart index c4e77404..6687c95d 100644 --- a/lib/src/bulk_download/internal/manager.dart +++ b/lib/src/bulk_download/internal/manager.dart @@ -174,7 +174,7 @@ Future _downloadManager( // Setup two-way communications with root final rootReceivePort = ReceivePort(); - void sendToMain(Object? m) => input.sendPort.send(m); + void sendToRoot(Object? m) => input.sendPort.send(m); // Setup cancel, pause, and resume handling Iterable> generateThreadPausedStates() => Iterable.generate( @@ -184,48 +184,75 @@ Future _downloadManager( final threadPausedStates = generateThreadPausedStates().toList(); final cancelSignal = Completer(); var pauseResumeSignal = Completer()..complete(); + + // Setup efficient output handling + bool shouldEmitDownloadProgress = false; + bool shouldEmitTileEvents = false; + + // Setup progress report fallback + Timer? fallbackProgressEmitter; + void emitLastDownloadProgressUpdated() => sendToRoot( + lastDownloadProgress = lastDownloadProgress._updateWithoutTile( + elapsedDuration: downloadDuration.elapsed, + tilesPerSecond: getCurrentTPS(registerNewTPS: false), + ), + ); + void restartFallbackProgressEmitter() { + if (input.maxReportInterval case final interval?) { + fallbackProgressEmitter = Timer.periodic( + interval, + (_) => emitLastDownloadProgressUpdated(), + ); + } + emitLastDownloadProgressUpdated(); + } + + // Listen to the root comms port rootReceivePort.listen( (cmd) async { + if (cmd is! _DownloadManagerControlCmd) { + throw UnsupportedError('Recieved unknown control cmd: $cmd'); + } + switch (cmd) { case _DownloadManagerControlCmd.cancel: - try { - cancelSignal.complete(); - // If the signal is already complete, that's fine - // ignore: avoid_catching_errors, empty_catches - } on StateError {} + if (!cancelSignal.isCompleted) cancelSignal.complete(); + // We might recieve it more than once if the root requests cancellation + // whilst we already are cancelling it case _DownloadManagerControlCmd.pause: + if (!pauseResumeSignal.isCompleted) { + // We might recieve it more than once if the root requests pausing + // whilst we already are pausing it + break; + } + pauseResumeSignal = Completer(); threadPausedStates.setAll(0, generateThreadPausedStates()); await Future.wait(threadPausedStates.map((e) => e.future)); + downloadDuration.stop(); - sendToMain(_DownloadManagerControlCmd.pause); + fallbackProgressEmitter?.cancel(); + if (shouldEmitDownloadProgress) emitLastDownloadProgressUpdated(); + + sendToRoot(_DownloadManagerControlCmd.pause); case _DownloadManagerControlCmd.resume: - pauseResumeSignal.complete(); + if (shouldEmitDownloadProgress) restartFallbackProgressEmitter(); downloadDuration.start(); - default: - throw UnimplementedError('Recieved unknown control cmd: $cmd'); + pauseResumeSignal.complete(); + case _DownloadManagerControlCmd.startEmittingDownloadProgress: + shouldEmitDownloadProgress = true; + restartFallbackProgressEmitter(); + case _DownloadManagerControlCmd.stopEmittingDownloadProgress: + shouldEmitDownloadProgress = false; + fallbackProgressEmitter?.cancel(); + case _DownloadManagerControlCmd.startEmittingTileEvents: + shouldEmitTileEvents = true; + case _DownloadManagerControlCmd.stopEmittingTileEvents: + shouldEmitTileEvents = false; } }, ); - // Setup progress report fallback - final fallbackReportTimer = input.maxReportInterval == null - ? null - : Timer.periodic( - input.maxReportInterval!, - (_) { - if (lastDownloadProgress != initialDownloadProgress && - pauseResumeSignal.isCompleted) { - sendToMain( - lastDownloadProgress = lastDownloadProgress._updateWithoutTile( - elapsedDuration: downloadDuration.elapsed, - tilesPerSecond: getCurrentTPS(registerNewTPS: false), - ), - ); - } - }, - ); - // Start recovery system (unless disabled) if (input.recoveryId case final recoveryId?) { await input.backend.initialise(); @@ -252,10 +279,11 @@ Future _downloadManager( final threadBackend = input.backend.duplicate(); // Now it's safe, start accepting communications from the root - sendToMain(rootReceivePort.sendPort); + sendToRoot(rootReceivePort.sendPort); // Send an initial progress report to indicate the start of the download - sendToMain(initialDownloadProgress); + // if (shouldEmitDownloadProgress) sendToRoot(initialDownloadProgress); + // This is done implicitly on listening to the output, so is unnecessary // Start download threads & wait for download to complete/cancelled downloadDuration.start(); @@ -302,8 +330,8 @@ Future _downloadManager( (evt) async { // Thread is sending tile data if (evt is TileEvent) { - // Send event to user - sendToMain(evt); + // Send event to root if necessary + if (shouldEmitTileEvents) sendToRoot(evt); // Queue tiles for retry if failed and not already a retry attempt if (input.retryFailedRequestTiles && @@ -333,40 +361,43 @@ Future _downloadManager( } } - final wasBufferFlushed = - evt is SuccessfulTileEvent && evt._wasBufferFlushed; - - sendToMain( - lastDownloadProgress = lastDownloadProgress._updateWithTile( - bufferedTiles: evt is SuccessfulTileEvent - ? ( - count: threadBuffersTiles.reduce((a, b) => a + b), - size: threadBuffersSize.reduce((a, b) => a + b) / - 1024, - ) - : null, - newTileEvent: evt, - elapsedDuration: downloadDuration.elapsed, - tilesPerSecond: getCurrentTPS(registerNewTPS: true), - ), + // Update download progress and send to root if necessary + lastDownloadProgress = lastDownloadProgress._updateWithTile( + bufferedTiles: evt is SuccessfulTileEvent + ? ( + count: threadBuffersTiles.reduce((a, b) => a + b), + size: + threadBuffersSize.reduce((a, b) => a + b) / 1024, + ) + : null, + newTileEvent: evt, + elapsedDuration: downloadDuration.elapsed, + tilesPerSecond: getCurrentTPS(registerNewTPS: true), ); + if (shouldEmitDownloadProgress) { + sendToRoot(lastDownloadProgress); + } // For efficiency, only update recovery when the buffer is // cleaned // We don't want to update recovery to a tile that isn't cached // (only buffered), because they'll be lost in the events // recovery is designed to recover from - if (wasBufferFlushed) updateRecovery(); + if (evt is SuccessfulTileEvent && evt._wasBufferFlushed) { + updateRecovery(); + } } else { // We do not need to care about buffering, which makes updates // much easier - sendToMain( - lastDownloadProgress = lastDownloadProgress._updateWithTile( - newTileEvent: evt, - elapsedDuration: downloadDuration.elapsed, - tilesPerSecond: getCurrentTPS(registerNewTPS: true), - ), + + lastDownloadProgress = lastDownloadProgress._updateWithTile( + newTileEvent: evt, + elapsedDuration: downloadDuration.elapsed, + tilesPerSecond: getCurrentTPS(registerNewTPS: true), ); + if (shouldEmitDownloadProgress) { + sendToRoot(lastDownloadProgress); + } updateRecovery(); } @@ -433,14 +464,13 @@ Future _downloadManager( // Send final progress update downloadDuration.stop(); - sendToMain( - lastDownloadProgress = lastDownloadProgress._updateToComplete( - elapsedDuration: downloadDuration.elapsed, - ), + lastDownloadProgress = lastDownloadProgress._updateToComplete( + elapsedDuration: downloadDuration.elapsed, ); + if (shouldEmitDownloadProgress) sendToRoot(lastDownloadProgress); // Cleanup resources and shutdown - fallbackReportTimer?.cancel(); + fallbackProgressEmitter?.cancel(); rootReceivePort.close(); if (input.recoveryId != null) await input.backend.uninitialise(); tileIsolate.kill(priority: Isolate.immediate); diff --git a/lib/src/providers/tile_provider/tile_provider.dart b/lib/src/providers/tile_provider/tile_provider.dart index cb41f603..e75a7a7e 100644 --- a/lib/src/providers/tile_provider/tile_provider.dart +++ b/lib/src/providers/tile_provider/tile_provider.dart @@ -176,25 +176,17 @@ class FMTCTileProvider extends TileProvider { /// Method used to create a tile's storage-suitable UID from it's real URL /// + /// For more information, check the + /// [online documentation](https://fmtc.jaffaketchup.dev/basic-usage/integrating-with-a-map#ensure-tiles-are-resilient-to-url-changes). + /// /// The input string is the tile's URL. The output string should be a unique /// string to that tile that will remain as stable as necessary if parts of /// the URL not directly related to the tile image change. /// - /// For more information, see: - /// . - /// /// [urlTransformerOmitKeyValues] may be used as a transformer to omit entire /// key-value pairs from a URL where the key matches one of the specified /// keys. /// - /// > [!IMPORTANT] - /// > The callback will be passed to a different isolate: therefore, avoid - /// > using any external state that may not be properly captured or cannot be - /// > copied to an isolate spawned with [Isolate.spawn] (see [SendPort.send]). - /// - /// _Internally, the storage-suitable UID is usually referred to as the tile - /// URL (with distinction inferred)._ - /// /// By default, the output string is the input string - that is, the /// storage-suitable UID is the tile's real URL. final UrlTransformer? urlTransformer; @@ -419,10 +411,13 @@ class FMTCTileProvider extends TileProvider { return mutableUrl; } - /// If [stores] contains `null`, returns `null`, otherwise returns all - /// non-null names (which cannot be empty) - List? _getSpecifiedStoresOrNull() => - otherStoresStrategy != null ? null : stores.keys.toList(); + // TODO: This does not work correctly. Needs a complex system like writing. + List? _getSpecifiedStoresOrNull() => otherStoresStrategy != null + ? null + : /*stores.keys.toList()*/ stores.entries + .where((e) => e.value != null) + .map((e) => e.key) + .toList(); @override bool operator ==(Object other) => diff --git a/lib/src/store/download.dart b/lib/src/store/download.dart index 73f63b07..3486ad43 100644 --- a/lib/src/store/download.dart +++ b/lib/src/store/download.dart @@ -34,19 +34,10 @@ class StoreDownload { /// Download a specified [DownloadableRegion] in the foreground, with a /// recovery session by default /// - /// > [!TIP] - /// > To count the number of tiles in a region before starting a download, use - /// > [countTiles]. - /// - /// --- - /// - /// Outputs two non-broadcast streams. - /// - /// One emits [DownloadProgress]s which contain stats and info about the whole - /// download. - /// - /// One emits [TileEvent]s which contain info about the most recent tile - /// attempted only. + /// Outputs two non-broadcast streams. One emits [DownloadProgress]s which + /// contain stats and info about the whole download. The other emits + /// [TileEvent]s which contain info about the most recent tile attempted only. + /// They only emit events when listened to. /// /// The first stream (of [DownloadProgress]s) will emit events: /// * once per [TileEvent] emitted on the second stream @@ -57,16 +48,33 @@ class StoreDownload { /// complete and the first tile is being downloaded /// * additionally once at the end of the download after the last tile /// setting some final statistics (such as tiles per second to 0) + /// * additionally when pausing and resuming the download, as well as after + /// listening to the stream + /// + /// The completion/finish of the [DownloadProgress] stream implies the + /// completion of the download, even if the last + /// [DownloadProgress.percentageProgress] is not 100(%). /// - /// Once the stream of [DownloadProgress]s completes/finishes, the download - /// has stopped. + /// The second stream (of [TileEvent]s) will emit events for every tile + /// download attempt. /// - /// Neither output stream respects listen, pause, resume, or cancel events - /// when submitted through the stream subscription. - /// The download will start when this method is invoked, irrespective of - /// whether there are listeners. The download will continue irrespective of - /// listeners. The only control methods are via FMTC's [pause], [resume], and - /// [cancel] methods. + /// > [!IMPORTANT] + /// > + /// > An emitted [TileEvent] may refer to a tile for which an event has been + /// > emitted previously. + /// > + /// > This will be the case when [TileEvent.wasRetryAttempt] is `true`, which + /// > may occur only if [retryFailedRequestTiles] is enabled. + /// + /// Listening, pausing, resuming, or cancelling subscriptions to the output + /// streams will not start, pause, resume, or cancel the download. It will + /// only change the output stream. Not listening to a stream may improve the + /// efficiency of the download a negligible amount. + /// + /// To control the download itself, use [pause], [resume], and [cancel]. + /// + /// The download starts when this method is invoked: it does not wait for + /// listneners. /// /// --- /// @@ -93,7 +101,7 @@ class StoreDownload { /// > currently in the buffer. It will also increase the memory (RAM) /// > required. /// - /// > [!WARNING] + /// > [!IMPORTANT] /// > Skipping sea tiles will not reduce the number of downloads - tiles must /// > be downloaded to be compared against the sample sea tile. It is only /// > designed to reduce the storage capacity consumed. @@ -132,10 +140,24 @@ class StoreDownload { /// /// --- /// - /// For info about [urlTransformer], see [FMTCTileProvider.urlTransformer]. - /// If unspecified, and the [region]'s [DownloadableRegion.options] is an - /// [FMTCTileProvider], will default to that tile provider's `urlTransformer` - /// if specified. Otherwise, will default to the identity function. + /// For info about [urlTransformer], see [FMTCTileProvider.urlTransformer] and + /// the + /// [online documentation](https://fmtc.jaffaketchup.dev/basic-usage/integrating-with-a-map#ensure-tiles-are-resilient-to-url-changes). + /// + /// > [!WARNING] + /// > + /// > The callback will be passed to a different isolate: therefore, avoid + /// > using any external state that may not be properly captured or cannot be + /// > copied to an isolate spawned with [Isolate.spawn] (see [SendPort.send]). + /// > + /// > Ideally, the callback should be state-indepedent. + /// + /// If unspecified, and the [region]'s [DownloadableRegion.options] + /// [TileLayer.tileProvider] is a [FMTCTileProvider] with a defined + /// [FMTCTileProvider.urlTransformer], this will default to that transformer. + /// Otherwise, will default to the identity function. + /// + /// --- /// /// To set additional headers, set it via [TileProvider.headers] when /// constructing the [DownloadableRegion]. @@ -218,10 +240,59 @@ class StoreDownload { : Object.hash(instanceId, DateTime.timestamp().millisecondsSinceEpoch); if (!disableRecovery) FMTCRoot.recovery._downloadsOngoing.add(recoveryId!); + // Prepare send port completer + // We use a completer to ensure that the user's request is met as soon as + // possible and is not dropped if the download has not setup yet + final sendPortCompleter = Completer(); + // Prepare output streams - final tileEventsStreamController = StreamController(); - final downloadProgressStreamController = - StreamController(); + // The statuses of the output streams does not control the download itself, + // but for efficiency, we don't emit events that the user will not hear + // We do not filter in the main thread, for added efficiency, we instead + // make the decision directly at source, so copying between Isolates is + // avoided if unnecessary + // We treat listen & resume and cancel & pause as the same event + final downloadProgressStreamController = StreamController( + onListen: () async => (await sendPortCompleter.future) + .send(_DownloadManagerControlCmd.startEmittingDownloadProgress), + onResume: () async => (await sendPortCompleter.future) + .send(_DownloadManagerControlCmd.startEmittingDownloadProgress), + onPause: () async => (await sendPortCompleter.future) + .send(_DownloadManagerControlCmd.stopEmittingDownloadProgress), + onCancel: () async => (await sendPortCompleter.future) + .send(_DownloadManagerControlCmd.stopEmittingDownloadProgress), + ); + final tileEventsStreamController = StreamController( + onListen: () async => (await sendPortCompleter.future) + .send(_DownloadManagerControlCmd.startEmittingTileEvents), + onResume: () async => (await sendPortCompleter.future) + .send(_DownloadManagerControlCmd.startEmittingTileEvents), + onPause: () async => (await sendPortCompleter.future) + .send(_DownloadManagerControlCmd.stopEmittingTileEvents), + onCancel: () async => (await sendPortCompleter.future) + .send(_DownloadManagerControlCmd.stopEmittingTileEvents), + ); + + // Prepare control mechanisms + final cancelCompleter = Completer(); + Completer? pauseCompleter; + sendPortCompleter.future.then( + (sp) => instance + ..requestCancel = () { + sp.send(_DownloadManagerControlCmd.cancel); + return cancelCompleter.future; + } + ..requestPause = () { + sp.send(_DownloadManagerControlCmd.pause); + // Completed by handler above + return (pauseCompleter = Completer()).future + ..then((_) => instance.isPaused = true); + } + ..requestResume = () { + sp.send(_DownloadManagerControlCmd.resume); + instance.isPaused = false; + }, + ); () async { // Start download thread @@ -247,10 +318,6 @@ class StoreDownload { debugName: '[FMTC] Master Bulk Download Thread', ); - // Setup control mechanisms (completers) - final cancelCompleter = Completer(); - Completer? pauseCompleter; - await for (final evt in receivePort) { // Handle new download progress if (evt is DownloadProgress) { @@ -275,25 +342,11 @@ class StoreDownload { // Setup control mechanisms (senders) if (evt is SendPort) { - instance - ..requestCancel = () { - evt.send(_DownloadManagerControlCmd.cancel); - return cancelCompleter.future; - } - ..requestPause = () { - evt.send(_DownloadManagerControlCmd.pause); - // Completed by handler above - return (pauseCompleter = Completer()).future - ..then((_) => instance.isPaused = true); - } - ..requestResume = () { - evt.send(_DownloadManagerControlCmd.resume); - instance.isPaused = false; - }; + sendPortCompleter.complete(evt); continue; } - throw UnimplementedError('Unrecognised message'); + throw UnsupportedError('Unrecognised message: $evt'); } // Handle shutdown (both normal and cancellation)