Skip to content

Commit

Permalink
wip - trying out new solution
Browse files Browse the repository at this point in the history
  • Loading branch information
gmcrocetti committed Jan 9, 2025
1 parent 86fde24 commit 0fccb84
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
35 changes: 23 additions & 12 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ def _execute_insert(self, conn, keys: list[str], data_iter) -> int:
Each item contains a list of values to be inserted
"""
data = [dict(zip(keys, row)) for row in data_iter]
result = conn.execute(self.table.insert(), data)
result = self.pd_sql.execute(self.table.insert(), data)
return result.rowcount

def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:
Expand All @@ -1023,7 +1023,7 @@ def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:

data = [dict(zip(keys, row)) for row in data_iter]
stmt = insert(self.table).values(data)
result = conn.execute(stmt)
result = self.pd_sql.execute(stmt)
return result.rowcount

def insert_data(self) -> tuple[list[str], list[np.ndarray]]:
Expand Down Expand Up @@ -1662,8 +1662,14 @@ def execute(self, sql: str | Select | TextClause, params=None):
"""Simple passthrough to SQLAlchemy connectable"""
args = [] if params is None else [params]
if isinstance(sql, str):
return self.con.exec_driver_sql(sql, *args)
return self.con.execute(sql, *args)
try:
return self.con.exec_driver_sql(sql, *args)
except Exception as exc:
raise DatabaseError("foo") from exc
try:
return self.con.execute(sql, *args)
except Exception as exc:
raise DatabaseError from exc

def read_table(
self,
Expand Down Expand Up @@ -2077,9 +2083,9 @@ def delete_rows(self, table_name: str, schema: str | None = None) -> None:
self.meta.reflect(
bind=self.con, only=[table_name], schema=schema, views=True
)
with self.run_transaction() as con:
with self.run_transaction():
table = self.get_table(table_name, schema)
con.execute(table.delete())
self.execute(table.delete())

self.meta.clear()

Expand Down Expand Up @@ -2403,9 +2409,12 @@ def to_sql(
raise ValueError("datatypes not supported") from exc

with self.con.cursor() as cur:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
)
try:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
)
except Exception as exc:
raise DatabaseError("foo") from exc

self.con.commit()
return total_inserted
Expand All @@ -2431,8 +2440,7 @@ def has_table(self, name: str, schema: str | None = None) -> bool:
def delete_rows(self, name: str, schema: str | None = None) -> None:
table_name = f"{schema}.{name}" if schema else name
if self.has_table(name, schema):
with self.con.cursor() as cur:
cur.execute(f"DELETE FROM {table_name}")
self.execute(f"DELETE FROM {table_name}").close()

def _create_sql_schema(
self,
Expand Down Expand Up @@ -2553,7 +2561,10 @@ def insert_statement(self, *, num_rows: int) -> str:

def _execute_insert(self, conn, keys, data_iter) -> int:
data_list = list(data_iter)
conn.executemany(self.insert_statement(num_rows=1), data_list)
try:
conn.executemany(self.insert_statement(num_rows=1), data_list)
except Exception as exc:
raise DatabaseError("foo") from exc
return conn.rowcount

def _execute_insert_multi(self, conn, keys, data_iter) -> int:
Expand Down
17 changes: 4 additions & 13 deletions pandas/tests/io/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2718,7 +2718,6 @@ def test_delete_rows_success(conn_name, test_frame1, request):

@pytest.mark.parametrize("conn_name", all_connectable)
def test_delete_rows_is_atomic(conn_name, request):
adbc_driver_manager = pytest.importorskip("adbc_driver_manager")
sqlalchemy = pytest.importorskip("sqlalchemy")

table_name = "temp_frame"
Expand All @@ -2737,20 +2736,11 @@ def test_delete_rows_is_atomic(conn_name, request):
with pandasSQL.run_transaction() as cur:
cur.execute(table_stmt)

if conn_name != "sqlite_buildin" and "adbc" not in conn_name:
expected_exception = sqlalchemy.exc.IntegrityError
elif "adbc" in conn_name and "sqlite" in conn_name:
expected_exception = adbc_driver_manager.InternalError
elif "adbc" in conn_name and "postgres" in conn_name:
expected_exception = adbc_driver_manager.ProgrammingError
elif conn_name == "sqlite_buildin":
expected_exception = sqlite3.IntegrityError

with pandasSQL.run_transaction():
pandasSQL.to_sql(original_df, table_name, if_exists="append", index=False)

# inserting duplicated values in a UNIQUE constraint column
with pytest.raises(expected_exception):
with pytest.raises(pd.errors.DatabaseError):
with pandasSQL.run_transaction():
pandasSQL.to_sql(
replacing_df, table_name, if_exists="delete_rows", index=False
Expand Down Expand Up @@ -3473,8 +3463,9 @@ def test_to_sql_with_negative_npinf(conn, request, input):
mark = pytest.mark.xfail(reason="GH 36465")
request.applymarker(mark)

msg = "inf cannot be used with MySQL"
with pytest.raises(ValueError, match=msg):
# TODO: Fix me with the correct message
# msg = "inf cannot be used with MySQL"
with pytest.raises(pd.errors.DatabaseError):
df.to_sql(name="foobar", con=conn, index=False)
else:
assert df.to_sql(name="foobar", con=conn, index=False) == 1
Expand Down

0 comments on commit 0fccb84

Please sign in to comment.