-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix import timeouts on increasing datasets by precomputing batch size #5231
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1552,6 +1552,117 @@ def _compute_batch_size(self): | |
return self.batch_size | ||
|
||
|
||
class ContentSizeBatcher(Batcher): | ||
"""Class for iterating over the elements of an iterable with a dynamic | ||
batch size to achieve a desired content size. | ||
|
||
The batch sizes emitted when iterating over this object are dynamically | ||
scaled such that the total content size of the batch is as close as | ||
possible to a specified target size. | ||
|
||
This batcher does not require backpressure feedback because it calculates | ||
the total size of the iterable object before batching. | ||
|
||
This class is often used in conjunction with a :class:`ProgressBar` to keep | ||
the user appraised on the status of a long-running task. | ||
|
||
Example usage:: | ||
|
||
import fiftyone.core.utils as fou | ||
|
||
elements = range(int(1e7)) | ||
|
||
batcher = fou.ContentSizeBatcher( | ||
elements, | ||
target_size=2**20, | ||
progress=True | ||
) | ||
|
||
with batcher: | ||
for batch in batcher: | ||
print("batch size: %d" % len(batch)) | ||
|
||
Args: | ||
iterable: an iterable to batch over. If ``None``, the result of | ||
``next()`` will be a batch size instead of a batch, and is an | ||
infinite iterator. | ||
target_size (1048576): the target batch bson content size, in bytes | ||
min_batch_size (1): the minimum allowed batch size | ||
max_batch_size (None): an optional maximum allowed batch size | ||
return_views (False): whether to return each batch as a | ||
:class:`fiftyone.core.view.DatasetView`. Only applicable when the | ||
iterable is a :class:`fiftyone.core.collections.SampleCollection` | ||
progress (False): whether to render a progress bar tracking the | ||
consumption of the batches (True/False), use the default value | ||
``fiftyone.config.show_progress_bars`` (None), or a progress | ||
callback function to invoke instead | ||
total (None): the length of ``iterable``. Only applicable when | ||
``progress=True``. If not provided, it is computed via | ||
``len(iterable)``, if possible | ||
""" | ||
|
||
def __init__( | ||
self, | ||
iterable, | ||
target_size=2**20, | ||
min_batch_size=1, | ||
max_batch_size=None, | ||
return_views=False, | ||
progress=False, | ||
total=None, | ||
): | ||
iterable, iterable_copy = itertools.tee(iterable) | ||
super().__init__( | ||
iterable, return_views=return_views, progress=progress, total=total | ||
) | ||
self.batch_sizes = self._compute_batch_sizes( | ||
target_size, min_batch_size, max_batch_size, iterable_copy | ||
) | ||
self.curr_batch = 0 | ||
|
||
def _compute_batch_sizes( | ||
self, target_size, min_batch_size, max_batch_size, iterable | ||
): | ||
batch_sizes = [] | ||
curr_batch_size = 0 | ||
curr_batch_content_size = 0 | ||
|
||
for obj in iterable: | ||
try: | ||
curr_batch_content_size += len( | ||
json_util.dumps(self._make_dict(obj)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't this extra serialization potentially be very expensive? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We currently do this already in the _upsert_samples_batch so we are basically doubling how much we are doing it in that one method. In most other cases (such as bulk_write and insert_documents) its the same total computation since we are just moving it from post computing for each batch to precomputing for the entire collection. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. though I think it might be possible to refactor _upsert_sample_batch so that we just pass the dict as an iterable to begin with instead of calculating it twice |
||
) | ||
except Exception: | ||
curr_batch_content_size += len(str(obj)) | ||
curr_batch_size += 1 | ||
if curr_batch_size >= min_batch_size and ( | ||
curr_batch_content_size >= target_size | ||
or curr_batch_size == max_batch_size | ||
): | ||
batch_sizes.append(curr_batch_size) | ||
curr_batch_size = 0 | ||
curr_batch_content_size = 0 | ||
|
||
if curr_batch_size: | ||
batch_sizes.append(curr_batch_size) | ||
|
||
return batch_sizes | ||
|
||
def _make_dict(self, obj): | ||
return ( | ||
obj.to_mongo_dict(include_id=True) | ||
if hasattr(obj, "to_mongo_dict") | ||
else obj | ||
) | ||
|
||
def _compute_batch_size(self): | ||
size = 1 | ||
if self.curr_batch < len(self.batch_sizes): | ||
size = self.batch_sizes[self.curr_batch] | ||
self.curr_batch += 1 | ||
return size | ||
|
||
brimoor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def get_default_batcher(iterable, progress=False, total=None): | ||
"""Returns a :class:`Batcher` over ``iterable`` using defaults from your | ||
FiftyOne config. | ||
|
@@ -1588,11 +1699,9 @@ def get_default_batcher(iterable, progress=False, total=None): | |
) | ||
elif default_batcher == "size": | ||
target_content_size = fo.config.batcher_target_size_bytes | ||
return ContentSizeDynamicBatcher( | ||
iterable, | ||
return ContentSizeBatcher( | ||
iterable=iterable, | ||
target_size=target_content_size, | ||
init_batch_size=1, | ||
max_batch_beta=8.0, | ||
max_batch_size=100000, | ||
progress=progress, | ||
total=total, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not ideal because it requires iterating over the entire iterator before any writes start happening.
A large scale
add_samples()
call could take 1 hour or more to complete. With this implementation, it will take an hour just to compute safe batch sizes, during which time there will be no progress bar or indication to the user of what's happening, and then will it start adding samples to the dataset for the next hour.We used dynamic batching with backpressure to try to find a compromise where writes will start immediately while also tuning the batch size to maximize throughput.
Perhaps there's a way to keep the greedy batching but add a safeguard to split unexpectedly large batches into multiple writes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an alternative is we can override the next implementation for just this batch size and build it as we go so instead of doing it upfront we can just iterate and build a list up to the max size and then return that.
We probably want to refactor a bit as well because currently we have two content size bathers: one for backpressure where we don't know the size of the items before batching (think it's just a bunch of ops and not actually looking at the items) and another for if we have the items and can calculate their sizes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are you referring to? We currently have three batchers:
https://github.com/voxel51/fiftyone-teams/blob/318d3abb38b478f557180bb6aed4f25681cc7c64/fiftyone/core/utils.py#L1579-L1606
LatencyDynamicBatcher
: targets a specific latency between calls to generate the next batchContentSizeDynamicBatcher
: targets a specific content size in each batchStaticBatcher
: simple fixed batch sizeLatency and content size are correlated heuristics, and both would encounter problems if there are wildly different
Sample
sizes in a collectionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you're referring to latency and content.
The latency batcher is the default for OSS because it is effectively parameterized by the frame rate on the progress bars you want to see (and the assumption that the database is a big boi and will take as much content in 0.2 seconds as you can throw at it, for example)
The content batcher is the default for Teams when there's an API connection involved because the assumption is that the API gateway has a hard limit on request size, so we want to be certain to not exceed say 1 MB).
https://github.com/voxel51/fiftyone-teams/blob/318d3abb38b478f557180bb6aed4f25681cc7c64/fiftyone/core/config.py#L171
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The batcher in this PR "ContentSizeBatcher" is separate from ContentSizeDynamicBatcher because it doesn't use backpressure and has a different use case where we can view content size upfront which, keep me honest @swheaton, isn't always the case. Thats what I meant by specifically two "content size batchers" I know there's other batchers as well.
I am proposing a couple things:
or if I am wrong about there being two use cases (compute before running content size batch vs compute after running content size batch) consolidate these into one method that just does number 2 above instead of precomputing the entire iterable.