Skip to content

Commit

Permalink
log error in case of exception
Browse files Browse the repository at this point in the history
In case of exception while processing repo, log the error and
move to next repo. Exception should not crash the actor.

Signed-off-by: Shivdeep Singh <[email protected]>
  • Loading branch information
shivdeep-singh-ibm committed Sep 24, 2024
1 parent 2b0c7a7 commit 6960272
Showing 1 changed file with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,24 @@ def _default_mapper_func(self, table, file_name):
]

def process(self, repo: str, files: List[str]):
repo_table = self._read_table_for_group(self.repo_column_name, repo, files)
if len(repo_table) == 0:
# not processing empty table
return
try:
repo_table = self._read_table_for_group(self.repo_column_name, repo, files)
if len(repo_table) == 0:
# not processing empty table
return

def sanitize_path(repo_name):
return repo_name.replace("/", "%2F")
def sanitize_path(repo_name):
return repo_name.replace("/", "%2F")

repo = sanitize_path(repo)
tables = self.table_mapper(repo_table, repo)
repo = sanitize_path(repo)
tables = self.table_mapper(repo_table, repo)

for out_table, filename in tables:
for out_table, filename in tables:

self.logger.info(f"Write {filename}, tables: {len(out_table)}")
self._write_parquet(out_table, filename)
self.logger.info(f"Write {filename}, tables: {len(out_table)}")
self._write_parquet(out_table, filename)
except Exception as e:
self.logger.error(f"Failed processing repo: {repo}. {e}")

def _write_parquet(self, table, repo_name):
# since we already know the repo
Expand Down

0 comments on commit 6960272

Please sign in to comment.