summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-05-21 17:43:03 +0100
committerErik Johnston <erik@matrix.org>2018-05-31 10:18:40 +0100
commit76879578ea01a74df2666d8ba15155500ee49d6f (patch)
tree7a48e3bd21b56d241a8cccc0755644ce6213f6c0
parentImplement pagination using chunks (diff)
downloadsynapse-76879578ea01a74df2666d8ba15155500ee49d6f.tar.xz
Fix clamp leave and disable backfill
-rw-r--r--synapse/handlers/message.py22
-rw-r--r--synapse/storage/stream.py42
2 files changed, 47 insertions, 17 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c350c93c7e..ca9c48cabc 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -211,29 +211,17 @@ class MessageHandler(BaseHandler):
             )
 
             if source_config.direction == 'b':
-                # if we're going backwards, we might need to backfill. This
-                # requires that we have a topo token.
-                if room_token.topological:
-                    max_topo = room_token.topological
-                else:
-                    max_topo = yield self.store.get_max_topological_token(
-                        room_id, room_token.stream
-                    )
-
                 if membership == Membership.LEAVE:
                     # If they have left the room then clamp the token to be before
                     # they left the room, to save the effort of loading from the
                     # database.
+
                     leave_token = yield self.store.get_topological_token_for_event(
-                        member_event_id
+                        member_event_id,
+                    )
+                    source_config.from_key = yield self.store.clamp_token_before(
+                        room_id, source_config.from_key, leave_token,
                     )
-                    leave_token = RoomStreamToken.parse(leave_token)
-                    if leave_token.topological < max_topo:
-                        source_config.from_key = str(leave_token)
-
-                yield self.hs.get_handlers().federation_handler.maybe_backfill(
-                    room_id, max_topo
-                )
 
             events, next_key, extremities = yield self.store.paginate_room_events(
                 room_id=room_id,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 6655bb76f1..65919ac7be 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -905,6 +905,48 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         defer.returnValue((events, token, extremities))
 
+    def clamp_token_before(self, room_id, token, clamp_to):
+        token = RoomStreamToken.parse(token)
+        clamp_to = RoomStreamToken.parse(clamp_to)
+
+        def clamp_token_before_txn(txn, token):
+            if not token.topological:
+                sql = """
+                    SELECT chunk_id, topological_ordering FROM events
+                    WHERE room_id = ? AND stream_ordering <= ?
+                    ORDER BY stream_ordering DESC
+                """
+                txn.execute(sql, (room_id, token.stream,))
+                row = txn.fetchone()
+                if not row:
+                    return str(token)
+
+                chunk_id, topo = row
+                token = RoomStreamToken(chunk_id, topo, token.stream)
+
+            if token.chunk == clamp_to.chunk:
+                if token.topological < clamp_to.topological:
+                    return str(token)
+                else:
+                    return str(clamp_to)
+
+            sql = "SELECT rationale FROM chunk_linearized WHERE chunk_id = ?"
+
+            txn.execute(sql, (token.chunk,))
+            token_order, = txn.fetchone()
+
+            txn.execute(sql, (clamp_to.chunk,))
+            clamp_order, = txn.fetchone()
+
+            if token_order < clamp_order:
+                return str(token)
+            else:
+                return str(clamp_to)
+
+        return self.runInteraction(
+            "clamp_token_before", clamp_token_before_txn, token
+        )
+
 
 class StreamStore(StreamWorkerStore):
     def get_room_max_stream_ordering(self):