diff --git a/changelog.d/14375.misc b/changelog.d/14375.misc
new file mode 100644
index 0000000000..d0369b9b8c
--- /dev/null
+++ b/changelog.d/14375.misc
@@ -0,0 +1 @@
+Cleanup old worker datastore classes. Contributed by Nick @ Beeper (@fizzadar).
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index 3c8c00ea5b..165d1c5db0 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -28,10 +28,6 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.events import EventBase
from synapse.handlers.admin import ExfiltrationWriter
-from synapse.replication.slave.storage.devices import SlavedDeviceStore
-from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.filtering import SlavedFilteringStore
-from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.server import HomeServer
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
@@ -40,10 +36,24 @@ from synapse.storage.databases.main.appservice import (
ApplicationServiceWorkerStore,
)
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
+from synapse.storage.databases.main.devices import DeviceWorkerStore
+from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
+from synapse.storage.databases.main.event_push_actions import (
+ EventPushActionsWorkerStore,
+)
+from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.storage.databases.main.filtering import FilteringWorkerStore
+from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.registration import RegistrationWorkerStore
+from synapse.storage.databases.main.relations import RelationsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
+from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
+from synapse.storage.databases.main.signatures import SignatureWorkerStore
+from synapse.storage.databases.main.state import StateGroupWorkerStore
+from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
+from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
from synapse.types import StateMap
from synapse.util import SYNAPSE_VERSION
from synapse.util.logcontext import LoggingContext
@@ -52,17 +62,25 @@ logger = logging.getLogger("synapse.app.admin_cmd")
class AdminCmdSlavedStore(
- SlavedFilteringStore,
- SlavedPushRuleStore,
- SlavedEventStore,
- SlavedDeviceStore,
+ FilteringWorkerStore,
+ DeviceWorkerStore,
TagsWorkerStore,
DeviceInboxWorkerStore,
AccountDataWorkerStore,
+ PushRulesWorkerStore,
ApplicationServiceTransactionWorkerStore,
ApplicationServiceWorkerStore,
- RegistrationWorkerStore,
+ RoomMemberWorkerStore,
+ RelationsWorkerStore,
+ EventFederationWorkerStore,
+ EventPushActionsWorkerStore,
+ StateGroupWorkerStore,
+ SignatureWorkerStore,
+ UserErasureWorkerStore,
ReceiptsWorkerStore,
+ StreamWorkerStore,
+ EventsWorkerStore,
+ RegistrationWorkerStore,
RoomWorkerStore,
):
def __init__(
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index cb5892f041..51446b49cd 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -48,12 +48,6 @@ from synapse.http.site import SynapseRequest, SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
-from synapse.replication.slave.storage.devices import SlavedDeviceStore
-from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.filtering import SlavedFilteringStore
-from synapse.replication.slave.storage.keys import SlavedKeyStore
-from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
-from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client import (
account_data,
@@ -101,8 +95,16 @@ from synapse.storage.databases.main.appservice import (
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
+from synapse.storage.databases.main.devices import DeviceWorkerStore
from synapse.storage.databases.main.directory import DirectoryWorkerStore
from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
+from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
+from synapse.storage.databases.main.event_push_actions import (
+ EventPushActionsWorkerStore,
+)
+from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.storage.databases.main.filtering import FilteringWorkerStore
+from synapse.storage.databases.main.keys import KeyStore
from synapse.storage.databases.main.lock import LockStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
@@ -111,17 +113,25 @@ from synapse.storage.databases.main.monthly_active_users import (
)
from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.profile import ProfileWorkerStore
+from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
+from synapse.storage.databases.main.pusher import PusherWorkerStore
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.registration import RegistrationWorkerStore
+from synapse.storage.databases.main.relations import RelationsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.storage.databases.main.room_batch import RoomBatchStore
+from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.databases.main.search import SearchStore
from synapse.storage.databases.main.session import SessionStore
+from synapse.storage.databases.main.signatures import SignatureWorkerStore
+from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.storage.databases.main.stats import StatsStore
+from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
+from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
from synapse.types import JsonDict
from synapse.util import SYNAPSE_VERSION
from synapse.util.httpresourcetree import create_resource_tree
@@ -232,26 +242,36 @@ class GenericWorkerSlavedStore(
EndToEndRoomKeyStore,
PresenceStore,
DeviceInboxWorkerStore,
- SlavedDeviceStore,
- SlavedPushRuleStore,
+ DeviceWorkerStore,
TagsWorkerStore,
AccountDataWorkerStore,
- SlavedPusherStore,
CensorEventsStore,
ClientIpWorkerStore,
- SlavedEventStore,
- SlavedKeyStore,
+ # KeyStore isn't really safe to use from a worker, but for now we do so and hope that
+ # the races it creates aren't too bad.
+ KeyStore,
RoomWorkerStore,
RoomBatchStore,
DirectoryWorkerStore,
+ PushRulesWorkerStore,
ApplicationServiceTransactionWorkerStore,
ApplicationServiceWorkerStore,
ProfileWorkerStore,
- SlavedFilteringStore,
+ FilteringWorkerStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
ServerMetricsStore,
+ PusherWorkerStore,
+ RoomMemberWorkerStore,
+ RelationsWorkerStore,
+ EventFederationWorkerStore,
+ EventPushActionsWorkerStore,
+ StateGroupWorkerStore,
+ SignatureWorkerStore,
+ UserErasureWorkerStore,
ReceiptsWorkerStore,
+ StreamWorkerStore,
+ EventsWorkerStore,
RegistrationWorkerStore,
SearchStore,
TransactionWorkerStore,
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
deleted file mode 100644
index 6fcade510a..0000000000
--- a/synapse/replication/slave/storage/devices.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import TYPE_CHECKING, Any, Iterable
-
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
-from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.devices import DeviceWorkerStore
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-
-class SlavedDeviceStore(DeviceWorkerStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- self.hs = hs
-
- self._device_list_id_gen = SlavedIdTracker(
- db_conn,
- "device_lists_stream",
- "stream_id",
- extra_tables=[
- ("user_signature_stream", "stream_id"),
- ("device_lists_outbound_pokes", "stream_id"),
- ("device_lists_changes_in_room", "stream_id"),
- ],
- )
-
- super().__init__(database, db_conn, hs)
-
- def get_device_stream_token(self) -> int:
- return self._device_list_id_gen.get_current_token()
-
- def process_replication_rows(
- self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
- ) -> None:
- if stream_name == DeviceListsStream.NAME:
- self._device_list_id_gen.advance(instance_name, token)
- self._invalidate_caches_for_devices(token, rows)
- elif stream_name == UserSignatureStream.NAME:
- self._device_list_id_gen.advance(instance_name, token)
- for row in rows:
- self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
- return super().process_replication_rows(stream_name, instance_name, token, rows)
-
- def _invalidate_caches_for_devices(
- self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow]
- ) -> None:
- for row in rows:
- # The entities are either user IDs (starting with '@') whose devices
- # have changed, or remote servers that we need to tell about
- # changes.
- if row.entity.startswith("@"):
- self._device_list_stream_cache.entity_has_changed(row.entity, token)
- self.get_cached_devices_for_user.invalidate((row.entity,))
- self._get_cached_user_device.invalidate((row.entity,))
- self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
-
- else:
- self._device_list_federation_stream_cache.entity_has_changed(
- row.entity, token
- )
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
deleted file mode 100644
index fe47778cb1..0000000000
--- a/synapse/replication/slave/storage/events.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import logging
-from typing import TYPE_CHECKING
-
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
-from synapse.storage.databases.main.event_push_actions import (
- EventPushActionsWorkerStore,
-)
-from synapse.storage.databases.main.events_worker import EventsWorkerStore
-from synapse.storage.databases.main.relations import RelationsWorkerStore
-from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
-from synapse.storage.databases.main.signatures import SignatureWorkerStore
-from synapse.storage.databases.main.state import StateGroupWorkerStore
-from synapse.storage.databases.main.stream import StreamWorkerStore
-from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-# So, um, we want to borrow a load of functions intended for reading from
-# a DataStore, but we don't want to take functions that either write to the
-# DataStore or are cached and don't have cache invalidation logic.
-#
-# Rather than write duplicate versions of those functions, or lift them to
-# a common base class, we going to grab the underlying __func__ object from
-# the method descriptor on the DataStore and chuck them into our class.
-
-
-class SlavedEventStore(
- EventFederationWorkerStore,
- RoomMemberWorkerStore,
- EventPushActionsWorkerStore,
- StreamWorkerStore,
- StateGroupWorkerStore,
- SignatureWorkerStore,
- EventsWorkerStore,
- UserErasureWorkerStore,
- RelationsWorkerStore,
-):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
-
- events_max = self._stream_id_gen.get_current_token()
- curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
- db_conn,
- "current_state_delta_stream",
- entity_column="room_id",
- stream_column="stream_id",
- max_value=events_max, # As we share the stream id with events token
- limit=1000,
- )
- self._curr_state_delta_stream_cache = StreamChangeCache(
- "_curr_state_delta_stream_cache",
- min_curr_state_delta_id,
- prefilled_cache=curr_state_delta_prefill,
- )
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
deleted file mode 100644
index c52679cd60..0000000000
--- a/synapse/replication/slave/storage/filtering.py
+++ /dev/null
@@ -1,35 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import TYPE_CHECKING
-
-from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.filtering import FilteringStore
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-
-class SlavedFilteringStore(SQLBaseStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
-
- # Filters are immutable so this cache doesn't need to be expired
- get_user_filter = FilteringStore.__dict__["get_user_filter"]
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
deleted file mode 100644
index a00b38c512..0000000000
--- a/synapse/replication/slave/storage/keys.py
+++ /dev/null
@@ -1,20 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.storage.databases.main.keys import KeyStore
-
-# KeyStore isn't really safe to use from a worker, but for now we do so and hope that
-# the races it creates aren't too bad.
-
-SlavedKeyStore = KeyStore
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
deleted file mode 100644
index 5e65eaf1e0..0000000000
--- a/synapse/replication/slave/storage/push_rule.py
+++ /dev/null
@@ -1,35 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from typing import Any, Iterable
-
-from synapse.replication.tcp.streams import PushRulesStream
-from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
-
-from .events import SlavedEventStore
-
-
-class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
- def get_max_push_rules_stream_id(self) -> int:
- return self._push_rules_stream_id_gen.get_current_token()
-
- def process_replication_rows(
- self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
- ) -> None:
- if stream_name == PushRulesStream.NAME:
- self._push_rules_stream_id_gen.advance(instance_name, token)
- for row in rows:
- self.get_push_rules_for_user.invalidate((row.user_id,))
- self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
- return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
deleted file mode 100644
index 44ed20e424..0000000000
--- a/synapse/replication/slave/storage/pushers.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from typing import TYPE_CHECKING, Any, Iterable
-
-from synapse.replication.tcp.streams import PushersStream
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.pusher import PusherWorkerStore
-
-from ._slaved_id_tracker import SlavedIdTracker
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-
-class SlavedPusherStore(PusherWorkerStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
- self._pushers_id_gen = SlavedIdTracker( # type: ignore
- db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
- )
-
- def get_pushers_stream_token(self) -> int:
- return self._pushers_id_gen.get_current_token()
-
- def process_replication_rows(
- self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
- ) -> None:
- if stream_name == PushersStream.NAME:
- self._pushers_id_gen.advance(instance_name, token)
- return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index cfaedf5e0c..0e47592be3 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -26,9 +26,7 @@ from synapse.storage.database import (
from synapse.storage.databases.main.stats import UserSortOrder
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.types import Cursor
-from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict, get_domain_from_id
-from synapse.util.caches.stream_change_cache import StreamChangeCache
from .account_data import AccountDataStore
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
@@ -138,41 +136,8 @@ class DataStore(
self._clock = hs.get_clock()
self.database_engine = database.engine
- self._device_list_id_gen = StreamIdGenerator(
- db_conn,
- "device_lists_stream",
- "stream_id",
- extra_tables=[
- ("user_signature_stream", "stream_id"),
- ("device_lists_outbound_pokes", "stream_id"),
- ("device_lists_changes_in_room", "stream_id"),
- ],
- )
-
super().__init__(database, db_conn, hs)
- events_max = self._stream_id_gen.get_current_token()
- curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
- db_conn,
- "current_state_delta_stream",
- entity_column="room_id",
- stream_column="stream_id",
- max_value=events_max, # As we share the stream id with events token
- limit=1000,
- )
- self._curr_state_delta_stream_cache = StreamChangeCache(
- "_curr_state_delta_stream_cache",
- min_curr_state_delta_id,
- prefilled_cache=curr_state_delta_prefill,
- )
-
- self._stream_order_on_start = self.get_room_max_stream_ordering()
- self._min_stream_order_on_start = self.get_room_min_stream_ordering()
-
- def get_device_stream_token(self) -> int:
- # TODO: shouldn't this be moved to `DeviceWorkerStore`?
- return self._device_list_id_gen.get_current_token()
-
async def get_users(self) -> List[JsonDict]:
"""Function to retrieve a list of users in users table.
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 979dd4e17e..aa58c2adc3 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import abc
import logging
from typing import (
TYPE_CHECKING,
@@ -39,6 +38,8 @@ from synapse.logging.opentracing import (
whitelisted_homeserver,
)
from synapse.metrics.background_process_metrics import wrap_as_background_process
+from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@@ -49,6 +50,11 @@ from synapse.storage.database import (
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.types import Cursor
+from synapse.storage.util.id_generators import (
+ AbstractStreamIdGenerator,
+ AbstractStreamIdTracker,
+ StreamIdGenerator,
+)
from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
@@ -80,9 +86,32 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
):
super().__init__(database, db_conn, hs)
+ if hs.config.worker.worker_app is None:
+ self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
+ db_conn,
+ "device_lists_stream",
+ "stream_id",
+ extra_tables=[
+ ("user_signature_stream", "stream_id"),
+ ("device_lists_outbound_pokes", "stream_id"),
+ ("device_lists_changes_in_room", "stream_id"),
+ ],
+ )
+ else:
+ self._device_list_id_gen = SlavedIdTracker(
+ db_conn,
+ "device_lists_stream",
+ "stream_id",
+ extra_tables=[
+ ("user_signature_stream", "stream_id"),
+ ("device_lists_outbound_pokes", "stream_id"),
+ ("device_lists_changes_in_room", "stream_id"),
+ ],
+ )
+
# Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
# StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).
- device_list_max = self._device_list_id_gen.get_current_token() # type: ignore[attr-defined]
+ device_list_max = self._device_list_id_gen.get_current_token()
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
"device_lists_stream",
@@ -136,6 +165,39 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
self._prune_old_outbound_device_pokes, 60 * 60 * 1000
)
+ def process_replication_rows(
+ self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
+ ) -> None:
+ if stream_name == DeviceListsStream.NAME:
+ self._device_list_id_gen.advance(instance_name, token)
+ self._invalidate_caches_for_devices(token, rows)
+ elif stream_name == UserSignatureStream.NAME:
+ self._device_list_id_gen.advance(instance_name, token)
+ for row in rows:
+ self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
+ return super().process_replication_rows(stream_name, instance_name, token, rows)
+
+ def _invalidate_caches_for_devices(
+ self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow]
+ ) -> None:
+ for row in rows:
+ # The entities are either user IDs (starting with '@') whose devices
+ # have changed, or remote servers that we need to tell about
+ # changes.
+ if row.entity.startswith("@"):
+ self._device_list_stream_cache.entity_has_changed(row.entity, token)
+ self.get_cached_devices_for_user.invalidate((row.entity,))
+ self._get_cached_user_device.invalidate((row.entity,))
+ self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
+
+ else:
+ self._device_list_federation_stream_cache.entity_has_changed(
+ row.entity, token
+ )
+
+ def get_device_stream_token(self) -> int:
+ return self._device_list_id_gen.get_current_token()
+
async def count_devices_by_users(self, user_ids: Optional[List[str]] = None) -> int:
"""Retrieve number of all devices of given users.
Only returns number of devices that are not marked as hidden.
@@ -677,11 +739,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
},
)
- @abc.abstractmethod
- def get_device_stream_token(self) -> int:
- """Get the current stream id from the _device_list_id_gen"""
- ...
-
@trace
@cancellable
async def get_user_devices_from_cache(
@@ -1481,6 +1538,10 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
+ # Because we have write access, this will be a StreamIdGenerator
+ # (see DeviceWorkerStore.__init__)
+ _device_list_id_gen: AbstractStreamIdGenerator
+
def __init__(
self,
database: DatabasePool,
@@ -1805,7 +1866,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
context,
)
- async with self._device_list_id_gen.get_next_mult( # type: ignore[attr-defined]
+ async with self._device_list_id_gen.get_next_mult(
len(device_ids)
) as stream_ids:
await self.db_pool.runInteraction(
@@ -2044,7 +2105,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
[],
)
- async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: # type: ignore[attr-defined]
+ async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
return await self.db_pool.runInteraction(
"add_device_list_outbound_pokes",
add_device_list_outbound_pokes_txn,
@@ -2058,7 +2119,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
updates during partial joins.
"""
- async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
+ async with self._device_list_id_gen.get_next() as stream_id:
await self.db_pool.simple_upsert(
table="device_lists_remote_pending",
keyvalues={
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 69fea452ad..a79091952a 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -81,6 +81,7 @@ from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import AsyncLruCache
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
@@ -233,6 +234,21 @@ class EventsWorkerStore(SQLBaseStore):
db_conn, "events", "stream_ordering", step=-1
)
+ events_max = self._stream_id_gen.get_current_token()
+ curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
+ db_conn,
+ "current_state_delta_stream",
+ entity_column="room_id",
+ stream_column="stream_id",
+ max_value=events_max, # As we share the stream id with events token
+ limit=1000,
+ )
+ self._curr_state_delta_stream_cache: StreamChangeCache = StreamChangeCache(
+ "_curr_state_delta_stream_cache",
+ min_curr_state_delta_id,
+ prefilled_cache=curr_state_delta_prefill,
+ )
+
if hs.config.worker.run_background_tasks:
# We periodically clean out old transaction ID mappings
self._clock.looping_call(
diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index cb9ee08fa8..12f3b601f1 100644
--- a/synapse/storage/databases/main/filtering.py
+++ b/synapse/storage/databases/main/filtering.py
@@ -24,7 +24,7 @@ from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached
-class FilteringStore(SQLBaseStore):
+class FilteringWorkerStore(SQLBaseStore):
@cached(num_args=2)
async def get_user_filter(
self, user_localpart: str, filter_id: Union[int, str]
@@ -46,6 +46,8 @@ class FilteringStore(SQLBaseStore):
return db_to_json(def_json)
+
+class FilteringStore(FilteringWorkerStore):
async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> int:
def_json = encode_canonical_json(user_filter)
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index b6c15f29f8..8ae10f6127 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -12,13 +12,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import abc
import logging
from typing import (
TYPE_CHECKING,
Any,
Collection,
Dict,
+ Iterable,
List,
Mapping,
Optional,
@@ -31,6 +31,7 @@ from typing import (
from synapse.api.errors import StoreError
from synapse.config.homeserver import ExperimentalConfig
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.replication.tcp.streams import PushRulesStream
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -90,8 +91,6 @@ def _load_rules(
return filtered_rules
-# The ABCMeta metaclass ensures that it cannot be instantiated without
-# the abstract methods being implemented.
class PushRulesWorkerStore(
ApplicationServiceWorkerStore,
PusherWorkerStore,
@@ -99,7 +98,6 @@ class PushRulesWorkerStore(
ReceiptsWorkerStore,
EventsWorkerStore,
SQLBaseStore,
- metaclass=abc.ABCMeta,
):
"""This is an abstract base class where subclasses must implement
`get_max_push_rules_stream_id` which can be called in the initializer.
@@ -136,14 +134,23 @@ class PushRulesWorkerStore(
prefilled_cache=push_rules_prefill,
)
- @abc.abstractmethod
def get_max_push_rules_stream_id(self) -> int:
"""Get the position of the push rules stream.
Returns:
int
"""
- raise NotImplementedError()
+ return self._push_rules_stream_id_gen.get_current_token()
+
+ def process_replication_rows(
+ self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
+ ) -> None:
+ if stream_name == PushRulesStream.NAME:
+ self._push_rules_stream_id_gen.advance(instance_name, token)
+ for row in rows:
+ self.get_push_rules_for_user.invalidate((row.user_id,))
+ self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
+ return super().process_replication_rows(stream_name, instance_name, token, rows)
@cached(max_entries=5000)
async def get_push_rules_for_user(self, user_id: str) -> FilteredPushRules:
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 01206950a9..4a01562d45 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -27,13 +27,19 @@ from typing import (
)
from synapse.push import PusherConfig, ThrottleParams
+from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.replication.tcp.streams import PushersStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
-from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.storage.util.id_generators import (
+ AbstractStreamIdGenerator,
+ AbstractStreamIdTracker,
+ StreamIdGenerator,
+)
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -52,9 +58,21 @@ class PusherWorkerStore(SQLBaseStore):
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
- self._pushers_id_gen = StreamIdGenerator(
- db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
- )
+
+ if hs.config.worker.worker_app is None:
+ self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
+ db_conn,
+ "pushers",
+ "id",
+ extra_tables=[("deleted_pushers", "stream_id")],
+ )
+ else:
+ self._pushers_id_gen = SlavedIdTracker(
+ db_conn,
+ "pushers",
+ "id",
+ extra_tables=[("deleted_pushers", "stream_id")],
+ )
self.db_pool.updates.register_background_update_handler(
"remove_deactivated_pushers",
@@ -96,6 +114,16 @@ class PusherWorkerStore(SQLBaseStore):
yield PusherConfig(**r)
+ def get_pushers_stream_token(self) -> int:
+ return self._pushers_id_gen.get_current_token()
+
+ def process_replication_rows(
+ self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
+ ) -> None:
+ if stream_name == PushersStream.NAME:
+ self._pushers_id_gen.advance(instance_name, token)
+ return super().process_replication_rows(stream_name, instance_name, token, rows)
+
async def get_pushers_by_app_id_and_pushkey(
self, app_id: str, pushkey: str
) -> Iterator[PusherConfig]:
@@ -545,8 +573,9 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
- def get_pushers_stream_token(self) -> int:
- return self._pushers_id_gen.get_current_token()
+ # Because we have write access, this will be a StreamIdGenerator
+ # (see PusherWorkerStore.__init__)
+ _pushers_id_gen: AbstractStreamIdGenerator
async def add_pusher(
self,
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 09ce855aa8..cc27ec3804 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -415,6 +415,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
self._stream_order_on_start = self.get_room_max_stream_ordering()
+ self._min_stream_order_on_start = self.get_room_min_stream_ordering()
def get_room_max_stream_ordering(self) -> int:
"""Get the stream_ordering of regular events that we have committed up to
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index d42e36cdf1..96f3880923 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -21,11 +21,11 @@ from synapse.api.constants import ReceiptTypes
from synapse.api.room_versions import RoomVersions
from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict
from synapse.handlers.room import RoomEventSource
-from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.storage.databases.main.event_push_actions import (
NotifCounts,
RoomNotifCounts,
)
+from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
from synapse.types import PersistedEventPosition
@@ -58,9 +58,9 @@ def patch__eq__(cls):
return unpatch
-class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
+class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase):
- STORE_TYPE = SlavedEventStore
+ STORE_TYPE = EventsWorkerStore
def setUp(self):
# Patch up the equality operator for events so that we can check
|