diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c350c93c7e..ca9c48cabc 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -211,29 +211,17 @@ class MessageHandler(BaseHandler):
)
if source_config.direction == 'b':
- # if we're going backwards, we might need to backfill. This
- # requires that we have a topo token.
- if room_token.topological:
- max_topo = room_token.topological
- else:
- max_topo = yield self.store.get_max_topological_token(
- room_id, room_token.stream
- )
-
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
+
leave_token = yield self.store.get_topological_token_for_event(
- member_event_id
+ member_event_id,
+ )
+ source_config.from_key = yield self.store.clamp_token_before(
+ room_id, source_config.from_key, leave_token,
)
- leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < max_topo:
- source_config.from_key = str(leave_token)
-
- yield self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, max_topo
- )
events, next_key, extremities = yield self.store.paginate_room_events(
room_id=room_id,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 0d32a3a498..00da329924 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -915,6 +915,82 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue((events, token, extremities))
+ def clamp_token_before(self, room_id, token_str, clamp_to_token_str):
+ """For a given room returns the given token if its before
+ clamp_to, otherwise returns clamp_to.
+
+ Args:
+ room_id (str)
+ token_str (str)
+ clamp_to_token_str(str): Must be topological token
+
+ Returns:
+ Deferred[str]
+ """
+
+ token = RoomStreamToken.parse(token_str)
+ clamp_to_token = RoomStreamToken.parse(clamp_to_token_str)
+
+ def clamp_token_before_txn(txn, token):
+ # If we're given a stream ordering, convert to topological token
+ if not token.chunk:
+ row = self._simple_select_one_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "stream_ordering": token.stream,
+ },
+ retcols=("chunk_id", "topological_ordering", "stream_ordering",),
+ )
+ token = RoomStreamToken(*row)
+
+ # If both tokens have chunk_ids, we can use that.
+ if token.chunk and clamp_to_token.chunk:
+ if token.chunk == clamp_to_token.chunk:
+ if token.topological < clamp_to_token.topological:
+ return token_str
+ else:
+ return clamp_to_token_str
+
+ table = ChunkDBOrderedListStore(
+ txn, room_id, self.clock,
+ )
+
+ if table.is_before(token.chunk, clamp_to_token.chunk):
+ return token_str
+ else:
+ return clamp_to_token_str
+
+ # Ok, so we're dealing with events that haven't been chunked yet,
+ # lets just cheat and fallback to depth.
+
+ token_depth = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "stream_ordering": token.stream,
+ },
+ retcol="depth",
+ )
+
+ clamp_depth = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "stream_ordering": clamp_to_token.stream,
+ },
+ retcol="depth",
+ )
+
+ if token_depth < clamp_depth:
+ return token_str
+ else:
+ return clamp_to_token_str
+
+ return self.runInteraction(
+ "clamp_token_before", clamp_token_before_txn, token
+ )
+
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
|