summary refs log tree commit diff
diff options
context:
space:
mode:
authorNick Mills-Barrett <nick@beeper.com>2022-07-21 12:51:30 +0200
committerGitHub <noreply@github.com>2022-07-21 11:51:30 +0100
commit190f49d8aba3b18bb9b9c2cd8352dc9b402d6bbf (patch)
treeda19b669036987ba26fb1948af66c3b6356a3e66
parentMerge branch 'master' into develop (diff)
downloadsynapse-190f49d8aba3b18bb9b9c2cd8352dc9b402d6bbf.tar.xz
Use cache store remove base slaved (#13329)
This comes from two identical definitions in each of the base stores, and means the base slaved store is now empty and can be removed.
-rw-r--r--changelog.d/13329.misc1
-rw-r--r--synapse/app/admin_cmd.py2
-rw-r--r--synapse/app/generic_worker.py2
-rw-r--r--synapse/replication/slave/storage/_base.py58
-rw-r--r--synapse/replication/slave/storage/account_data.py3
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py3
-rw-r--r--synapse/replication/slave/storage/devices.py3
-rw-r--r--synapse/replication/slave/storage/directory.py4
-rw-r--r--synapse/replication/slave/storage/events.py3
-rw-r--r--synapse/replication/slave/storage/filtering.py5
-rw-r--r--synapse/replication/slave/storage/profile.py3
-rw-r--r--synapse/replication/slave/storage/pushers.py3
-rw-r--r--synapse/replication/slave/storage/receipts.py4
-rw-r--r--synapse/replication/slave/storage/registration.py4
-rw-r--r--synapse/storage/databases/main/__init__.py29
-rw-r--r--synapse/storage/databases/main/cache.py26
16 files changed, 39 insertions, 114 deletions
diff --git a/changelog.d/13329.misc b/changelog.d/13329.misc
new file mode 100644
index 0000000000..4df9a9f6d7
--- /dev/null
+++ b/changelog.d/13329.misc
@@ -0,0 +1 @@
+Remove old base slaved store and de-duplicate cache ID generators. Contributed by Nick @ Beeper (@fizzadar).
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index 87f82bd9a5..53ec33bcd1 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -28,7 +28,6 @@ from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.events import EventBase
 from synapse.handlers.admin import ExfiltrationWriter
-from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
@@ -58,7 +57,6 @@ class AdminCmdSlavedStore(
     SlavedDeviceStore,
     SlavedPushRuleStore,
     SlavedEventStore,
-    BaseSlavedStore,
     RoomWorkerStore,
 ):
     def __init__(
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 4a987fb759..0c16584abc 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -48,7 +48,6 @@ from synapse.http.site import SynapseRequest, SynapseSite
 from synapse.logging.context import LoggingContext
 from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
-from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
@@ -251,7 +250,6 @@ class GenericWorkerSlavedStore(
     TransactionWorkerStore,
     LockStore,
     SessionStore,
-    BaseSlavedStore,
 ):
     # Properties that multiple storage classes define. Tell mypy what the
     # expected type is.
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
deleted file mode 100644
index 7644146dba..0000000000
--- a/synapse/replication/slave/storage/_base.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from typing import TYPE_CHECKING, Optional
-
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
-from synapse.storage.engines import PostgresEngine
-from synapse.storage.util.id_generators import MultiWriterIdGenerator
-
-if TYPE_CHECKING:
-    from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-class BaseSlavedStore(CacheInvalidationWorkerStore):
-    def __init__(
-        self,
-        database: DatabasePool,
-        db_conn: LoggingDatabaseConnection,
-        hs: "HomeServer",
-    ):
-        super().__init__(database, db_conn, hs)
-        if isinstance(self.database_engine, PostgresEngine):
-            self._cache_id_gen: Optional[
-                MultiWriterIdGenerator
-            ] = MultiWriterIdGenerator(
-                db_conn,
-                database,
-                stream_name="caches",
-                instance_name=hs.get_instance_name(),
-                tables=[
-                    (
-                        "cache_invalidation_stream_by_instance",
-                        "instance_name",
-                        "stream_id",
-                    )
-                ],
-                sequence_name="cache_invalidation_stream_seq",
-                writers=[],
-            )
-        else:
-            self._cache_id_gen = None
-
-        self.hs = hs
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index ee74ee7d85..57d3237981 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -13,10 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.storage.databases.main.account_data import AccountDataWorkerStore
 from synapse.storage.databases.main.tags import TagsWorkerStore
 
 
-class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
+class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore):
     pass
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index e940751084..df9e4d8f45 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -12,9 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
 
 
-class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
+class SlavedDeviceInboxStore(DeviceInboxWorkerStore):
     pass
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index a48cc02069..6fcade510a 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -14,7 +14,6 @@
 
 from typing import TYPE_CHECKING, Any, Iterable
 
-from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
 from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
@@ -24,7 +23,7 @@ if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
-class SlavedDeviceStore(DeviceWorkerStore, BaseSlavedStore):
+class SlavedDeviceStore(DeviceWorkerStore):
     def __init__(
         self,
         database: DatabasePool,
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
index 71fde0c96c..ca716df3df 100644
--- a/synapse/replication/slave/storage/directory.py
+++ b/synapse/replication/slave/storage/directory.py
@@ -14,8 +14,6 @@
 
 from synapse.storage.databases.main.directory import DirectoryWorkerStore
 
-from ._base import BaseSlavedStore
 
-
-class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
+class DirectoryStore(DirectoryWorkerStore):
     pass
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index a72dad7464..fe47778cb1 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -29,8 +29,6 @@ from synapse.storage.databases.main.stream import StreamWorkerStore
 from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-from ._base import BaseSlavedStore
-
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
@@ -56,7 +54,6 @@ class SlavedEventStore(
     EventsWorkerStore,
     UserErasureWorkerStore,
     RelationsWorkerStore,
-    BaseSlavedStore,
 ):
     def __init__(
         self,
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
index 4d185e2b56..c52679cd60 100644
--- a/synapse/replication/slave/storage/filtering.py
+++ b/synapse/replication/slave/storage/filtering.py
@@ -14,16 +14,15 @@
 
 from typing import TYPE_CHECKING
 
+from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
 from synapse.storage.databases.main.filtering import FilteringStore
 
-from ._base import BaseSlavedStore
-
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
-class SlavedFilteringStore(BaseSlavedStore):
+class SlavedFilteringStore(SQLBaseStore):
     def __init__(
         self,
         database: DatabasePool,
diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py
index 99f4a22642..a774a2ff48 100644
--- a/synapse/replication/slave/storage/profile.py
+++ b/synapse/replication/slave/storage/profile.py
@@ -12,9 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.storage.databases.main.profile import ProfileWorkerStore
 
 
-class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore):
+class SlavedProfileStore(ProfileWorkerStore):
     pass
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index de642bba71..44ed20e424 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -18,14 +18,13 @@ from synapse.replication.tcp.streams import PushersStream
 from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
 from synapse.storage.databases.main.pusher import PusherWorkerStore
 
-from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
-class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
+class SlavedPusherStore(PusherWorkerStore):
     def __init__(
         self,
         database: DatabasePool,
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index 3826b87dec..407862a2b2 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -15,8 +15,6 @@
 
 from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
 
-from ._base import BaseSlavedStore
 
-
-class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
+class SlavedReceiptsStore(ReceiptsWorkerStore):
     pass
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
index 5dae35a960..52c593e59d 100644
--- a/synapse/replication/slave/storage/registration.py
+++ b/synapse/replication/slave/storage/registration.py
@@ -14,8 +14,6 @@
 
 from synapse.storage.databases.main.registration import RegistrationWorkerStore
 
-from ._base import BaseSlavedStore
 
-
-class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore):
+class SlavedRegistrationStore(RegistrationWorkerStore):
     pass
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index a3d31d3737..4dccbb732a 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -24,9 +24,9 @@ from synapse.storage.database import (
     LoggingTransaction,
 )
 from synapse.storage.databases.main.stats import UserSortOrder
-from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+from synapse.storage.engines import BaseDatabaseEngine
 from synapse.storage.types import Cursor
-from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
+from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.types import JsonDict, get_domain_from_id
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
@@ -149,31 +149,6 @@ class DataStore(
             ],
         )
 
-        self._cache_id_gen: Optional[MultiWriterIdGenerator]
-        if isinstance(self.database_engine, PostgresEngine):
-            # We set the `writers` to an empty list here as we don't care about
-            # missing updates over restarts, as we'll not have anything in our
-            # caches to invalidate. (This reduces the amount of writes to the DB
-            # that happen).
-            self._cache_id_gen = MultiWriterIdGenerator(
-                db_conn,
-                database,
-                stream_name="caches",
-                instance_name=hs.get_instance_name(),
-                tables=[
-                    (
-                        "cache_invalidation_stream_by_instance",
-                        "instance_name",
-                        "stream_id",
-                    )
-                ],
-                sequence_name="cache_invalidation_stream_seq",
-                writers=[],
-            )
-
-        else:
-            self._cache_id_gen = None
-
         super().__init__(database, db_conn, hs)
 
         events_max = self._stream_id_gen.get_current_token()
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2367ddeea3..12e9a42382 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -32,6 +32,7 @@ from synapse.storage.database import (
     LoggingTransaction,
 )
 from synapse.storage.engines import PostgresEngine
+from synapse.storage.util.id_generators import MultiWriterIdGenerator
 from synapse.util.caches.descriptors import _CachedFunction
 from synapse.util.iterutils import batch_iter
 
@@ -65,6 +66,31 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             psql_only=True,  # The table is only on postgres DBs.
         )
 
+        self._cache_id_gen: Optional[MultiWriterIdGenerator]
+        if isinstance(self.database_engine, PostgresEngine):
+            # We set the `writers` to an empty list here as we don't care about
+            # missing updates over restarts, as we'll not have anything in our
+            # caches to invalidate. (This reduces the amount of writes to the DB
+            # that happen).
+            self._cache_id_gen = MultiWriterIdGenerator(
+                db_conn,
+                database,
+                stream_name="caches",
+                instance_name=hs.get_instance_name(),
+                tables=[
+                    (
+                        "cache_invalidation_stream_by_instance",
+                        "instance_name",
+                        "stream_id",
+                    )
+                ],
+                sequence_name="cache_invalidation_stream_seq",
+                writers=[],
+            )
+
+        else:
+            self._cache_id_gen = None
+
     async def get_all_updated_caches(
         self, instance_name: str, last_id: int, current_id: int, limit: int
     ) -> Tuple[List[Tuple[int, tuple]], int, bool]: