diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c350c93c7e..ca9c48cabc 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -211,29 +211,17 @@ class MessageHandler(BaseHandler):
)
if source_config.direction == 'b':
- # if we're going backwards, we might need to backfill. This
- # requires that we have a topo token.
- if room_token.topological:
- max_topo = room_token.topological
- else:
- max_topo = yield self.store.get_max_topological_token(
- room_id, room_token.stream
- )
-
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
+
leave_token = yield self.store.get_topological_token_for_event(
- member_event_id
+ member_event_id,
+ )
+ source_config.from_key = yield self.store.clamp_token_before(
+ room_id, source_config.from_key, leave_token,
)
- leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < max_topo:
- source_config.from_key = str(leave_token)
-
- yield self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, max_topo
- )
events, next_key, extremities = yield self.store.paginate_room_events(
room_id=room_id,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 6655bb76f1..65919ac7be 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -905,6 +905,48 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue((events, token, extremities))
+ def clamp_token_before(self, room_id, token, clamp_to):
+ token = RoomStreamToken.parse(token)
+ clamp_to = RoomStreamToken.parse(clamp_to)
+
+ def clamp_token_before_txn(txn, token):
+ if not token.topological:
+ sql = """
+ SELECT chunk_id, topological_ordering FROM events
+ WHERE room_id = ? AND stream_ordering <= ?
+ ORDER BY stream_ordering DESC
+ """
+ txn.execute(sql, (room_id, token.stream,))
+ row = txn.fetchone()
+ if not row:
+ return str(token)
+
+ chunk_id, topo = row
+ token = RoomStreamToken(chunk_id, topo, token.stream)
+
+ if token.chunk == clamp_to.chunk:
+ if token.topological < clamp_to.topological:
+ return str(token)
+ else:
+ return str(clamp_to)
+
+ sql = "SELECT rationale FROM chunk_linearized WHERE chunk_id = ?"
+
+ txn.execute(sql, (token.chunk,))
+ token_order, = txn.fetchone()
+
+ txn.execute(sql, (clamp_to.chunk,))
+ clamp_order, = txn.fetchone()
+
+ if token_order < clamp_order:
+ return str(token)
+ else:
+ return str(clamp_to)
+
+ return self.runInteraction(
+ "clamp_token_before", clamp_token_before_txn, token
+ )
+
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
|