Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch improvement #37

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

chishankar-work
Copy link

@chishankar-work chishankar-work commented Mar 10, 2021

The idea is too emulate the way JDBCIO writes to SQL.

When calling write_record it does the .execute() and then .commit() in sequence, writing and committing each record to disk one at a time. The proposed change allows for more effective batching while better managing connection pooling to CSQL.

  • Removed the session.commit() from the write_record which is called on every element in the _WriteToRelationalDBFn ParDo. Instead, we just call .execute() on each record, and then commit it to disk all at once.

  • Instead of building the engine at the start of each bundle, move self._db = SqlAlchemyDB(self.source_config) to the .setup() method so it's only created once for the object and handles for connection pooling for the sessions that are opened and closed at the start and finish of each bundle.

  • Handled the .commit() logic in the DoFn. In the start_bundle create a record_counter = 0 and records = []. This will allow us to build the commits up to sizes and ensure that they don't get too big.

  • In cases where the bundles are small or divide unevenly leaving a chunk with less than 1000 records, we can directly call commit_records in the finish_bundle() to take care of the remaining elements in the bundle and flush the buffer.

  • Made max_batch_size a configurable value with a default value of 1000. This can be changed easily by the user by doing something such as:

relational_db.Write(
    source_config=source_config,
    table_config=table_config
    table_config=table_config,
    max_batch_size=1500
 )

… configurable. Also moved the SqlAlchemyDB to the setup() method for better connection pool handling
…s configurable. Also moved the SqlAlchemyDB to the setup() method for better connection pool handling"
@chishankar-work
Copy link
Author

GH-36

Copy link
Owner

@mohaseeb mohaseeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @chishankar-work for this PR (and sorry for being late to review it). I had just minor comments.

P.S. you might want to check #38 if you will be running with SQLAlchemy 1.4.


def process(self, element):
assert isinstance(element, dict)
self.records.append(element)
self.record_counter = self.record_counter + 1
self._db.write_record(self.table_config, element)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we remove this call now?

self._db.write_record(self.table_config, element)

if (self.record_counter > self.max_batch_size):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parentheses are redundant.

self.commit_records()

def commit_records(self):
if self.record_counter() == 0:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.record_counter() to self.record_counter

)

"""

def __init__(self, source_config, table_config, *args, **kwargs):
def __init__(self, source_config, table_config, max_batch_size=1000, *args, **kwargs):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this to the class documentation and describe the new behavior.

@@ -358,7 +361,6 @@ def write_record(self, session, create_insert_f, record_dict):
record=record_dict
)
session.execute(insert_stmt)
Copy link
Owner

@mohaseeb mohaseeb Apr 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about SQLAlchemy behavior here. If the behavior is sending each stmt directly the DB, then another potential improvement could be to create a single insert stmt for the whole batch using, e.g., something like this or this, significantly reducing the number of DB calls.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mohaseeb Correct. I ended up creating my own batch insert mod based on your project and the only thing I changed was around removing assumptions that record is a dict. SQLAlchemy while generating insert statement supports both a single row (dict ) or multiple rows (list of dicts).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants