diff options
author | Matthew Hodgson <matthew@arasphere.net> | 2018-03-14 15:38:05 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-14 15:38:05 +0000 |
commit | 056a6df5462d45eeb4cc9870492f6147677e7213 (patch) | |
tree | a3a4c576bfd2749d297315a9e6b5cd3f53161d87 | |
parent | pep8 (diff) | |
parent | Merge pull request #2995 from matrix-org/erikj/enable_membership_worker (diff) | |
download | synapse-056a6df5462d45eeb4cc9870492f6147677e7213.tar.xz |
Merge branch 'develop' into matthew/filter_members
-rw-r--r-- | synapse/app/event_creator.py | 14 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 5 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 287 | ||||
-rw-r--r-- | synapse/handlers/room_member_worker.py | 102 | ||||
-rw-r--r-- | synapse/replication/http/__init__.py | 5 | ||||
-rw-r--r-- | synapse/replication/http/membership.py | 334 | ||||
-rw-r--r-- | synapse/replication/slave/storage/profile.py | 21 | ||||
-rw-r--r-- | synapse/server.py | 7 | ||||
-rw-r--r-- | synapse/storage/profile.py | 50 | ||||
-rw-r--r-- | synapse/storage/room.py | 30 |
10 files changed, 711 insertions, 144 deletions
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index eb593c5278..172e989b54 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -31,14 +31,20 @@ from synapse.replication.slave.storage.account_data import SlavedAccountDataStor from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.rest.client.v1.room import RoomSendEventRestServlet +from synapse.rest.client.v1.room import ( + RoomSendEventRestServlet, RoomMembershipRestServlet, RoomStateEventRestServlet, + JoinRoomAliasServlet, +) from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree @@ -52,6 +58,9 @@ logger = logging.getLogger("synapse.app.event_creator") class EventCreatorSlavedStore( + DirectoryStore, + TransactionStore, + SlavedProfileStore, SlavedAccountDataStore, SlavedPusherStore, SlavedReceiptsStore, @@ -85,6 +94,9 @@ class EventCreatorServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) RoomSendEventRestServlet(self).register(resource) + RoomMembershipRestServlet(self).register(resource) + RoomStateEventRestServlet(self).register(resource) + JoinRoomAliasServlet(self).register(resource) resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index cb710fe796..3465a787ab 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -38,7 +38,10 @@ class ProfileHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() - self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS) + if hs.config.worker_app is None: + self.clock.looping_call( + self._update_remote_profile_cache, self.PROFILE_UPDATE_MS, + ) @defer.inlineCallbacks def get_profile(self, user_id): diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 0127cf4166..9977be8831 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import abc import logging from signedjson.key import decode_verify_key_bytes @@ -31,6 +31,7 @@ from synapse.types import UserID, RoomID from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room + logger = logging.getLogger(__name__) id_server_scheme = "https://" @@ -42,6 +43,8 @@ class RoomMemberHandler(object): # API that takes ID strings and returns pagination chunks. These concerns # ought to be separated out a lot better. + __metaclass__ = abc.ABCMeta + def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() @@ -61,9 +64,87 @@ class RoomMemberHandler(object): self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() - self.distributor = hs.get_distributor() - self.distributor.declare("user_joined_room") - self.distributor.declare("user_left_room") + @abc.abstractmethod + def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + """Try and join a room that this server is not in + + Args: + requester (Requester) + remote_room_hosts (list[str]): List of servers that can be used + to join via. + room_id (str): Room that we are trying to join + user (UserID): User who is trying to join + content (dict): A dict that should be used as the content of the + join event. + + Returns: + Deferred + """ + raise NotImplementedError() + + @abc.abstractmethod + def _remote_reject_invite(self, remote_room_hosts, room_id, target): + """Attempt to reject an invite for a room this server is not in. If we + fail to do so we locally mark the invite as rejected. + + Args: + requester (Requester) + remote_room_hosts (list[str]): List of servers to use to try and + reject invite + room_id (str) + target (UserID): The user rejecting the invite + + Returns: + Deferred[dict]: A dictionary to be returned to the client, may + include event_id etc, or nothing if we locally rejected + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Get a guest access token for a 3PID, creating a guest account if + one doesn't already exist. + + Args: + requester (Requester) + medium (str) + address (str) + inviter_user_id (str): The user ID who is trying to invite the + 3PID + + Returns: + Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the + 3PID guest account. + """ + raise NotImplementedError() + + @abc.abstractmethod + def _user_joined_room(self, target, room_id): + """Notifies distributor on master process that the user has joined the + room. + + Args: + target (UserID) + room_id (str) + + Returns: + Deferred|None + """ + raise NotImplementedError() + + @abc.abstractmethod + def _user_left_room(self, target, room_id): + """Notifies distributor on master process that the user has left the + room. + + Args: + target (UserID) + room_id (str) + + Returns: + Deferred|None + """ + raise NotImplementedError() @defer.inlineCallbacks def _local_membership_update( @@ -127,83 +208,16 @@ class RoomMemberHandler(object): prev_member_event = yield self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - yield user_joined_room(self.distributor, target, room_id) + yield self._user_joined_room(target, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: - user_left_room(self.distributor, target, room_id) + yield self._user_left_room(target, room_id) defer.returnValue(event) @defer.inlineCallbacks - def _remote_join(self, remote_room_hosts, room_id, user, content): - """Try and join a room that this server is not in - - Args: - remote_room_hosts (list[str]): List of servers that can be used - to join via. - room_id (str): Room that we are trying to join - user (UserID): User who is trying to join - content (dict): A dict that should be used as the content of the - join event. - - Returns: - Deferred - """ - if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") - - # We don't do an auth check if we are doing an invite - # join dance for now, since we're kinda implicitly checking - # that we are allowed to join when we decide whether or not we - # need to do the invite/join dance. - yield self.federation_handler.do_invite_join( - remote_room_hosts, - room_id, - user.to_string(), - content, - ) - yield user_joined_room(self.distributor, user, room_id) - - @defer.inlineCallbacks - def _remote_reject_invite(self, remote_room_hosts, room_id, target): - """Attempt to reject an invite for a room this server is not in. If we - fail to do so we locally mark the invite as rejected. - - Args: - remote_room_hosts (list[str]): List of servers to use to try and - reject invite - room_id (str) - target (UserID): The user rejecting the invite - - Returns: - Deferred[dict]: A dictionary to be returned to the client, may - include event_id etc, or nothing if we locally rejected - """ - fed_handler = self.federation_handler - try: - ret = yield fed_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - target.to_string(), - ) - defer.returnValue(ret) - except Exception as e: - # if we were unable to reject the exception, just mark - # it as rejected on our end and plough ahead. - # - # The 'except' clause is very broad, but we need to - # capture everything from DNS failures upwards - # - logger.warn("Failed to reject invite: %s", e) - - yield self.store.locally_reject_invite( - target.to_string(), room_id - ) - defer.returnValue({}) - - @defer.inlineCallbacks def update_membership( self, requester, @@ -356,7 +370,7 @@ class RoomMemberHandler(object): content["kind"] = "guest" ret = yield self._remote_join( - remote_room_hosts, room_id, target, content + requester, remote_room_hosts, room_id, target, content ) defer.returnValue(ret) @@ -378,7 +392,7 @@ class RoomMemberHandler(object): # send the rejection to the inviter's HS. remote_room_hosts = remote_room_hosts + [inviter.domain] res = yield self._remote_reject_invite( - remote_room_hosts, room_id, target, + requester, remote_room_hosts, room_id, target, ) defer.returnValue(res) @@ -476,12 +490,12 @@ class RoomMemberHandler(object): prev_member_event = yield self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - yield user_joined_room(self.distributor, target_user, room_id) + yield self._user_joined_room(target_user, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: - user_left_room(self.distributor, target_user, room_id) + yield self._user_left_room(target_user, room_id) @defer.inlineCallbacks def _can_guest_join(self, current_state_ids): @@ -672,6 +686,7 @@ class RoomMemberHandler(object): token, public_keys, fallback_public_key, display_name = ( yield self._ask_id_server_for_third_party_invite( + requester=requester, id_server=id_server, medium=medium, address=address, @@ -708,6 +723,7 @@ class RoomMemberHandler(object): @defer.inlineCallbacks def _ask_id_server_for_third_party_invite( self, + requester, id_server, medium, address, @@ -724,6 +740,7 @@ class RoomMemberHandler(object): Asks an identity server for a third party invite. Args: + requester (Requester) id_server (str): hostname + optional port for the identity server. medium (str): The literal string "email". address (str): The third party address being invited. @@ -766,8 +783,8 @@ class RoomMemberHandler(object): } if self.config.invite_3pid_guest: - rh = self.registration_handler - guest_user_id, guest_access_token = yield rh.get_or_register_3pid_guest( + guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest( + requester=requester, medium=medium, address=address, inviter_user_id=inviter_user_id, @@ -801,27 +818,6 @@ class RoomMemberHandler(object): defer.returnValue((token, public_keys, fallback_public_key, display_name)) @defer.inlineCallbacks - def forget(self, user, room_id): - user_id = user.to_string() - - member = yield self.state_handler.get_current_state( - room_id=room_id, - event_type=EventTypes.Member, - state_key=user_id - ) - membership = member.membership if member else None - - if membership is not None and membership not in [ - Membership.LEAVE, Membership.BAN - ]: - raise SynapseError(400, "User %s in room %s" % ( - user_id, room_id - )) - - if membership: - yield self.store.forget(user_id, room_id) - - @defer.inlineCallbacks def _is_host_in_room(self, current_state_ids): # Have we just created the room, and is this about to be the very # first member event? @@ -842,3 +838,94 @@ class RoomMemberHandler(object): defer.returnValue(True) defer.returnValue(False) + + +class RoomMemberMasterHandler(RoomMemberHandler): + def __init__(self, hs): + super(RoomMemberMasterHandler, self).__init__(hs) + + self.distributor = hs.get_distributor() + self.distributor.declare("user_joined_room") + self.distributor.declare("user_left_room") + + @defer.inlineCallbacks + def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + """Implements RoomMemberHandler._remote_join + """ + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + # We don't do an auth check if we are doing an invite + # join dance for now, since we're kinda implicitly checking + # that we are allowed to join when we decide whether or not we + # need to do the invite/join dance. + yield self.federation_handler.do_invite_join( + remote_room_hosts, + room_id, + user.to_string(), + content, + ) + yield self._user_joined_room(user, room_id) + + @defer.inlineCallbacks + def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + """Implements RoomMemberHandler._remote_reject_invite + """ + fed_handler = self.federation_handler + try: + ret = yield fed_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + target.to_string(), + ) + defer.returnValue(ret) + except Exception as e: + # if we were unable to reject the exception, just mark + # it as rejected on our end and plough ahead. + # + # The 'except' clause is very broad, but we need to + # capture everything from DNS failures upwards + # + logger.warn("Failed to reject invite: %s", e) + + yield self.store.locally_reject_invite( + target.to_string(), room_id + ) + defer.returnValue({}) + + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Implements RoomMemberHandler.get_or_register_3pid_guest + """ + rg = self.registration_handler + return rg.get_or_register_3pid_guest(medium, address, inviter_user_id) + + def _user_joined_room(self, target, room_id): + """Implements RoomMemberHandler._user_joined_room + """ + return user_joined_room(self.distributor, target, room_id) + + def _user_left_room(self, target, room_id): + """Implements RoomMemberHandler._user_left_room + """ + return user_left_room(self.distributor, target, room_id) + + @defer.inlineCallbacks + def forget(self, user, room_id): + user_id = user.to_string() + + member = yield self.state_handler.get_current_state( + room_id=room_id, + event_type=EventTypes.Member, + state_key=user_id + ) + membership = member.membership if member else None + + if membership is not None and membership not in [ + Membership.LEAVE, Membership.BAN + ]: + raise SynapseError(400, "User %s in room %s" % ( + user_id, room_id + )) + + if membership: + yield self.store.forget(user_id, room_id) diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py new file mode 100644 index 0000000000..493aec1e48 --- /dev/null +++ b/synapse/handlers/room_member_worker.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.errors import SynapseError +from synapse.handlers.room_member import RoomMemberHandler +from synapse.replication.http.membership import ( + remote_join, remote_reject_invite, get_or_register_3pid_guest, + notify_user_membership_change, +) + + +logger = logging.getLogger(__name__) + + +class RoomMemberWorkerHandler(RoomMemberHandler): + @defer.inlineCallbacks + def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + """Implements RoomMemberHandler._remote_join + """ + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + ret = yield remote_join( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + remote_room_hosts=remote_room_hosts, + room_id=room_id, + user_id=user.to_string(), + content=content, + ) + + yield self._user_joined_room(user, room_id) + + defer.returnValue(ret) + + def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + """Implements RoomMemberHandler._remote_reject_invite + """ + return remote_reject_invite( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + remote_room_hosts=remote_room_hosts, + room_id=room_id, + user_id=target.to_string(), + ) + + def _user_joined_room(self, target, room_id): + """Implements RoomMemberHandler._user_joined_room + """ + return notify_user_membership_change( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + user_id=target.to_string(), + room_id=room_id, + change="joined", + ) + + def _user_left_room(self, target, room_id): + """Implements RoomMemberHandler._user_left_room + """ + return notify_user_membership_change( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + user_id=target.to_string(), + room_id=room_id, + change="left", + ) + + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Implements RoomMemberHandler.get_or_register_3pid_guest + """ + return get_or_register_3pid_guest( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + medium=medium, + address=address, + inviter_user_id=inviter_user_id, + ) diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index b378b41646..1d7a607529 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -13,10 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. - -import send_event - from synapse.http.server import JsonResource +from synapse.replication.http import membership, send_event REPLICATION_PREFIX = "/_synapse/replication" @@ -29,3 +27,4 @@ class ReplicationRestResource(JsonResource): def register_servlets(self, hs): send_event.register_servlets(hs, self) + membership.register_servlets(hs, self) diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py new file mode 100644 index 0000000000..e66c4e881f --- /dev/null +++ b/synapse/replication/http/membership.py @@ -0,0 +1,334 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re + +from twisted.internet import defer + +from synapse.api.errors import SynapseError, MatrixCodeMessageException +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.types import Requester, UserID +from synapse.util.distributor import user_left_room, user_joined_room + +logger = logging.getLogger(__name__) + + +@defer.inlineCallbacks +def remote_join(client, host, port, requester, remote_room_hosts, + room_id, user_id, content): + """Ask the master to do a remote join for the given user to the given room + + Args: + client (SimpleHttpClient) + host (str): host of master + port (int): port on master listening for HTTP replication + requester (Requester) + remote_room_hosts (list[str]): Servers to try and join via + room_id (str) + user_id (str) + content (dict): The event content to use for the join event + + Returns: + Deferred + """ + uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port) + + payload = { + "requester": requester.serialize(), + "remote_room_hosts": remote_room_hosts, + "room_id": room_id, + "user_id": user_id, + "content": content, + } + + try: + result = yield client.post_json_get_json(uri, payload) + except MatrixCodeMessageException as e: + # We convert to SynapseError as we know that it was a SynapseError + # on the master process that we should send to the client. (And + # importantly, not stack traces everywhere) + raise SynapseError(e.code, e.msg, e.errcode) + defer.returnValue(result) + + +@defer.inlineCallbacks +def remote_reject_invite(client, host, port, requester, remote_room_hosts, + room_id, user_id): + """Ask master to reject the invite for the user and room. + + Args: + client (SimpleHttpClient) + host (str): host of master + port (int): port on master listening for HTTP replication + requester (Requester) + remote_room_hosts (list[str]): Servers to try and reject via + room_id (str) + user_id (str) + + Returns: + Deferred + """ + uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port) + + payload = { + "requester": requester.serialize(), + "remote_room_hosts": remote_room_hosts, + "room_id": room_id, + "user_id": user_id, + } + + try: + result = yield client.post_json_get_json(uri, payload) + except MatrixCodeMessageException as e: + # We convert to SynapseError as we know that it was a SynapseError + # on the master process that we should send to the client. (And + # importantly, not stack traces everywhere) + raise SynapseError(e.code, e.msg, e.errcode) + defer.returnValue(result) + + +@defer.inlineCallbacks +def get_or_register_3pid_guest(client, host, port, requester, + medium, address, inviter_user_id): + """Ask the master to get/create a guest account for given 3PID. + + Args: + client (SimpleHttpClient) + host (str): host of master + port (int): port on master listening for HTTP replication + requester (Requester) + medium (str) + address (str) + inviter_user_id (str): The user ID who is trying to invite the + 3PID + + Returns: + Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the + 3PID guest account. + """ + + uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port) + + payload = { + "requester": requester.serialize(), + "medium": medium, + "address": address, + "inviter_user_id": inviter_user_id, + } + + try: + result = yield client.post_json_get_json(uri, payload) + except MatrixCodeMessageException as e: + # We convert to SynapseError as we know that it was a SynapseError + # on the master process that we should send to the client. (And + # importantly, not stack traces everywhere) + raise SynapseError(e.code, e.msg, e.errcode) + defer.returnValue(result) + + +@defer.inlineCallbacks +def notify_user_membership_change(client, host, port, user_id, room_id, change): + """Notify master that a user has joined or left the room + + Args: + client (SimpleHttpClient) + host (str): host of master + port (int): port on master listening for HTTP replication. + user_id (str) + room_id (str) + change (str): Either "join" or "left" + + Returns: + Deferred + """ + assert change in ("joined", "left") + + uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change) + + payload = { + "user_id": user_id, + "room_id": room_id, + } + + try: + result = yield client.post_json_get_json(uri, payload) + except MatrixCodeMessageException as e: + # We convert to SynapseError as we know that it was a SynapseError + # on the master process that we should send to the client. (And + # importantly, not stack traces everywhere) + raise SynapseError(e.code, e.msg, e.errcode) + defer.returnValue(result) + + +class ReplicationRemoteJoinRestServlet(RestServlet): + PATTERNS = [re.compile("^/_synapse/replication/remote_join$")] + + def __init__(self, hs): + super(ReplicationRemoteJoinRestServlet, self).__init__() + + self.federation_handler = hs.get_handlers().federation_handler + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def on_POST(self, request): + content = parse_json_object_from_request(request) + + remote_room_hosts = content["remote_room_hosts"] + room_id = content["room_id"] + user_id = content["user_id"] + event_content = content["content"] + + requester = Requester.deserialize(self.store, content["requester"]) + + if requester.user: + request.authenticated_entity = requester.user.to_string() + + logger.info( + "remote_join: %s into room: %s", + user_id, room_id, + ) + + yield self.federation_handler.do_invite_join( + remote_room_hosts, + room_id, + user_id, + event_content, + ) + + defer.returnValue((200, {})) + + +class ReplicationRemoteRejectInviteRestServlet(RestServlet): + PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")] + + def __init__(self, hs): + super(ReplicationRemoteRejectInviteRestServlet, self).__init__() + + self.federation_handler = hs.get_handlers().federation_handler + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def on_POST(self, request): + content = parse_json_object_from_request(request) + + remote_room_hosts = content["remote_room_hosts"] + room_id = content["room_id"] + user_id = content["user_id"] + + requester = Requester.deserialize(self.store, content["requester"]) + + if requester.user: + request.authenticated_entity = requester.user.to_string() + + logger.info( + "remote_reject_invite: %s out of room: %s", + user_id, room_id, + ) + + try: + event = yield self.federation_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + user_id, + ) + ret = event.get_pdu_json() + except Exception as e: + # if we were unable to reject the exception, just mark + # it as rejected on our end and plough ahead. + # + # The 'except' clause is very broad, but we need to + # capture everything from DNS failures upwards + # + logger.warn("Failed to reject invite: %s", e) + + yield self.store.locally_reject_invite( + user_id, room_id + ) + ret = {} + + defer.returnValue((200, ret)) + + +class ReplicationRegister3PIDGuestRestServlet(RestServlet): + PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")] + + def __init__(self, hs): + super(ReplicationRegister3PIDGuestRestServlet, self).__init__() + + self.registeration_handler = hs.get_handlers().registration_handler + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def on_POST(self, request): + content = parse_json_object_from_request(request) + + medium = content["medium"] + address = content["address"] + inviter_user_id = content["inviter_user_id"] + + requester = Requester.deserialize(self.store, content["requester"]) + + if requester.user: + request.authenticated_entity = requester.user.to_string() + + logger.info("get_or_register_3pid_guest: %r", content) + + ret = yield self.registeration_handler.get_or_register_3pid_guest( + medium, address, inviter_user_id, + ) + + defer.returnValue((200, ret)) + + +class ReplicationUserJoinedLeftRoomRestServlet(RestServlet): + PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")] + + def __init__(self, hs): + super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__() + + self.registeration_handler = hs.get_handlers().registration_handler + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.distributor = hs.get_distributor() + + def on_POST(self, request, change): + content = parse_json_object_from_request(request) + + user_id = content["user_id"] + room_id = content["room_id"] + + logger.info("user membership change: %s in %s", user_id, room_id) + + user = UserID.from_string(user_id) + + if change == "joined": + user_joined_room(self.distributor, user, room_id) + elif change == "left": + user_left_room(self.distributor, user, room_id) + else: + raise Exception("Unrecognized change: %r", change) + + return (200, {}) + + +def register_servlets(hs, http_server): + ReplicationRemoteJoinRestServlet(hs).register(http_server) + ReplicationRemoteRejectInviteRestServlet(hs).register(http_server) + ReplicationRegister3PIDGuestRestServlet(hs).register(http_server) + ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server) diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py new file mode 100644 index 0000000000..46c28d4171 --- /dev/null +++ b/synapse/replication/slave/storage/profile.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.storage.profile import ProfileWorkerStore + + +class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore): + pass diff --git a/synapse/server.py b/synapse/server.py index 43c6e0a6d6..cd0c1a51be 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -47,7 +47,8 @@ from synapse.handlers.device import DeviceHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler from synapse.handlers.room_list import RoomListHandler -from synapse.handlers.room_member import RoomMemberHandler +from synapse.handlers.room_member import RoomMemberMasterHandler +from synapse.handlers.room_member_worker import RoomMemberWorkerHandler from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler @@ -392,7 +393,9 @@ class HomeServer(object): return SpamChecker(self) def build_room_member_handler(self): - return RoomMemberHandler(self) + if self.config.worker_app: + return RoomMemberWorkerHandler(self) + return RoomMemberMasterHandler(self) def build_federation_registry(self): return FederationHandlerRegistry() diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index ec02e73bc2..8612bd5ecc 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -21,14 +21,7 @@ from synapse.api.errors import StoreError from ._base import SQLBaseStore -class ProfileStore(SQLBaseStore): - def create_profile(self, user_localpart): - return self._simple_insert( - table="profiles", - values={"user_id": user_localpart}, - desc="create_profile", - ) - +class ProfileWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_profileinfo(self, user_localpart): try: @@ -61,14 +54,6 @@ class ProfileStore(SQLBaseStore): desc="get_profile_displayname", ) - def set_profile_displayname(self, user_localpart, new_displayname): - return self._simple_update_one( - table="profiles", - keyvalues={"user_id": user_localpart}, - updatevalues={"displayname": new_displayname}, - desc="set_profile_displayname", - ) - def get_profile_avatar_url(self, user_localpart): return self._simple_select_one_onecol( table="profiles", @@ -77,14 +62,6 @@ class ProfileStore(SQLBaseStore): desc="get_profile_avatar_url", ) - def set_profile_avatar_url(self, user_localpart, new_avatar_url): - return self._simple_update_one( - table="profiles", - keyvalues={"user_id": user_localpart}, - updatevalues={"avatar_url": new_avatar_url}, - desc="set_profile_avatar_url", - ) - def get_from_remote_profile_cache(self, user_id): return self._simple_select_one( table="remote_profile_cache", @@ -94,6 +71,31 @@ class ProfileStore(SQLBaseStore): desc="get_from_remote_profile_cache", ) + +class ProfileStore(ProfileWorkerStore): + def create_profile(self, user_localpart): + return self._simple_insert( + table="profiles", + values={"user_id": user_localpart}, + desc="create_profile", + ) + + def set_profile_displayname(self, user_localpart, new_displayname): + return self._simple_update_one( + table="profiles", + keyvalues={"user_id": user_localpart}, + updatevalues={"displayname": new_displayname}, + desc="set_profile_displayname", + ) + + def set_profile_avatar_url(self, user_localpart, new_avatar_url): + return self._simple_update_one( + table="profiles", + keyvalues={"user_id": user_localpart}, + updatevalues={"avatar_url": new_avatar_url}, + desc="set_profile_avatar_url", + ) + def add_remote_profile_cache(self, user_id, displayname, avatar_url): """Ensure we are caching the remote user's profiles. diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 7f2c08d7a6..34ed84ea22 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -157,6 +157,18 @@ class RoomWorkerStore(SQLBaseStore): "get_public_room_changes", get_public_room_changes_txn ) + @cached(max_entries=10000) + def is_room_blocked(self, room_id): + return self._simple_select_one_onecol( + table="blocked_rooms", + keyvalues={ + "room_id": room_id, + }, + retcol="1", + allow_none=True, + desc="is_room_blocked", + ) + class RoomStore(RoomWorkerStore, SearchStore): @@ -485,18 +497,6 @@ class RoomStore(RoomWorkerStore, SearchStore): else: defer.returnValue(None) - @cached(max_entries=10000) - def is_room_blocked(self, room_id): - return self._simple_select_one_onecol( - table="blocked_rooms", - keyvalues={ - "room_id": room_id, - }, - retcol="1", - allow_none=True, - desc="is_room_blocked", - ) - @defer.inlineCallbacks def block_room(self, room_id, user_id): yield self._simple_insert( @@ -507,7 +507,11 @@ class RoomStore(RoomWorkerStore, SearchStore): }, desc="block_room", ) - self.is_room_blocked.invalidate((room_id,)) + yield self.runInteraction( + "block_room_invalidation", + self._invalidate_cache_and_stream, + self.is_room_blocked, (room_id,), + ) def get_media_mxcs_in_room(self, room_id): """Retrieves all the local and remote media MXC URIs in a given room |