summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/_base.py21
-rw-r--r--synapse/replication/tcp/client.py18
2 files changed, 30 insertions, 9 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py

index 2bd244ed79..a4ae4040c3 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py
@@ -26,7 +26,8 @@ from twisted.web.server import Request from synapse.api.errors import HttpResponseException, SynapseError from synapse.http import RequestTimedOutError -from synapse.http.server import HttpServer +from synapse.http.server import HttpServer, is_method_cancellable +from synapse.http.site import SynapseRequest from synapse.logging import opentracing from synapse.logging.opentracing import trace from synapse.types import JsonDict @@ -310,6 +311,12 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): url_args = list(self.PATH_ARGS) method = self.METHOD + if self.CACHE and is_method_cancellable(self._handle_request): + raise Exception( + f"{self.__class__.__name__} has been marked as cancellable, but CACHE " + "is set. The cancellable flag would have no effect." + ) + if self.CACHE: url_args.append("txn_id") @@ -324,7 +331,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): ) async def _check_auth_and_handle( - self, request: Request, **kwargs: Any + self, request: SynapseRequest, **kwargs: Any ) -> Tuple[int, JsonDict]: """Called on new incoming requests when caching is enabled. Checks if there is a cached response for the request and returns that, @@ -340,8 +347,18 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): if self.CACHE: txn_id = kwargs.pop("txn_id") + # We ignore the `@cancellable` flag, since cancellation wouldn't interupt + # `_handle_request` and `ResponseCache` does not handle cancellation + # correctly yet. In particular, there may be issues to do with logging + # context lifetimes. + return await self.response_cache.wrap( txn_id, self._handle_request, request, **kwargs ) + # The `@cancellable` decorator may be applied to `_handle_request`. But we + # told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`, + # so we have to set up the cancellable flag ourselves. + request.is_render_cancellable = is_method_cancellable(self._handle_request) + return await self._handle_request(request, **kwargs) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 350762f494..a52e25c1af 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -43,7 +43,7 @@ from synapse.replication.tcp.streams.events import ( EventsStreamEventRow, EventsStreamRow, ) -from synapse.types import PersistedEventPosition, ReadReceipt, UserID +from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred from synapse.util.metrics import Measure @@ -153,19 +153,19 @@ class ReplicationDataHandler: if stream_name == TypingStream.NAME: self._typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows] + StreamKeyType.TYPING, token, rooms=[row.room_id for row in rows] ) elif stream_name == PushRulesStream.NAME: self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows] + StreamKeyType.PUSH_RULES, token, users=[row.user_id for row in rows] ) elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows] + StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows] ) elif stream_name == ReceiptsStream.NAME: self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows] + StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows] ) await self._pusher_pool.on_new_receipts( token, token, {row.room_id for row in rows} @@ -173,14 +173,18 @@ class ReplicationDataHandler: elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: - self.notifier.on_new_event("to_device_key", token, users=entities) + self.notifier.on_new_event( + StreamKeyType.TO_DEVICE, token, users=entities + ) elif stream_name == DeviceListsStream.NAME: all_room_ids: Set[str] = set() for row in rows: if row.entity.startswith("@"): room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) - self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids + ) elif stream_name == GroupServerStream.NAME: self.notifier.on_new_event( "groups_key", token, users=[row.user_id for row in rows]