summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation_event.py2
-rw-r--r--synapse/handlers/presence.py2
-rw-r--r--synapse/replication/slave/storage/events.py9
-rw-r--r--synapse/storage/databases/main/__init__.py4
-rw-r--r--synapse/storage/databases/main/events_worker.py4
-rw-r--r--synapse/storage/databases/main/stream.py34
6 files changed, 19 insertions, 36 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9917613298..d08e48da58 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -1838,7 +1838,7 @@ class FederationEventHandler:
             The stream ID after which all events have been persisted.
         """
         if not event_and_contexts:
-            return self._store.get_current_events_token()
+            return self._store.get_room_max_stream_ordering()
 
         instance = self._config.worker.events_shard_config.get_instance(room_id)
         if instance != self._instance_name:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 454d06c973..c781fefb1b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -729,7 +729,7 @@ class PresenceHandler(BasePresenceHandler):
 
         # Presence is best effort and quickly heals itself, so lets just always
         # stream from the current state when we restart.
-        self._event_pos = self.store.get_current_events_token()
+        self._event_pos = self.store.get_room_max_stream_ordering()
         self._event_processing = False
 
     async def _on_shutdown(self) -> None:
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 50e7379e83..0f08372694 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -80,12 +80,3 @@ class SlavedEventStore(
             min_curr_state_delta_id,
             prefilled_cache=curr_state_delta_prefill,
         )
-
-    # Cached functions can't be accessed through a class instance so we need
-    # to reach inside the __dict__ to extract them.
-
-    def get_room_max_stream_ordering(self):
-        return self._stream_id_gen.get_current_token()
-
-    def get_room_min_stream_ordering(self):
-        return self._backfill_id_gen.get_current_token()
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index a594223fc6..f024761ba7 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -68,7 +68,7 @@ from .session import SessionStore
 from .signatures import SignatureStore
 from .state import StateStore
 from .stats import StatsStore
-from .stream import StreamStore
+from .stream import StreamWorkerStore
 from .tags import TagsStore
 from .transactions import TransactionWorkerStore
 from .ui_auth import UIAuthStore
@@ -87,7 +87,7 @@ class DataStore(
     RoomStore,
     RoomBatchStore,
     RegistrationStore,
-    StreamStore,
+    StreamWorkerStore,
     ProfileStore,
     PresenceStore,
     TransactionWorkerStore,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index c7b660ac5a..8d4287045a 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1383,10 +1383,6 @@ class EventsWorkerStore(SQLBaseStore):
 
         return {"v1": complexity_v1}
 
-    def get_current_events_token(self) -> int:
-        """The current maximum token that events have reached"""
-        return self._stream_id_gen.get_current_token()
-
     async def get_all_new_forward_event_rows(
         self, instance_name: str, last_id: int, current_id: int, limit: int
     ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b0642ca69f..319464b1fa 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -34,7 +34,7 @@ what sort order was used:
     - topological tokems: "t%d-%d", where the integers map to the topological
       and stream ordering columns respectively.
 """
-import abc
+
 import logging
 from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
 
@@ -336,12 +336,7 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
     return " AND ".join(clauses), args
 
 
-class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
-    """This is an abstract base class where subclasses must implement
-    `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
-    which can be called in the initializer.
-    """
-
+class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     def __init__(
         self,
         database: DatabasePool,
@@ -379,13 +374,22 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
 
         self._stream_order_on_start = self.get_room_max_stream_ordering()
 
-    @abc.abstractmethod
     def get_room_max_stream_ordering(self) -> int:
-        raise NotImplementedError()
+        """Get the stream_ordering of regular events that we have committed up to
+
+        Returns the maximum stream id such that all stream ids less than or
+        equal to it have been successfully persisted.
+        """
+        return self._stream_id_gen.get_current_token()
 
-    @abc.abstractmethod
     def get_room_min_stream_ordering(self) -> int:
-        raise NotImplementedError()
+        """Get the stream_ordering of backfilled events that we have committed up to
+
+        Backfilled events use *negative* stream orderings, so this returns the
+        minimum negative stream id such that all stream ids greater than or
+        equal to it have been successfully persisted.
+        """
+        return self._backfill_id_gen.get_current_token()
 
     def get_room_max_token(self) -> RoomStreamToken:
         """Get a `RoomStreamToken` that marks the current maximum persisted
@@ -1351,11 +1355,3 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
             retcol="instance_name",
             desc="get_name_from_instance_id",
         )
-
-
-class StreamStore(StreamWorkerStore):
-    def get_room_max_stream_ordering(self) -> int:
-        return self._stream_id_gen.get_current_token()
-
-    def get_room_min_stream_ordering(self) -> int:
-        return self._backfill_id_gen.get_current_token()