Skip to content

Commit

Permalink
Bump dart_flutter_team lints to latest across packages (#1679)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevmoo authored Dec 14, 2024
1 parent febccb9 commit 71c9cc1
Show file tree
Hide file tree
Showing 22 changed files with 360 additions and 268 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/bazel_worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ jobs:
- uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94
with:
sdk: ${{ matrix.sdk }}
- run: dart pub get
- run: "dart format --output=none --set-exit-if-changed ."
if: ${{ matrix.sdk == dev }}
- name: Test
run: ./tool/travis.sh
19 changes: 11 additions & 8 deletions pkgs/bazel_worker/benchmark/benchmark.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ Future<void> main() async {
var path = 'blaze-bin/some/path/to/a/file/that/is/an/input/$i';
workRequest
..arguments.add('--input=$path')
..inputs.add(Input(
path: '',
digest: List.filled(70, 0x11),
));
..inputs.add(Input(path: '', digest: List.filled(70, 0x11)));
}

// Serialize it.
Expand All @@ -24,14 +21,20 @@ Future<void> main() async {
print('Request has $length requestBytes.');

// Add the length in front base 128 encoded as in the worker protocol.
requestBytes =
Uint8List.fromList(requestBytes.toList()..insertAll(0, _varInt(length)));
requestBytes = Uint8List.fromList(
requestBytes.toList()..insertAll(0, _varInt(length)),
);

// Split into 10000 byte chunks.
var lists = <Uint8List>[];
for (var i = 0; i < requestBytes.length; i += 10000) {
lists.add(Uint8List.sublistView(
requestBytes, i, min(i + 10000, requestBytes.length)));
lists.add(
Uint8List.sublistView(
requestBytes,
i,
min(i + 10000, requestBytes.length),
),
);
}

// Time `AsyncMessageGrouper` and deserialization.
Expand Down
5 changes: 4 additions & 1 deletion pkgs/bazel_worker/e2e_test/bin/async_worker_in_isolate.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import 'package:e2e_test/forwards_to_isolate_async_worker.dart';
Future main(List<String> args, [SendPort? message]) async {
var receivePort = ReceivePort();
await Isolate.spawnUri(
Uri.file('async_worker.dart'), [], receivePort.sendPort);
Uri.file('async_worker.dart'),
[],
receivePort.sendPort,
);

var worker = await ForwardsToIsolateAsyncWorker.create(receivePort);
await worker.run();
Expand Down
5 changes: 1 addition & 4 deletions pkgs/bazel_worker/e2e_test/lib/async_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ class ExampleAsyncWorker extends AsyncWorkerLoop {

@override
Future<WorkResponse> performRequest(WorkRequest request) async {
return WorkResponse(
exitCode: 0,
output: request.arguments.join('\n'),
);
return WorkResponse(exitCode: 0, output: request.arguments.join('\n'));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ class ForwardsToIsolateAsyncWorker extends AsyncWorkerLoop {
final IsolateDriverConnection _isolateDriverConnection;

static Future<ForwardsToIsolateAsyncWorker> create(
ReceivePort receivePort) async {
ReceivePort receivePort,
) async {
return ForwardsToIsolateAsyncWorker(
await IsolateDriverConnection.create(receivePort));
await IsolateDriverConnection.create(receivePort),
);
}

ForwardsToIsolateAsyncWorker(this._isolateDriverConnection);
Expand Down
2 changes: 1 addition & 1 deletion pkgs/bazel_worker/e2e_test/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ dependencies:

dev_dependencies:
cli_util: ^0.4.2
dart_flutter_team_lints: ^1.0.0
dart_flutter_team_lints: ^3.0.0
path: ^1.8.0
test: ^1.16.0
18 changes: 11 additions & 7 deletions pkgs/bazel_worker/e2e_test/test/e2e_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ import 'package:test/test.dart';

void main() {
var dart = p.join(sdkPath, 'bin', 'dart');
runE2eTestForWorker('sync worker',
() => Process.start(dart, [p.join('bin', 'sync_worker.dart')]));
runE2eTestForWorker('async worker',
() => Process.start(dart, [p.join('bin', 'async_worker.dart')]));
runE2eTestForWorker(
'async worker in isolate',
() =>
Process.start(dart, [p.join('bin', 'async_worker_in_isolate.dart')]));
'sync worker',
() => Process.start(dart, [p.join('bin', 'sync_worker.dart')]),
);
runE2eTestForWorker(
'async worker',
() => Process.start(dart, [p.join('bin', 'async_worker.dart')]),
);
runE2eTestForWorker(
'async worker in isolate',
() => Process.start(dart, [p.join('bin', 'async_worker_in_isolate.dart')]),
);
}

void runE2eTestForWorker(String groupName, SpawnWorker spawnWorker) {
Expand Down
12 changes: 8 additions & 4 deletions pkgs/bazel_worker/example/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import 'package:bazel_worker/driver.dart';
void main() async {
var scratchSpace = await Directory.systemTemp.createTemp();
var driver = BazelWorkerDriver(
() => Process.start(Platform.resolvedExecutable,
[Platform.script.resolve('worker.dart').toFilePath()],
workingDirectory: scratchSpace.path),
maxWorkers: 4);
() => Process.start(
Platform.resolvedExecutable,
[
Platform.script.resolve('worker.dart').toFilePath(),
],
workingDirectory: scratchSpace.path),
maxWorkers: 4,
);
var response = await driver.doWork(WorkRequest(arguments: ['foo']));
if (response.exitCode != EXIT_CODE_OK) {
print('Worker request failed');
Expand Down
17 changes: 11 additions & 6 deletions pkgs/bazel_worker/lib/src/async_message_grouper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,18 @@ class AsyncMessageGrouper implements MessageGrouper {
// Copy as much as possible from the input buffer. Limit is the
// smaller of the remaining length to fill in the message and the
// remaining length in the buffer.
var lengthToCopy = min(_message.length - _messagePos,
_inputBuffer.length - _inputBufferPos);
var lengthToCopy = min(
_message.length - _messagePos,
_inputBuffer.length - _inputBufferPos,
);
_message.setRange(
_messagePos,
_messagePos + lengthToCopy,
_inputBuffer.sublist(
_inputBufferPos, _inputBufferPos + lengthToCopy));
_messagePos,
_messagePos + lengthToCopy,
_inputBuffer.sublist(
_inputBufferPos,
_inputBufferPos + lengthToCopy,
),
);
_messagePos += lengthToCopy;
_inputBufferPos += lengthToCopy;

Expand Down
104 changes: 58 additions & 46 deletions pkgs/bazel_worker/lib/src/driver/driver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ class BazelWorkerDriver {
/// Factory method that spawns a worker process.
final SpawnWorker _spawnWorker;

BazelWorkerDriver(this._spawnWorker,
{int? maxIdleWorkers, int? maxWorkers, int? maxRetries})
: _maxIdleWorkers = maxIdleWorkers ?? 4,
BazelWorkerDriver(
this._spawnWorker, {
int? maxIdleWorkers,
int? maxWorkers,
int? maxRetries,
}) : _maxIdleWorkers = maxIdleWorkers ?? 4,
_maxWorkers = maxWorkers ?? 4,
_maxRetries = maxRetries ?? 4;

Expand All @@ -56,8 +59,10 @@ class BazelWorkerDriver {
/// [request] has been actually sent to the worker. This allows the caller
/// to determine when actual work is being done versus just waiting for an
/// available worker.
Future<WorkResponse> doWork(WorkRequest request,
{void Function(Future<WorkResponse?>)? trackWork}) {
Future<WorkResponse> doWork(
WorkRequest request, {
void Function(Future<WorkResponse?>)? trackWork,
}) {
var attempt = _WorkAttempt(request, trackWork: trackWork);
_workQueue.add(attempt);
_runWorkQueue();
Expand All @@ -69,9 +74,11 @@ class BazelWorkerDriver {
for (var worker in _readyWorkers.toList()) {
_killWorker(worker);
}
await Future.wait(_spawningWorkers.map((worker) async {
_killWorker(await worker);
}));
await Future.wait(
_spawningWorkers.map((worker) async {
_killWorker(await worker);
}),
);
}

/// Runs as many items in [_workQueue] as possible given the number of
Expand All @@ -88,8 +95,10 @@ class BazelWorkerDriver {
if (_workQueue.isEmpty) return;
if (_numWorkers == _maxWorkers && _idleWorkers.isEmpty) return;
if (_numWorkers > _maxWorkers) {
throw StateError('Internal error, created to many workers. Please '
'file a bug at https://github.com/dart-lang/bazel_worker/issues/new');
throw StateError(
'Internal error, created to many workers. Please '
'file a bug at https://github.com/dart-lang/bazel_worker/issues/new',
);
}

// At this point we definitely want to run a task, we just need to decide
Expand Down Expand Up @@ -137,48 +146,51 @@ class BazelWorkerDriver {
void _runWorker(Process worker, _WorkAttempt attempt) {
var rescheduled = false;

runZonedGuarded(() async {
var connection = _workerConnections[worker]!;
runZonedGuarded(
() async {
var connection = _workerConnections[worker]!;

connection.writeRequest(attempt.request);
var responseFuture = connection.readResponse();
if (attempt.trackWork != null) {
attempt.trackWork!(responseFuture);
}
var response = await responseFuture;

// It is possible for us to complete with an error response due to an
// unhandled async error before we get here.
if (!attempt.responseCompleter.isCompleted) {
if (response.exitCode == EXIT_CODE_BROKEN_PIPE) {
connection.writeRequest(attempt.request);
var responseFuture = connection.readResponse();
if (attempt.trackWork != null) {
attempt.trackWork!(responseFuture);
}
var response = await responseFuture;

// It is possible for us to complete with an error response due to an
// unhandled async error before we get here.
if (!attempt.responseCompleter.isCompleted) {
if (response.exitCode == EXIT_CODE_BROKEN_PIPE) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
stderr.writeln('Failed to run request ${attempt.request}');
response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output:
'Invalid response from worker, this probably means it wrote '
'invalid output or died.',
);
}
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
},
(e, s) {
// Note that we don't need to do additional cleanup here on failures. If
// the worker dies that is already handled in a generic fashion, we just
// need to make sure we complete with a valid response.
if (!attempt.responseCompleter.isCompleted) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
stderr.writeln('Failed to run request ${attempt.request}');
response = WorkResponse(
var response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output:
'Invalid response from worker, this probably means it wrote '
'invalid output or died.',
output: 'Error running worker:\n$e\n$s',
);
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
}, (e, s) {
// Note that we don't need to do additional cleanup here on failures. If
// the worker dies that is already handled in a generic fashion, we just
// need to make sure we complete with a valid response.
if (!attempt.responseCompleter.isCompleted) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
var response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: 'Error running worker:\n$e\n$s',
);
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
});
},
);
}

/// Performs post-work cleanup for [worker].
Expand Down
11 changes: 7 additions & 4 deletions pkgs/bazel_worker/lib/src/driver/driver_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ class StdDriverConnection implements DriverConnection {

Future<void> get done => _messageGrouper.done;

StdDriverConnection(
{Stream<List<int>>? inputStream, StreamSink<List<int>>? outputStream})
: _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
StdDriverConnection({
Stream<List<int>>? inputStream,
StreamSink<List<int>>? outputStream,
}) : _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
_outputStream = outputStream ?? stdout;

factory StdDriverConnection.forWorker(Process worker) => StdDriverConnection(
inputStream: worker.stdout, outputStream: worker.stdin);
inputStream: worker.stdout,
outputStream: worker.stdin,
);

/// Note: This will attempts to recover from invalid proto messages by parsing
/// them as strings. This is a common error case for workers (they print a
Expand Down
5 changes: 3 additions & 2 deletions pkgs/bazel_worker/lib/src/utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ List<int> protoToDelimitedBuffer(GeneratedMessage message) {
var delimiterBuffer = CodedBufferWriter();
delimiterBuffer.writeInt32NoTag(messageBuffer.lengthInBytes);

var result =
Uint8List(messageBuffer.lengthInBytes + delimiterBuffer.lengthInBytes);
var result = Uint8List(
messageBuffer.lengthInBytes + delimiterBuffer.lengthInBytes,
);

delimiterBuffer.writeTo(result);
messageBuffer.writeTo(result, delimiterBuffer.lengthInBytes);
Expand Down
20 changes: 10 additions & 10 deletions pkgs/bazel_worker/lib/src/worker/async_worker_loop.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ abstract class AsyncWorkerLoop implements WorkerLoop {
var request = await connection.readRequest();
if (request == null) break;
var printMessages = StringBuffer();
response = await runZoned(() => performRequest(request),
zoneSpecification:
ZoneSpecification(print: (self, parent, zone, message) {
printMessages.writeln();
printMessages.write(message);
}));
response = await runZoned(
() => performRequest(request),
zoneSpecification: ZoneSpecification(
print: (self, parent, zone, message) {
printMessages.writeln();
printMessages.write(message);
},
),
);
if (printMessages.isNotEmpty) {
response.output = '${response.output}$printMessages';
}
} catch (e, s) {
response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: '$e\n$s',
);
response = WorkResponse(exitCode: EXIT_CODE_ERROR, output: '$e\n$s');
}

connection.writeResponse(response);
Expand Down
19 changes: 10 additions & 9 deletions pkgs/bazel_worker/lib/src/worker/sync_worker_loop.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,20 @@ abstract class SyncWorkerLoop implements WorkerLoop {
var request = connection.readRequest();
if (request == null) break;
var printMessages = StringBuffer();
response = runZoned(() => performRequest(request), zoneSpecification:
ZoneSpecification(print: (self, parent, zone, message) {
printMessages.writeln();
printMessages.write(message);
}));
response = runZoned(
() => performRequest(request),
zoneSpecification: ZoneSpecification(
print: (self, parent, zone, message) {
printMessages.writeln();
printMessages.write(message);
},
),
);
if (printMessages.isNotEmpty) {
response.output = '${response.output}$printMessages';
}
} catch (e, s) {
response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: '$e\n$s',
);
response = WorkResponse(exitCode: EXIT_CODE_ERROR, output: '$e\n$s');
}

connection.writeResponse(response);
Expand Down
Loading

0 comments on commit 71c9cc1

Please sign in to comment.