Skip to content

Commit

Permalink
Added respect for downloading output streams' status
Browse files Browse the repository at this point in the history
Improved documentation
Minor example app improvements
  • Loading branch information
JaffaKetchup committed Jan 1, 2025
1 parent ee08199 commit 012db03
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,17 @@ class LoadingBehaviourSelector extends StatelessWidget {
Selector<GeneralProvider, BrowseLoadingStrategy>(
selector: (context, provider) => provider.loadingStrategy,
builder: (context, loadingStrategy, _) => Column(
crossAxisAlignment: CrossAxisAlignment.start,
mainAxisSize: MainAxisSize.min,
children: [
Padding(
padding: const EdgeInsets.only(left: 18),
child: Text(
'Preferred Loading Strategy',
style: Theme.of(context).textTheme.labelMedium,
),
),
const SizedBox(height: 4),
SizedBox(
width: double.infinity,
child: SegmentedButton(
Expand All @@ -28,12 +37,12 @@ class LoadingBehaviourSelector extends StatelessWidget {
ButtonSegment(
value: BrowseLoadingStrategy.cacheFirst,
icon: Icon(Icons.storage_rounded),
label: Text('Cache'),
label: Text('Cache First'),
),
ButtonSegment(
value: BrowseLoadingStrategy.onlineFirst,
icon: Icon(Icons.public_rounded),
label: Text('Network'),
label: Text('Online First'),
),
],
selected: {loadingStrategy},
Expand All @@ -46,31 +55,6 @@ class LoadingBehaviourSelector extends StatelessWidget {
),
),
const SizedBox(height: 6),
/*Selector<GeneralProvider, bool>(
selector: (context, provider) =>
provider.behaviourUpdateFromNetwork,
builder: (context, behaviourUpdateFromNetwork, _) => Row(
children: [
const SizedBox(width: 8),
const Text('Update cache when network used'),
const Spacer(),
Switch.adaptive(
value: cacheBehavior != null && behaviourUpdateFromNetwork,
onChanged: cacheBehavior == null
? null
: (value) => context
.read<GeneralProvider>()
.behaviourUpdateFromNetwork = value,
thumbIcon: WidgetStateProperty.resolveWith(
(states) => states.contains(WidgetState.selected)
? const Icon(Icons.edit)
: const Icon(Icons.edit_off),
),
),
const SizedBox(width: 8),
],
),
),*/
],
),
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'package:flutter/material.dart';
import 'package:flutter_map_tile_caching/flutter_map_tile_caching.dart';
import 'package:provider/provider.dart';

import '../../../../../../../../../shared/misc/internal_store_read_write_behaviour.dart';
Expand All @@ -21,11 +22,15 @@ class _UnspecifiedTileState extends State<UnspecifiedTile> {
@override
Widget build(BuildContext context) {
final isAllUnselectedDisabled = context
.select<GeneralProvider, InternalBrowseStoreStrategy?>(
(provider) => provider.currentStores['(unspecified)'],
)
?.toBrowseStoreStrategy() ==
null;
.select<GeneralProvider, InternalBrowseStoreStrategy?>(
(p) => p.currentStores['(unspecified)'],
)
?.toBrowseStoreStrategy() ==
null ||
context.select<GeneralProvider, BrowseLoadingStrategy>(
(p) => p.loadingStrategy,
) ==
BrowseLoadingStrategy.onlineFirst;

return RepaintBoundary(
child: Material(
Expand Down
4 changes: 3 additions & 1 deletion example/lib/src/shared/state/download_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class DownloadingProvider extends ChangeNotifier {

_rawTileEventsStream = downloadStreams.tileEvents.asBroadcastStream();

bool isFirstEvent = true;
downloadStreams.downloadProgress.listen(
(evt) {
// Focus on initial event
if (evt.attemptedTilesCount == 0) {
if (isFirstEvent) {
_isFocused = true;
focused.complete();
isFirstEvent = false;
}

// Update stored value
Expand Down
4 changes: 4 additions & 0 deletions lib/src/bulk_download/internal/control_cmds.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ enum _DownloadManagerControlCmd {
cancel,
resume,
pause,
startEmittingDownloadProgress,
stopEmittingDownloadProgress,
startEmittingTileEvents,
stopEmittingTileEvents,
}
150 changes: 90 additions & 60 deletions lib/src/bulk_download/internal/manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Future<void> _downloadManager(

// Setup two-way communications with root
final rootReceivePort = ReceivePort();
void sendToMain(Object? m) => input.sendPort.send(m);
void sendToRoot(Object? m) => input.sendPort.send(m);

// Setup cancel, pause, and resume handling
Iterable<Completer<void>> generateThreadPausedStates() => Iterable.generate(
Expand All @@ -184,48 +184,75 @@ Future<void> _downloadManager(
final threadPausedStates = generateThreadPausedStates().toList();
final cancelSignal = Completer<void>();
var pauseResumeSignal = Completer<void>()..complete();

// Setup efficient output handling
bool shouldEmitDownloadProgress = false;
bool shouldEmitTileEvents = false;

// Setup progress report fallback
Timer? fallbackProgressEmitter;
void emitLastDownloadProgressUpdated() => sendToRoot(
lastDownloadProgress = lastDownloadProgress._updateWithoutTile(
elapsedDuration: downloadDuration.elapsed,
tilesPerSecond: getCurrentTPS(registerNewTPS: false),
),
);
void restartFallbackProgressEmitter() {
if (input.maxReportInterval case final interval?) {
fallbackProgressEmitter = Timer.periodic(
interval,
(_) => emitLastDownloadProgressUpdated(),
);
}
emitLastDownloadProgressUpdated();
}

// Listen to the root comms port
rootReceivePort.listen(
(cmd) async {
if (cmd is! _DownloadManagerControlCmd) {
throw UnsupportedError('Recieved unknown control cmd: $cmd');
}

switch (cmd) {
case _DownloadManagerControlCmd.cancel:
try {
cancelSignal.complete();
// If the signal is already complete, that's fine
// ignore: avoid_catching_errors, empty_catches
} on StateError {}
if (!cancelSignal.isCompleted) cancelSignal.complete();
// We might recieve it more than once if the root requests cancellation
// whilst we already are cancelling it
case _DownloadManagerControlCmd.pause:
if (!pauseResumeSignal.isCompleted) {
// We might recieve it more than once if the root requests pausing
// whilst we already are pausing it
break;
}

pauseResumeSignal = Completer<void>();
threadPausedStates.setAll(0, generateThreadPausedStates());
await Future.wait(threadPausedStates.map((e) => e.future));

downloadDuration.stop();
sendToMain(_DownloadManagerControlCmd.pause);
fallbackProgressEmitter?.cancel();
if (shouldEmitDownloadProgress) emitLastDownloadProgressUpdated();

sendToRoot(_DownloadManagerControlCmd.pause);
case _DownloadManagerControlCmd.resume:
pauseResumeSignal.complete();
if (shouldEmitDownloadProgress) restartFallbackProgressEmitter();
downloadDuration.start();
default:
throw UnimplementedError('Recieved unknown control cmd: $cmd');
pauseResumeSignal.complete();
case _DownloadManagerControlCmd.startEmittingDownloadProgress:
shouldEmitDownloadProgress = true;
restartFallbackProgressEmitter();
case _DownloadManagerControlCmd.stopEmittingDownloadProgress:
shouldEmitDownloadProgress = false;
fallbackProgressEmitter?.cancel();
case _DownloadManagerControlCmd.startEmittingTileEvents:
shouldEmitTileEvents = true;
case _DownloadManagerControlCmd.stopEmittingTileEvents:
shouldEmitTileEvents = false;
}
},
);

// Setup progress report fallback
final fallbackReportTimer = input.maxReportInterval == null
? null
: Timer.periodic(
input.maxReportInterval!,
(_) {
if (lastDownloadProgress != initialDownloadProgress &&
pauseResumeSignal.isCompleted) {
sendToMain(
lastDownloadProgress = lastDownloadProgress._updateWithoutTile(
elapsedDuration: downloadDuration.elapsed,
tilesPerSecond: getCurrentTPS(registerNewTPS: false),
),
);
}
},
);

// Start recovery system (unless disabled)
if (input.recoveryId case final recoveryId?) {
await input.backend.initialise();
Expand All @@ -252,10 +279,11 @@ Future<void> _downloadManager(
final threadBackend = input.backend.duplicate();

// Now it's safe, start accepting communications from the root
sendToMain(rootReceivePort.sendPort);
sendToRoot(rootReceivePort.sendPort);

// Send an initial progress report to indicate the start of the download
sendToMain(initialDownloadProgress);
// if (shouldEmitDownloadProgress) sendToRoot(initialDownloadProgress);
// This is done implicitly on listening to the output, so is unnecessary

// Start download threads & wait for download to complete/cancelled
downloadDuration.start();
Expand Down Expand Up @@ -302,8 +330,8 @@ Future<void> _downloadManager(
(evt) async {
// Thread is sending tile data
if (evt is TileEvent) {
// Send event to user
sendToMain(evt);
// Send event to root if necessary
if (shouldEmitTileEvents) sendToRoot(evt);

// Queue tiles for retry if failed and not already a retry attempt
if (input.retryFailedRequestTiles &&
Expand Down Expand Up @@ -333,40 +361,43 @@ Future<void> _downloadManager(
}
}

final wasBufferFlushed =
evt is SuccessfulTileEvent && evt._wasBufferFlushed;

sendToMain(
lastDownloadProgress = lastDownloadProgress._updateWithTile(
bufferedTiles: evt is SuccessfulTileEvent
? (
count: threadBuffersTiles.reduce((a, b) => a + b),
size: threadBuffersSize.reduce((a, b) => a + b) /
1024,
)
: null,
newTileEvent: evt,
elapsedDuration: downloadDuration.elapsed,
tilesPerSecond: getCurrentTPS(registerNewTPS: true),
),
// Update download progress and send to root if necessary
lastDownloadProgress = lastDownloadProgress._updateWithTile(
bufferedTiles: evt is SuccessfulTileEvent
? (
count: threadBuffersTiles.reduce((a, b) => a + b),
size:
threadBuffersSize.reduce((a, b) => a + b) / 1024,
)
: null,
newTileEvent: evt,
elapsedDuration: downloadDuration.elapsed,
tilesPerSecond: getCurrentTPS(registerNewTPS: true),
);
if (shouldEmitDownloadProgress) {
sendToRoot(lastDownloadProgress);
}

// For efficiency, only update recovery when the buffer is
// cleaned
// We don't want to update recovery to a tile that isn't cached
// (only buffered), because they'll be lost in the events
// recovery is designed to recover from
if (wasBufferFlushed) updateRecovery();
if (evt is SuccessfulTileEvent && evt._wasBufferFlushed) {
updateRecovery();
}
} else {
// We do not need to care about buffering, which makes updates
// much easier
sendToMain(
lastDownloadProgress = lastDownloadProgress._updateWithTile(
newTileEvent: evt,
elapsedDuration: downloadDuration.elapsed,
tilesPerSecond: getCurrentTPS(registerNewTPS: true),
),

lastDownloadProgress = lastDownloadProgress._updateWithTile(
newTileEvent: evt,
elapsedDuration: downloadDuration.elapsed,
tilesPerSecond: getCurrentTPS(registerNewTPS: true),
);
if (shouldEmitDownloadProgress) {
sendToRoot(lastDownloadProgress);
}

updateRecovery();
}
Expand Down Expand Up @@ -433,14 +464,13 @@ Future<void> _downloadManager(

// Send final progress update
downloadDuration.stop();
sendToMain(
lastDownloadProgress = lastDownloadProgress._updateToComplete(
elapsedDuration: downloadDuration.elapsed,
),
lastDownloadProgress = lastDownloadProgress._updateToComplete(
elapsedDuration: downloadDuration.elapsed,
);
if (shouldEmitDownloadProgress) sendToRoot(lastDownloadProgress);

// Cleanup resources and shutdown
fallbackReportTimer?.cancel();
fallbackProgressEmitter?.cancel();
rootReceivePort.close();
if (input.recoveryId != null) await input.backend.uninitialise();
tileIsolate.kill(priority: Isolate.immediate);
Expand Down
25 changes: 10 additions & 15 deletions lib/src/providers/tile_provider/tile_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -176,25 +176,17 @@ class FMTCTileProvider extends TileProvider {

/// Method used to create a tile's storage-suitable UID from it's real URL
///
/// For more information, check the
/// [online documentation](https://fmtc.jaffaketchup.dev/basic-usage/integrating-with-a-map#ensure-tiles-are-resilient-to-url-changes).
///
/// The input string is the tile's URL. The output string should be a unique
/// string to that tile that will remain as stable as necessary if parts of
/// the URL not directly related to the tile image change.
///
/// For more information, see:
/// <https://fmtc.jaffaketchup.dev/flutter_map-integration/url-transformer>.
///
/// [urlTransformerOmitKeyValues] may be used as a transformer to omit entire
/// key-value pairs from a URL where the key matches one of the specified
/// keys.
///
/// > [!IMPORTANT]
/// > The callback will be passed to a different isolate: therefore, avoid
/// > using any external state that may not be properly captured or cannot be
/// > copied to an isolate spawned with [Isolate.spawn] (see [SendPort.send]).
///
/// _Internally, the storage-suitable UID is usually referred to as the tile
/// URL (with distinction inferred)._
///
/// By default, the output string is the input string - that is, the
/// storage-suitable UID is the tile's real URL.
final UrlTransformer? urlTransformer;
Expand Down Expand Up @@ -419,10 +411,13 @@ class FMTCTileProvider extends TileProvider {
return mutableUrl;
}

/// If [stores] contains `null`, returns `null`, otherwise returns all
/// non-null names (which cannot be empty)
List<String>? _getSpecifiedStoresOrNull() =>
otherStoresStrategy != null ? null : stores.keys.toList();
// TODO: This does not work correctly. Needs a complex system like writing.
List<String>? _getSpecifiedStoresOrNull() => otherStoresStrategy != null
? null
: /*stores.keys.toList()*/ stores.entries
.where((e) => e.value != null)
.map((e) => e.key)
.toList();

@override
bool operator ==(Object other) =>
Expand Down
Loading

0 comments on commit 012db03

Please sign in to comment.