diff --git a/changelog.d/11643.misc b/changelog.d/11643.misc
new file mode 100644
index 0000000000..1c3b3071f6
--- /dev/null
+++ b/changelog.d/11643.misc
@@ -0,0 +1 @@
+Remove redundant `get_current_events_token` method.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9917613298..d08e48da58 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -1838,7 +1838,7 @@ class FederationEventHandler:
The stream ID after which all events have been persisted.
"""
if not event_and_contexts:
- return self._store.get_current_events_token()
+ return self._store.get_room_max_stream_ordering()
instance = self._config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 454d06c973..c781fefb1b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -729,7 +729,7 @@ class PresenceHandler(BasePresenceHandler):
# Presence is best effort and quickly heals itself, so lets just always
# stream from the current state when we restart.
- self._event_pos = self.store.get_current_events_token()
+ self._event_pos = self.store.get_room_max_stream_ordering()
self._event_processing = False
async def _on_shutdown(self) -> None:
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 50e7379e83..0f08372694 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -80,12 +80,3 @@ class SlavedEventStore(
min_curr_state_delta_id,
prefilled_cache=curr_state_delta_prefill,
)
-
- # Cached functions can't be accessed through a class instance so we need
- # to reach inside the __dict__ to extract them.
-
- def get_room_max_stream_ordering(self):
- return self._stream_id_gen.get_current_token()
-
- def get_room_min_stream_ordering(self):
- return self._backfill_id_gen.get_current_token()
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index a594223fc6..f024761ba7 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -68,7 +68,7 @@ from .session import SessionStore
from .signatures import SignatureStore
from .state import StateStore
from .stats import StatsStore
-from .stream import StreamStore
+from .stream import StreamWorkerStore
from .tags import TagsStore
from .transactions import TransactionWorkerStore
from .ui_auth import UIAuthStore
@@ -87,7 +87,7 @@ class DataStore(
RoomStore,
RoomBatchStore,
RegistrationStore,
- StreamStore,
+ StreamWorkerStore,
ProfileStore,
PresenceStore,
TransactionWorkerStore,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index c7b660ac5a..8d4287045a 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1383,10 +1383,6 @@ class EventsWorkerStore(SQLBaseStore):
return {"v1": complexity_v1}
- def get_current_events_token(self) -> int:
- """The current maximum token that events have reached"""
- return self._stream_id_gen.get_current_token()
-
async def get_all_new_forward_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b0642ca69f..319464b1fa 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -34,7 +34,7 @@ what sort order was used:
- topological tokems: "t%d-%d", where the integers map to the topological
and stream ordering columns respectively.
"""
-import abc
+
import logging
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
@@ -336,12 +336,7 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
return " AND ".join(clauses), args
-class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
- """This is an abstract base class where subclasses must implement
- `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
- which can be called in the initializer.
- """
-
+class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def __init__(
self,
database: DatabasePool,
@@ -379,13 +374,22 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
self._stream_order_on_start = self.get_room_max_stream_ordering()
- @abc.abstractmethod
def get_room_max_stream_ordering(self) -> int:
- raise NotImplementedError()
+ """Get the stream_ordering of regular events that we have committed up to
+
+ Returns the maximum stream id such that all stream ids less than or
+ equal to it have been successfully persisted.
+ """
+ return self._stream_id_gen.get_current_token()
- @abc.abstractmethod
def get_room_min_stream_ordering(self) -> int:
- raise NotImplementedError()
+ """Get the stream_ordering of backfilled events that we have committed up to
+
+ Backfilled events use *negative* stream orderings, so this returns the
+ minimum negative stream id such that all stream ids greater than or
+ equal to it have been successfully persisted.
+ """
+ return self._backfill_id_gen.get_current_token()
def get_room_max_token(self) -> RoomStreamToken:
"""Get a `RoomStreamToken` that marks the current maximum persisted
@@ -1351,11 +1355,3 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
retcol="instance_name",
desc="get_name_from_instance_id",
)
-
-
-class StreamStore(StreamWorkerStore):
- def get_room_max_stream_ordering(self) -> int:
- return self._stream_id_gen.get_current_token()
-
- def get_room_min_stream_ordering(self) -> int:
- return self._backfill_id_gen.get_current_token()
|