-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch improvement #37
base: master
Are you sure you want to change the base?
Batch improvement #37
Conversation
… configurable. Also moved the SqlAlchemyDB to the setup() method for better connection pool handling
…s configurable. Also moved the SqlAlchemyDB to the setup() method for better connection pool handling"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @chishankar-work for this PR (and sorry for being late to review it). I had just minor comments.
P.S. you might want to check #38 if you will be running with SQLAlchemy 1.4.
|
||
def process(self, element): | ||
assert isinstance(element, dict) | ||
self.records.append(element) | ||
self.record_counter = self.record_counter + 1 | ||
self._db.write_record(self.table_config, element) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we remove this call now?
self._db.write_record(self.table_config, element) | ||
|
||
if (self.record_counter > self.max_batch_size): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parentheses are redundant.
self.commit_records() | ||
|
||
def commit_records(self): | ||
if self.record_counter() == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.record_counter()
to self.record_counter
) | ||
|
||
""" | ||
|
||
def __init__(self, source_config, table_config, *args, **kwargs): | ||
def __init__(self, source_config, table_config, max_batch_size=1000, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add this to the class documentation and describe the new behavior.
@@ -358,7 +361,6 @@ def write_record(self, session, create_insert_f, record_dict): | |||
record=record_dict | |||
) | |||
session.execute(insert_stmt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mohaseeb Correct. I ended up creating my own batch insert mod based on your project and the only thing I changed was around removing assumptions that record
is a dict. SQLAlchemy while generating insert statement supports both a single row (dict ) or multiple rows (list of dicts).
The idea is too emulate the way JDBCIO writes to SQL.
When calling
write_record
it does the.execute()
and then.commit()
in sequence, writing and committing each record to disk one at a time. The proposed change allows for more effective batching while better managing connection pooling to CSQL.Removed the
session.commit()
from thewrite_record
which is called on every element in the_WriteToRelationalDBFn
ParDo. Instead, we just call.execute()
on each record, and then commit it to disk all at once.Instead of building the engine at the start of each bundle, move
self._db = SqlAlchemyDB(self.source_config)
to the.setup()
method so it's only created once for the object and handles for connection pooling for the sessions that are opened and closed at the start and finish of each bundle.Handled the
.commit()
logic in the DoFn. In thestart_bundle
create arecord_counter = 0
andrecords = []
. This will allow us to build the commits up to sizes and ensure that they don't get too big.In cases where the bundles are small or divide unevenly leaving a chunk with less than 1000 records, we can directly call
commit_records
in thefinish_bundle()
to take care of the remaining elements in the bundle and flush the buffer.Made
max_batch_size
a configurable value with a default value of 1000. This can be changed easily by the user by doing something such as: