diff --git a/transforms/code/repo_level_ordering/ray/src/dpk_repo_level_order/internal/repo_grouper.py b/transforms/code/repo_level_ordering/ray/src/dpk_repo_level_order/internal/repo_grouper.py index 51447a2b3..c85d84d6a 100644 --- a/transforms/code/repo_level_ordering/ray/src/dpk_repo_level_order/internal/repo_grouper.py +++ b/transforms/code/repo_level_ordering/ray/src/dpk_repo_level_order/internal/repo_grouper.py @@ -41,21 +41,24 @@ def _default_mapper_func(self, table, file_name): ] def process(self, repo: str, files: List[str]): - repo_table = self._read_table_for_group(self.repo_column_name, repo, files) - if len(repo_table) == 0: - # not processing empty table - return + try: + repo_table = self._read_table_for_group(self.repo_column_name, repo, files) + if len(repo_table) == 0: + # not processing empty table + return - def sanitize_path(repo_name): - return repo_name.replace("/", "%2F") + def sanitize_path(repo_name): + return repo_name.replace("/", "%2F") - repo = sanitize_path(repo) - tables = self.table_mapper(repo_table, repo) + repo = sanitize_path(repo) + tables = self.table_mapper(repo_table, repo) - for out_table, filename in tables: + for out_table, filename in tables: - self.logger.info(f"Write {filename}, tables: {len(out_table)}") - self._write_parquet(out_table, filename) + self.logger.info(f"Write {filename}, tables: {len(out_table)}") + self._write_parquet(out_table, filename) + except Exception as e: + self.logger.error(f"Failed processing repo: {repo}. {e}") def _write_parquet(self, table, repo_name): # since we already know the repo