diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index a4ae4040c3..acb0bd18f7 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -26,12 +26,13 @@ from twisted.web.server import Request
from synapse.api.errors import HttpResponseException, SynapseError
from synapse.http import RequestTimedOutError
-from synapse.http.server import HttpServer, is_method_cancellable
+from synapse.http.server import HttpServer
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing
-from synapse.logging.opentracing import trace
+from synapse.logging.opentracing import trace_with_opname
from synapse.types import JsonDict
from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.cancellation import is_function_cancellable
from synapse.util.stringutils import random_string
if TYPE_CHECKING:
@@ -196,7 +197,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
"ascii"
)
- @trace(opname="outgoing_replication_request")
+ @trace_with_opname("outgoing_replication_request")
async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:
with outgoing_gauge.track_inprogress():
if instance_name == local_instance_name:
@@ -311,7 +312,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
url_args = list(self.PATH_ARGS)
method = self.METHOD
- if self.CACHE and is_method_cancellable(self._handle_request):
+ if self.CACHE and is_function_cancellable(self._handle_request):
raise Exception(
f"{self.__class__.__name__} has been marked as cancellable, but CACHE "
"is set. The cancellable flag would have no effect."
@@ -359,6 +360,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
# The `@cancellable` decorator may be applied to `_handle_request`. But we
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
# so we have to set up the cancellable flag ourselves.
- request.is_render_cancellable = is_method_cancellable(self._handle_request)
+ request.is_render_cancellable = is_function_cancellable(self._handle_request)
return await self._handle_request(request, **kwargs)
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
deleted file mode 100644
index 7644146dba..0000000000
--- a/synapse/replication/slave/storage/_base.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from typing import TYPE_CHECKING, Optional
-
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
-from synapse.storage.engines import PostgresEngine
-from synapse.storage.util.id_generators import MultiWriterIdGenerator
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-class BaseSlavedStore(CacheInvalidationWorkerStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
- if isinstance(self.database_engine, PostgresEngine):
- self._cache_id_gen: Optional[
- MultiWriterIdGenerator
- ] = MultiWriterIdGenerator(
- db_conn,
- database,
- stream_name="caches",
- instance_name=hs.get_instance_name(),
- tables=[
- (
- "cache_invalidation_stream_by_instance",
- "instance_name",
- "stream_id",
- )
- ],
- sequence_name="cache_invalidation_stream_seq",
- writers=[],
- )
- else:
- self._cache_id_gen = None
-
- self.hs = hs
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
deleted file mode 100644
index ee74ee7d85..0000000000
--- a/synapse/replication/slave/storage/account_data.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.storage.databases.main.account_data import AccountDataWorkerStore
-from synapse.storage.databases.main.tags import TagsWorkerStore
-
-
-class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
- pass
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
deleted file mode 100644
index 29f50c0add..0000000000
--- a/synapse/replication/slave/storage/appservice.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.storage.databases.main.appservice import (
- ApplicationServiceTransactionWorkerStore,
- ApplicationServiceWorkerStore,
-)
-
-
-class SlavedApplicationServiceStore(
- ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore
-):
- pass
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
deleted file mode 100644
index e940751084..0000000000
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ /dev/null
@@ -1,20 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
-
-
-class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
- pass
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index a48cc02069..6fcade510a 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -14,7 +14,6 @@
from typing import TYPE_CHECKING, Any, Iterable
-from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
@@ -24,7 +23,7 @@ if TYPE_CHECKING:
from synapse.server import HomeServer
-class SlavedDeviceStore(DeviceWorkerStore, BaseSlavedStore):
+class SlavedDeviceStore(DeviceWorkerStore):
def __init__(
self,
database: DatabasePool,
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
deleted file mode 100644
index 71fde0c96c..0000000000
--- a/synapse/replication/slave/storage/directory.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.storage.databases.main.directory import DirectoryWorkerStore
-
-from ._base import BaseSlavedStore
-
-
-class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
- pass
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index a72dad7464..fe47778cb1 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -29,8 +29,6 @@ from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from ._base import BaseSlavedStore
-
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -56,7 +54,6 @@ class SlavedEventStore(
EventsWorkerStore,
UserErasureWorkerStore,
RelationsWorkerStore,
- BaseSlavedStore,
):
def __init__(
self,
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
index 4d185e2b56..c52679cd60 100644
--- a/synapse/replication/slave/storage/filtering.py
+++ b/synapse/replication/slave/storage/filtering.py
@@ -14,16 +14,15 @@
from typing import TYPE_CHECKING
+from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.filtering import FilteringStore
-from ._base import BaseSlavedStore
-
if TYPE_CHECKING:
from synapse.server import HomeServer
-class SlavedFilteringStore(BaseSlavedStore):
+class SlavedFilteringStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py
deleted file mode 100644
index 99f4a22642..0000000000
--- a/synapse/replication/slave/storage/profile.py
+++ /dev/null
@@ -1,20 +0,0 @@
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.storage.databases.main.profile import ProfileWorkerStore
-
-
-class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore):
- pass
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 52ee3f7e58..5e65eaf1e0 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -31,6 +31,5 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
self._push_rules_stream_id_gen.advance(instance_name, token)
for row in rows:
self.get_push_rules_for_user.invalidate((row.user_id,))
- self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index de642bba71..44ed20e424 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -18,14 +18,13 @@ from synapse.replication.tcp.streams import PushersStream
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.pusher import PusherWorkerStore
-from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
if TYPE_CHECKING:
from synapse.server import HomeServer
-class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
+class SlavedPusherStore(PusherWorkerStore):
def __init__(
self,
database: DatabasePool,
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
deleted file mode 100644
index 3826b87dec..0000000000
--- a/synapse/replication/slave/storage/receipts.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
-
-from ._base import BaseSlavedStore
-
-
-class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
- pass
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
deleted file mode 100644
index 5dae35a960..0000000000
--- a/synapse/replication/slave/storage/registration.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.storage.databases.main.registration import RegistrationWorkerStore
-
-from ._base import BaseSlavedStore
-
-
-class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore):
- pass
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 2f59245058..e4f2201c92 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -21,7 +21,7 @@ from twisted.internet.interfaces import IAddress, IConnector
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.python.failure import Failure
-from synapse.api.constants import EventTypes, ReceiptTypes
+from synapse.api.constants import EventTypes, Membership, ReceiptTypes
from synapse.federation import send_queue
from synapse.federation.sender import FederationSender
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -219,6 +219,21 @@ class ReplicationDataHandler:
membership=row.data.membership,
)
+ # If this event is a join, make a note of it so we have an accurate
+ # cross-worker room rate limit.
+ # TODO: Erik said we should exclude rows that came from ex_outliers
+ # here, but I don't see how we can determine that. I guess we could
+ # add a flag to row.data?
+ if (
+ row.data.type == EventTypes.Member
+ and row.data.membership == Membership.JOIN
+ and not row.data.outlier
+ ):
+ # TODO retrieve the previous state, and exclude join -> join transitions
+ self.notifier.notify_user_joined_room(
+ row.data.event_id, row.data.room_id
+ )
+
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
)
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index e1cbfa50eb..0f166d16aa 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -35,7 +35,6 @@ from twisted.internet.protocol import ReconnectingClientFactory
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
@@ -332,46 +331,31 @@ class ReplicationCommandHandler:
def start_replication(self, hs: "HomeServer") -> None:
"""Helper method to start replication."""
- if hs.config.redis.redis_enabled:
- from synapse.replication.tcp.redis import (
- RedisDirectTcpReplicationClientFactory,
- )
+ from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory
- # First let's ensure that we have a ReplicationStreamer started.
- hs.get_replication_streamer()
+ # First let's ensure that we have a ReplicationStreamer started.
+ hs.get_replication_streamer()
- # We need two connections to redis, one for the subscription stream and
- # one to send commands to (as you can't send further redis commands to a
- # connection after SUBSCRIBE is called).
+ # We need two connections to redis, one for the subscription stream and
+ # one to send commands to (as you can't send further redis commands to a
+ # connection after SUBSCRIBE is called).
- # First create the connection for sending commands.
- outbound_redis_connection = hs.get_outbound_redis_connection()
+ # First create the connection for sending commands.
+ outbound_redis_connection = hs.get_outbound_redis_connection()
- # Now create the factory/connection for the subscription stream.
- self._factory = RedisDirectTcpReplicationClientFactory(
- hs,
- outbound_redis_connection,
- channel_names=self._channels_to_subscribe_to,
- )
- hs.get_reactor().connectTCP(
- hs.config.redis.redis_host,
- hs.config.redis.redis_port,
- self._factory,
- timeout=30,
- bindAddress=None,
- )
- else:
- client_name = hs.get_instance_name()
- self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
- host = hs.config.worker.worker_replication_host
- port = hs.config.worker.worker_replication_port
- hs.get_reactor().connectTCP(
- host,
- port,
- self._factory,
- timeout=30,
- bindAddress=None,
- )
+ # Now create the factory/connection for the subscription stream.
+ self._factory = RedisDirectTcpReplicationClientFactory(
+ hs,
+ outbound_redis_connection,
+ channel_names=self._channels_to_subscribe_to,
+ )
+ hs.get_reactor().connectTCP(
+ hs.config.redis.redis_host,
+ hs.config.redis.redis_port,
+ self._factory,
+ timeout=30,
+ bindAddress=None,
+ )
def get_streams(self) -> Dict[str, Stream]:
"""Get a map from stream name to all streams."""
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 26f4fa7cfd..14b6705862 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -98,6 +98,7 @@ class EventsStreamEventRow(BaseEventsStreamRow):
relates_to: Optional[str]
membership: Optional[str]
rejected: bool
+ outlier: bool
@attr.s(slots=True, frozen=True, auto_attribs=True)
|