diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 80e17f0fb0..c52e726661 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -42,7 +42,7 @@ from unpaddedbase64 import encode_base64
from synapse.api.constants import RelationTypes
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
-from synapse.types import JsonDict, RoomStreamToken, StrCollection
+from synapse.types import JsonDict, StrCollection
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
from synapse.util.stringutils import strtobool
@@ -211,13 +211,6 @@ class _EventInternalMetadata:
device_id: DictProperty[str] = DictProperty("device_id")
"""The device ID of the user who sent this event, if any."""
- # XXX: These are set by StreamWorkerStore._set_before_and_after.
- # I'm pretty sure that these are never persisted to the database, so shouldn't
- # be here
- before: DictProperty[RoomStreamToken] = DictProperty("before")
- after: DictProperty[RoomStreamToken] = DictProperty("after")
- order: DictProperty[Tuple[int, int]] = DictProperty("order")
-
def get_dict(self) -> JsonDict:
return dict(self._dict)
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 9a4af3c45f..db80345b94 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -208,7 +208,12 @@ class AdminHandler:
if not events:
break
- from_key = events[-1].internal_metadata.after
+ last_event = events[-1]
+ assert last_event.internal_metadata.stream_ordering
+ from_key = RoomStreamToken(
+ stream=last_event.internal_metadata.stream_ordering,
+ topological=last_event.depth,
+ )
events = await filter_events_for_client(
self._storage_controllers, user_id, events
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index e78e598d5e..41b00a5cf7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1742,13 +1742,19 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
events = list(room_events)
events.extend(e for evs, _ in room_to_events.values() for e in evs)
- events.sort(key=lambda e: e.internal_metadata.order)
+ # We know stream_ordering must be not None here, as its been
+ # persisted, but mypy doesn't know that
+ events.sort(key=lambda e: cast(int, e.internal_metadata.stream_ordering))
if limit:
events[:] = events[:limit]
if events:
- end_key = events[-1].internal_metadata.after
+ last_event = events[-1]
+ assert last_event.internal_metadata.stream_ordering
+ end_key = RoomStreamToken(
+ stream=last_event.internal_metadata.stream_ordering,
+ )
else:
end_key = to_key
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 1152c0158f..0385c04bc2 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -601,7 +601,10 @@ class SyncHandler:
if not limited or block_all_timeline:
prev_batch_token = upto_token
if recents:
- room_key = recents[0].internal_metadata.before
+ assert recents[0].internal_metadata.stream_ordering
+ room_key = RoomStreamToken(
+ stream=recents[0].internal_metadata.stream_ordering - 1
+ )
prev_batch_token = upto_token.copy_and_replace(
StreamKeyType.ROOM, room_key
)
@@ -689,7 +692,10 @@ class SyncHandler:
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
- room_key = recents[0].internal_metadata.before
+ assert recents[0].internal_metadata.stream_ordering
+ room_key = RoomStreamToken(
+ stream=recents[0].internal_metadata.stream_ordering - 1
+ )
prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 1b2a65bed2..aeeb74b46d 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -705,8 +705,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
[r.event_id for r in rows], get_prev_content=True
)
- self._set_before_and_after(ret, rows, topo_order=False)
-
if order.lower() == "desc":
ret.reverse()
@@ -793,8 +791,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
[r.event_id for r in rows], get_prev_content=True
)
- self._set_before_and_after(ret, rows, topo_order=False)
-
return ret
async def get_recent_events_for_room(
@@ -820,8 +816,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
[r.event_id for r in rows], get_prev_content=True
)
- self._set_before_and_after(events, rows)
-
return events, token
async def get_recent_event_ids_for_room(
@@ -1094,31 +1088,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# `[(None,)]`
return rows[0][0] if rows[0][0] is not None else 0
- @staticmethod
- def _set_before_and_after(
- events: List[EventBase], rows: List[_EventDictReturn], topo_order: bool = True
- ) -> None:
- """Inserts ordering information to events' internal metadata from
- the DB rows.
-
- Args:
- events
- rows
- topo_order: Whether the events were ordered topologically or by stream
- ordering. If true then all rows should have a non null
- topological_ordering.
- """
- for event, row in zip(events, rows):
- stream = row.stream_ordering
- if topo_order and row.topological_ordering:
- topo: Optional[int] = row.topological_ordering
- else:
- topo = None
- internal = event.internal_metadata
- internal.before = RoomStreamToken(topological=topo, stream=stream - 1)
- internal.after = RoomStreamToken(topological=topo, stream=stream)
- internal.order = (int(topo) if topo else 0, int(stream))
-
async def get_events_around(
self,
room_id: str,
@@ -1559,8 +1528,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
[r.event_id for r in rows], get_prev_content=True
)
- self._set_before_and_after(events, rows)
-
return events, token
@cached()
|