From 7469fa7585f9e520344d58947ea617fdfa6bfa62 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Jan 2024 13:03:20 +0000 Subject: [PATCH] Simplify internal metadata class. (#16762) We remove these fields as they're just duplicating data the event already stores, and (for reasons :shushing_face:) I'd like to simplify the class to only store simple types. I'm not entirely convinced that we shouldn't instead add helper methods to the event class to generate stream tokens, but I don't really think that's where they belong either --- changelog.d/16762.misc | 1 + synapse/events/__init__.py | 9 +------ synapse/handlers/admin.py | 7 ++++- synapse/handlers/room.py | 10 +++++-- synapse/handlers/sync.py | 10 +++++-- synapse/storage/databases/main/stream.py | 33 ------------------------ 6 files changed, 24 insertions(+), 46 deletions(-) create mode 100644 changelog.d/16762.misc diff --git a/changelog.d/16762.misc b/changelog.d/16762.misc new file mode 100644 index 00000000000..c49dc2085e0 --- /dev/null +++ b/changelog.d/16762.misc @@ -0,0 +1 @@ +Simplify event internal metadata class. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 80e17f0fb0d..c52e7266617 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -42,7 +42,7 @@ from synapse.api.constants import RelationTypes from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions -from synapse.types import JsonDict, RoomStreamToken, StrCollection +from synapse.types import JsonDict, StrCollection from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze from synapse.util.stringutils import strtobool @@ -211,13 +211,6 @@ def __init__(self, internal_metadata_dict: JsonDict): device_id: DictProperty[str] = DictProperty("device_id") """The device ID of the user who sent this event, if any.""" - # XXX: These are set by StreamWorkerStore._set_before_and_after. - # I'm pretty sure that these are never persisted to the database, so shouldn't - # be here - before: DictProperty[RoomStreamToken] = DictProperty("before") - after: DictProperty[RoomStreamToken] = DictProperty("after") - order: DictProperty[Tuple[int, int]] = DictProperty("order") - def get_dict(self) -> JsonDict: return dict(self._dict) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 9a4af3c45fe..db80345b945 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -208,7 +208,12 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> if not events: break - from_key = events[-1].internal_metadata.after + last_event = events[-1] + assert last_event.internal_metadata.stream_ordering + from_key = RoomStreamToken( + stream=last_event.internal_metadata.stream_ordering, + topological=last_event.depth, + ) events = await filter_events_for_client( self._storage_controllers, user_id, events diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index e78e598d5eb..41b00a5cf7b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1742,13 +1742,19 @@ async def get_new_events( events = list(room_events) events.extend(e for evs, _ in room_to_events.values() for e in evs) - events.sort(key=lambda e: e.internal_metadata.order) + # We know stream_ordering must be not None here, as its been + # persisted, but mypy doesn't know that + events.sort(key=lambda e: cast(int, e.internal_metadata.stream_ordering)) if limit: events[:] = events[:limit] if events: - end_key = events[-1].internal_metadata.after + last_event = events[-1] + assert last_event.internal_metadata.stream_ordering + end_key = RoomStreamToken( + stream=last_event.internal_metadata.stream_ordering, + ) else: end_key = to_key diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1152c0158f7..0385c04bc24 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -601,7 +601,10 @@ async def _load_filtered_recents( if not limited or block_all_timeline: prev_batch_token = upto_token if recents: - room_key = recents[0].internal_metadata.before + assert recents[0].internal_metadata.stream_ordering + room_key = RoomStreamToken( + stream=recents[0].internal_metadata.stream_ordering - 1 + ) prev_batch_token = upto_token.copy_and_replace( StreamKeyType.ROOM, room_key ) @@ -689,7 +692,10 @@ async def _load_filtered_recents( if len(recents) > timeline_limit: limited = True recents = recents[-timeline_limit:] - room_key = recents[0].internal_metadata.before + assert recents[0].internal_metadata.stream_ordering + room_key = RoomStreamToken( + stream=recents[0].internal_metadata.stream_ordering - 1 + ) prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 1b2a65bed2b..aeeb74b46d5 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -705,8 +705,6 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]: [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(ret, rows, topo_order=False) - if order.lower() == "desc": ret.reverse() @@ -793,8 +791,6 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]: [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(ret, rows, topo_order=False) - return ret async def get_recent_events_for_room( @@ -820,8 +816,6 @@ async def get_recent_events_for_room( [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(events, rows) - return events, token async def get_recent_event_ids_for_room( @@ -1094,31 +1088,6 @@ def _get_max_topological_txn(self, txn: LoggingTransaction, room_id: str) -> int # `[(None,)]` return rows[0][0] if rows[0][0] is not None else 0 - @staticmethod - def _set_before_and_after( - events: List[EventBase], rows: List[_EventDictReturn], topo_order: bool = True - ) -> None: - """Inserts ordering information to events' internal metadata from - the DB rows. - - Args: - events - rows - topo_order: Whether the events were ordered topologically or by stream - ordering. If true then all rows should have a non null - topological_ordering. - """ - for event, row in zip(events, rows): - stream = row.stream_ordering - if topo_order and row.topological_ordering: - topo: Optional[int] = row.topological_ordering - else: - topo = None - internal = event.internal_metadata - internal.before = RoomStreamToken(topological=topo, stream=stream - 1) - internal.after = RoomStreamToken(topological=topo, stream=stream) - internal.order = (int(topo) if topo else 0, int(stream)) - async def get_events_around( self, room_id: str, @@ -1559,8 +1528,6 @@ async def paginate_room_events( [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(events, rows) - return events, token @cached()