Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix import timeouts on increasing datasets by precomputing batch size #5231

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions fiftyone/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3392,10 +3392,6 @@ def _add_samples_batch(
if sample.media_type == fom.VIDEO:
sample.frames.save()

if batcher is not None and batcher.manual_backpressure:
# @todo can we infer content size from insert_many() above?
batcher.apply_backpressure(dicts)

return [str(d["_id"]) for d in dicts]

def _upsert_samples(
Expand Down Expand Up @@ -3459,10 +3455,6 @@ def _upsert_samples_batch(
if sample.media_type == fom.VIDEO:
sample.frames.save()

if batcher is not None and batcher.manual_backpressure:
# @todo can we infer content size from bulk_write() above?
batcher.apply_backpressure(dicts)

def _make_dict(
self,
sample,
Expand Down
8 changes: 0 additions & 8 deletions fiftyone/core/odm/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,6 @@ def insert_documents(docs, coll, ordered=False, progress=None, num_docs=None):
batch = list(batch)
coll.insert_many(batch, ordered=ordered)
ids.extend(b["_id"] for b in batch)
if batcher.manual_backpressure:
# @todo can we infer content size from insert_many() above?
batcher.apply_backpressure(batch)

except BulkWriteError as bwe:
msg = bwe.details["writeErrors"][0]["errmsg"]
Expand All @@ -838,11 +835,6 @@ def bulk_write(ops, coll, ordered=False, progress=False):
for batch in batcher:
batch = list(batch)
coll.bulk_write(batch, ordered=ordered)
if batcher.manual_backpressure:
# @todo can we infer content size from bulk_write() above?
# @todo do we need a more accurate measure of size here?
content_size = sum(len(str(b)) for b in batch)
batcher.apply_backpressure(content_size)

except BulkWriteError as bwe:
msg = bwe.details["writeErrors"][0]["errmsg"]
Expand Down
117 changes: 113 additions & 4 deletions fiftyone/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1552,6 +1552,117 @@ def _compute_batch_size(self):
return self.batch_size


class ContentSizeBatcher(Batcher):
"""Class for iterating over the elements of an iterable with a dynamic
batch size to achieve a desired content size.

The batch sizes emitted when iterating over this object are dynamically
scaled such that the total content size of the batch is as close as
possible to a specified target size.

This batcher does not require backpressure feedback because it calculates
the total size of the iterable object before batching.

This class is often used in conjunction with a :class:`ProgressBar` to keep
the user appraised on the status of a long-running task.

Example usage::

import fiftyone.core.utils as fou

elements = range(int(1e7))

batcher = fou.ContentSizeBatcher(
elements,
target_size=2**20,
progress=True
)

with batcher:
for batch in batcher:
print("batch size: %d" % len(batch))

Args:
iterable: an iterable to batch over. If ``None``, the result of
``next()`` will be a batch size instead of a batch, and is an
infinite iterator.
target_size (1048576): the target batch bson content size, in bytes
min_batch_size (1): the minimum allowed batch size
max_batch_size (None): an optional maximum allowed batch size
return_views (False): whether to return each batch as a
:class:`fiftyone.core.view.DatasetView`. Only applicable when the
iterable is a :class:`fiftyone.core.collections.SampleCollection`
progress (False): whether to render a progress bar tracking the
consumption of the batches (True/False), use the default value
``fiftyone.config.show_progress_bars`` (None), or a progress
callback function to invoke instead
total (None): the length of ``iterable``. Only applicable when
``progress=True``. If not provided, it is computed via
``len(iterable)``, if possible
"""

def __init__(
self,
iterable,
target_size=2**20,
min_batch_size=1,
max_batch_size=None,
return_views=False,
progress=False,
total=None,
):
iterable, iterable_copy = itertools.tee(iterable)
super().__init__(
iterable, return_views=return_views, progress=progress, total=total
)
self.batch_sizes = self._compute_batch_sizes(
target_size, min_batch_size, max_batch_size, iterable_copy
)
self.curr_batch = 0

def _compute_batch_sizes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not ideal because it requires iterating over the entire iterator before any writes start happening.

A large scale add_samples() call could take 1 hour or more to complete. With this implementation, it will take an hour just to compute safe batch sizes, during which time there will be no progress bar or indication to the user of what's happening, and then will it start adding samples to the dataset for the next hour.

We used dynamic batching with backpressure to try to find a compromise where writes will start immediately while also tuning the batch size to maximize throughput.

Perhaps there's a way to keep the greedy batching but add a safeguard to split unexpectedly large batches into multiple writes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an alternative is we can override the next implementation for just this batch size and build it as we go so instead of doing it upfront we can just iterate and build a list up to the max size and then return that.

We probably want to refactor a bit as well because currently we have two content size bathers: one for backpressure where we don't know the size of the items before batching (think it's just a bunch of ops and not actually looking at the items) and another for if we have the items and can calculate their sizes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently we have two content size bathers

What are you referring to? We currently have three batchers:
https://github.com/voxel51/fiftyone-teams/blob/318d3abb38b478f557180bb6aed4f25681cc7c64/fiftyone/core/utils.py#L1579-L1606

  • LatencyDynamicBatcher: targets a specific latency between calls to generate the next batch
  • ContentSizeDynamicBatcher: targets a specific content size in each batch
  • StaticBatcher: simple fixed batch size

Latency and content size are correlated heuristics, and both would encounter problems if there are wildly different Sample sizes in a collection

Copy link
Contributor

@brimoor brimoor Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you're referring to latency and content.

The latency batcher is the default for OSS because it is effectively parameterized by the frame rate on the progress bars you want to see (and the assumption that the database is a big boi and will take as much content in 0.2 seconds as you can throw at it, for example)

The content batcher is the default for Teams when there's an API connection involved because the assumption is that the API gateway has a hard limit on request size, so we want to be certain to not exceed say 1 MB).

https://github.com/voxel51/fiftyone-teams/blob/318d3abb38b478f557180bb6aed4f25681cc7c64/fiftyone/core/config.py#L171

Copy link
Contributor Author

@CamronStaley CamronStaley Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batcher in this PR "ContentSizeBatcher" is separate from ContentSizeDynamicBatcher because it doesn't use backpressure and has a different use case where we can view content size upfront which, keep me honest @swheaton, isn't always the case. Thats what I meant by specifically two "content size batchers" I know there's other batchers as well.

I am proposing a couple things:

  1. rename both of the content size batchers so that it's clear one is used for computing before writing the batch and the other is for estimating based on the previous batch's results
  2. make it so this new batcher we have added computes batch size everytime next is called so that we don't have to do it all upfront (this would require overriding the inherited next method in its current state so maybe a refactor would be good)

or if I am wrong about there being two use cases (compute before running content size batch vs compute after running content size batch) consolidate these into one method that just does number 2 above instead of precomputing the entire iterable.

self, target_size, min_batch_size, max_batch_size, iterable
):
batch_sizes = []
curr_batch_size = 0
curr_batch_content_size = 0

for obj in iterable:
try:
curr_batch_content_size += len(
json_util.dumps(self._make_dict(obj))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this extra serialization potentially be very expensive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently do this already in the _upsert_samples_batch so we are basically doubling how much we are doing it in that one method. In most other cases (such as bulk_write and insert_documents) its the same total computation since we are just moving it from post computing for each batch to precomputing for the entire collection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though I think it might be possible to refactor _upsert_sample_batch so that we just pass the dict as an iterable to begin with instead of calculating it twice

)
except Exception:
curr_batch_content_size += len(str(obj))
curr_batch_size += 1
if curr_batch_size >= min_batch_size and (
curr_batch_content_size >= target_size
or curr_batch_size == max_batch_size
):
batch_sizes.append(curr_batch_size)
curr_batch_size = 0
curr_batch_content_size = 0

if curr_batch_size:
batch_sizes.append(curr_batch_size)

return batch_sizes

def _make_dict(self, obj):
return (
obj.to_mongo_dict(include_id=True)
if hasattr(obj, "to_mongo_dict")
else obj
)

def _compute_batch_size(self):
size = 1
if self.curr_batch < len(self.batch_sizes):
size = self.batch_sizes[self.curr_batch]
self.curr_batch += 1
return size

brimoor marked this conversation as resolved.
Show resolved Hide resolved

def get_default_batcher(iterable, progress=False, total=None):
"""Returns a :class:`Batcher` over ``iterable`` using defaults from your
FiftyOne config.
Expand Down Expand Up @@ -1588,11 +1699,9 @@ def get_default_batcher(iterable, progress=False, total=None):
)
elif default_batcher == "size":
target_content_size = fo.config.batcher_target_size_bytes
return ContentSizeDynamicBatcher(
iterable,
return ContentSizeBatcher(
iterable=iterable,
target_size=target_content_size,
init_batch_size=1,
max_batch_beta=8.0,
max_batch_size=100000,
progress=progress,
total=total,
Expand Down
49 changes: 45 additions & 4 deletions tests/unittests/utils_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from unittest.mock import MagicMock, patch

from bson import ObjectId
from bson import json_util
import numpy as np

import fiftyone as fo
Expand Down Expand Up @@ -59,10 +60,7 @@ def test_get_default_batcher(self):
target_size,
):
batcher = fou.get_default_batcher(iterable)
self.assertTrue(
isinstance(batcher, fou.ContentSizeDynamicBatcher)
)
self.assertEqual(batcher.target_measurement, target_size)
self.assertTrue(isinstance(batcher, fou.ContentSizeBatcher))

with patch.object(fo.config, "default_batcher", "invalid"):
self.assertRaises(ValueError, fou.get_default_batcher, iterable)
Expand All @@ -84,6 +82,49 @@ def test_static_batcher_covered(self):
batches = [batch for batch in batcher]
self.assertListEqual(batches, [iterable])

def test_content_size_batcher(self):
n = 10
samples = [fo.Sample(filepath=f"{i}.jpg") for i in range(n)]

# Test min batch size same as total size
batcher = fou.ContentSizeBatcher(iter(samples), min_batch_size=n)
expected = [n]
self.assertListEqual(expected, batcher.batch_sizes)
self.assertEqual(n, sum(batcher.batch_sizes))

# Test max batch size same as min_size and less than target
batcher = fou.ContentSizeBatcher(iter(samples), max_batch_size=1)
expected = [1] * n
self.assertListEqual(expected, batcher.batch_sizes)
self.assertEqual(n, sum(batcher.batch_sizes))

# Test default case
batcher = fou.ContentSizeBatcher(iter(samples))
expected = [n]
self.assertListEqual(expected, batcher.batch_sizes)
self.assertEqual(n, sum(batcher.batch_sizes))

# Test target smaller than min
batcher = fou.ContentSizeBatcher(iter(samples), target_size=1)
expected = [1] * n
self.assertListEqual(expected, batcher.batch_sizes)

# Test target size half of total
total_size = len(
json_util.dumps(
[sample.to_mongo_dict(include_id=True) for sample in samples]
)
)
target_size = (
total_size // 2 - 100
) # offset because the items slightly differ in size
expected = [n // 2] * 2
batcher = fou.ContentSizeBatcher(
iter(samples), target_size=target_size
)
self.assertListEqual(expected, batcher.batch_sizes)
self.assertEqual(n, sum(batcher.batch_sizes))

def test_static_batcher_perfect_boundary(self):
iterable = list(range(200))
batcher = fou.StaticBatcher(iterable, batch_size=100, progress=False)
Expand Down
Loading