diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 22e19af17f..867fdbefb0 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -20,8 +20,9 @@ from synapse.api.errors import RoomError, SynapseError
from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
+from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
-from synapse.types import UserID
+from synapse.types import UserID, RoomStreamToken
from ._base import BaseHandler
@@ -89,9 +90,19 @@ class MessageHandler(BaseHandler):
if not pagin_config.from_token:
pagin_config.from_token = (
- yield self.hs.get_event_sources().get_current_token()
+ yield self.hs.get_event_sources().get_current_token(
+ direction='b'
+ )
)
+ room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
+ if room_token.topological is None:
+ raise SynapseError(400, "Invalid token")
+
+ yield self.hs.get_handlers().federation_handler.maybe_backfill(
+ room_id, room_token.topological
+ )
+
user = UserID.from_string(user_id)
events, next_key = yield data_source.get_pagination_rows(
@@ -303,7 +314,7 @@ class MessageHandler(BaseHandler):
event.room_id
),
]
- )
+ ).addErrback(unwrapFirstError)
start_token = now_token.copy_and_replace("room_key", token[0])
end_token = now_token.copy_and_replace("room_key", token[1])
@@ -328,7 +339,7 @@ class MessageHandler(BaseHandler):
yield defer.gatherResults(
[handle_room(e) for e in room_list],
consumeErrors=True
- )
+ ).addErrback(unwrapFirstError)
ret = {
"rooms": rooms_ret,
|