summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-10 16:10:01 +0100
committerErik Johnston <erik@matrix.org>2020-09-10 16:10:01 +0100
commitd9a29506b6b2e68aeafeff6ca2ef003792194448 (patch)
treee775c83dda4209e636afbb85c5de72b81ff15168
parentNewsfile (diff)
downloadsynapse-erikj/stream_position.tar.xz
-rw-r--r--synapse/handlers/federation.py16
-rw-r--r--synapse/handlers/message.py6
-rw-r--r--synapse/notifier.py55
-rw-r--r--synapse/replication/tcp/client.py12
-rw-r--r--synapse/storage/persist_events.py14
-rw-r--r--synapse/types.py15
6 files changed, 75 insertions, 43 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c195eba830..41b8942f53 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -74,6 +74,8 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.types import (
     JsonDict,
     MutableStateMap,
+    PersistedEventPosition,
+    RoomStreamToken,
     StateMap,
     UserID,
     get_domain_from_id,
@@ -2891,7 +2893,7 @@ class FederationHandler(BaseHandler):
             )
             return result["max_stream_id"]
         else:
-            max_stream_id = await self.storage.persistence.persist_events(
+            max_stream_token = await self.storage.persistence.persist_events(
                 event_and_contexts, backfilled=backfilled
             )
 
@@ -2902,12 +2904,12 @@ class FederationHandler(BaseHandler):
 
             if not backfilled:  # Never notify for backfilled events
                 for event, _ in event_and_contexts:
-                    await self._notify_persisted_event(event, max_stream_id)
+                    await self._notify_persisted_event(event, max_stream_token)
 
-            return max_stream_id
+            return max_stream_token.stream
 
     async def _notify_persisted_event(
-        self, event: EventBase, max_stream_id: int
+        self, event: EventBase, max_stream_token: RoomStreamToken
     ) -> None:
         """Checks to see if notifier/pushers should be notified about the
         event or not.
@@ -2933,9 +2935,11 @@ class FederationHandler(BaseHandler):
         elif event.internal_metadata.is_outlier():
             return
 
-        event_stream_id = event.internal_metadata.stream_ordering
+        event_pos = PersistedEventPosition(
+            self._instance_name, event.internal_metadata.stream_ordering
+        )
         self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id, extra_users=extra_users
+            event, event_pos, max_stream_token, extra_users=extra_users
         )
 
     async def _clean_room_for_join(self, room_id: str) -> None:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e54e2b322b..cca457883d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1136,7 +1136,7 @@ class EventCreationHandler:
             if prev_state_ids:
                 raise AuthError(403, "Changing the room create event is forbidden")
 
-        event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
+        event_pos, max_stream_token = await self.storage.persistence.persist_event(
             event, context=context
         )
 
@@ -1147,7 +1147,7 @@ class EventCreationHandler:
         def _notify():
             try:
                 self.notifier.on_new_room_event(
-                    event, event_stream_id, max_stream_id, extra_users=extra_users
+                    event, event_pos, max_stream_token, extra_users=extra_users
                 )
             except Exception:
                 logger.exception("Error notifying about new room event")
@@ -1159,7 +1159,7 @@ class EventCreationHandler:
             # matters as sometimes presence code can take a while.
             run_in_background(self._bump_active_time, requester.user)
 
-        return event_stream_id
+        return event_pos.stream
 
     async def _bump_active_time(self, user: UserID) -> None:
         try:
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 12cd84b27b..8c9588b5ff 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -42,7 +42,13 @@ from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.streams.config import PaginationConfig
-from synapse.types import Collection, RoomStreamToken, StreamToken, UserID
+from synapse.types import (
+    Collection,
+    PersistedEventPosition,
+    RoomStreamToken,
+    StreamToken,
+    UserID,
+)
 from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
 from synapse.util.metrics import Measure
 from synapse.visibility import filter_events_for_client
@@ -189,7 +195,7 @@ class Notifier:
         self.store = hs.get_datastore()
         self.pending_new_room_events = (
             []
-        )  # type: List[Tuple[int, EventBase, Collection[UserID]]]
+        )  # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
 
         # Called when there are new things to stream over replication
         self.replication_callbacks = []  # type: List[Callable[[], None]]
@@ -248,8 +254,8 @@ class Notifier:
     def on_new_room_event(
         self,
         event: EventBase,
-        room_stream_id: int,
-        max_room_stream_id: int,
+        event_pos: PersistedEventPosition,
+        max_room_stream_token: RoomStreamToken,
         extra_users: Collection[UserID] = [],
     ):
         """ Used by handlers to inform the notifier something has happened
@@ -263,16 +269,16 @@ class Notifier:
         until all previous events have been persisted before notifying
         the client streams.
         """
-        self.pending_new_room_events.append((room_stream_id, event, extra_users))
-        self._notify_pending_new_room_events(max_room_stream_id)
+        self.pending_new_room_events.append((event_pos, event, extra_users))
+        self._notify_pending_new_room_events(max_room_stream_token)
 
         self.notify_replication()
 
-    def _notify_pending_new_room_events(self, max_room_stream_id: int):
+    def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken):
         """Notify for the room events that were queued waiting for a previous
         event to be persisted.
         Args:
-            max_room_stream_id: The highest stream_id below which all
+            max_room_stream_token: The highest stream_id below which all
                 events have been persisted.
         """
         pending = self.pending_new_room_events
@@ -281,11 +287,9 @@ class Notifier:
         users = set()  # type: Set[UserID]
         rooms = set()  # type: Set[str]
 
-        for room_stream_id, event, extra_users in pending:
-            if room_stream_id > max_room_stream_id:
-                self.pending_new_room_events.append(
-                    (room_stream_id, event, extra_users)
-                )
+        for event_pos, event, extra_users in pending:
+            if event_pos.persisted_after(max_room_stream_token):
+                self.pending_new_room_events.append((event_pos, event, extra_users))
             else:
                 if (
                     event.type == EventTypes.Member
@@ -298,39 +302,38 @@ class Notifier:
 
         if users or rooms:
             self.on_new_event(
-                "room_key",
-                RoomStreamToken(None, max_room_stream_id),
-                users=users,
-                rooms=rooms,
+                "room_key", max_room_stream_token, users=users, rooms=rooms,
             )
-            self._on_updated_room_token(max_room_stream_id)
+            self._on_updated_room_token(max_room_stream_token)
 
-    def _on_updated_room_token(self, max_room_stream_id: int):
+    def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
         """Poke services that might care that the room position has been
         updated.
         """
 
         # poke any interested application service.
         run_as_background_process(
-            "_notify_app_services", self._notify_app_services, max_room_stream_id
+            "_notify_app_services", self._notify_app_services, max_room_stream_token
         )
 
         run_as_background_process(
-            "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id
+            "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
         )
 
         if self.federation_sender:
-            self.federation_sender.notify_new_events(max_room_stream_id)
+            self.federation_sender.notify_new_events(max_room_stream_token.stream)
 
-    async def _notify_app_services(self, max_room_stream_id: int):
+    async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
         try:
-            await self.appservice_handler.notify_interested_services(max_room_stream_id)
+            await self.appservice_handler.notify_interested_services(
+                max_room_stream_token.stream
+            )
         except Exception:
             logger.exception("Error notifying application services of event")
 
-    async def _notify_pusher_pool(self, max_room_stream_id: int):
+    async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
         try:
-            await self._pusher_pool.on_new_notifications(max_room_stream_id)
+            await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
         except Exception:
             logger.exception("Error pusher pool of event")
 
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e82b9e386f..55af3d41ea 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -29,7 +29,7 @@ from synapse.replication.tcp.streams.events import (
     EventsStreamEventRow,
     EventsStreamRow,
 )
-from synapse.types import UserID
+from synapse.types import PersistedEventPosition, RoomStreamToken, UserID
 from synapse.util.async_helpers import timeout_deferred
 from synapse.util.metrics import Measure
 
@@ -151,8 +151,14 @@ class ReplicationDataHandler:
                 extra_users = ()  # type: Tuple[UserID, ...]
                 if event.type == EventTypes.Member:
                     extra_users = (UserID.from_string(event.state_key),)
-                max_token = self.store.get_room_max_stream_ordering()
-                self.notifier.on_new_room_event(event, token, max_token, extra_users)
+
+                max_token = RoomStreamToken(
+                    None, self.store.get_room_max_stream_ordering()
+                )
+                event_pos = PersistedEventPosition(instance_name, token)
+                self.notifier.on_new_room_event(
+                    event, event_pos, max_token, extra_users
+                )
 
         # Notify any waiting deferreds. The list is ordered by position so we
         # just iterate through the list until we reach a position that is
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index d89f6ed128..603cd7d825 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -31,7 +31,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.databases import Databases
 from synapse.storage.databases.main.events import DeltaState
-from synapse.types import Collection, StateMap
+from synapse.types import Collection, PersistedEventPosition, RoomStreamToken, StateMap
 from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.metrics import Measure
 
@@ -190,6 +190,7 @@ class EventsPersistenceStorage:
         self.persist_events_store = stores.persist_events
 
         self._clock = hs.get_clock()
+        self._instance_name = hs.get_instance_name()
         self.is_mine_id = hs.is_mine_id
         self._event_persist_queue = _EventPeristenceQueue()
         self._state_resolution_handler = hs.get_state_resolution_handler()
@@ -198,7 +199,7 @@ class EventsPersistenceStorage:
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
         backfilled: bool = False,
-    ) -> int:
+    ) -> RoomStreamToken:
         """
         Write events to the database
         Args:
@@ -228,11 +229,11 @@ class EventsPersistenceStorage:
             defer.gatherResults(deferreds, consumeErrors=True)
         )
 
