summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-09-14 18:14:17 -0500
committerEric Eastwood <erice@element.io>2022-09-14 18:14:17 -0500
commitdab846568de09742c038a9a5a5ed00539ff5aa38 (patch)
treeef4220faa96569d1d8d5f386c92bb4df2655eeed /synapse/replication
parentAdd changelog (diff)
parentKeep track when we try and fail to process a pulled event (#13589) (diff)
downloadsynapse-github/madlittlemods/event_id_always_failed_to_fetch.tar.xz
Merge branch 'develop' into madlittlemods/event_id_always_failed_to_fetch github/madlittlemods/event_id_always_failed_to_fetch madlittlemods/event_id_always_failed_to_fetch
Conflicts:
	synapse/handlers/federation_event.py
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/_base.py11
-rw-r--r--synapse/replication/slave/storage/_base.py58
-rw-r--r--synapse/replication/slave/storage/account_data.py22
-rw-r--r--synapse/replication/slave/storage/appservice.py25
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py20
-rw-r--r--synapse/replication/slave/storage/devices.py3
-rw-r--r--synapse/replication/slave/storage/directory.py21
-rw-r--r--synapse/replication/slave/storage/events.py3
-rw-r--r--synapse/replication/slave/storage/filtering.py5
-rw-r--r--synapse/replication/slave/storage/profile.py20
-rw-r--r--synapse/replication/slave/storage/push_rule.py1
-rw-r--r--synapse/replication/slave/storage/pushers.py3
-rw-r--r--synapse/replication/slave/storage/receipts.py22
-rw-r--r--synapse/replication/slave/storage/registration.py21
-rw-r--r--synapse/replication/tcp/client.py17
-rw-r--r--synapse/replication/tcp/handler.py58
-rw-r--r--synapse/replication/tcp/streams/events.py1
17 files changed, 48 insertions, 263 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index a4ae4040c3..acb0bd18f7 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -26,12 +26,13 @@ from twisted.web.server import Request
 
 from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.http import RequestTimedOutError
-from synapse.http.server import HttpServer, is_method_cancellable
+from synapse.http.server import HttpServer
 from synapse.http.site import SynapseRequest
 from synapse.logging import opentracing
-from synapse.logging.opentracing import trace
+from synapse.logging.opentracing import trace_with_opname
 from synapse.types import JsonDict
 from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.cancellation import is_function_cancellable
 from synapse.util.stringutils import random_string
 
 if TYPE_CHECKING:
@@ -196,7 +197,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
                 "ascii"
             )
 
-        @trace(opname="outgoing_replication_request")
+        @trace_with_opname("outgoing_replication_request")
         async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:
             with outgoing_gauge.track_inprogress():
                 if instance_name == local_instance_name:
@@ -311,7 +312,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         url_args = list(self.PATH_ARGS)
         method = self.METHOD
 
