summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-24 13:24:17 +0100
committerGitHub <noreply@github.com>2020-09-24 13:24:17 +0100
commitac11fcbbb8ccfeb4c72b5aae9faef28469109277 (patch)
treee35d6b0e50558eb0d83389bb40f8179c4c254aab /synapse/handlers
parentMark the shadow_banned column as boolean in synapse_port_db. (#8386) (diff)
downloadsynapse-ac11fcbbb8ccfeb4c72b5aae9faef28469109277.tar.xz
Add EventStreamPosition type (#8388)
The idea is to remove some of the places we pass around `int`, where it can represent one of two things:

1. the position of an event in the stream; or
2. a token that partitions the stream, used as part of the stream tokens.

The valid operations are then:

1. did a position happen before or after a token;
2. get all events that happened before or after a token; and
3. get all events between two tokens.

(Note that we don't want to allow other operations as we want to change the tokens to be vector clocks rather than simple ints)
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py16
-rw-r--r--synapse/handlers/message.py6
-rw-r--r--synapse/handlers/sync.py10
3 files changed, 18 insertions, 14 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ea9264e751..9f773aefa7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -74,6 +74,8 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.types import (
     JsonDict,
     MutableStateMap,
+    PersistedEventPosition,
+    RoomStreamToken,
     StateMap,
     UserID,
     get_domain_from_id,
@@ -2956,7 +2958,7 @@ class FederationHandler(BaseHandler):
             )
             return result["max_stream_id"]
         else:
-            max_stream_id = await self.storage.persistence.persist_events(
+            max_stream_token = await self.storage.persistence.persist_events(
                 event_and_contexts, backfilled=backfilled
             )
 
@@ -2967,12 +2969,12 @@ class FederationHandler(BaseHandler):
 
             if not backfilled:  # Never notify for backfilled events
                 for event, _ in event_and_contexts:
-                    await self._notify_persisted_event(event, max_stream_id)
+                    await self._notify_persisted_event(event, max_stream_token)
 
-            return max_stream_id
+            return max_stream_token.stream
 
     async def _notify_persisted_event(
-        self, event: EventBase, max_stream_id: int
+        self, event: EventBase, max_stream_token: RoomStreamToken
     ) -> None:
         """Checks to see if notifier/pushers should be notified about the
         event or not.
@@ -2998,9 +3000,11 @@ class FederationHandler(BaseHandler):
         elif event.internal_metadata.is_outlier():
             return
 
-        event_stream_id = event.internal_metadata.stream_ordering
+        event_pos = PersistedEventPosition(
+            self._instance_name, event.internal_metadata.stream_ordering
+        )
         self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id, extra_users=extra_users
+            event, event_pos, max_stream_token, extra_users=extra_users
         )
 
     async def _clean_room_for_join(self, room_id: str) -> None:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6ee559fd1d..ee271e85e5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1138,7 +1138,7 @@ class EventCreationHandler:
             if prev_state_ids:
                 raise AuthError(403, "Changing the room create event is forbidden")
 
-        event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
+        event_pos, max_stream_token = await self.storage.persistence.persist_event(
             event, context=context
         )
 
@@ -1149,7 +1149,7 @@ class EventCreationHandler:
         def _notify():
             try:
                 self.notifier.on_new_room_event(
-                    event, event_stream_id, max_stream_id, extra_users=extra_users
+                    event, event_pos, max_stream_token, extra_users=extra_users
                 )
             except Exception:
                 logger.exception("Error notifying about new room event")
@@ -1161,7 +1161,7 @@ class EventCreationHandler:
             # matters as sometimes presence code can take a while.
             run_in_background(self._bump_active_time, requester.user)
 
-        return event_stream_id
+        return event_pos.stream
 
     async def _bump_active_time(self, user: UserID) -> None:
         try:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9b3a4f638b..e948efef2e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -967,7 +967,7 @@ class SyncHandler:
             raise NotImplementedError()
         else:
             joined_room_ids = await self.get_rooms_for_user_at(
-                user_id, now_token.room_stream_id
+                user_id, now_token.room_key
             )
         sync_result_builder = SyncResultBuilder(
             sync_config,
@@ -1916,7 +1916,7 @@ class SyncHandler:
             raise Exception("Unrecognized rtype: %r", room_builder.rtype)
 
     async def get_rooms_for_user_at(
-        self, user_id: str, stream_ordering: int
+        self, user_id: str, room_key: RoomStreamToken
     ) -> FrozenSet[str]:
         """Get set of joined rooms for a user at the given stream ordering.
 
@@ -1942,15 +1942,15 @@ class SyncHandler:
         # If the membership's stream ordering is after the given stream
         # ordering, we need to go and work out if the user was in the room
         # before.
-        for room_id, membership_stream_ordering in joined_rooms:
-            if membership_stream_ordering <= stream_ordering:
+        for room_id, event_pos in joined_rooms:
+            if not event_pos.persisted_after(room_key):
                 joined_room_ids.add(room_id)
                 continue
 
             logger.info("User joined room after current token: %s", room_id)
 
             extrems = await self.store.get_forward_extremeties_for_room(
-                room_id, stream_ordering
+                room_id, event_pos.stream
             )
             users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
             if user_id in users_in_room: