Skip to content

Commit

Permalink
'Refactored by Sourcery'
Browse files Browse the repository at this point in the history
  • Loading branch information
Sourcery AI committed Oct 26, 2023
1 parent 73b744c commit 3418755
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 83 deletions.
147 changes: 67 additions & 80 deletions nxdrive/gui/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,19 @@ def _export_formatted_state(
@pyqtSlot(str, int, result=list)
def get_last_files(self, uid: str, number: int, /) -> List[Dict[str, Any]]:
"""Return the last files transferred (see EngineDAO)."""
engine = self._manager.engines.get(uid)
if not engine:
if engine := self._manager.engines.get(uid):
return [s.export() for s in engine.dao.get_last_files(number)]
else:
return []
return [s.export() for s in engine.dao.get_last_files(number)]

@pyqtSlot(str, result=int)
def get_last_files_count(self, uid: str, /) -> int:
"""Return the count of the last files transferred (see EngineDAO)."""
count = 0
engine = self._manager.engines.get(uid)
if engine:
count = engine.dao.get_last_files_count(duration=60)
return count
return (
engine.dao.get_last_files_count(duration=60)
if (engine := self._manager.engines.get(uid))
else 0
)

@pyqtSlot(QUrl, result=str)
def to_local_file(self, url: QUrl, /) -> str:
Expand Down Expand Up @@ -217,8 +217,7 @@ def get_completed_sessions_items(self, dao: EngineDAO, /) -> List[Dict[str, Any]
@pyqtSlot(str, result=int)
def get_active_sessions_count(self, uid: str, /) -> int:
"""Return the count of active sessions items."""
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
return engine.dao.get_count(
f"status IN ({TransferStatus.ONGOING.value}, {TransferStatus.PAUSED.value})",
table="Sessions",
Expand All @@ -228,8 +227,7 @@ def get_active_sessions_count(self, uid: str, /) -> int:
@pyqtSlot(str, result=int)
def get_completed_sessions_count(self, uid: str, /) -> int:
"""Return the count of completed sessions items."""
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
return engine.dao.get_count(
f"status IN ({TransferStatus.CANCELLED.value}, {TransferStatus.DONE.value})",
table="Sessions",
Expand All @@ -249,12 +247,12 @@ def pause_transfer(
) -> None:
"""Pause a given transfer. *nature* is either downloads or upload."""
log.info(f"Pausing {nature} {transfer_uid} for engine {engine_uid!r}")
engine = self._manager.engines.get(engine_uid)
if not engine:
if engine := self._manager.engines.get(engine_uid):
engine.dao.pause_transfer(
nature, transfer_uid, progress, is_direct_transfer=is_direct_transfer
)
else:
return
engine.dao.pause_transfer(
nature, transfer_uid, progress, is_direct_transfer=is_direct_transfer
)

@pyqtSlot(str, str, int, bool)
def resume_transfer(
Expand All @@ -268,77 +266,79 @@ def resume_transfer(
) -> None:
"""Resume a given transfer. *nature* is either downloads or upload."""
log.info(f"Resume {nature} {uid} for engine {engine_uid!r}")
engine = self._manager.engines.get(engine_uid)
if not engine:
if engine := self._manager.engines.get(engine_uid):
engine.resume_transfer(nature, uid, is_direct_transfer=is_direct_transfer)
else:
return
engine.resume_transfer(nature, uid, is_direct_transfer=is_direct_transfer)

@pyqtSlot(str, int)
def resume_session(self, engine_uid: str, uid: int, /) -> None:
"""Resume a given session and it's transfers."""
log.info(f"Resume session {uid} for engine {engine_uid!r}")
engine = self._manager.engines.get(engine_uid)
if not engine:
if engine := self._manager.engines.get(engine_uid):
engine.resume_session(uid)
else:
return
engine.resume_session(uid)

@pyqtSlot(str, int)
def pause_session(self, engine_uid: str, uid: int, /) -> None:
"""Pause a given session and it's transfers."""
log.info(f"Pausing session {uid} for engine {engine_uid!r}")
engine = self._manager.engines.get(engine_uid)
if not engine:
if engine := self._manager.engines.get(engine_uid):
engine.dao.pause_session(uid)
else:
return
engine.dao.pause_session(uid)

def cancel_session(self, engine_uid: str, uid: int, /) -> None:
"""Cancel a given session and it's transfers."""
log.info(f"Cancelling session {uid} for engine {engine_uid!r}")
engine = self._manager.engines.get(engine_uid)
if not engine:
if engine := self._manager.engines.get(engine_uid):
engine.cancel_session(uid)
else:
return
engine.cancel_session(uid)

@pyqtSlot(str, str)
def show_metadata(self, uid: str, ref: str, /) -> None:
self.application.hide_systray()
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
path = engine.local.abspath(Path(ref))
self.application.show_metadata(path)

@pyqtSlot(str, result=list)
def get_unsynchronizeds(self, uid: str, /) -> List[Dict[str, Any]]:
result = []
engine = self._manager.engines.get(uid)
if engine:
for conflict in engine.dao.get_unsynchronizeds():
result.append(self._export_formatted_state(uid, state=conflict))
if engine := self._manager.engines.get(uid):
result.extend(
self._export_formatted_state(uid, state=conflict)
for conflict in engine.dao.get_unsynchronizeds()
)
return result

@pyqtSlot(str, result=list)
def get_conflicts(self, uid: str, /) -> List[Dict[str, Any]]:
result = []
engine = self._manager.engines.get(uid)
if engine:
for conflict in engine.get_conflicts():
result.append(self._export_formatted_state(uid, state=conflict))
if engine := self._manager.engines.get(uid):
result.extend(
self._export_formatted_state(uid, state=conflict)
for conflict in engine.get_conflicts()
)
return result

@pyqtSlot(str, result=list)
def get_errors(self, uid: str, /) -> List[Dict[str, Any]]:
result = []
engine = self._manager.engines.get(uid)
if engine:
for error in engine.dao.get_errors():
result.append(self._export_formatted_state(uid, state=error))
if engine := self._manager.engines.get(uid):
result.extend(
self._export_formatted_state(uid, state=error)
for error in engine.dao.get_errors()
)
return result

@pyqtSlot(result=list)
def get_features_list(self) -> List[List[str]]:
"""Return the list of declared features with their value, title and translation key."""
result = []
for feature in vars(Feature).keys():
for feature in vars(Feature):
title = feature.replace("_", " ").title()
translation_key = f"FEATURE_{feature.upper()}"
result.append([title, feature, translation_key])
Expand All @@ -350,7 +350,7 @@ def generate_report(self) -> str:
return str(self._manager.generate_report())
except Exception as e:
log.exception("Report error")
return "[ERROR] " + str(e)
return f"[ERROR] {str(e)}"

@pyqtSlot(str, str, result=bool)
def generate_csv(self, session_id: str, engine_uid: str) -> bool:
Expand Down Expand Up @@ -383,12 +383,11 @@ def open_direct_transfer(self, uid: str, /) -> None:
def open_server_folders(self, uid: str, /) -> None:
"""Hide the systray and show the server folders dialog."""
self.application.hide_systray()
engine = self._manager.engines.get(uid)
if not engine:
if engine := self._manager.engines.get(uid):
self.application.show_server_folders(engine, None)
else:
return

self.application.show_server_folders(engine, None)

@pyqtSlot(str, result=str)
def get_hostname_from_url(self, url: str, /) -> str:
urlp = urlparse(url)
Expand All @@ -397,8 +396,7 @@ def get_hostname_from_url(self, url: str, /) -> str:
@pyqtSlot(str)
def open_remote_server(self, uid: str, /) -> None:
self.application.hide_systray()
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
engine.open_remote()

@pyqtSlot(str)
Expand All @@ -415,11 +413,9 @@ def open_local(self, uid: str, path: str, /) -> None:
filepath = Path(force_decode(path).lstrip("/"))
if not uid:
self._manager.open_local_file(filepath)
else:
engine = self._manager.engines.get(uid)
if engine:
filepath = engine.local.abspath(filepath)
self._manager.open_local_file(filepath)
elif engine := self._manager.engines.get(uid):
filepath = engine.local.abspath(filepath)
self._manager.open_local_file(filepath)

@pyqtSlot()
def open_help(self) -> None:
Expand Down Expand Up @@ -448,8 +444,7 @@ def open_document(self, engine_uid: str, doc_pair_id: int, /) -> None:
@pyqtSlot(str)
def show_conflicts_resolution(self, uid: str, /) -> None:
self.application.hide_systray()
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
self.application.show_conflicts_resolution(engine)

@pyqtSlot(str)
Expand Down Expand Up @@ -576,7 +571,7 @@ def get_disk_space_info_to_width(
def _balance_percents(self, result: Dict[str, float], /) -> Dict[str, float]:
"""Return an altered version of the dict in which no value is under a minimum threshold."""

result = {k: v for k, v in sorted(result.items(), key=lambda item: item[1])}
result = dict(sorted(result.items(), key=lambda item: item[1]))
keys = list(result)
min_threshold = 10
data = 0.0
Expand Down Expand Up @@ -638,8 +633,7 @@ def unbind_server(self, uid: str, purge: bool, /) -> None:

@pyqtSlot(str)
def filters_dialog(self, uid: str, /) -> None:
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
self.application.show_filters(engine)

def _bind_server(
Expand Down Expand Up @@ -873,8 +867,7 @@ def set_proxy_settings(self, config: str, url: str, pac_url: str, /) -> bool:
self.setMessage.emit("PROXY_NO_PAC_FILE", "error")
return False

error = self._manager.set_proxy(proxy)
if error:
if error := self._manager.set_proxy(proxy):
self.setMessage.emit(error, "error")
return False

Expand Down Expand Up @@ -1048,44 +1041,39 @@ def is_paused(self) -> bool:

@pyqtSlot(str, result=int)
def get_syncing_count(self, uid: str, /) -> int:
count = 0
engine = self._manager.engines.get(uid)
if engine:
count = engine.dao.get_syncing_count()
return count
return (
engine.dao.get_syncing_count()
if (engine := self._manager.engines.get(uid))
else 0
)

# Conflicts section

@pyqtSlot(str, int)
def resolve_with_local(self, uid: str, state_id: int, /) -> None:
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
engine.resolve_with_local(state_id)

@pyqtSlot(str, int)
def resolve_with_remote(self, uid: str, state_id: int, /) -> None:
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
engine.resolve_with_remote(state_id)

@pyqtSlot(str, int)
def retry_pair(self, uid: str, state_id: int, /) -> None:
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
engine.retry_pair(state_id)

@pyqtSlot(str, int, str)
def ignore_pair(self, uid: str, state_id: int, reason: str, /) -> None:
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
engine.ignore_pair(state_id, reason)

@pyqtSlot(str, str, str)
def open_remote(self, uid: str, remote_ref: str, remote_name: str, /) -> None:
log.info(f"Should open {remote_name!r} ({remote_ref!r})")
try:
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
engine.open_edit(remote_ref, remote_name)
except OSError:
log.exception("Remote open error")
Expand All @@ -1096,8 +1084,7 @@ def open_remote_document(
) -> None:
log.info(f"Should open remote document {remote_path!r} ({remote_ref!r})")
try:
engine = self._manager.engines.get(uid)
if engine:
if engine := self._manager.engines.get(uid):
url = engine.get_metadata_url(remote_ref)
engine.open_remote(url=url)
except OSError:
Expand Down
4 changes: 1 addition & 3 deletions tests/functional/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ def func(val):
with patch.object(manager, "check_local_folder_available", new=func):
url = nuxeo_url()
returned_val = drive_api.web_authentication(
url + "/login.jsp?requestedUrl=ui%2F",
"/dummy-path",
True,
f"{url}/login.jsp?requestedUrl=ui%2F", "/dummy-path", True
)
assert not returned_val

0 comments on commit 3418755

Please sign in to comment.