-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NXDRIVE-2711: Show that upload is still alive for very large files (Sourcery refactored) #4228
Changes from all commits
b90806e
ea94e57
273e923
915f44d
d7308ca
d82ddbb
cf30aaa
3382d47
9664a78
70481e6
7524f02
bd016cd
2968e81
7a400a7
900dab5
643f317
ab433bb
894e6a6
3fda071
2de2bf4
1c921c4
8d90b25
7cb3f27
e8501e3
ae9a1a8
345078c
c21d4f1
893d1aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -357,11 +357,9 @@ | |
self.dao.update_upload(transfer) | ||
transfer.is_dirty = False | ||
|
||
# Handle status changes every time a chunk is sent | ||
_transfer = self.get_upload( | ||
if _transfer := self.get_upload( | ||
doc_pair=transfer.doc_pair, path=transfer.path | ||
) | ||
if _transfer: | ||
): | ||
self._handle_transfer_status(_transfer) | ||
else: | ||
uploader.upload() | ||
|
@@ -416,7 +414,7 @@ | |
self._set_transfer_status(transfer, TransferStatus.ONGOING) | ||
raise exc | ||
|
||
def link_blob_to_doc( # type: ignore[return] | ||
def link_blob_to_doc( | ||
self, | ||
command: str, | ||
transfer: Upload, | ||
|
@@ -451,15 +449,19 @@ | |
kwargs["headers"] = headers | ||
try: | ||
doc_type = kwargs.get("doc_type", "") | ||
if transfer.is_direct_transfer and doc_type and doc_type != "": | ||
res = self._transfer_docType_file(transfer, headers, doc_type) | ||
else: | ||
res = self._transfer_autoType_file(command, blob, kwargs) | ||
|
||
return res | ||
return ( | ||
self._transfer_docType_file(transfer, headers, doc_type) | ||
if transfer.is_direct_transfer and doc_type and doc_type != "" | ||
else self._transfer_autoType_file(command, blob, kwargs) | ||
) | ||
except Exception as exc: | ||
err = f"Error while linking blob to doc: {exc!r}" | ||
log.warning(err) | ||
action.finalizing_status = "Error" | ||
if "TCPKeepAliveHTTPSConnectionPool" not in str(exc): | ||
transfer.request_uid = str(uuid4()) | ||
self.dao.update_upload_requestid(transfer) | ||
raise exc | ||
finally: | ||
action.finish_action() | ||
|
||
|
@@ -498,11 +500,10 @@ | |
data=content, | ||
ssl_verify=self.verification_needed, | ||
) | ||
res = self.remote.fetch( | ||
return self.remote.fetch( | ||
f"{self.remote.client.api_path}/path{transfer.remote_parent_path}", | ||
headers=headers, | ||
) | ||
return res | ||
Comment on lines
-501
to
-505
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
@staticmethod | ||
def _complete_upload(transfer: Upload, blob: FileBlob, /) -> None: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -690,7 +690,7 @@ | |
"UPDATE States SET processor = 0 WHERE processor = ?", (processor_id,) | ||
) | ||
log.debug(f"Released processor {processor_id}") | ||
return bool(c.rowcount > 0) | ||
return c.rowcount > 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
def acquire_processor(self, thread_id: int, row_id: int, /) -> bool: | ||
with self.lock: | ||
|
@@ -702,7 +702,7 @@ | |
" AND processor IN (0, ?)", | ||
(thread_id, row_id, thread_id), | ||
) | ||
return bool(c.rowcount == 1) | ||
return c.rowcount == 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
def _reinit_states(self, cursor: Cursor, /) -> None: | ||
cursor.execute("DROP TABLE States") | ||
|
@@ -748,7 +748,7 @@ | |
c.execute(f"{update} WHERE id = ?", ("remotely_deleted", doc_pair.id)) | ||
if doc_pair.folderish: | ||
c.execute( | ||
update + " " + self._get_recursive_remote_condition(doc_pair), | ||
f"{update} {self._get_recursive_remote_condition(doc_pair)}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
("parent_remotely_deleted",), | ||
) | ||
# Only queue parent | ||
|
@@ -781,16 +781,7 @@ | |
) -> int: | ||
digest = None | ||
if not info.folderish: | ||
if is_large_file(info.size): | ||
# We can't compute the digest of big files now as it will | ||
# be done later when the entire file is fully copied. | ||
# For instance, on my machine (32GB RAM, 8 cores, Intel NUC) | ||
# it takes 23 minutes for 100 GB and 7 minute for 50 GB. | ||
# This is way too much effort to compute it several times. | ||
digest = UNACCESSIBLE_HASH | ||
else: | ||
digest = info.get_digest() | ||
|
||
digest = UNACCESSIBLE_HASH if is_large_file(info.size) else info.get_digest() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
This removes the following comments ( why? ):
|
||
with self.lock: | ||
c = self._get_write_connection().cursor() | ||
pair_state = PAIR_STATES[("created", "unknown")] | ||
|
@@ -1406,7 +1397,7 @@ | |
condition = self._get_recursive_remote_condition(doc_pair) | ||
else: | ||
condition = self._get_recursive_condition(doc_pair) | ||
c.execute("DELETE FROM States " + condition) | ||
c.execute(f"DELETE FROM States {condition}") | ||
Comment on lines
-1409
to
+1400
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
def remove_state_children( | ||
self, doc_pair: DocPair, /, *, remote_recursion: bool = False | ||
|
@@ -1417,7 +1408,7 @@ | |
condition = self._get_recursive_remote_condition(doc_pair) | ||
else: | ||
condition = self._get_recursive_condition(doc_pair) | ||
c.execute("DELETE FROM States " + condition) | ||
c.execute(f"DELETE FROM States {condition}") | ||
Comment on lines
-1420
to
+1411
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
def get_state_from_local(self, path: Path, /) -> Optional[DocPair]: | ||
c = self._get_read_connection().cursor() | ||
|
@@ -1485,15 +1476,10 @@ | |
def queue_children(self, row: DocPair, /) -> None: | ||
with self.lock: | ||
c = self._get_write_connection().cursor() | ||
children: List[DocPair] = c.execute( | ||
"SELECT *" | ||
" FROM States" | ||
" WHERE remote_parent_ref = ?" | ||
" OR local_parent_path = ?" | ||
" AND " + self._get_to_sync_condition(), | ||
if children := c.execute( | ||
f"SELECT * FROM States WHERE remote_parent_ref = ? OR local_parent_path = ? AND {self._get_to_sync_condition()}", | ||
(row.remote_ref, row.local_path), | ||
).fetchall() | ||
if children: | ||
).fetchall(): | ||
Comment on lines
-1488
to
+1482
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
log.info(f"Queuing {len(children)} children of {row}") | ||
for child in children: | ||
self._queue_pair_state(child.id, child.folderish, child.pair_state) | ||
|
@@ -1663,7 +1649,7 @@ | |
version, | ||
), | ||
) | ||
result = bool(c.rowcount == 1) | ||
result = c.rowcount == 1 | ||
Comment on lines
-1666
to
+1652
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
# Retry without version for folder | ||
if not result and row.folderish: | ||
|
@@ -1694,7 +1680,7 @@ | |
row.remote_parent_ref, | ||
), | ||
) | ||
result = bool(c.rowcount == 1) | ||
result = c.rowcount == 1 | ||
|
||
if not result: | ||
log.debug(f"Was not able to synchronize state: {row!r}") | ||
|
@@ -1869,7 +1855,7 @@ | |
row = c.execute( | ||
"SELECT COUNT(path) FROM RemoteScan WHERE path = ? LIMIT 1", (path,) | ||
).fetchone() | ||
return bool(row[0] > 0) | ||
return row[0] > 0 | ||
Comment on lines
-1872
to
+1858
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
def is_filter(self, path: str, /) -> bool: | ||
path = self._clean_filter_path(path) | ||
|
@@ -2381,6 +2367,13 @@ | |
sql = "UPDATE Uploads SET batch = ? WHERE uid = ?" | ||
c.execute(sql, (json.dumps(batch), upload.uid)) | ||
|
||
def update_upload_requestid(self, upload: Upload, /) -> None: | ||
"""In case of error during linking, update request_uid for upload""" | ||
with self.lock: | ||
c = self._get_write_connection().cursor() | ||
sql = "UPDATE Uploads SET request_uid = ? WHERE uid = ?" | ||
c.execute(sql, (upload.request_uid, upload.uid)) | ||
|
||
def pause_transfer( | ||
self, | ||
nature: str, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,8 +54,7 @@ def get_current_action(*, thread_id: int = None) -> Optional["Action"]: | |
|
||
@staticmethod | ||
def finish_action() -> None: | ||
action = Action.actions.pop(current_thread_id(), None) | ||
if action: | ||
if action := Action.actions.pop(current_thread_id(), None): | ||
action.finish() | ||
|
||
def finish(self) -> None: | ||
|
@@ -69,9 +68,7 @@ def export(self) -> Dict[str, Any]: | |
} | ||
|
||
def __repr__(self) -> str: | ||
if not self.progress: | ||
return str(self.type) | ||
return f"{self.type}({self.progress}%)" | ||
return f"{self.type}({self.progress}%)" if self.progress else str(self.type) | ||
Comment on lines
-72
to
+71
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
||
|
||
class IdleAction(Action): | ||
|
@@ -131,8 +128,7 @@ def _connect_reporter(self, reporter: Optional[QApplication], /) -> None: | |
return | ||
|
||
for evt in ("started", "progressing", "done"): | ||
signal = getattr(reporter, f"action_{evt}", None) | ||
if signal: | ||
if signal := getattr(reporter, f"action_{evt}", None): | ||
Comment on lines
-134
to
+131
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
getattr(self, evt).connect(signal) | ||
|
||
@property | ||
|
@@ -149,6 +145,15 @@ def progress(self, value: float, /) -> None: | |
|
||
self.progressing.emit(self) | ||
|
||
@property | ||
def finalizing_status(self) -> str: | ||
return self._finalizing_status | ||
|
||
@finalizing_status.setter | ||
def finalizing_status(self, value: str, /) -> None: | ||
self._finalizing_status = value | ||
self.progressing.emit(self) | ||
|
||
def get_percent(self) -> float: | ||
if self.size < 0 or (self.empty and not self.uploaded): | ||
return 0.0 | ||
|
@@ -257,6 +262,13 @@ def __init__( | |
doc_pair=doc_pair, | ||
) | ||
self.progress = size | ||
self.finalizing_status = "" | ||
|
||
def export(self) -> Dict[str, Any]: | ||
return { | ||
**super().export(), | ||
"finalizing_status": self.finalizing_status, | ||
} | ||
|
||
|
||
def tooltip(doing: str): # type: ignore | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function
BaseUploader.upload_chunks
refactored with the following changes:use-named-expression
)This removes the following comments ( why? ):