summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2022-01-04 16:10:27 +0000
committerGitHub <noreply@github.com>2022-01-04 16:10:27 +0000
commit2359ee3864a065229c80e3ff58faa981edd24558 (patch)
treeed211ee7846ccef031d5e1b114f9bf2d3ccdc441
parentBetter error messages from `get_create_event_for_room` (#11638) (diff)
downloadsynapse-2359ee3864a065229c80e3ff58faa981edd24558.tar.xz
Remove redundant `get_current_events_token` (#11643)
* Push `get_room_{min,max_stream_ordering}` into StreamStore

Both implementations of this are identical, so we may as well push it down and
get rid of the abstract base class nonsense.

* Remove redundant `StreamStore` class

This is empty now

* Remove redundant `get_current_events_token`

This was an exact duplicate of `get_room_max_stream_ordering`, so let's get rid
of it.

* newsfile
-rw-r--r--changelog.d/11643.misc1
-rw-r--r--synapse/handlers/federation_event.py2
-rw-r--r--synapse/handlers/presence.py2
-rw-r--r--synapse/replication/slave/storage/events.py9
-rw-r--r--synapse/storage/databases/main/__init__.py4
-rw-r--r--synapse/storage/databases/main/events_worker.py4
-rw-r--r--synapse/storage/databases/main/stream.py34
7 files changed, 20 insertions, 36 deletions
diff --git a/changelog.d/11643.misc b/changelog.d/11643.misc
new file mode 100644

index 0000000000..1c3b3071f6 --- /dev/null +++ b/changelog.d/11643.misc
@@ -0,0 +1 @@ +Remove redundant `get_current_events_token` method. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9917613298..d08e48da58 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -1838,7 +1838,7 @@ class FederationEventHandler: The stream ID after which all events have been persisted. """ if not event_and_contexts: - return self._store.get_current_events_token() + return self._store.get_room_max_stream_ordering() instance = self._config.worker.events_shard_config.get_instance(room_id) if instance != self._instance_name: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 454d06c973..c781fefb1b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py
@@ -729,7 +729,7 @@ class PresenceHandler(BasePresenceHandler): # Presence is best effort and quickly heals itself, so lets just always # stream from the current state when we restart. - self._event_pos = self.store.get_current_events_token() + self._event_pos = self.store.get_room_max_stream_ordering() self._event_processing = False async def _on_shutdown(self) -> None: diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 50e7379e83..0f08372694 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py
@@ -80,12 +80,3 @@ class SlavedEventStore( min_curr_state_delta_id, prefilled_cache=curr_state_delta_prefill, ) - - # Cached functions can't be accessed through a class instance so we need - # to reach inside the __dict__ to extract them. - - def get_room_max_stream_ordering(self): - return self._stream_id_gen.get_current_token() - - def get_room_min_stream_ordering(self): - return self._backfill_id_gen.get_current_token() diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index a594223fc6..f024761ba7 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py
@@ -68,7 +68,7 @@ from .session import SessionStore from .signatures import SignatureStore from .state import StateStore from .stats import StatsStore -from .stream import StreamStore +from .stream import StreamWorkerStore from .tags import TagsStore from .transactions import TransactionWorkerStore from .ui_auth import UIAuthStore @@ -87,7 +87,7 @@ class DataStore( RoomStore, RoomBatchStore, RegistrationStore, - StreamStore, + StreamWorkerStore, ProfileStore, PresenceStore, TransactionWorkerStore, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index c7b660ac5a..8d4287045a 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -1383,10 +1383,6 @@ class EventsWorkerStore(SQLBaseStore): return {"v1": complexity_v1} - def get_current_events_token(self) -> int: - """The current maximum token that events have reached""" - return self._stream_id_gen.get_current_token() - async def get_all_new_forward_event_rows( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b0642ca69f..319464b1fa 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -34,7 +34,7 @@ what sort order was used: - topological tokems: "t%d-%d", where the integers map to the topological and stream ordering columns respectively. """ -import abc + import logging from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple @@ -336,12 +336,7 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: return " AND ".join(clauses), args -class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): - """This is an abstract base class where subclasses must implement - `get_room_max_stream_ordering` and `get_room_min_stream_ordering` - which can be called in the initializer. - """ - +class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def __init__( self, database: DatabasePool, @@ -379,13 +374,22 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): self._stream_order_on_start = self.get_room_max_stream_ordering() - @abc.abstractmethod def get_room_max_stream_ordering(self) -> int: - raise NotImplementedError() + """Get the stream_ordering of regular events that we have committed up to + + Returns the maximum stream id such that all stream ids less than or + equal to it have been successfully persisted. + """ + return self._stream_id_gen.get_current_token() - @abc.abstractmethod def get_room_min_stream_ordering(self) -> int: - raise NotImplementedError() + """Get the stream_ordering of backfilled events that we have committed up to + + Backfilled events use *negative* stream orderings, so this returns the + minimum negative stream id such that all stream ids greater than or + equal to it have been successfully persisted. + """ + return self._backfill_id_gen.get_current_token() def get_room_max_token(self) -> RoomStreamToken: """Get a `RoomStreamToken` that marks the current maximum persisted @@ -1351,11 +1355,3 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): retcol="instance_name", desc="get_name_from_instance_id", ) - - -class StreamStore(StreamWorkerStore): - def get_room_max_stream_ordering(self) -> int: - return self._stream_id_gen.get_current_token() - - def get_room_min_stream_ordering(self) -> int: - return self._backfill_id_gen.get_current_token()