summary refs log tree commit diff
path: root/synapse/storage/persist_events.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-24 13:24:17 +0100
committerGitHub <noreply@github.com>2020-09-24 13:24:17 +0100
commitac11fcbbb8ccfeb4c72b5aae9faef28469109277 (patch)
treee35d6b0e50558eb0d83389bb40f8179c4c254aab /synapse/storage/persist_events.py
parentMark the shadow_banned column as boolean in synapse_port_db. (#8386) (diff)
downloadsynapse-ac11fcbbb8ccfeb4c72b5aae9faef28469109277.tar.xz
Add EventStreamPosition type (#8388)
The idea is to remove some of the places we pass around `int`, where it can represent one of two things:

1. the position of an event in the stream; or
2. a token that partitions the stream, used as part of the stream tokens.

The valid operations are then:

1. did a position happen before or after a token;
2. get all events that happened before or after a token; and
3. get all events between two tokens.

(Note that we don't want to allow other operations as we want to change the tokens to be vector clocks rather than simple ints)
Diffstat (limited to 'synapse/storage/persist_events.py')
-rw-r--r--synapse/storage/persist_events.py14
1 files changed, 9 insertions, 5 deletions
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index d89f6ed128..603cd7d825 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -31,7 +31,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.databases import Databases
 from synapse.storage.databases.main.events import DeltaState
-from synapse.types import Collection, StateMap
+from synapse.types import Collection, PersistedEventPosition, RoomStreamToken, StateMap
 from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.metrics import Measure
 
@@ -190,6 +190,7 @@ class EventsPersistenceStorage:
         self.persist_events_store = stores.persist_events
 
         self._clock = hs.get_clock()
+        self._instance_name = hs.get_instance_name()
         self.is_mine_id = hs.is_mine_id
         self._event_persist_queue = _EventPeristenceQueue()
         self._state_resolution_handler = hs.get_state_resolution_handler()
@@ -198,7 +199,7 @@ class EventsPersistenceStorage:
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
         backfilled: bool = False,
-    ) -> int:
+    ) -> RoomStreamToken:
         """
         Write events to the database
         Args:
@@ -228,11 +229,11 @@ class EventsPersistenceStorage:
             defer.gatherResults(deferreds, consumeErrors=True)
         )
 
-        return self.main_store.get_current_events_token()
+        return RoomStreamToken(None, self.main_store.get_current_events_token())
 
     async def persist_event(
         self, event: EventBase, context: EventContext, backfilled: bool = False
-    ) -> Tuple[int, int]:
+    ) -> Tuple[PersistedEventPosition, RoomStreamToken]:
         """
         Returns:
             The stream ordering of `event`, and the stream ordering of the
@@ -247,7 +248,10 @@ class EventsPersistenceStorage:
         await make_deferred_yieldable(deferred)
 
         max_persisted_id = self.main_store.get_current_events_token()
-        return (event.internal_metadata.stream_ordering, max_persisted_id)
+        event_stream_id = event.internal_metadata.stream_ordering
+
+        pos = PersistedEventPosition(self._instance_name, event_stream_id)
+        return pos, RoomStreamToken(None, max_persisted_id)
 
     def _maybe_start_persisting(self, room_id: str):
         async def persisting_queue(item):