diff --git a/pkgs/async/CHANGELOG.md b/pkgs/async/CHANGELOG.md index 06ac7d11..244ce15c 100644 --- a/pkgs/async/CHANGELOG.md +++ b/pkgs/async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 2.13.0 + +* Add `FutureGroup.addCancelable()`. + ## 2.12.0 - Require Dart 3.4. diff --git a/pkgs/async/lib/src/future_group.dart b/pkgs/async/lib/src/future_group.dart index daf985d3..bd0783b5 100644 --- a/pkgs/async/lib/src/future_group.dart +++ b/pkgs/async/lib/src/future_group.dart @@ -4,6 +4,12 @@ import 'dart:async'; +import 'cancelable_operation.dart'; + +/// A sentinel object indicating that a member of a [FutureGroup] was canceled +/// rather than completing normally. +const _canceledResult = Object(); + /// A collection of futures waits until all added [Future]s complete. /// /// Futures are added to the group with [add]. Once you're finished adding @@ -61,12 +67,21 @@ class FutureGroup implements Sink> { /// The values emitted by the futures that have been added to the group, in /// the order they were added. /// - /// The slots for futures that haven't completed yet are `null`. - final _values = []; + /// This is type `Object?` rather than `T?` so it can contain + /// [_canceledResult]. The slots for futures that haven't completed yet are + /// `null`. + final _values = []; /// Wait for [task] to complete. @override - void add(Future task) { + void add(Future task) => + addCancelable(CancelableOperation.fromFuture(task)); + + /// Wait for [task] to complete. + /// + /// If [task] is canceled, it's removed from the group without adding a value + /// to [future]. + void addCancelable(CancelableOperation task) { if (_closed) throw StateError('The FutureGroup is closed.'); // Ensure that future values are put into [values] in the same order they're @@ -76,11 +91,11 @@ class FutureGroup implements Sink> { _values.add(null); _pending++; - task.then((value) { + task.valueOrCancellation().then((value) { if (_completer.isCompleted) return null; _pending--; - _values[index] = value; + _values[index] = task.isCanceled ? _canceledResult : value; if (_pending != 0) return null; var onIdleController = _onIdleController; @@ -88,7 +103,10 @@ class FutureGroup implements Sink> { if (!_closed) return null; if (onIdleController != null) onIdleController.close(); - _completer.complete(_values.whereType().toList()); + _completer.complete([ + for (var value in _values) + if (value != _canceledResult && value is T) value + ]); }).catchError((Object error, StackTrace stackTrace) { if (_completer.isCompleted) return null; _completer.completeError(error, stackTrace); diff --git a/pkgs/async/pubspec.yaml b/pkgs/async/pubspec.yaml index be05c855..db2d84f4 100644 --- a/pkgs/async/pubspec.yaml +++ b/pkgs/async/pubspec.yaml @@ -1,5 +1,5 @@ name: async -version: 2.12.0 +version: 2.13.0 description: Utility functions and classes related to the 'dart:async' library. repository: https://github.com/dart-lang/core/tree/main/pkgs/async issue_tracker: https://github.com/dart-lang/core/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aasync diff --git a/pkgs/async/test/future_group_test.dart b/pkgs/async/test/future_group_test.dart index 9729c066..ad0ed074 100644 --- a/pkgs/async/test/future_group_test.dart +++ b/pkgs/async/test/future_group_test.dart @@ -4,6 +4,7 @@ import 'dart:async'; +import 'package:async/src/cancelable_operation.dart'; import 'package:async/src/future_group.dart'; import 'package:test/test.dart'; @@ -92,6 +93,22 @@ void main() { expect(completed, isTrue); }); + test('a canceled operation doesn\'t block completion', () { + var completer1 = Completer(); + var completer2 = CancelableCompleter(); + var completer3 = Completer(); + + futureGroup.add(completer1.future); + futureGroup.addCancelable(completer2.operation); + futureGroup.add(completer3.future); + futureGroup.close(); + + completer3.complete(3); + completer2.operation.cancel(); + completer1.complete(1); + expect(futureGroup.future, completion(equals([1, 3]))); + }); + test('completes to the values of the futures in order of addition', () { var completer1 = Completer(); var completer2 = Completer();