diff options
author | Erik Johnston <erik@matrix.org> | 2018-05-21 17:43:03 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-05-31 10:18:40 +0100 |
commit | 76879578ea01a74df2666d8ba15155500ee49d6f (patch) | |
tree | 7a48e3bd21b56d241a8cccc0755644ce6213f6c0 | |
parent | Implement pagination using chunks (diff) | |
download | synapse-76879578ea01a74df2666d8ba15155500ee49d6f.tar.xz |
Fix clamp leave and disable backfill
-rw-r--r-- | synapse/handlers/message.py | 22 | ||||
-rw-r--r-- | synapse/storage/stream.py | 42 |
2 files changed, 47 insertions, 17 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c350c93c7e..ca9c48cabc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -211,29 +211,17 @@ class MessageHandler(BaseHandler): ) if source_config.direction == 'b': - # if we're going backwards, we might need to backfill. This - # requires that we have a topo token. - if room_token.topological: - max_topo = room_token.topological - else: - max_topo = yield self.store.get_max_topological_token( - room_id, room_token.stream - ) - if membership == Membership.LEAVE: # If they have left the room then clamp the token to be before # they left the room, to save the effort of loading from the # database. + leave_token = yield self.store.get_topological_token_for_event( - member_event_id + member_event_id, + ) + source_config.from_key = yield self.store.clamp_token_before( + room_id, source_config.from_key, leave_token, ) - leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < max_topo: - source_config.from_key = str(leave_token) - - yield self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, max_topo - ) events, next_key, extremities = yield self.store.paginate_room_events( room_id=room_id, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6655bb76f1..65919ac7be 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -905,6 +905,48 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue((events, token, extremities)) + def clamp_token_before(self, room_id, token, clamp_to): + token = RoomStreamToken.parse(token) + clamp_to = RoomStreamToken.parse(clamp_to) + + def clamp_token_before_txn(txn, token): + if not token.topological: + sql = """ + SELECT chunk_id, topological_ordering FROM events + WHERE room_id = ? AND stream_ordering <= ? + ORDER BY stream_ordering DESC + """ + txn.execute(sql, (room_id, token.stream,)) + row = txn.fetchone() + if not row: + return str(token) + + chunk_id, topo = row + token = RoomStreamToken(chunk_id, topo, token.stream) + + if token.chunk == clamp_to.chunk: + if token.topological < clamp_to.topological: + return str(token) + else: + return str(clamp_to) + + sql = "SELECT rationale FROM chunk_linearized WHERE chunk_id = ?" + + txn.execute(sql, (token.chunk,)) + token_order, = txn.fetchone() + + txn.execute(sql, (clamp_to.chunk,)) + clamp_order, = txn.fetchone() + + if token_order < clamp_order: + return str(token) + else: + return str(clamp_to) + + return self.runInteraction( + "clamp_token_before", clamp_token_before_txn, token + ) + class StreamStore(StreamWorkerStore): def get_room_max_stream_ordering(self): |