Skip to content

Commit

Permalink
feat: implement option 'delete_rows' of argument 'if_exists' in 'Data…
Browse files Browse the repository at this point in the history
…Frame.to_sql' API.
  • Loading branch information
gmcrocetti committed Nov 20, 2024
1 parent 6a7685f commit 33cd0d6
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 12 deletions.
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Other enhancements
- :meth:`Series.map` can now accept kwargs to pass on to func (:issue:`59814`)
- :meth:`pandas.concat` will raise a ``ValueError`` when ``ignore_index=True`` and ``keys`` is not ``None`` (:issue:`59274`)
- :meth:`str.get_dummies` now accepts a ``dtype`` parameter to specify the dtype of the resulting DataFrame (:issue:`47872`)
- Add ``"delete_rows"`` option to ``if_exists`` argument in :meth:`DataFrame.to_sql` deleting all records of the table before inserting data (:issue:`37210`).
- Multiplying two :class:`DateOffset` objects will now raise a ``TypeError`` instead of a ``RecursionError`` (:issue:`59442`)
- Restore support for reading Stata 104-format and enable reading 103-format dta files (:issue:`58554`)
- Support passing a :class:`Iterable[Hashable]` input to :meth:`DataFrame.drop_duplicates` (:issue:`59237`)
Expand Down
49 changes: 38 additions & 11 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ def to_sql(
name: str,
con,
schema: str | None = None,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label: IndexLabel | None = None,
chunksize: int | None = None,
Expand All @@ -764,10 +764,11 @@ def to_sql(
schema : str, optional
Name of SQL schema in database to write to (if database flavor
supports this). If None, use default schema (default).
if_exists : {'fail', 'replace', 'append'}, default 'fail'
if_exists : {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : bool, default True
Write DataFrame index as a column.
index_label : str or sequence, optional
Expand Down Expand Up @@ -818,7 +819,7 @@ def to_sql(
`sqlite3 <https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.rowcount>`__ or
`SQLAlchemy <https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount>`__
""" # noqa: E501
if if_exists not in ("fail", "replace", "append"):
if if_exists not in ("fail", "replace", "append", "delete_rows"):
raise ValueError(f"'{if_exists}' is not valid for if_exists")

if isinstance(frame, Series):
Expand Down Expand Up @@ -926,7 +927,7 @@ def __init__(
pandas_sql_engine,
frame=None,
index: bool | str | list[str] | None = True,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
prefix: str = "pandas",
index_label=None,
schema=None,
Expand Down Expand Up @@ -974,11 +975,13 @@ def create(self) -> None:
if self.exists():
if self.if_exists == "fail":
raise ValueError(f"Table '{self.name}' already exists.")
if self.if_exists == "replace":
elif self.if_exists == "replace":
self.pd_sql.drop_table(self.name, self.schema)
self._execute_create()
elif self.if_exists == "append":
pass
elif self.if_exists == "delete_rows":
self.pd_sql.delete_rows(self.name, self.schema)
else:
raise ValueError(f"'{self.if_exists}' is not valid for if_exists")
else:
Expand Down Expand Up @@ -1480,7 +1483,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema=None,
Expand Down Expand Up @@ -1866,7 +1869,7 @@ def prep_table(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool | str | list[str] | None = True,
index_label=None,
schema=None,
Expand Down Expand Up @@ -1943,7 +1946,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema: str | None = None,
Expand All @@ -1961,10 +1964,11 @@ def to_sql(
frame : DataFrame
name : string
Name of SQL table.
if_exists : {'fail', 'replace', 'append'}, default 'fail'
if_exists : {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : boolean, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Expand Down Expand Up @@ -2061,6 +2065,18 @@ def drop_table(self, table_name: str, schema: str | None = None) -> None:
self.get_table(table_name, schema).drop(bind=self.con)
self.meta.clear()

def delete_rows(self, table_name: str, schema: str | None = None) -> None:
schema = schema or self.meta.schema
if self.has_table(table_name, schema):
self.meta.reflect(
bind=self.con, only=[table_name], schema=schema, views=True
)
with self.run_transaction() as con:
table = self.get_table(table_name, schema)
con.execute(table.delete())

self.meta.clear()

def _create_sql_schema(
self,
frame: DataFrame,
Expand Down Expand Up @@ -2296,7 +2312,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "delete_rows"] = "fail",
index: bool = True,
index_label=None,
schema: str | None = None,
Expand All @@ -2318,6 +2334,7 @@ def to_sql(
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- delete_rows: If a table exists, delete all records and insert data.
index : boolean, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Expand All @@ -2335,6 +2352,7 @@ def to_sql(
engine : {'auto', 'sqlalchemy'}, default 'auto'
Raises NotImplementedError if not set to 'auto'
"""

if index_label:
raise NotImplementedError(
"'index_label' is not implemented for ADBC drivers"
Expand Down Expand Up @@ -2368,6 +2386,10 @@ def to_sql(
cur.execute(f"DROP TABLE {table_name}")
elif if_exists == "append":
mode = "append"
elif if_exists == "delete_rows":
mode = "append"
with self.con.cursor() as cur:
cur.execute(f"DELETE FROM {table_name}")

import pyarrow as pa

Expand Down Expand Up @@ -2769,10 +2791,11 @@ def to_sql(
frame: DataFrame
name: string
Name of SQL table.
if_exists: {'fail', 'replace', 'append'}, default 'fail'
if_exists: {'fail', 'replace', 'append', 'delete_rows'}, default 'fail'
fail: If table exists, do nothing.
replace: If table exists, drop it, recreate it, and insert data.
append: If table exists, insert data. Create if it does not exist.
delete_rows: If a table exists, delete all records and insert data.
index : bool, default True
Write DataFrame index as a column
index_label : string or sequence, default None
Expand Down Expand Up @@ -2848,6 +2871,10 @@ def drop_table(self, name: str, schema: str | None = None) -> None:
drop_sql = f"DROP TABLE {_get_valid_sqlite_name(name)}"
self.execute(drop_sql)

def delete_rows(self, name: str, schema: str | None = None) -> None:
delete_sql = f"DELETE FROM {_get_valid_sqlite_name(name)}"
self.execute(delete_sql)

def _create_sql_schema(
self,
frame,
Expand Down
18 changes: 17 additions & 1 deletion pandas/tests/io/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,9 @@ def test_to_sql(conn, method, test_frame1, request):


@pytest.mark.parametrize("conn", all_connectable)
@pytest.mark.parametrize("mode, num_row_coef", [("replace", 1), ("append", 2)])
@pytest.mark.parametrize(
"mode, num_row_coef", [("replace", 1), ("append", 2), ("delete_rows", 1)]
)
def test_to_sql_exist(conn, mode, num_row_coef, test_frame1, request):
conn = request.getfixturevalue(conn)
with pandasSQL_builder(conn, need_transaction=True) as pandasSQL:
Expand Down Expand Up @@ -2698,6 +2700,20 @@ def test_drop_table(conn, request):
assert not insp.has_table("temp_frame")


@pytest.mark.parametrize("conn", sqlalchemy_connectable + ["sqlite_buildin"])
def test_delete_rows_success(conn, test_frame1, request):
table_name = "temp_frame"
conn = request.getfixturevalue(conn)
pandasSQL = pandasSQL_builder(conn)
with pandasSQL.run_transaction():
assert (
pandasSQL.to_sql(test_frame1, table_name, if_exists="replace")
== test_frame1.shape[0]
)
assert pandasSQL.delete_rows(table_name) is None
assert count_rows(conn, table_name) == 0


@pytest.mark.parametrize("conn", all_connectable)
def test_roundtrip(conn, request, test_frame1):
if conn == "sqlite_str":
Expand Down

0 comments on commit 33cd0d6

Please sign in to comment.