-        return self.main_store.get_current_events_token()
+        return RoomStreamToken(None, self.main_store.get_current_events_token())
 
     async def persist_event(
         self, event: EventBase, context: EventContext, backfilled: bool = False
-    ) -> Tuple[int, int]:
+    ) -> Tuple[PersistedEventPosition, RoomStreamToken]:
         """
         Returns:
             The stream ordering of `event`, and the stream ordering of the
@@ -247,7 +248,10 @@ class EventsPersistenceStorage:
         await make_deferred_yieldable(deferred)
 
         max_persisted_id = self.main_store.get_current_events_token()
-        return (event.internal_metadata.stream_ordering, max_persisted_id)
+        event_stream_id = event.internal_metadata.stream_ordering
+
+        pos = PersistedEventPosition(self._instance_name, event_stream_id)
+        return pos, RoomStreamToken(None, max_persisted_id)
 
     def _maybe_start_persisting(self, room_id: str):
         async def persisting_queue(item):
diff --git a/synapse/types.py b/synapse/types.py
index 4087ffe7b3..fba17ead33 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -496,6 +496,21 @@ class StreamToken:
 StreamToken.START = StreamToken.from_string("s0_0")
 
 
+@attr.s(slots=True, frozen=True)
+class PersistedEventPosition:
+    """Position of a newly persisted event with instance that persisted it.
+
+    This can be used to test whether the event is persisted before or after a
+    RoomStreamToken.
+    """
+
+    instance_name = attr.ib(type=str)
+    stream = attr.ib(type=int)
+
+    def persisted_after(self, token: RoomStreamToken) -> bool:
+        return token.stream < self.stream
+
+
 class ThirdPartyInstanceID(
     namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
 ):