diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 72ebac047f..db89491b46 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -83,21 +83,44 @@ class MessageHandler(BaseHandler):
Returns:
dict: Pagination API results
"""
- yield self.auth.check_joined_room(room_id, user_id)
+ member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
data_source = self.hs.get_event_sources().sources["room"]
- if not pagin_config.from_token:
+ if pagin_config.from_token:
+ room_token = pagin_config.from_token.room_key
+ else:
pagin_config.from_token = (
yield self.hs.get_event_sources().get_current_token(
direction='b'
)
)
+ room_token = pagin_config.from_token.room_key
- room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
+ room_token = RoomStreamToken.parse(room_token)
if room_token.topological is None:
raise SynapseError(400, "Invalid token")
+ pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+ "room_key", str(room_token)
+ )
+
+ source_config = pagin_config.get_source_config("room")
+
+ if member_event.membership == Membership.LEAVE:
+ # If they have left the room then clamp the token to be before
+ # they left the room
+ leave_token = yield self.store.get_topological_token_for_event(
+ member_event.event_id
+ )
+ leave_token = RoomStreamToken.parse(leave_token)
+ if leave_token.topological < room_token.topological:
+ source_config.from_key = str(leave_token)
+
+ if source_config.direction == "f":
+ if source_config.to_key is None:
+ source_config.to_key = str(leave_token)
+
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, room_token.topological
)
@@ -105,7 +128,7 @@ class MessageHandler(BaseHandler):
user = UserID.from_string(user_id)
events, next_key = yield data_source.get_pagination_rows(
- user, pagin_config.get_source_config("room"), room_id
+ user, source_config, room_id
)
next_token = pagin_config.from_token.copy_and_replace(
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 5763c462af..3cab06fdef 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -386,7 +386,24 @@ class StreamStore(SQLBaseStore):
table="events",
keyvalues={"event_id": event_id},
retcol="stream_ordering",
- ).addCallback(lambda stream_ordering: "s%d" % (stream_ordering,))
+ ).addCallback(lambda row: "s%d" % (row,))
+
+ def get_topological_token_for_event(self, event_id):
+ """The stream token for an event
+ Args:
+ event_id(str): The id of the event to look up a stream token for.
+ Raises:
+ StoreError if the event wasn't in the database.
+ Returns:
+ A deferred "t%d-%d" topological token.
+ """
+ return self._simple_select_one(
+ table="events",
+ keyvalues={"event_id": event_id},
+ retcols=("stream_ordering", "topological_ordering"),
+ ).addCallback(lambda row: "t%d-%d" % (
+ row["topological_ordering"], row["stream_ordering"],)
+ )
def _get_max_topological_txn(self, txn):
txn.execute(
|