summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/message.py22
-rw-r--r--synapse/storage/stream.py76
2 files changed, 81 insertions, 17 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c350c93c7e..ca9c48cabc 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -211,29 +211,17 @@ class MessageHandler(BaseHandler):
             )
 
             if source_config.direction == 'b':
-                # if we're going backwards, we might need to backfill. This
-                # requires that we have a topo token.
-                if room_token.topological:
-                    max_topo = room_token.topological
-                else:
-                    max_topo = yield self.store.get_max_topological_token(
-                        room_id, room_token.stream
-                    )
-
                 if membership == Membership.LEAVE:
                     # If they have left the room then clamp the token to be before
                     # they left the room, to save the effort of loading from the
                     # database.
+
                     leave_token = yield self.store.get_topological_token_for_event(
-                        member_event_id
+                        member_event_id,
+                    )
+                    source_config.from_key = yield self.store.clamp_token_before(
+                        room_id, source_config.from_key, leave_token,
                     )
-                    leave_token = RoomStreamToken.parse(leave_token)
-                    if leave_token.topological < max_topo:
-                        source_config.from_key = str(leave_token)
-
-                yield self.hs.get_handlers().federation_handler.maybe_backfill(
-                    room_id, max_topo
-                )
 
             events, next_key, extremities = yield self.store.paginate_room_events(
                 room_id=room_id,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 0d32a3a498..00da329924 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -915,6 +915,82 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         defer.returnValue((events, token, extremities))
 
+    def clamp_token_before(self, room_id, token_str, clamp_to_token_str):
+        """For a given room returns the given token if its before
+        clamp_to, otherwise returns clamp_to.
+
+        Args:
+            room_id (str)
+            token_str (str)
+            clamp_to_token_str(str): Must be topological token
+
+        Returns:
+            Deferred[str]
+        """
+
+        token = RoomStreamToken.parse(token_str)
+        clamp_to_token = RoomStreamToken.parse(clamp_to_token_str)
+
+        def clamp_token_before_txn(txn, token):
+            # If we're given a stream ordering, convert to topological token
+            if not token.chunk:
+                row = self._simple_select_one_txn(
+                    txn,
+                    table="events",
+                    keyvalues={
+                        "stream_ordering": token.stream,
+                    },
+                    retcols=("chunk_id", "topological_ordering", "stream_ordering",),
+                )
+                token = RoomStreamToken(*row)
+
+            # If both tokens have chunk_ids, we can use that.
+            if token.chunk and clamp_to_token.chunk:
+                if token.chunk == clamp_to_token.chunk:
+                    if token.topological < clamp_to_token.topological:
+                        return token_str
+                    else:
+                        return clamp_to_token_str
+
+                table = ChunkDBOrderedListStore(
+                    txn, room_id, self.clock,
+                )
+
+                if table.is_before(token.chunk, clamp_to_token.chunk):
+                    return token_str
+                else:
+                    return clamp_to_token_str
+
+            # Ok, so we're dealing with events that haven't been chunked yet,
+            # lets just cheat and fallback to depth.
+
+            token_depth = self._simple_select_one_onecol_txn(
+                txn,
+                table="events",
+                keyvalues={
+                    "stream_ordering": token.stream,
+                },
+                retcol="depth",
+            )
+
+            clamp_depth = self._simple_select_one_onecol_txn(
+                txn,
+                table="events",
+                keyvalues={
+                    "stream_ordering": clamp_to_token.stream,
+                },
+                retcol="depth",
+            )
+
+            if token_depth < clamp_depth:
+                return token_str
+            else:
+                return clamp_to_token_str
+
+        return self.runInteraction(
+            "clamp_token_before", clamp_token_before_txn, token
+        )
+
 
 class StreamStore(StreamWorkerStore):
     def get_room_max_stream_ordering(self):