diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ea9264e751..9f773aefa7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -74,6 +74,8 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
JsonDict,
MutableStateMap,
+ PersistedEventPosition,
+ RoomStreamToken,
StateMap,
UserID,
get_domain_from_id,
@@ -2956,7 +2958,7 @@ class FederationHandler(BaseHandler):
)
return result["max_stream_id"]
else:
- max_stream_id = await self.storage.persistence.persist_events(
+ max_stream_token = await self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)
@@ -2967,12 +2969,12 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
- await self._notify_persisted_event(event, max_stream_id)
+ await self._notify_persisted_event(event, max_stream_token)
- return max_stream_id
+ return max_stream_token.stream
async def _notify_persisted_event(
- self, event: EventBase, max_stream_id: int
+ self, event: EventBase, max_stream_token: RoomStreamToken
) -> None:
"""Checks to see if notifier/pushers should be notified about the
event or not.
@@ -2998,9 +3000,11 @@ class FederationHandler(BaseHandler):
elif event.internal_metadata.is_outlier():
return
- event_stream_id = event.internal_metadata.stream_ordering
+ event_pos = PersistedEventPosition(
+ self._instance_name, event.internal_metadata.stream_ordering
+ )
self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
+ event, event_pos, max_stream_token, extra_users=extra_users
)
async def _clean_room_for_join(self, room_id: str) -> None:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6ee559fd1d..ee271e85e5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1138,7 +1138,7 @@ class EventCreationHandler:
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
- event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
+ event_pos, max_stream_token = await self.storage.persistence.persist_event(
event, context=context
)
@@ -1149,7 +1149,7 @@ class EventCreationHandler:
def _notify():
try:
self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
+ event, event_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room event")
@@ -1161,7 +1161,7 @@ class EventCreationHandler:
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
- return event_stream_id
+ return event_pos.stream
async def _bump_active_time(self, user: UserID) -> None:
try:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9b3a4f638b..e948efef2e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -967,7 +967,7 @@ class SyncHandler:
raise NotImplementedError()
else:
joined_room_ids = await self.get_rooms_for_user_at(
- user_id, now_token.room_stream_id
+ user_id, now_token.room_key
)
sync_result_builder = SyncResultBuilder(
sync_config,
@@ -1916,7 +1916,7 @@ class SyncHandler:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
async def get_rooms_for_user_at(
- self, user_id: str, stream_ordering: int
+ self, user_id: str, room_key: RoomStreamToken
) -> FrozenSet[str]:
"""Get set of joined rooms for a user at the given stream ordering.
@@ -1942,15 +1942,15 @@ class SyncHandler:
# If the membership's stream ordering is after the given stream
# ordering, we need to go and work out if the user was in the room
# before.
- for room_id, membership_stream_ordering in joined_rooms:
- if membership_stream_ordering <= stream_ordering:
+ for room_id, event_pos in joined_rooms:
+ if not event_pos.persisted_after(room_key):
joined_room_ids.add(room_id)
continue
logger.info("User joined room after current token: %s", room_id)
extrems = await self.store.get_forward_extremeties_for_room(
- room_id, stream_ordering
+ room_id, event_pos.stream
)
users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
if user_id in users_in_room:
|