diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index b73ad62147..82c8cb5f0c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import SynapseError, AuthError, Codes
+from synapse.api.errors import AuthError, Codes
from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
@@ -119,9 +119,12 @@ class MessageHandler(BaseHandler):
if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
- if room_token.topological is None:
- raise SynapseError(400, "Invalid token: cannot paginate "
- "backwards from a stream token")
+ if room_token.topological:
+ max_topo = room_token.topological
+ else:
+ max_topo = yield self.store.get_max_topological_token_for_stream_and_room(
+ room_id, room_token.stream
+ )
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
@@ -131,11 +134,11 @@ class MessageHandler(BaseHandler):
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < room_token.topological:
+ if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)
yield self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, room_token.topological
+ room_id, max_topo
)
events, next_key = yield data_source.get_pagination_rows(
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 28721e6994..5096b46864 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -234,10 +234,10 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
- ret.reverse()
-
self._set_before_and_after(ret, rows, topo_order=False)
+ ret.reverse()
+
if rows:
key = "s%d" % min(r["stream_ordering"] for r in rows)
else:
@@ -570,6 +570,18 @@ class StreamStore(SQLBaseStore):
row["topological_ordering"], row["stream_ordering"],)
)
+ def get_max_topological_token_for_stream_and_room(self, room_id, stream_key):
+ sql = (
+ "SELECT max(topological_ordering) FROM events"
+ " WHERE room_id = ? AND stream_ordering < ?"
+ )
+ return self._execute(
+ "get_max_topological_token_for_stream_and_room", None,
+ sql, room_id, stream_key,
+ ).addCallback(
+ lambda r: r[0][0] if r else 0
+ )
+
def _get_max_topological_txn(self, txn):
txn.execute(
"SELECT MAX(topological_ordering) FROM events"
|