From d5275fc55f4edc42d1543825da2c13df63d96927 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jan 2020 13:47:50 +0000 Subject: Propagate cache invalidates from workers to other workers. (#6748) Currently if a worker invalidates a cache it will be streamed to master, which then didn't forward those to other workers. --- synapse/replication/tcp/protocol.py | 2 +- synapse/replication/tcp/resource.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) (limited to 'synapse/replication/tcp') diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 131e5acb09..bc1482a9bb 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -459,7 +459,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id) async def on_INVALIDATE_CACHE(self, cmd): - self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + await self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): self.streamer.on_remote_server_up(cmd.data) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 6ebf944f66..ce60ae2e07 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -17,7 +17,7 @@ import logging import random -from typing import List +from typing import Any, List from six import itervalues @@ -271,11 +271,14 @@ class ReplicationStreamer(object): self.notifier.on_new_replication_data() @measure_func("repl.on_invalidate_cache") - def on_invalidate_cache(self, cache_func, keys): + async def on_invalidate_cache(self, cache_func: str, keys: List[Any]): """The client has asked us to invalidate a cache """ invalidate_cache_counter.inc() - getattr(self.store, cache_func).invalidate(tuple(keys)) + + # We invalidate the cache locally, but then also stream that to other + # workers. + await self.store.invalidate_cache_and_stream(cache_func, tuple(keys)) @measure_func("repl.on_user_ip") async def on_user_ip( -- cgit 1.5.1 From c3d4ad8afdbe181707451410100dec4817c2c01a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Jan 2020 16:42:11 +0000 Subject: Fix sending server up commands from workers (#6811) Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/6811.bugfix | 1 + synapse/federation/transport/client.py | 5 ++++- synapse/federation/transport/server.py | 26 +++++++++++++++----------- synapse/replication/tcp/client.py | 4 ++++ synapse/server.pyi | 12 +++++++++++- tox.ini | 1 + 6 files changed, 36 insertions(+), 13 deletions(-) create mode 100644 changelog.d/6811.bugfix (limited to 'synapse/replication/tcp') diff --git a/changelog.d/6811.bugfix b/changelog.d/6811.bugfix new file mode 100644 index 0000000000..361f2fc2e8 --- /dev/null +++ b/changelog.d/6811.bugfix @@ -0,0 +1 @@ +Fix waking up other workers when remote server is detected to have come back online. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 198257414b..dc563538de 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +from typing import Any, Dict from six.moves import urllib @@ -352,7 +353,9 @@ class TransportLayerClient(object): else: path = _create_v1_path("/publicRooms") - args = {"include_all_networks": "true" if include_all_networks else "false"} + args = { + "include_all_networks": "true" if include_all_networks else "false" + } # type: Dict[str, Any] if third_party_instance_id: args["third_party_instance_id"] = (third_party_instance_id,) if limit: diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index d8cf9ed299..125eadd796 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,6 +18,7 @@ import functools import logging import re +from typing import Optional, Tuple, Type from twisted.internet.defer import maybeDeferred @@ -267,6 +268,8 @@ class BaseFederationServlet(object): returned. """ + PATH = "" # Overridden in subclasses, the regex to match against the path. + REQUIRE_AUTH = True PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version @@ -347,9 +350,6 @@ class BaseFederationServlet(object): return response - # Extra logic that functools.wraps() doesn't finish - new_func.__self__ = func.__self__ - return new_func def register(self, server): @@ -824,7 +824,7 @@ class PublicRoomList(BaseFederationServlet): if not self.allow_access: raise FederationDeniedError(origin) - limit = int(content.get("limit", 100)) + limit = int(content.get("limit", 100)) # type: Optional[int] since_token = content.get("since", None) search_filter = content.get("filter", None) @@ -971,7 +971,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet): if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - result = await self.groups_handler.update_room_in_group( + result = await self.handler.update_room_in_group( group_id, requester_user_id, room_id, config_key, content ) @@ -1422,11 +1422,13 @@ FEDERATION_SERVLET_CLASSES = ( On3pidBindServlet, FederationVersionServlet, RoomComplexityServlet, -) +) # type: Tuple[Type[BaseFederationServlet], ...] -OPENID_SERVLET_CLASSES = (OpenIdUserInfo,) +OPENID_SERVLET_CLASSES = ( + OpenIdUserInfo, +) # type: Tuple[Type[BaseFederationServlet], ...] -ROOM_LIST_CLASSES = (PublicRoomList,) +ROOM_LIST_CLASSES = (PublicRoomList,) # type: Tuple[Type[PublicRoomList], ...] GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsProfileServlet, @@ -1447,17 +1449,19 @@ GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsAddRoomsServlet, FederationGroupsAddRoomsConfigServlet, FederationGroupsSettingJoinPolicyServlet, -) +) # type: Tuple[Type[BaseFederationServlet], ...] GROUP_LOCAL_SERVLET_CLASSES = ( FederationGroupsLocalInviteServlet, FederationGroupsRemoveLocalUserServlet, FederationGroupsBulkPublicisedServlet, -) +) # type: Tuple[Type[BaseFederationServlet], ...] -GROUP_ATTESTATION_SERVLET_CLASSES = (FederationGroupsRenewAttestaionServlet,) +GROUP_ATTESTATION_SERVLET_CLASSES = ( + FederationGroupsRenewAttestaionServlet, +) # type: Tuple[Type[BaseFederationServlet], ...] DEFAULT_SERVLET_GROUPS = ( "federation", diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index fc06a7b053..02ab5b66ea 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -31,6 +31,7 @@ from .commands import ( Command, FederationAckCommand, InvalidateCacheCommand, + RemoteServerUpCommand, RemovePusherCommand, UserIpCommand, UserSyncCommand, @@ -210,6 +211,9 @@ class ReplicationClientHandler(AbstractReplicationClientHandler): cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen) self.send_command(cmd) + def send_remote_server_up(self, server: str): + self.send_command(RemoteServerUpCommand(server)) + def await_sync(self, data): """Returns a deferred that is resolved when we receive a SYNC command with given data. diff --git a/synapse/server.pyi b/synapse/server.pyi index 0731403047..90347ac23e 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -2,8 +2,8 @@ import twisted.internet import synapse.api.auth import synapse.config.homeserver +import synapse.crypto.keyring import synapse.federation.sender -import synapse.federation.transaction_queue import synapse.federation.transport.client import synapse.handlers import synapse.handlers.auth @@ -17,6 +17,7 @@ import synapse.handlers.room_member import synapse.handlers.set_password import synapse.http.client import synapse.notifier +import synapse.replication.tcp.client import synapse.rest.media.v1.media_repository import synapse.server_notices.server_notices_manager import synapse.server_notices.server_notices_sender @@ -27,6 +28,9 @@ class HomeServer(object): @property def config(self) -> synapse.config.homeserver.HomeServerConfig: pass + @property + def hostname(self) -> str: + pass def get_auth(self) -> synapse.api.auth.Auth: pass def get_auth_handler(self) -> synapse.handlers.auth.AuthHandler: @@ -97,3 +101,9 @@ class HomeServer(object): pass def get_reactor(self) -> twisted.internet.base.ReactorBase: pass + def get_keyring(self) -> synapse.crypto.keyring.Keyring: + pass + def get_tcp_replication( + self, + ) -> synapse.replication.tcp.client.ReplicationClientHandler: + pass diff --git a/tox.ini b/tox.ini index 1d946a02ba..88ef12bebd 100644 --- a/tox.ini +++ b/tox.ini @@ -179,6 +179,7 @@ extras = all commands = mypy \ synapse/api \ synapse/config/ \ + synapse/federation/transport \ synapse/handlers/ui_auth \ synapse/logging/ \ synapse/module_api \ -- cgit 1.5.1