diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 53aa7fa4c6..ac9a92240a 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -25,6 +25,7 @@ from synapse.replication.http import (
push,
register,
send_event,
+ send_events,
state,
streams,
)
@@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs: "HomeServer") -> None:
send_event.register_servlets(hs, self)
+ send_events.register_servlets(hs, self)
federation.register_servlets(hs, self)
presence.register_servlets(hs, self)
membership.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 561ad5bf04..3f4d3fc51a 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -26,12 +26,13 @@ from twisted.web.server import Request
from synapse.api.errors import HttpResponseException, SynapseError
from synapse.http import RequestTimedOutError
-from synapse.http.server import HttpServer, is_method_cancellable
+from synapse.http.server import HttpServer
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing
from synapse.logging.opentracing import trace_with_opname
from synapse.types import JsonDict
from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.cancellation import is_function_cancellable
from synapse.util.stringutils import random_string
if TYPE_CHECKING:
@@ -152,7 +153,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
argument list.
Returns:
- dict: If POST/PUT request then dictionary must be JSON serialisable,
+ If POST/PUT request then dictionary must be JSON serialisable,
otherwise must be appropriate for adding as query args.
"""
return {}
@@ -183,8 +184,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
client = hs.get_simple_http_client()
local_instance_name = hs.get_instance_name()
+ # The value of these option should match the replication listener settings
master_host = hs.config.worker.worker_replication_host
master_port = hs.config.worker.worker_replication_http_port
+ master_tls = hs.config.worker.worker_replication_http_tls
instance_map = hs.config.worker.instance_map
@@ -204,9 +207,11 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
if instance_name == "master":
host = master_host
port = master_port
+ tls = master_tls
elif instance_name in instance_map:
host = instance_map[instance_name].host
port = instance_map[instance_name].port
+ tls = instance_map[instance_name].tls
else:
raise Exception(
"Instance %r not in 'instance_map' config" % (instance_name,)
@@ -237,7 +242,11 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
)
- uri = "http://%s:%s/_synapse/replication/%s/%s" % (
+ # Here the protocol is hard coded to be http by default or https in case the replication
+ # port is set to have tls true.
+ scheme = "https" if tls else "http"
+ uri = "%s://%s:%s/_synapse/replication/%s/%s" % (
+ scheme,
host,
port,
cls.NAME,
@@ -311,7 +320,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
url_args = list(self.PATH_ARGS)
method = self.METHOD
- if self.CACHE and is_method_cancellable(self._handle_request):
+ if self.CACHE and is_function_cancellable(self._handle_request):
raise Exception(
f"{self.__class__.__name__} has been marked as cancellable, but CACHE "
"is set. The cancellable flag would have no effect."
@@ -359,6 +368,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
# The `@cancellable` decorator may be applied to `_handle_request`. But we
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
# so we have to set up the cancellable flag ourselves.
- request.is_render_cancellable = is_method_cancellable(self._handle_request)
+ request.is_render_cancellable = is_function_cancellable(self._handle_request)
return await self._handle_request(request, **kwargs)
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 3d63645726..7c4941c3d3 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -13,11 +13,12 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Tuple
+from typing import TYPE_CHECKING, Optional, Tuple
from twisted.web.server import Request
from synapse.http.server import HttpServer
+from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict
@@ -62,7 +63,12 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
- self.device_list_updater = hs.get_device_handler().device_list_updater
+ from synapse.handlers.device import DeviceHandler
+
+ handler = hs.get_device_handler()
+ assert isinstance(handler, DeviceHandler)
+ self.device_list_updater = handler.device_list_updater
+
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
@@ -72,11 +78,77 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override]
self, request: Request, user_id: str
- ) -> Tuple[int, JsonDict]:
+ ) -> Tuple[int, Optional[JsonDict]]:
user_devices = await self.device_list_updater.user_device_resync(user_id)
return 200, user_devices
+class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
+ """Ask master to upload keys for the user and send them out over federation to
+ update other servers.
+
+ For now, only the master is permitted to handle key upload requests;
+ any worker can handle key query requests (since they're read-only).
+
+ Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on
+ the main process to accomplish this.
+
+ Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload
+ Request format(borrowed and expanded from KeyUploadServlet):
+
+ POST /_synapse/replication/upload_keys_for_user
+
+ {
+ "user_id": "<user_id>",
+ "device_id": "<device_id>",
+ "keys": {
+ ....this part can be found in KeyUploadServlet in rest/client/keys.py....
+ }
+ }
+
+ Response is equivalent to ` /_matrix/client/v3/keys/upload` found in KeyUploadServlet
+
+ """
+
+ NAME = "upload_keys_for_user"
+ PATH_ARGS = ()
+ CACHE = False
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.e2e_keys_handler = hs.get_e2e_keys_handler()
+ self.store = hs.get_datastores().main
+ self.clock = hs.get_clock()
+
+ @staticmethod
+ async def _serialize_payload( # type: ignore[override]
+ user_id: str, device_id: str, keys: JsonDict
+ ) -> JsonDict:
+
+ return {
+ "user_id": user_id,
+ "device_id": device_id,
+ "keys": keys,
+ }
+
+ async def _handle_request( # type: ignore[override]
+ self, request: Request
+ ) -> Tuple[int, JsonDict]:
+ content = parse_json_object_from_request(request)
+
+ user_id = content["user_id"]
+ device_id = content["device_id"]
+ keys = content["keys"]
+
+ results = await self.e2e_keys_handler.upload_keys_for_user(
+ user_id, device_id, keys
+ )
+
+ return 200, results
+
+
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
+ ReplicationUploadKeysForUserRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index 6c8f8388fd..976c283360 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -39,6 +39,16 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
self.store = hs.get_datastores().main
self.registration_handler = hs.get_registration_handler()
+ # Default value if the worker that sent the replication request did not include
+ # an 'approved' property.
+ if (
+ hs.config.experimental.msc3866.enabled
+ and hs.config.experimental.msc3866.require_approval_for_new_accounts
+ ):
+ self._approval_default = False
+ else:
+ self._approval_default = True
+
@staticmethod
async def _serialize_payload( # type: ignore[override]
user_id: str,
@@ -51,6 +61,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
user_type: Optional[str],
address: Optional[str],
shadow_banned: bool,
+ approved: bool,
) -> JsonDict:
"""
Args:
@@ -68,6 +79,8 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
or None for a normal user.
address: the IP address used to perform the regitration.
shadow_banned: Whether to shadow-ban the user
+ approved: Whether the user should be considered already approved by an
+ administrator.
"""
return {
"password_hash": password_hash,
@@ -79,6 +92,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
"user_type": user_type,
"address": address,
"shadow_banned": shadow_banned,
+ "approved": approved,
}
async def _handle_request( # type: ignore[override]
@@ -88,6 +102,12 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
await self.registration_handler.check_registration_ratelimit(content["address"])
+ # Always default admin users to approved (since it means they were created by
+ # an admin).
+ approved_default = self._approval_default
+ if content["admin"]:
+ approved_default = True
+
await self.registration_handler.register_with_store(
user_id=user_id,
password_hash=content["password_hash"],
@@ -99,6 +119,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
user_type=content["user_type"],
address=content["address"],
shadow_banned=content["shadow_banned"],
+ approved=content.get("approved", approved_default),
)
return 200, {}
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 486f04723c..4215a1c1bc 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -141,8 +141,8 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
)
- event = await self.event_creation_handler.persist_and_notify_client_event(
- requester, event, context, ratelimit=ratelimit, extra_users=extra_users
+ event = await self.event_creation_handler.persist_and_notify_client_events(
+ requester, [(event, context)], ratelimit=ratelimit, extra_users=extra_users
)
return (
diff --git a/synapse/replication/http/send_events.py b/synapse/replication/http/send_events.py
new file mode 100644
index 0000000000..8889bbb644
--- /dev/null
+++ b/synapse/replication/http/send_events.py
@@ -0,0 +1,171 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING, List, Tuple
+
+from twisted.web.server import Request
+
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.events import EventBase, make_event_from_dict
+from synapse.events.snapshot import EventContext
+from synapse.http.server import HttpServer
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict, Requester, UserID
+from synapse.util.metrics import Measure
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+ from synapse.storage.databases.main import DataStore
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationSendEventsRestServlet(ReplicationEndpoint):
+ """Handles batches of newly created events on workers, including persisting and
+ notifying.
+
+ The API looks like:
+
+ POST /_synapse/replication/send_events/:txn_id
+
+ {
+ "events": [{
+ "event": { .. serialized event .. },
+ "room_version": .., // "1", "2", "3", etc: the version of the room
+ // containing the event
+ "event_format_version": .., // 1,2,3 etc: the event format version
+ "internal_metadata": { .. serialized internal_metadata .. },
+ "outlier": true|false,
+ "rejected_reason": .., // The event.rejected_reason field
+ "context": { .. serialized event context .. },
+ "requester": { .. serialized requester .. },
+ "ratelimit": true,
+ }]
+ }
+
+ 200 OK
+
+ { "stream_id": 12345, "event_id": "$abcdef..." }
+
+ Responds with a 409 when a `PartialStateConflictError` is raised due to an event
+ context that needs to be recomputed due to the un-partial stating of a room.
+
+ """
+
+ NAME = "send_events"
+ PATH_ARGS = ()
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.event_creation_handler = hs.get_event_creation_handler()
+ self.store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
+ self.clock = hs.get_clock()
+
+ @staticmethod
+ async def _serialize_payload( # type: ignore[override]
+ events_and_context: List[Tuple[EventBase, EventContext]],
+ store: "DataStore",
+ requester: Requester,
+ ratelimit: bool,
+ extra_users: List[UserID],
+ ) -> JsonDict:
+ """
+ Args:
+ store
+ requester
+ events_and_ctx
+ ratelimit
+ """
+ serialized_events = []
+
+ for event, context in events_and_context:
+ serialized_context = await context.serialize(event, store)
+ serialized_event = {
+ "event": event.get_pdu_json(),
+ "room_version": event.room_version.identifier,
+ "event_format_version": event.format_version,
+ "internal_metadata": event.internal_metadata.get_dict(),
+ "outlier": event.internal_metadata.is_outlier(),
+ "rejected_reason": event.rejected_reason,
+ "context": serialized_context,
+ "requester": requester.serialize(),
+ "ratelimit": ratelimit,
+ "extra_users": [u.to_string() for u in extra_users],
+ }
+ serialized_events.append(serialized_event)
+
+ payload = {"events": serialized_events}
+
+ return payload
+
+ async def _handle_request( # type: ignore[override]
+ self, request: Request
+ ) -> Tuple[int, JsonDict]:
+ with Measure(self.clock, "repl_send_events_parse"):
+ payload = parse_json_object_from_request(request)
+ events_and_context = []
+ events = payload["events"]
+
+ for event_payload in events:
+ event_dict = event_payload["event"]
+ room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]]
+ internal_metadata = event_payload["internal_metadata"]
+ rejected_reason = event_payload["rejected_reason"]
+
+ event = make_event_from_dict(
+ event_dict, room_ver, internal_metadata, rejected_reason
+ )
+ event.internal_metadata.outlier = event_payload["outlier"]
+
+ requester = Requester.deserialize(
+ self.store, event_payload["requester"]
+ )
+ context = EventContext.deserialize(
+ self._storage_controllers, event_payload["context"]
+ )
+
+ ratelimit = event_payload["ratelimit"]
+ events_and_context.append((event, context))
+
+ extra_users = [
+ UserID.from_string(u) for u in event_payload["extra_users"]
+ ]
+
+ logger.info(
+ "Got batch of events to send, last ID of batch is: %s, sending into room: %s",
+ event.event_id,
+ event.room_id,
+ )
+
+ last_event = (
+ await self.event_creation_handler.persist_and_notify_client_events(
+ requester, events_and_context, ratelimit, extra_users
+ )
+ )
+
+ return (
+ 200,
+ {
+ "stream_id": last_event.internal_metadata.stream_ordering,
+ "event_id": last_event.event_id,
+ },
+ )
+
+
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+ ReplicationSendEventsRestServlet(hs).register(http_server)
diff --git a/synapse/replication/slave/__init__.py b/synapse/replication/slave/__init__.py
deleted file mode 100644
index f43a360a80..0000000000
--- a/synapse/replication/slave/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/synapse/replication/slave/storage/__init__.py b/synapse/replication/slave/storage/__init__.py
deleted file mode 100644
index f43a360a80..0000000000
--- a/synapse/replication/slave/storage/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
deleted file mode 100644
index 8f3f953ed4..0000000000
--- a/synapse/replication/slave/storage/_slaved_id_tracker.py
+++ /dev/null
@@ -1,50 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from typing import List, Optional, Tuple
-
-from synapse.storage.database import LoggingDatabaseConnection
-from synapse.storage.util.id_generators import AbstractStreamIdTracker, _load_current_id
-
-
-class SlavedIdTracker(AbstractStreamIdTracker):
- """Tracks the "current" stream ID of a stream with a single writer.
-
- See `AbstractStreamIdTracker` for more details.
-
- Note that this class does not work correctly when there are multiple
- writers.
- """
-
- def __init__(
- self,
- db_conn: LoggingDatabaseConnection,
- table: str,
- column: str,
- extra_tables: Optional[List[Tuple[str, str]]] = None,
- step: int = 1,
- ):
- self.step = step
- self._current = _load_current_id(db_conn, table, column, step)
- if extra_tables:
- for table, column in extra_tables:
- self.advance(None, _load_current_id(db_conn, table, column))
-
- def advance(self, instance_name: Optional[str], new_id: int) -> None:
- self._current = (max if self.step > 0 else min)(self._current, new_id)
-
- def get_current_token(self) -> int:
- return self._current
-
- def get_current_token_for_writer(self, instance_name: str) -> int:
- return self.get_current_token()
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
deleted file mode 100644
index 6fcade510a..0000000000
--- a/synapse/replication/slave/storage/devices.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import TYPE_CHECKING, Any, Iterable
-
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
-from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.devices import DeviceWorkerStore
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-
-class SlavedDeviceStore(DeviceWorkerStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- self.hs = hs
-
- self._device_list_id_gen = SlavedIdTracker(
- db_conn,
- "device_lists_stream",
- "stream_id",
- extra_tables=[
- ("user_signature_stream", "stream_id"),
- ("device_lists_outbound_pokes", "stream_id"),
- ("device_lists_changes_in_room", "stream_id"),
- ],
- )
-
- super().__init__(database, db_conn, hs)
-
- def get_device_stream_token(self) -> int:
- return self._device_list_id_gen.get_current_token()
-
- def process_replication_rows(
- self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
- ) -> None:
- if stream_name == DeviceListsStream.NAME:
- self._device_list_id_gen.advance(instance_name, token)
- self._invalidate_caches_for_devices(token, rows)
- elif stream_name == UserSignatureStream.NAME:
- self._device_list_id_gen.advance(instance_name, token)
- for row in rows:
- self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
- return super().process_replication_rows(stream_name, instance_name, token, rows)
-
- def _invalidate_caches_for_devices(
- self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow]
- ) -> None:
- for row in rows:
- # The entities are either user IDs (starting with '@') whose devices
- # have changed, or remote servers that we need to tell about
- # changes.
- if row.entity.startswith("@"):
- self._device_list_stream_cache.entity_has_changed(row.entity, token)
- self.get_cached_devices_for_user.invalidate((row.entity,))
- self._get_cached_user_device.invalidate((row.entity,))
- self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
-
- else:
- self._device_list_federation_stream_cache.entity_has_changed(
- row.entity, token
- )
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
deleted file mode 100644
index fe47778cb1..0000000000
--- a/synapse/replication/slave/storage/events.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import logging
-from typing import TYPE_CHECKING
-
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
-from synapse.storage.databases.main.event_push_actions import (
- EventPushActionsWorkerStore,
-)
-from synapse.storage.databases.main.events_worker import EventsWorkerStore
-from synapse.storage.databases.main.relations import RelationsWorkerStore
-from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
-from synapse.storage.databases.main.signatures import SignatureWorkerStore
-from synapse.storage.databases.main.state import StateGroupWorkerStore
-from synapse.storage.databases.main.stream import StreamWorkerStore
-from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-# So, um, we want to borrow a load of functions intended for reading from
-# a DataStore, but we don't want to take functions that either write to the
-# DataStore or are cached and don't have cache invalidation logic.
-#
-# Rather than write duplicate versions of those functions, or lift them to
-# a common base class, we going to grab the underlying __func__ object from
-# the method descriptor on the DataStore and chuck them into our class.
-
-
-class SlavedEventStore(
- EventFederationWorkerStore,
- RoomMemberWorkerStore,
- EventPushActionsWorkerStore,
- StreamWorkerStore,
- StateGroupWorkerStore,
- SignatureWorkerStore,
- EventsWorkerStore,
- UserErasureWorkerStore,
- RelationsWorkerStore,
-):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
-
- events_max = self._stream_id_gen.get_current_token()
- curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
- db_conn,
- "current_state_delta_stream",
- entity_column="room_id",
- stream_column="stream_id",
- max_value=events_max, # As we share the stream id with events token
- limit=1000,
- )
- self._curr_state_delta_stream_cache = StreamChangeCache(
- "_curr_state_delta_stream_cache",
- min_curr_state_delta_id,
- prefilled_cache=curr_state_delta_prefill,
- )
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
deleted file mode 100644
index c52679cd60..0000000000
--- a/synapse/replication/slave/storage/filtering.py
+++ /dev/null
@@ -1,35 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import TYPE_CHECKING
-
-from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.filtering import FilteringStore
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-
-class SlavedFilteringStore(SQLBaseStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
-
- # Filters are immutable so this cache doesn't need to be expired
- get_user_filter = FilteringStore.__dict__["get_user_filter"]
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
deleted file mode 100644
index a00b38c512..0000000000
--- a/synapse/replication/slave/storage/keys.py
+++ /dev/null
@@ -1,20 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.storage.databases.main.keys import KeyStore
-
-# KeyStore isn't really safe to use from a worker, but for now we do so and hope that
-# the races it creates aren't too bad.
-
-SlavedKeyStore = KeyStore
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
deleted file mode 100644
index 52ee3f7e58..0000000000
--- a/synapse/replication/slave/storage/push_rule.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from typing import Any, Iterable
-
-from synapse.replication.tcp.streams import PushRulesStream
-from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
-
-from .events import SlavedEventStore
-
-
-class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
- def get_max_push_rules_stream_id(self) -> int:
- return self._push_rules_stream_id_gen.get_current_token()
-
- def process_replication_rows(
- self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
- ) -> None:
- if stream_name == PushRulesStream.NAME:
- self._push_rules_stream_id_gen.advance(instance_name, token)
- for row in rows:
- self.get_push_rules_for_user.invalidate((row.user_id,))
- self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
- self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
- return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
deleted file mode 100644
index 44ed20e424..0000000000
--- a/synapse/replication/slave/storage/pushers.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from typing import TYPE_CHECKING, Any, Iterable
-
-from synapse.replication.tcp.streams import PushersStream
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-from synapse.storage.databases.main.pusher import PusherWorkerStore
-
-from ._slaved_id_tracker import SlavedIdTracker
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-
-class SlavedPusherStore(PusherWorkerStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
- self._pushers_id_gen = SlavedIdTracker( # type: ignore
- db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
- )
-
- def get_pushers_stream_token(self) -> int:
- return self._pushers_id_gen.get_current_token()
-
- def process_replication_rows(
- self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
- ) -> None:
- if stream_name == PushersStream.NAME:
- self._pushers_id_gen.advance(instance_name, token)
- return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e4f2201c92..18252a2958 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -189,7 +189,9 @@ class ReplicationDataHandler:
if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
- await self.start_pusher(row.user_id, row.app_id, row.pushkey)
+ await self.process_pusher_change(
+ row.user_id, row.app_id, row.pushkey
+ )
elif stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
@@ -208,15 +210,16 @@ class ReplicationDataHandler:
max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token)
- await self.notifier.on_new_room_event_args(
- event_pos=event_pos,
- max_room_stream_token=max_token,
- extra_users=extra_users,
- room_id=row.data.room_id,
- event_id=row.data.event_id,
- event_type=row.data.type,
- state_key=row.data.state_key,
- membership=row.data.membership,
+ event_entry = self.notifier.create_pending_room_event_entry(
+ event_pos,
+ extra_users,
+ row.data.room_id,
+ row.data.type,
+ row.data.state_key,
+ row.data.membership,
+ )
+ await self.notifier.notify_new_room_events(
+ [(event_entry, row.data.event_id)], max_token
)
# If this event is a join, make a note of it so we have an accurate
@@ -334,13 +337,15 @@ class ReplicationDataHandler:
logger.info("Stopping pusher %r / %r", user_id, key)
pusher.on_stop()
- async def start_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
+ async def process_pusher_change(
+ self, user_id: str, app_id: str, pushkey: str
+ ) -> None:
if not self._notify_pushers:
return
key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
- await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
+ await self._pusher_pool.process_pusher_change_by_id(app_id, pushkey, user_id)
class FederationSenderHandler:
@@ -423,7 +428,8 @@ class FederationSenderHandler:
receipt.receipt_type,
receipt.user_id,
[receipt.event_id],
- receipt.data,
+ thread_id=receipt.thread_id,
+ data=receipt.data,
)
await self.federation_sender.send_read_receipt(receipt_info)
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index e1cbfa50eb..0f166d16aa 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -35,7 +35,6 @@ from twisted.internet.protocol import ReconnectingClientFactory
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
@@ -332,46 +331,31 @@ class ReplicationCommandHandler:
def start_replication(self, hs: "HomeServer") -> None:
"""Helper method to start replication."""
- if hs.config.redis.redis_enabled:
- from synapse.replication.tcp.redis import (
- RedisDirectTcpReplicationClientFactory,
- )
+ from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory
- # First let's ensure that we have a ReplicationStreamer started.
- hs.get_replication_streamer()
+ # First let's ensure that we have a ReplicationStreamer started.
+ hs.get_replication_streamer()
- # We need two connections to redis, one for the subscription stream and
- # one to send commands to (as you can't send further redis commands to a
- # connection after SUBSCRIBE is called).
+ # We need two connections to redis, one for the subscription stream and
+ # one to send commands to (as you can't send further redis commands to a
+ # connection after SUBSCRIBE is called).
- # First create the connection for sending commands.
- outbound_redis_connection = hs.get_outbound_redis_connection()
+ # First create the connection for sending commands.
+ outbound_redis_connection = hs.get_outbound_redis_connection()
- # Now create the factory/connection for the subscription stream.
- self._factory = RedisDirectTcpReplicationClientFactory(
- hs,
- outbound_redis_connection,
- channel_names=self._channels_to_subscribe_to,
- )
- hs.get_reactor().connectTCP(
- hs.config.redis.redis_host,
- hs.config.redis.redis_port,
- self._factory,
- timeout=30,
- bindAddress=None,
- )
- else:
- client_name = hs.get_instance_name()
- self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
- host = hs.config.worker.worker_replication_host
- port = hs.config.worker.worker_replication_port
- hs.get_reactor().connectTCP(
- host,
- port,
- self._factory,
- timeout=30,
- bindAddress=None,
- )
+ # Now create the factory/connection for the subscription stream.
+ self._factory = RedisDirectTcpReplicationClientFactory(
+ hs,
+ outbound_redis_connection,
+ channel_names=self._channels_to_subscribe_to,
+ )
+ hs.get_reactor().connectTCP(
+ hs.config.redis.redis_host,
+ hs.config.redis.redis_port,
+ self._factory,
+ timeout=30,
+ bindAddress=None,
+ )
def get_streams(self) -> Dict[str, Stream]:
"""Get a map from stream name to all streams."""
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 7763ffb2d0..56a5c21910 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -245,7 +245,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self._parse_and_dispatch_line(line)
def _parse_and_dispatch_line(self, line: bytes) -> None:
- if line.strip() == "":
+ if line.strip() == b"":
# Ignore blank lines
return
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 398bebeaa6..e01155ad59 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -361,6 +361,7 @@ class ReceiptsStream(Stream):
receipt_type: str
user_id: str
event_id: str
+ thread_id: Optional[str]
data: dict
NAME = "receipts"
|