-        if self.CACHE and is_method_cancellable(self._handle_request):
+        if self.CACHE and is_function_cancellable(self._handle_request):
             raise Exception(
                 f"{self.__class__.__name__} has been marked as cancellable, but CACHE "
                 "is set. The cancellable flag would have no effect."
@@ -359,6 +360,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         # The `@cancellable` decorator may be applied to `_handle_request`. But we
         # told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
         # so we have to set up the cancellable flag ourselves.
-        request.is_render_cancellable = is_method_cancellable(self._handle_request)
+        request.is_render_cancellable = is_function_cancellable(self._handle_request)
 
         return await self._handle_request(request, **kwargs)
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
deleted file mode 100644
index 7644146dba..0000000000
--- a/synapse/replication/slave/storage/_base.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from typing import TYPE_CHECKING, Optional
-
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
-from synapse.storage.engines import PostgresEngine
-from synapse.storage.util.id_generators import MultiWriterIdGenerator
-
-if TYPE_CHECKING:
-    from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-class BaseSlavedStore(CacheInvalidationWorkerStore):
-    def __init__(
-        self,
-        database: DatabasePool,
-        db_conn: LoggingDatabaseConnection,
-        hs: "HomeServer",
-    ):
-        super().__init__(database, db_conn, hs)
-        if isinstance(self.database_engine, PostgresEngine):
-            self._cache_id_gen: Optional[
-                MultiWriterIdGenerator
-            ] = MultiWriterIdGenerator(
-                db_conn,
-                database,
-                stream_name="caches",
-                instance_name=hs.get_instance_name(),
-                tables=[
-                    (
-                        "cache_invalidation_stream_by_instance",
-                        "instance_name",
-                        "stream_id",
-                    )
-                ],
-                sequence_name="cache_invalidation_stream_seq",
-                writers=[],
-            )
-        else:
-            self._cache_id_gen = None
-
-        self.hs = hs
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
deleted file mode 100644
index ee74ee7d85..0000000000
--- a/synapse/replication/slave/storage/account_data.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.storage.databases.main.account_data import AccountDataWorkerStore
-from synapse.storage.databases.main.tags import TagsWorkerStore
-
-
-class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
-    pass
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
deleted file mode 100644
index 29f50c0add..0000000000
--- a/synapse/replication/slave/storage/appservice.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.storage.databases.main.appservice import (
-    ApplicationServiceTransactionWorkerStore,
-    ApplicationServiceWorkerStore,
-)
-
-
-class SlavedApplicationServiceStore(
-    ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore
-):
-    pass
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
deleted file mode 100644
index e940751084..0000000000
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ /dev/null
@@ -1,20 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
-
-
-class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
-    pass
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index a48cc02069..6fcade510a 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -14,7 +14,6 @@
 
 from typing import TYPE_CHECKING, Any, Iterable
 
-from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
 from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
@@ -24,7 +23,7 @@ if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
-class SlavedDeviceStore(DeviceWorkerStore, BaseSlavedStore):
+class SlavedDeviceStore(DeviceWorkerStore):
     def __init__(
         self,
         database: DatabasePool,
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
deleted file mode 100644
index 71fde0c96c..0000000000
--- a/synapse/replication/slave/storage/directory.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.storage.databases.main.directory import DirectoryWorkerStore
-
-from ._base import BaseSlavedStore
-
-
-class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
-    pass
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index a72dad7464..fe47778cb1 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -29,8 +29,6 @@ from synapse.storage.databases.main.stream import StreamWorkerStore
 from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-from ._base import BaseSlavedStore
-
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
@@ -56,7 +54,6 @@ class SlavedEventStore(
     EventsWorkerStore,
     UserErasureWorkerStore,
     RelationsWorkerStore,
-    BaseSlavedStore,
 ):
     def __init__(
         self,
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
index 4d185e2b56..c52679cd60 100644
--- a/synapse/replication/slave/storage/filtering.py
+++ b/synapse/replication/slave/storage/filtering.py
@@ -14,16 +14,15 @@
 
 from typing import TYPE_CHECKING
 
+from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
 from synapse.storage.databases.main.filtering import FilteringStore
 
-from ._base import BaseSlavedStore
-
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
-class SlavedFilteringStore(BaseSlavedStore):
+class SlavedFilteringStore(SQLBaseStore):
     def __init__(
         self,
         database: DatabasePool,
diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py
deleted file mode 100644
index 99f4a22642..0000000000
--- a/synapse/replication/slave/storage/profile.py
+++ /dev/null
@@ -1,20 +0,0 @@
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.storage.databases.main.profile import ProfileWorkerStore
-
-
-class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore):
-    pass
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 52ee3f7e58..5e65eaf1e0 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -31,6 +31,5 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
             self._push_rules_stream_id_gen.advance(instance_name, token)
             for row in rows:
                 self.get_push_rules_for_user.invalidate((row.user_id,))
-                self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
                 self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
         return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index de642bba71..44ed20e424 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -18,14 +18,13 @@ from synapse.replication.tcp.streams import PushersStream
 from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
 from synapse.storage.databases.main.pusher import PusherWorkerStore
 
-from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
-class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
+class SlavedPusherStore(PusherWorkerStore):
     def __init__(
         self,
         database: DatabasePool,
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
deleted file mode 100644
index 3826b87dec..0000000000
--- a/synapse/replication/slave/storage/receipts.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
-
-from ._base import BaseSlavedStore
-
-
-class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
-    pass
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
deleted file mode 100644
index 5dae35a960..0000000000
--- a/synapse/replication/slave/storage/registration.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.storage.databases.main.registration import RegistrationWorkerStore
-
-from ._base import BaseSlavedStore
-
-
-class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore):
-    pass
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 2f59245058..e4f2201c92 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -21,7 +21,7 @@ from twisted.internet.interfaces import IAddress, IConnector
 from twisted.internet.protocol import ReconnectingClientFactory
 from twisted.python.failure import Failure
 
-from synapse.api.constants import EventTypes, ReceiptTypes
+from synapse.api.constants import EventTypes, Membership, ReceiptTypes
 from synapse.federation import send_queue
 from synapse.federation.sender import FederationSender
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -219,6 +219,21 @@ class ReplicationDataHandler:
                     membership=row.data.membership,
                 )
 
+                # If this event is a join, make a note of it so we have an accurate
+                # cross-worker room rate limit.
+                # TODO: Erik said we should exclude rows that came from ex_outliers
+                #  here, but I don't see how we can determine that. I guess we could
+                #  add a flag to row.data?
+                if (
+                    row.data.type == EventTypes.Member
+                    and row.data.membership == Membership.JOIN
+                    and not row.data.outlier
+                ):
+                    # TODO retrieve the previous state, and exclude join -> join transitions
+                    self.notifier.notify_user_joined_room(
+                        row.data.event_id, row.data.room_id
+                    )
+
         await self._presence_handler.process_replication_rows(
             stream_name, instance_name, token, rows
         )
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index e1cbfa50eb..0f166d16aa 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -35,7 +35,6 @@ from twisted.internet.protocol import ReconnectingClientFactory
 
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
 from synapse.replication.tcp.commands import (
     ClearUserSyncsCommand,
     Command,
@@ -332,46 +331,31 @@ class ReplicationCommandHandler:
 
     def start_replication(self, hs: "HomeServer") -> None:
         """Helper method to start replication."""
-        if hs.config.redis.redis_enabled:
-            from synapse.replication.tcp.redis import (
-                RedisDirectTcpReplicationClientFactory,
-            )
+        from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory
 
-            # First let's ensure that we have a ReplicationStreamer started.
-            hs.get_replication_streamer()
+        # First let's ensure that we have a ReplicationStreamer started.
+        hs.get_replication_streamer()
 
-            # We need two connections to redis, one for the subscription stream and
-            # one to send commands to (as you can't send further redis commands to a
-            # connection after SUBSCRIBE is called).
+        # We need two connections to redis, one for the subscription stream and
+        # one to send commands to (as you can't send further redis commands to a
+        # connection after SUBSCRIBE is called).
 
-            # First create the connection for sending commands.
-            outbound_redis_connection = hs.get_outbound_redis_connection()
+        # First create the connection for sending commands.
+        outbound_redis_connection = hs.get_outbound_redis_connection()
 
-            # Now create the factory/connection for the subscription stream.
-            self._factory = RedisDirectTcpReplicationClientFactory(
-                hs,
-                outbound_redis_connection,
-                channel_names=self._channels_to_subscribe_to,
-            )
-            hs.get_reactor().connectTCP(
-                hs.config.redis.redis_host,
-                hs.config.redis.redis_port,
-                self._factory,
-                timeout=30,
-                bindAddress=None,
-            )
-        else:
-            client_name = hs.get_instance_name()
-            self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
-            host = hs.config.worker.worker_replication_host
-            port = hs.config.worker.worker_replication_port
-            hs.get_reactor().connectTCP(
-                host,
-                port,
-                self._factory,
-                timeout=30,
-                bindAddress=None,
-            )
+        # Now create the factory/connection for the subscription stream.
+        self._factory = RedisDirectTcpReplicationClientFactory(
+            hs,
+            outbound_redis_connection,
+            channel_names=self._channels_to_subscribe_to,
+        )
+        hs.get_reactor().connectTCP(
+            hs.config.redis.redis_host,
+            hs.config.redis.redis_port,
+            self._factory,
+            timeout=30,
+            bindAddress=None,
+        )
 
     def get_streams(self) -> Dict[str, Stream]:
         """Get a map from stream name to all streams."""
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 26f4fa7cfd..14b6705862 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -98,6 +98,7 @@ class EventsStreamEventRow(BaseEventsStreamRow):
     relates_to: Optional[str]
     membership: Optional[str]
     rejected: bool
+    outlier: bool
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)