From d4a35ada28302e096efd42e1a2a28542ed7ebd6f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 18:16:20 +0100 Subject: Send device messages over federation --- synapse/handlers/devicemessage.py | 121 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 synapse/handlers/devicemessage.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py new file mode 100644 index 0000000000..7e59c0d487 --- /dev/null +++ b/synapse/handlers/devicemessage.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.types import get_domain_from_id +from synapse.util.stringutils import random_string + + +logger = logging.getLogger(__name__) + + +class DeviceMessageHandler(object): + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id + self.federation = hs.get_replication_layer() + + self.federation.register_edu_handler( + "m.direct_to_device", self.on_direct_to_device_edu + ) + + @defer.inlineCallbacks + def on_direct_to_device_edu(self, origin, content): + local_messages = {} + sender_user_id = content["sender"] + if origin != get_domain_from_id(sender_user_id): + logger.warn( + "Dropping device message from %r with spoofed sender %r", + origin, sender_user_id + ) + message_type = content["type"] + message_id = content["message_id"] + for user_id, by_device in content["messages"].items(): + messages_by_device = { + device_id: { + "content": message_content, + "type": message_type, + "sender": sender_user_id, + } + for device_id, message_content in by_device.items() + } + if messages_by_device: + local_messages[user_id] = messages_by_device + + stream_id = yield self.store.add_messages_from_remote_to_device_inbox( + origin, message_id, local_messages + ) + + self.notifier.on_new_event( + "to_device_key", stream_id, users=local_messages.keys() + ) + + @defer.inlineCallbacks + def send_device_message(self, sender_user_id, message_type, messages): + + local_messages = {} + remote_messages = {} + for user_id, by_device in messages.items(): + if self.is_mine_id(user_id): + messages_by_device = { + device_id: { + "content": message_content, + "type": message_type, + "sender": sender_user_id, + } + for device_id, message_content in by_device.items() + } + if messages_by_device: + local_messages[user_id] = messages_by_device + else: + destination = get_domain_from_id(user_id) + remote_messages.setdefault(destination, {})[user_id] = by_device + + message_id = random_string(16) + + remote_edu_contents = {} + for destination, messages in remote_messages.items(): + remote_edu_contents[destination] = { + "messages": messages, + "sender": sender_user_id, + "type": message_type, + "message_id": message_id, + } + + stream_id = yield self.store.add_messages_to_device_inbox( + local_messages, remote_edu_contents + ) + + self.notifier.on_new_event( + "to_device_key", stream_id, users=local_messages.keys() + ) + + for destination in remote_messages.keys(): + # Hack to send make synapse send a federation transaction + # to the remote servers. + self.federation.send_edu( + destination=destination, + edu_type="m.ping", + content={}, + ) -- cgit 1.5.1 From 43954d000e19a622576063de0b48cf9235dec395 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 16:10:51 +0100 Subject: Add a new method to enqueue the device messages rather than sending a dummy EDU --- synapse/federation/federation_client.py | 6 ++++++ synapse/federation/transaction_queue.py | 11 +++++++++++ synapse/handlers/devicemessage.py | 10 +++------- 3 files changed, 20 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 627acc6a4f..78719eed25 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -137,6 +137,12 @@ class FederationClient(FederationBase): self._transaction_queue.enqueue_edu(edu) return defer.succeed(None) + @log_function + def send_device_messages(self, destination): + """Sends the device messages in the local database to the remote + destination""" + self._transaction_queue.enqueue_device_messages(destination) + @log_function def send_failure(self, failure, destination): self._transaction_queue.enqueue_failure(failure, destination) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c0ee946ac0..633c79c352 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -157,6 +157,17 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) + def enqueue_device_messages(self, destination): + if destination == self.server_name or destination == "localhost": + return + + if not self.can_send_to(destination): + return + + preserve_context_over_fn( + self._attempt_new_transaction, destination + ) + @defer.inlineCallbacks def _attempt_new_transaction(self, destination): yield run_on_reactor() diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 7e59c0d487..c5368e5df2 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -112,10 +112,6 @@ class DeviceMessageHandler(object): ) for destination in remote_messages.keys(): - # Hack to send make synapse send a federation transaction - # to the remote servers. - self.federation.send_edu( - destination=destination, - edu_type="m.ping", - content={}, - ) + # Enqueue a new federation transaction to send the new + # device messages to each remote destination. + self.federation.send_device_messages(destination) -- cgit 1.5.1 From 791658b57677cc60b02b969ab3cb617da8cc62f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 11:53:05 +0100 Subject: Add server param to /publicRooms --- synapse/handlers/room.py | 10 ++++++++++ synapse/rest/client/v1/room.py | 21 ++++++++++++++++----- 2 files changed, 26 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bf6b1c1535..8758af4ca1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -443,6 +443,16 @@ class RoomListHandler(BaseHandler): self.remote_list_request_cache.set((), deferred) self.remote_list_cache = yield deferred + @defer.inlineCallbacks + def get_remote_public_room_list(self, server_name): + res = yield self.hs.get_replication_layer().get_public_rooms( + [server_name] + ) + + if server_name not in res: + raise SynapseError(404, "Server not found") + defer.returnValue(res[server_name]) + @defer.inlineCallbacks def get_aggregated_public_room_list(self): """ diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 0d81757010..7971e53010 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -295,15 +295,26 @@ class PublicRoomListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): + server = request.args.get("server", [None])[0] + try: yield self.auth.get_user_by_req(request) - except AuthError: - # This endpoint isn't authed, but its useful to know who's hitting - # it if they *do* supply an access token - pass + except AuthError as e: + # We allow people to not be authed if they're just looking at our + # room list, but require auth when we proxy the request. + # In both cases we call the auth function, as that has the side + # effect of logging who issued this request if an access token was + # provided. + if server: + raise e + else: + pass handler = self.hs.get_room_list_handler() - data = yield handler.get_aggregated_public_room_list() + if server: + data = yield handler.get_remote_public_room_list(server) + else: + data = yield handler.get_aggregated_public_room_list() defer.returnValue((200, data)) -- cgit 1.5.1 From 8b93af662d432cf6b3d36cbbcbd4dd2427bde658 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 15:04:46 +0100 Subject: Check the user_id for presence/typing matches origin --- synapse/handlers/presence.py | 7 +++++++ synapse/handlers/typing.py | 9 ++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index da9f0da69e..7a3c16a8aa 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -651,6 +651,13 @@ class PresenceHandler(object): ) continue + if get_domain_from_id(user_id) != origin: + logger.info( + "Got presence update from %r with bad 'user_id': %r", + origin, user_id, + ) + continue + presence_state = push.get("presence", None) if not presence_state: logger.info( diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0b530b9034..3b687957dd 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -199,7 +199,14 @@ class TypingHandler(object): user_id = content["user_id"] # Check that the string is a valid user id - UserID.from_string(user_id) + user = UserID.from_string(user_id) + + if user.domain != origin: + logger.info( + "Got typing update from %r with bad 'user_id': %r", + origin, user_id, + ) + return users = yield self.state.get_current_user_in_room(room_id) domains = set(get_domain_from_id(u) for u in users) -- cgit 1.5.1 From 2f267ee160b1f7ce591f4f10ddb5f9239110a2f6 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 8 Sep 2016 17:43:53 +0100 Subject: Collect up all the "instances" lists of individual AS protocol results into one combined answer to the client --- synapse/handlers/appservice.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index b440280b74..25447284eb 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -179,9 +179,37 @@ class ApplicationServicesHandler(object): def get_3pe_protocols(self): services = yield self.store.get_app_services() protocols = {} + + # Collect up all the individual protocol responses out of the ASes for s in services: for p in s.protocols: - protocols[p] = yield self.appservice_api.get_3pe_protocol(s, p) + info = yield self.appservice_api.get_3pe_protocol(s, p) + + # Ignore any result that doesn't contain an "instances" list + if "instances" not in info: + continue + if not isinstance(info["instances"], list): + continue + + if p not in protocols: + protocols[p] = [] + protocols[p].append(info) + + def _merge_instances(infos): + if len(infos) == 0: + return {} + + # Merge the 'instances' lists of multiple results, but just take + # the other fields from the first as they ought to be identical + combined = dict(infos[0]) + + for info in infos[1:]: + combined["instances"].extend(info["instances"]) + + return combined + + for p in protocols.keys(): + protocols[p] = _merge_instances(protocols[p]) defer.returnValue(protocols) -- cgit 1.5.1 From 033d43e4190bf765eb29bb6ba8ea7cbe6ad66cf4 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 13:10:36 +0100 Subject: Don't corrupt shared cache on subsequent protocol requests --- synapse/handlers/appservice.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 25447284eb..e68628bdfd 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -201,7 +201,9 @@ class ApplicationServicesHandler(object): # Merge the 'instances' lists of multiple results, but just take # the other fields from the first as they ought to be identical + # deep-clone the result so as not to corrupt the cached one combined = dict(infos[0]) + combined["instances"] = list(combined["instances"]) for info in infos[1:]: combined["instances"].extend(info["instances"]) -- cgit 1.5.1 From 25eb769b26d6a13afcc9173e0eacf932e5cc1449 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 13:25:02 +0100 Subject: Efficiency fix for lookups of a single protocol --- synapse/handlers/appservice.py | 5 ++++- synapse/rest/client/v2_alpha/thirdparty.py | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index e68628bdfd..a0375f7e3b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -176,13 +176,16 @@ class ApplicationServicesHandler(object): defer.returnValue(ret) @defer.inlineCallbacks - def get_3pe_protocols(self): + def get_3pe_protocols(self, only_protocol=None): services = yield self.store.get_app_services() protocols = {} # Collect up all the individual protocol responses out of the ASes for s in services: for p in s.protocols: + if only_protocol is not None and p != only_protocol: + continue + info = yield self.appservice_api.get_3pe_protocol(s, p) # Ignore any result that doesn't contain an "instances" list diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index 48d8543e76..6bf9eb10ae 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -55,7 +55,9 @@ class ThirdPartyProtocolServlet(RestServlet): def on_GET(self, request, protocol): yield self.auth.get_user_by_req(request) - protocols = yield self.appservice_handler.get_3pe_protocols() + protocols = yield self.appservice_handler.get_3pe_protocols( + only_protocol=protocol, + ) if protocol in protocols: defer.returnValue((200, protocols[protocol])) else: -- cgit 1.5.1 From 6c4d5821446c861c0448a8d952a7aa40897b1ebd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 14:26:05 +0100 Subject: Deduplicate presence in _update_states --- synapse/handlers/presence.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7a3c16a8aa..16dbddee03 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -265,6 +265,12 @@ class PresenceHandler(object): to_notify = {} # Changes we want to notify everyone about to_federation_ping = {} # These need sending keep-alives + # Only bother handling the last presence change for each user + new_states_dict = {} + for new_state in new_states: + new_states_dict[new_state.user_id] = new_state + new_state = new_states_dict.values() + for new_state in new_states: user_id = new_state.user_id -- cgit 1.5.1 From f25d74f69c7da9bd36ba953d916dd78f7ea79ff1 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 14:54:16 +0100 Subject: Minor fixes from PR comments --- synapse/handlers/appservice.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index a0375f7e3b..4648e78d47 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -199,12 +199,12 @@ class ApplicationServicesHandler(object): protocols[p].append(info) def _merge_instances(infos): - if len(infos) == 0: + if not infos: return {} # Merge the 'instances' lists of multiple results, but just take # the other fields from the first as they ought to be identical - # deep-clone the result so as not to corrupt the cached one + # copy the result so as not to corrupt the cached one combined = dict(infos[0]) combined["instances"] = list(combined["instances"]) -- cgit 1.5.1 From ed44c475d832196957715f49215a95be1ce1eade Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 15:07:04 +0100 Subject: Reject malformed 3PE query metadata results earlier in AS API handling code --- synapse/appservice/api.py | 12 ++++++++++-- synapse/handlers/appservice.py | 14 +++++--------- 2 files changed, 15 insertions(+), 11 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index cc4af23962..afc64ed267 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -162,11 +162,19 @@ class ApplicationServiceApi(SimpleHttpClient): urllib.quote(protocol) ) try: - defer.returnValue((yield self.get_json(uri, {}))) + info = yield self.get_json(uri, {}) + + # Ignore any result that doesn't contain an "instances" list + if "instances" not in info: + defer.returnValue(None) + if not isinstance(info["instances"], list): + defer.returnValue(None) + + defer.returnValue(info) except Exception as ex: logger.warning("query_3pe_protocol to %s threw exception %s", uri, ex) - defer.returnValue({}) + defer.returnValue(None) key = (service.id, protocol) return self.protocol_meta_cache.get(key) or ( diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 4648e78d47..88fa0bb2e4 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -186,17 +186,13 @@ class ApplicationServicesHandler(object): if only_protocol is not None and p != only_protocol: continue - info = yield self.appservice_api.get_3pe_protocol(s, p) - - # Ignore any result that doesn't contain an "instances" list - if "instances" not in info: - continue - if not isinstance(info["instances"], list): - continue - if p not in protocols: protocols[p] = [] - protocols[p].append(info) + + info = yield self.appservice_api.get_3pe_protocol(s, p) + + if info is not None: + protocols[p].append(info) def _merge_instances(infos): if not infos: -- cgit 1.5.1