From f12993ec16b57a14f6d304f9cfa5dd50747168a9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 13:01:14 +0100 Subject: Bump changelog and version --- CHANGES.rst | 26 ++++++++++++++++++++++++++ synapse/__init__.py | 2 +- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index 49673ccce4..cc64d3adb1 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,29 @@ +Changes in synapse v0.17.2-rc1 (2016-09-05) +=========================================== + +Features: + +* Start adding store-and-forward direct-to-device messaging (PR #1046, #1050, + #1062, #1066) + + +Changes: + +* Avoid pulling the full state of a room out so often (PR #1047, #1049, #1063, + #1068) +* Don't notify for online to online presence transitions. (PR #1054) +* Occaisonally persist unpersisted presence updates (PR #1055) +* Allow application services to have an optional 'url' (PR #1056) +* Clean up old sent transactions from DB (PR #1059) + + +Bug fixes: + +* Fix None check in backfill (PR #1043) +* Fix membership changes to be idempotent (PR #1067) + + + Changes in synapse v0.17.1 (2016-08-24) ======================================= diff --git a/synapse/__init__.py b/synapse/__init__.py index 43bf78f885..ad2ec58ea4 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.17.1" +__version__ = "0.17.2-rc1" -- cgit 1.4.1 From 19275b3030eb69d575f85526127037b3c221d000 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 13:08:26 +0100 Subject: Typo --- CHANGES.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index cc64d3adb1..7820a4610b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,7 +12,7 @@ Changes: * Avoid pulling the full state of a room out so often (PR #1047, #1049, #1063, #1068) * Don't notify for online to online presence transitions. (PR #1054) -* Occaisonally persist unpersisted presence updates (PR #1055) +* Occasionally persist unpersisted presence updates (PR #1055) * Allow application services to have an optional 'url' (PR #1056) * Clean up old sent transactions from DB (PR #1059) -- cgit 1.4.1 From 4ec67a3d2167f3623fea6c22f4544a21f4781203 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 15:52:48 +0100 Subject: Mention get_pdu bug --- CHANGES.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 7820a4610b..1d073119f9 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -21,6 +21,8 @@ Bug fixes: * Fix None check in backfill (PR #1043) * Fix membership changes to be idempotent (PR #1067) +* Fix bug in get_pdu where it would sometimes return events with incorrect + signature -- cgit 1.4.1 From f4164edb7020e4031a285e3723e7ad57d0486df9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 11:26:37 +0100 Subject: Move _add_messages_to_device_inbox_txn into a separate method --- synapse/storage/deviceinbox.py | 69 ++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 37 deletions(-) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 68116b0394..57202a5bda 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -37,9 +37,21 @@ class DeviceInboxStore(SQLBaseStore): inserted. """ - def select_devices_txn(txn, user_id, devices): - if not devices: - return [] + with self._device_inbox_id_gen.get_next() as stream_id: + yield self.runInteraction( + "add_messages_to_device_inbox", + self._add_messages_to_device_inbox_txn, + stream_id, + messages_by_user_then_device, + ) + + defer.returnValue(self._device_inbox_id_gen.get_current_token()) + + def _add_messages_to_device_inbox_txn(self, txn, stream_id, + messages_by_user_then_device): + local_users_and_devices = set() + for user_id, messages_by_device in messages_by_user_then_device.items(): + devices = messages_by_device.keys() sql = ( "SELECT user_id, device_id FROM devices" " WHERE user_id = ? AND device_id IN (" @@ -48,41 +60,24 @@ class DeviceInboxStore(SQLBaseStore): ) # TODO: Maybe this needs to be done in batches if there are # too many local devices for a given user. - args = [user_id] + devices - txn.execute(sql, args) - return [tuple(row) for row in txn.fetchall()] - - def add_messages_to_device_inbox_txn(txn, stream_id): - local_users_and_devices = set() - for user_id, messages_by_device in messages_by_user_then_device.items(): - local_users_and_devices.update( - select_devices_txn(txn, user_id, messages_by_device.keys()) - ) - - sql = ( - "INSERT INTO device_inbox" - " (user_id, device_id, stream_id, message_json)" - " VALUES (?,?,?,?)" - ) - rows = [] - for user_id, messages_by_device in messages_by_user_then_device.items(): - for device_id, message in messages_by_device.items(): - message_json = ujson.dumps(message) - # Only insert into the local inbox if the device exists on - # this server - if (user_id, device_id) in local_users_and_devices: - rows.append((user_id, device_id, stream_id, message_json)) - - txn.executemany(sql, rows) - - with self._device_inbox_id_gen.get_next() as stream_id: - yield self.runInteraction( - "add_messages_to_device_inbox", - add_messages_to_device_inbox_txn, - stream_id - ) + txn.execute(sql, [user_id] + devices) + local_users_and_devices.update(map(tuple, txn.fetchall())) - defer.returnValue(self._device_inbox_id_gen.get_current_token()) + sql = ( + "INSERT INTO device_inbox" + " (user_id, device_id, stream_id, message_json)" + " VALUES (?,?,?,?)" + ) + rows = [] + for user_id, messages_by_device in messages_by_user_then_device.items(): + for device_id, message in messages_by_device.items(): + message_json = ujson.dumps(message) + # Only insert into the local inbox if the device exists on + # this server + if (user_id, device_id) in local_users_and_devices: + rows.append((user_id, device_id, stream_id, message_json)) + + txn.executemany(sql, rows) def get_new_messages_for_device( self, user_id, device_id, last_stream_id, current_stream_id, limit=100 -- cgit 1.4.1 From 2ad72da93181db7850f6ba8f0635bdc0924e6d8c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 15:10:29 +0100 Subject: Add tables for federated device messages Adds tables for storing the messages that need to be sent to a remote device and for deduplicating messages received. --- synapse/storage/schema/delta/34/device_outbox.sql | 38 +++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 synapse/storage/schema/delta/34/device_outbox.sql diff --git a/synapse/storage/schema/delta/34/device_outbox.sql b/synapse/storage/schema/delta/34/device_outbox.sql new file mode 100644 index 0000000000..a319f73e47 --- /dev/null +++ b/synapse/storage/schema/delta/34/device_outbox.sql @@ -0,0 +1,38 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE device_federation_outbox ( + destination TEXT NOT NULL, + stream_id BIGINT NOT NULL, + sender TEXT NOT NULL, + message_id TEXT NOT NULL, + sent_ts BIGINT NOT NULL, + messages_json TEXT NOT NULL +); + + +CREATE INDEX device_federation_outbox_destination_id + ON device_federation_outbox(destination, stream_id); + + +CREATE TABLE device_federation_inbox ( + origin TEXT NOT NULL, + message_id TEXT NOT NULL, + received_ts BIGINT NOT NULL +); + + +CREATE INDEX device_federation_inbox_sender_id + ON device_federation_inbox(origin, message_id); -- cgit 1.4.1 From e020834e4fcce9ba0c7d60d270bbdf40a1d2a336 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 15:12:13 +0100 Subject: Add storage methods for federated device messages --- synapse/storage/deviceinbox.py | 139 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 132 insertions(+), 7 deletions(-) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 57202a5bda..988577a334 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -27,28 +27,89 @@ logger = logging.getLogger(__name__) class DeviceInboxStore(SQLBaseStore): @defer.inlineCallbacks - def add_messages_to_device_inbox(self, messages_by_user_then_device): - """ + def add_messages_to_device_inbox(self, local_messages_by_user_then_device, + remote_messages_by_destination): + """Used to send messages from this server. + Args: - messages_by_user_and_device(dict): + sender_user_id(str): The ID of the user sending these messages. + local_messages_by_user_and_device(dict): Dictionary of user_id to device_id to message. + remote_messages_by_destination(dict): + Dictionary of destination server_name to the EDU JSON to send. Returns: A deferred stream_id that resolves when the messages have been inserted. """ + def add_messages_to_device_federation_outbox(txn, now_ms, stream_id): + sql = ( + "INSERT INTO device_federation_outbox" + " (destination, stream_id, queued_ts, messages_json)" + " VALUES (?,?,?,?)" + ) + rows = [] + for destination, edu in remote_messages_by_destination.items(): + edu_json = ujson.dumps(edu) + rows.append((destination, stream_id, now_ms, edu_json)) + + txn.executemany(sql, rows) + + def add_messages_txn(txn, now_ms, stream_id): + self._add_messages_to_local_device_inbox_txn( + txn, stream_id, local_messages_by_user_then_device + ) + add_messages_to_device_federation_outbox(now_ms, stream_id) + with self._device_inbox_id_gen.get_next() as stream_id: + now_ms = self.clock.time_now_ms() yield self.runInteraction( "add_messages_to_device_inbox", - self._add_messages_to_device_inbox_txn, + add_messages_txn, + now_ms, stream_id, - messages_by_user_then_device, ) defer.returnValue(self._device_inbox_id_gen.get_current_token()) - def _add_messages_to_device_inbox_txn(self, txn, stream_id, - messages_by_user_then_device): + @defer.inlineCallbacks + def add_messages_from_remote_to_device_inbox( + self, origin, message_id, local_messages_by_user_then_device + ): + def add_messages_txn(txn, now_ms, stream_id): + already_inserted = self._simple_select_one_txn( + txn, table="device_federation_inbox", + keyvalues={"origin": origin, "message_id": message_id}, + retcols=("message_id",), + allow_none=True, + ) + if already_inserted is not None: + return + + self._simple_insert_txn( + txn, table="device_federation_inbox", + values={ + "origin": origin, + "message_id": message_id, + "received_ts": now_ms, + }, + ) + + self._add_messages_to_local_device_inbox_txn( + txn, stream_id, local_messages_by_user_then_device + ) + + with self._device_inbox_id_gen.get_next() as stream_id: + now_ms = self.clock.time_now_ms() + yield self.runInteraction( + "add_messages_from_remote_to_device_inbox", + add_messages_txn, + now_ms, + stream_id, + ) + + def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, + messages_by_user_then_device): local_users_and_devices = set() for user_id, messages_by_device in messages_by_user_then_device.items(): devices = messages_by_device.keys() @@ -177,3 +238,67 @@ class DeviceInboxStore(SQLBaseStore): def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() + + @defer.inlineCallbacks + def get_new_device_messages_for_remote_destination( + self, destination, last_stream_id, current_stream_id, limit=100 + ): + """ + Args: + destination(str): The name of the remote server. + last_stream_id(int): The last position of the device message stream + that the server sent up to. + current_stream_id(int): The current position of the device + message stream. + Returns: + Deferred ([dict], int): List of messages for the device and where + in the stream the messages got to. + """ + def get_new_messages_for_remote_destination_txn(txn): + sql = ( + "SELECT stream_id, messages_json FROM device_federation_outbox" + " WHERE destination = ?" + " AND ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute(sql, ( + destination, last_stream_id, current_stream_id, limit + )) + messages = [] + for row in txn.fetchall(): + stream_pos = row[0] + messages.append(ujson.loads(row[1])) + if len(messages) < limit: + stream_pos = current_stream_id + return (messages, stream_pos) + + return self.runInteraction( + "get_new_device_messages_for_remote_destination", + get_new_messages_for_remote_destination_txn, + ) + + @defer.inlineCallbacks + def delete_device_messages_for_remote_destination(self, destination, + up_to_stream_id): + """Used to delete messages when the remote destination acknowledges + their receipt. + + Args: + destination(str): The destination server_name + up_to_stream_id(int): Where to delete messages up to. + Returns: + A deferred that resolves when the messages have been deleted. + """ + def delete_messages_for_remote_destination_txn(txn): + sql = ( + "DELETE FROM device_federation_outbox" + " WHERE destination = ? AND" + " AND stream_id <= ?" + ) + txn.execute(sql, (destination, up_to_stream_id)) + + return self.runInteraction( + "delete_device_messages_for_remote_destination", + delete_messages_for_remote_destination_txn + ) -- cgit 1.4.1 From d4a35ada28302e096efd42e1a2a28542ed7ebd6f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 18:16:20 +0100 Subject: Send device messages over federation --- synapse/federation/federation_server.py | 2 +- synapse/federation/transaction_queue.py | 43 ++++++-- synapse/handlers/devicemessage.py | 121 ++++++++++++++++++++++ synapse/rest/client/v2_alpha/sendtodevice.py | 33 ++---- synapse/server.py | 5 + synapse/storage/deviceinbox.py | 19 ++-- synapse/storage/schema/delta/34/device_outbox.sql | 4 +- 7 files changed, 179 insertions(+), 48 deletions(-) create mode 100644 synapse/handlers/devicemessage.py diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5621655098..3fa7b2315c 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -188,7 +188,7 @@ class FederationServer(FederationBase): except SynapseError as e: logger.info("Failed to handle edu %r: %r", edu_type, e) except Exception as e: - logger.exception("Failed to handle edu %r", edu_type, e) + logger.exception("Failed to handle edu %r", edu_type) else: logger.warn("Received EDU of type %s with no handler", edu_type) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index cb2ef0210c..5e86141f86 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .persistence import TransactionActions -from .units import Transaction +from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor @@ -187,6 +187,24 @@ class TransactionQueue(object): destination, pending_pdus, pending_edus, pending_failures ) + @defer.inlineCallbacks + def _get_new_device_messages(self, destination): + last_device_stream_id = 0 + to_device_stream_id = self.store.get_to_device_stream_token() + contents, stream_id = yield self.store.get_new_device_msgs_for_remote( + destination, last_device_stream_id, to_device_stream_id + ) + edus = [ + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.direct_to_device", + content=content, + ) + for content in contents + ] + defer.returnValue((edus, stream_id)) + @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, @@ -211,13 +229,19 @@ class TransactionQueue(object): self.store, ) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) + + edus.extend(device_message_edus) + logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", destination, txn_id, - len(pending_pdus), - len(pending_edus), - len(pending_failures) + len(pdus), + len(edus), + len(failures) ) logger.debug("TX [%s] Persisting transaction...", destination) @@ -242,9 +266,9 @@ class TransactionQueue(object): " (PDUs: %d, EDUs: %d, failures: %d)", destination, txn_id, transaction.transaction_id, - len(pending_pdus), - len(pending_edus), - len(pending_failures), + len(pdus), + len(edus), + len(failures), ) with limiter: @@ -299,6 +323,11 @@ class TransactionQueue(object): logger.info( "Failed to send event %s to %s", p.event_id, destination ) + else: + # Remove the acknowledged device messages from the database + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) except NotRetryingDestination: logger.info( "TX [%s] not ready for retry yet - " diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py new file mode 100644 index 0000000000..7e59c0d487 --- /dev/null +++ b/synapse/handlers/devicemessage.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.types import get_domain_from_id +from synapse.util.stringutils import random_string + + +logger = logging.getLogger(__name__) + + +class DeviceMessageHandler(object): + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id + self.federation = hs.get_replication_layer() + + self.federation.register_edu_handler( + "m.direct_to_device", self.on_direct_to_device_edu + ) + + @defer.inlineCallbacks + def on_direct_to_device_edu(self, origin, content): + local_messages = {} + sender_user_id = content["sender"] + if origin != get_domain_from_id(sender_user_id): + logger.warn( + "Dropping device message from %r with spoofed sender %r", + origin, sender_user_id + ) + message_type = content["type"] + message_id = content["message_id"] + for user_id, by_device in content["messages"].items(): + messages_by_device = { + device_id: { + "content": message_content, + "type": message_type, + "sender": sender_user_id, + } + for device_id, message_content in by_device.items() + } + if messages_by_device: + local_messages[user_id] = messages_by_device + + stream_id = yield self.store.add_messages_from_remote_to_device_inbox( + origin, message_id, local_messages + ) + + self.notifier.on_new_event( + "to_device_key", stream_id, users=local_messages.keys() + ) + + @defer.inlineCallbacks + def send_device_message(self, sender_user_id, message_type, messages): + + local_messages = {} + remote_messages = {} + for user_id, by_device in messages.items(): + if self.is_mine_id(user_id): + messages_by_device = { + device_id: { + "content": message_content, + "type": message_type, + "sender": sender_user_id, + } + for device_id, message_content in by_device.items() + } + if messages_by_device: + local_messages[user_id] = messages_by_device + else: + destination = get_domain_from_id(user_id) + remote_messages.setdefault(destination, {})[user_id] = by_device + + message_id = random_string(16) + + remote_edu_contents = {} + for destination, messages in remote_messages.items(): + remote_edu_contents[destination] = { + "messages": messages, + "sender": sender_user_id, + "type": message_type, + "message_id": message_id, + } + + stream_id = yield self.store.add_messages_to_device_inbox( + local_messages, remote_edu_contents + ) + + self.notifier.on_new_event( + "to_device_key", stream_id, users=local_messages.keys() + ) + + for destination in remote_messages.keys(): + # Hack to send make synapse send a federation transaction + # to the remote servers. + self.federation.send_edu( + destination=destination, + edu_type="m.ping", + content={}, + ) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 9c10a99acf..5975164b37 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -16,10 +16,11 @@ import logging from twisted.internet import defer -from synapse.http.servlet import parse_json_object_from_request from synapse.http import servlet +from synapse.http.servlet import parse_json_object_from_request from synapse.rest.client.v1.transactions import HttpTransactionStore + from ._base import client_v2_patterns logger = logging.getLogger(__name__) @@ -39,10 +40,8 @@ class SendToDeviceRestServlet(servlet.RestServlet): super(SendToDeviceRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() - self.store = hs.get_datastore() - self.notifier = hs.get_notifier() - self.is_mine_id = hs.is_mine_id self.txns = HttpTransactionStore() + self.device_message_handler = hs.get_device_message_handler() @defer.inlineCallbacks def on_PUT(self, request, message_type, txn_id): @@ -57,28 +56,10 @@ class SendToDeviceRestServlet(servlet.RestServlet): content = parse_json_object_from_request(request) - # TODO: Prod the notifier to wake up sync streams. - # TODO: Implement replication for the messages. - # TODO: Send the messages to remote servers if needed. - - local_messages = {} - for user_id, by_device in content["messages"].items(): - if self.is_mine_id(user_id): - messages_by_device = { - device_id: { - "content": message_content, - "type": message_type, - "sender": requester.user.to_string(), - } - for device_id, message_content in by_device.items() - } - if messages_by_device: - local_messages[user_id] = messages_by_device - - stream_id = yield self.store.add_messages_to_device_inbox(local_messages) - - self.notifier.on_new_event( - "to_device_key", stream_id, users=local_messages.keys() + sender_user_id = requester.user.to_string() + + yield self.device_message_handler.send_device_message( + sender_user_id, message_type, content["messages"] ) response = (200, {}) diff --git a/synapse/server.py b/synapse/server.py index af3246504b..f516f08167 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -35,6 +35,7 @@ from synapse.federation import initialize_http_replication from synapse.handlers import Handlers from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler +from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.device import DeviceHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler @@ -100,6 +101,7 @@ class HomeServer(object): 'application_service_api', 'application_service_scheduler', 'application_service_handler', + 'device_message_handler', 'notifier', 'distributor', 'client_resource', @@ -205,6 +207,9 @@ class HomeServer(object): def build_device_handler(self): return DeviceHandler(self) + def build_device_message_handler(self): + return DeviceMessageHandler(self) + def build_e2e_keys_handler(self): return E2eKeysHandler(self) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 988577a334..d9f91ccc4e 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -59,10 +59,10 @@ class DeviceInboxStore(SQLBaseStore): self._add_messages_to_local_device_inbox_txn( txn, stream_id, local_messages_by_user_then_device ) - add_messages_to_device_federation_outbox(now_ms, stream_id) + add_messages_to_device_federation_outbox(txn, now_ms, stream_id) with self._device_inbox_id_gen.get_next() as stream_id: - now_ms = self.clock.time_now_ms() + now_ms = self.clock.time_msec() yield self.runInteraction( "add_messages_to_device_inbox", add_messages_txn, @@ -100,7 +100,7 @@ class DeviceInboxStore(SQLBaseStore): ) with self._device_inbox_id_gen.get_next() as stream_id: - now_ms = self.clock.time_now_ms() + now_ms = self.clock.time_msec() yield self.runInteraction( "add_messages_from_remote_to_device_inbox", add_messages_txn, @@ -239,8 +239,7 @@ class DeviceInboxStore(SQLBaseStore): def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() - @defer.inlineCallbacks - def get_new_device_messages_for_remote_destination( + def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit=100 ): """ @@ -274,13 +273,11 @@ class DeviceInboxStore(SQLBaseStore): return (messages, stream_pos) return self.runInteraction( - "get_new_device_messages_for_remote_destination", + "get_new_device_msgs_for_remote", get_new_messages_for_remote_destination_txn, ) - @defer.inlineCallbacks - def delete_device_messages_for_remote_destination(self, destination, - up_to_stream_id): + def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -293,12 +290,12 @@ class DeviceInboxStore(SQLBaseStore): def delete_messages_for_remote_destination_txn(txn): sql = ( "DELETE FROM device_federation_outbox" - " WHERE destination = ? AND" + " WHERE destination = ?" " AND stream_id <= ?" ) txn.execute(sql, (destination, up_to_stream_id)) return self.runInteraction( - "delete_device_messages_for_remote_destination", + "delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn ) diff --git a/synapse/storage/schema/delta/34/device_outbox.sql b/synapse/storage/schema/delta/34/device_outbox.sql index a319f73e47..e87066d9a1 100644 --- a/synapse/storage/schema/delta/34/device_outbox.sql +++ b/synapse/storage/schema/delta/34/device_outbox.sql @@ -16,9 +16,7 @@ CREATE TABLE device_federation_outbox ( destination TEXT NOT NULL, stream_id BIGINT NOT NULL, - sender TEXT NOT NULL, - message_id TEXT NOT NULL, - sent_ts BIGINT NOT NULL, + queued_ts BIGINT NOT NULL, messages_json TEXT NOT NULL ); -- cgit 1.4.1 From 74cbfdc7dec17b5037cbde7af1e546674246b970 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 18:30:03 +0100 Subject: Fix unit tests --- tests/handlers/test_typing.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index b2957eef9f..ea1f0f7c33 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -121,6 +121,14 @@ class TypingNotificationsTestCase(unittest.TestCase): self.auth.check_joined_room = check_joined_room + self.datastore.get_to_device_stream_token = lambda: 0 + self.datastore.get_new_device_msgs_for_remote = ( + lambda *args, **kargs: ([], 0) + ) + self.datastore.delete_device_msgs_for_remote = ( + lambda *args, **kargs: None + ) + # Some local users to test with self.u_apple = UserID.from_string("@apple:test") self.u_banana = UserID.from_string("@banana:test") -- cgit 1.4.1 From 7d893beebe9d2fbab5a9b105992433efe8fab417 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 12:03:37 +0100 Subject: Comment the add_messages storage functions --- synapse/storage/deviceinbox.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index d9f91ccc4e..61da0e89e6 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -42,7 +42,15 @@ class DeviceInboxStore(SQLBaseStore): inserted. """ - def add_messages_to_device_federation_outbox(txn, now_ms, stream_id): + def add_messages_txn(txn, now_ms, stream_id): + # Add the local messages directly to the local inbox. + self._add_messages_to_local_device_inbox_txn( + txn, stream_id, local_messages_by_user_then_device + ) + + # Add the remote messages to the federation outbox. + # We'll send them to a remote server when we next send a + # federation transaction to that destination. sql = ( "INSERT INTO device_federation_outbox" " (destination, stream_id, queued_ts, messages_json)" @@ -52,15 +60,8 @@ class DeviceInboxStore(SQLBaseStore): for destination, edu in remote_messages_by_destination.items(): edu_json = ujson.dumps(edu) rows.append((destination, stream_id, now_ms, edu_json)) - txn.executemany(sql, rows) - def add_messages_txn(txn, now_ms, stream_id): - self._add_messages_to_local_device_inbox_txn( - txn, stream_id, local_messages_by_user_then_device - ) - add_messages_to_device_federation_outbox(txn, now_ms, stream_id) - with self._device_inbox_id_gen.get_next() as stream_id: now_ms = self.clock.time_msec() yield self.runInteraction( @@ -77,6 +78,9 @@ class DeviceInboxStore(SQLBaseStore): self, origin, message_id, local_messages_by_user_then_device ): def add_messages_txn(txn, now_ms, stream_id): + # Check if we've already inserted a matching message_id for that + # origin. This can happen if the origin doesn't receive our + # acknowledgement from the first time we received the message. already_inserted = self._simple_select_one_txn( txn, table="device_federation_inbox", keyvalues={"origin": origin, "message_id": message_id}, @@ -86,6 +90,8 @@ class DeviceInboxStore(SQLBaseStore): if already_inserted is not None: return + # Add an entry for this message_id so that we know we've processed + # it. self._simple_insert_txn( txn, table="device_federation_inbox", values={ @@ -95,6 +101,8 @@ class DeviceInboxStore(SQLBaseStore): }, ) + # Add the messages to the approriate local device inboxes so that + # they'll be sent to the devices when they next sync. self._add_messages_to_local_device_inbox_txn( txn, stream_id, local_messages_by_user_then_device ) -- cgit 1.4.1 From 31a07d2335dd628afb32f71167849ad88685525a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 15:27:07 +0100 Subject: Add stream change caches for device messages --- synapse/federation/transaction_queue.py | 5 ++++- synapse/storage/__init__.py | 24 ++++++++++++++++++++++++ synapse/storage/deviceinbox.py | 25 +++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5e86141f86..233c6606a9 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -81,6 +81,8 @@ class TransactionQueue(object): # destination -> list of tuple(failure, deferred) self.pending_failures_by_dest = {} + self.last_device_stream_id_by_dest = {} + # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) @@ -189,7 +191,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def _get_new_device_messages(self, destination): - last_device_stream_id = 0 + last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0) to_device_stream_id = self.store.get_to_device_stream_token() contents, stream_id = yield self.store.get_new_device_msgs_for_remote( destination, last_device_stream_id, to_device_stream_id @@ -328,6 +330,7 @@ class TransactionQueue(object): yield self.store.delete_device_msgs_for_remote( destination, device_stream_id ) + self.last_device_stream_id_by_dest[destination] = device_stream_id except NotRetryingDestination: logger.info( "TX [%s] not ready for retry yet - " diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6c32773f25..6965daddc5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -182,6 +182,30 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + max_device_inbox_id = self._device_inbox_id_gen.get_current_token() + device_inbox_prefill, min_device_inbox_id = self._get_cache_dict( + db_conn, "device_inbox", + entity_column="user_id", + stream_column="stream_id", + max_value=max_device_inbox_id + ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_inbox_id, + prefilled_cache=device_inbox_prefill, + ) + # The federation outbox and the local device inbox uses the same + # stream_id generator. + device_outbox_prefill, min_device_outbox_id = self._get_cache_dict( + db_conn, "device_federation_outbox", + entity_column="destination", + stream_column="stream_id", + max_value=max_device_inbox_id, + ) + self._device_federation_outbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_outbox_id, + prefilled_cache=device_outbox_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 61da0e89e6..0d37bb961b 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -70,6 +70,14 @@ class DeviceInboxStore(SQLBaseStore): now_ms, stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) + for destination in remote_messages_by_destination.keys(): + self._device_federation_outbox_stream_cache.entity_has_changed( + destination, stream_id + ) defer.returnValue(self._device_inbox_id_gen.get_current_token()) @@ -115,6 +123,10 @@ class DeviceInboxStore(SQLBaseStore): now_ms, stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, messages_by_user_then_device): @@ -161,6 +173,12 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + has_changed = self._device_inbox_stream_cache.has_entity_changed( + user_id, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_device_txn(txn): sql = ( "SELECT stream_id, message_json FROM device_inbox" @@ -261,6 +279,13 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( + destination, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" -- cgit 1.4.1 From cb98ac261bbda859574ec33cab934a3269e11e17 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 15:39:13 +0100 Subject: Move the check for federated device_messages. Move the check into _attempt_new_transaction. Only delete messages if there were messages to delete. --- synapse/federation/transaction_queue.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 233c6606a9..c0ee946ac0 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -177,6 +177,12 @@ class TransactionQueue(object): pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) + + pending_edus.extend(device_message_edus) + if pending_pdus: logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus)) @@ -186,7 +192,9 @@ class TransactionQueue(object): return yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures + destination, pending_pdus, pending_edus, pending_failures, + device_stream_id, + should_delete_from_device_stream=bool(device_message_edus) ) @defer.inlineCallbacks @@ -210,7 +218,8 @@ class TransactionQueue(object): @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, - pending_failures): + pending_failures, device_stream_id, + should_delete_from_device_stream): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -231,12 +240,6 @@ class TransactionQueue(object): self.store, ) - device_message_edus, device_stream_id = ( - yield self._get_new_device_messages(destination) - ) - - edus.extend(device_message_edus) - logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", @@ -327,9 +330,10 @@ class TransactionQueue(object): ) else: # Remove the acknowledged device messages from the database - yield self.store.delete_device_msgs_for_remote( - destination, device_stream_id - ) + if should_delete_from_device_stream: + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) self.last_device_stream_id_by_dest[destination] = device_stream_id except NotRetryingDestination: logger.info( -- cgit 1.4.1 From 2a0159b8aeaf8dce808345e2266c6d3301fa055a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 15:58:00 +0100 Subject: Fix the stream change cache to work over replication --- synapse/replication/slave/storage/deviceinbox.py | 11 +++++++++++ synapse/storage/__init__.py | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 64d8eb2af1..251078ba57 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker from synapse.storage import DataStore +from synapse.util.caches.stream_change_cache import StreamChangeCache class SlavedDeviceInboxStore(BaseSlavedStore): @@ -24,6 +25,10 @@ class SlavedDeviceInboxStore(BaseSlavedStore): self._device_inbox_id_gen = SlavedIdTracker( db_conn, "device_inbox", "stream_id", ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", + self._device_inbox_id_gen.get_current_token() + ) get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__ get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__ @@ -38,5 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore): stream = result.get("to_device") if stream: self._device_inbox_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + stream_id = row[0] + user_id = row[1] + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) return super(SlavedDeviceInboxStore, self).process_replication(result) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6965daddc5..828e5ca60b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -202,7 +202,7 @@ class DataStore(RoomMemberStore, RoomStore, max_value=max_device_inbox_id, ) self._device_federation_outbox_stream_cache = StreamChangeCache( - "DeviceInboxStreamChangeCache", min_device_outbox_id, + "DeviceFederationOutboxStreamChangeCache", min_device_outbox_id, prefilled_cache=device_outbox_prefill, ) -- cgit 1.4.1 From 43954d000e19a622576063de0b48cf9235dec395 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 16:10:51 +0100 Subject: Add a new method to enqueue the device messages rather than sending a dummy EDU --- synapse/federation/federation_client.py | 6 ++++++ synapse/federation/transaction_queue.py | 11 +++++++++++ synapse/handlers/devicemessage.py | 10 +++------- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 627acc6a4f..78719eed25 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -137,6 +137,12 @@ class FederationClient(FederationBase): self._transaction_queue.enqueue_edu(edu) return defer.succeed(None) + @log_function + def send_device_messages(self, destination): + """Sends the device messages in the local database to the remote + destination""" + self._transaction_queue.enqueue_device_messages(destination) + @log_function def send_failure(self, failure, destination): self._transaction_queue.enqueue_failure(failure, destination) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c0ee946ac0..633c79c352 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -157,6 +157,17 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) + def enqueue_device_messages(self, destination): + if destination == self.server_name or destination == "localhost": + return + + if not self.can_send_to(destination): + return + + preserve_context_over_fn( + self._attempt_new_transaction, destination + ) + @defer.inlineCallbacks def _attempt_new_transaction(self, destination): yield run_on_reactor() diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 7e59c0d487..c5368e5df2 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -112,10 +112,6 @@ class DeviceMessageHandler(object): ) for destination in remote_messages.keys(): - # Hack to send make synapse send a federation transaction - # to the remote servers. - self.federation.send_edu( - destination=destination, - edu_type="m.ping", - content={}, - ) + # Enqueue a new federation transaction to send the new + # device messages to each remote destination. + self.federation.send_device_messages(destination) -- cgit 1.4.1 From c5b49eb7ca7327cba0e3658b1ec84cca823c8b54 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 8 Sep 2016 09:40:10 +0100 Subject: Fix /notifications API when used with `from` param --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index eb15fb751b..56dce4b616 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -342,7 +342,7 @@ class EventPushActionsStore(SQLBaseStore): def f(txn): before_clause = "" if before: - before_clause = "AND stream_ordering < ?" + before_clause = "AND epa.stream_ordering < ?" args = [user_id, before, limit] else: args = [user_id, limit] -- cgit 1.4.1 From 791658b57677cc60b02b969ab3cb617da8cc62f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 11:53:05 +0100 Subject: Add server param to /publicRooms --- synapse/handlers/room.py | 10 ++++++++++ synapse/rest/client/v1/room.py | 21 ++++++++++++++++----- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bf6b1c1535..8758af4ca1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -443,6 +443,16 @@ class RoomListHandler(BaseHandler): self.remote_list_request_cache.set((), deferred) self.remote_list_cache = yield deferred + @defer.inlineCallbacks + def get_remote_public_room_list(self, server_name): + res = yield self.hs.get_replication_layer().get_public_rooms( + [server_name] + ) + + if server_name not in res: + raise SynapseError(404, "Server not found") + defer.returnValue(res[server_name]) + @defer.inlineCallbacks def get_aggregated_public_room_list(self): """ diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 0d81757010..7971e53010 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -295,15 +295,26 @@ class PublicRoomListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): + server = request.args.get("server", [None])[0] + try: yield self.auth.get_user_by_req(request) - except AuthError: - # This endpoint isn't authed, but its useful to know who's hitting - # it if they *do* supply an access token - pass + except AuthError as e: + # We allow people to not be authed if they're just looking at our + # room list, but require auth when we proxy the request. + # In both cases we call the auth function, as that has the side + # effect of logging who issued this request if an access token was + # provided. + if server: + raise e + else: + pass handler = self.hs.get_room_list_handler() - data = yield handler.get_aggregated_public_room_list() + if server: + data = yield handler.get_remote_public_room_list(server) + else: + data = yield handler.get_aggregated_public_room_list() defer.returnValue((200, data)) -- cgit 1.4.1 From 8b93af662d432cf6b3d36cbbcbd4dd2427bde658 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 15:04:46 +0100 Subject: Check the user_id for presence/typing matches origin --- synapse/handlers/presence.py | 7 +++++++ synapse/handlers/typing.py | 9 ++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index da9f0da69e..7a3c16a8aa 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -651,6 +651,13 @@ class PresenceHandler(object): ) continue + if get_domain_from_id(user_id) != origin: + logger.info( + "Got presence update from %r with bad 'user_id': %r", + origin, user_id, + ) + continue + presence_state = push.get("presence", None) if not presence_state: logger.info( diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0b530b9034..3b687957dd 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -199,7 +199,14 @@ class TypingHandler(object): user_id = content["user_id"] # Check that the string is a valid user id - UserID.from_string(user_id) + user = UserID.from_string(user_id) + + if user.domain != origin: + logger.info( + "Got typing update from %r with bad 'user_id': %r", + origin, user_id, + ) + return users = yield self.state.get_current_user_in_room(room_id) domains = set(get_domain_from_id(u) for u in users) -- cgit 1.4.1 From a1c8f268e5948d6466d64ef983b98fce287ec907 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Sep 2016 15:13:05 +0100 Subject: Support wildcard device_ids for direct to device messages --- synapse/storage/deviceinbox.py | 54 ++++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 0d37bb961b..658fbef27b 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -130,19 +130,41 @@ class DeviceInboxStore(SQLBaseStore): def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, messages_by_user_then_device): - local_users_and_devices = set() + local_by_user_then_device = {} for user_id, messages_by_device in messages_by_user_then_device.items(): + messages_json_for_user = {} devices = messages_by_device.keys() - sql = ( - "SELECT user_id, device_id FROM devices" - " WHERE user_id = ? AND device_id IN (" - + ",".join("?" * len(devices)) - + ")" - ) - # TODO: Maybe this needs to be done in batches if there are - # too many local devices for a given user. - txn.execute(sql, [user_id] + devices) - local_users_and_devices.update(map(tuple, txn.fetchall())) + if len(devices) == 1 and devices[0] == "*": + # Handle wildcard device_ids. + sql = ( + "SELECT device_id FROM devices" + " WHERE user_id = ?" + ) + txn.execute(sql, (user_id,)) + message_json = ujson.dumps(messages_by_device["*"]) + for row in txn.fetchall(): + # Add the message for all devices for this user on this + # server. + device = row[0] + messages_json_for_user[device] = message_json + else: + sql = ( + "SELECT device_id FROM devices" + " WHERE user_id = ? AND device_id IN (" + + ",".join("?" * len(devices)) + + ")" + ) + # TODO: Maybe this needs to be done in batches if there are + # too many local devices for a given user. + txn.execute(sql, [user_id] + devices) + for row in txn.fetchall(): + # Only insert into the local inbox if the device exists on + # this server + device = row[0] + message_json = ujson.dumps(messages_by_device[device]) + messages_json_for_user[device] = message_json + + local_by_user_then_device[user_id] = messages_json_for_user sql = ( "INSERT INTO device_inbox" @@ -150,13 +172,9 @@ class DeviceInboxStore(SQLBaseStore): " VALUES (?,?,?,?)" ) rows = [] - for user_id, messages_by_device in messages_by_user_then_device.items(): - for device_id, message in messages_by_device.items(): - message_json = ujson.dumps(message) - # Only insert into the local inbox if the device exists on - # this server - if (user_id, device_id) in local_users_and_devices: - rows.append((user_id, device_id, stream_id, message_json)) + for user_id, messages_by_device in local_by_user_then_device.items(): + for device_id, message_json in messages_by_device.items(): + rows.append((user_id, device_id, stream_id, message_json)) txn.executemany(sql, rows) -- cgit 1.4.1 From b152ee71fec06cc52842e19249ff4db849f99d38 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 15:23:44 +0100 Subject: Bump version and changelog --- CHANGES.rst | 9 +++++++++ synapse/__init__.py | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index 1d073119f9..c40a32abd6 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,12 @@ +Changes in synapse v0.17.2 (2016-09-08) +======================================= + +This release contains security bug fixes. Please upgrade. + + +No changes since v0.17.2 + + Changes in synapse v0.17.2-rc1 (2016-09-05) =========================================== diff --git a/synapse/__init__.py b/synapse/__init__.py index ad2ec58ea4..523deaa5ff 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.17.2-rc1" +__version__ = "0.17.2" -- cgit 1.4.1 From 3f9889bfd69ae511f2cfedb5d3749f89878b5498 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 15:51:10 +0100 Subject: Use parse_string --- synapse/rest/client/v1/room.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 7971e53010..3c933f1620 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.filtering import Filter from synapse.types import UserID, RoomID, RoomAlias from synapse.events.utils import serialize_event -from synapse.http.servlet import parse_json_object_from_request +from synapse.http.servlet import parse_json_object_from_request, parse_string import logging import urllib @@ -295,7 +295,7 @@ class PublicRoomListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): - server = request.args.get("server", [None])[0] + server = parse_string(request, "server", default=None) try: yield self.auth.get_user_by_req(request) -- cgit 1.4.1 From 5beda10bbdeeed4d5535c726f32e18d5c09f2553 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 16:18:01 +0100 Subject: Reindex state_groups_state after pruning --- synapse/storage/background_updates.py | 6 ++++-- synapse/storage/state.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 30d0e4c5dc..003f5ba203 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -133,10 +133,12 @@ class BackgroundUpdateStore(SQLBaseStore): updates = yield self._simple_select_list( "background_updates", keyvalues=None, - retcols=("update_name",), + retcols=("update_name", "depends_on"), ) + in_flight = set(update["update_name"] for update in updates) for update in updates: - self._background_update_queue.append(update['update_name']) + if update["depends_on"] not in in_flight: + self._background_update_queue.append(update['update_name']) if not self._background_update_queue: # no work left to do diff --git a/synapse/storage/state.py b/synapse/storage/state.py index fef87834ca..0cff0a0cda 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -48,6 +48,7 @@ class StateStore(SQLBaseStore): """ STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" + STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" def __init__(self, hs): super(StateStore, self).__init__(hs) @@ -55,6 +56,10 @@ class StateStore(SQLBaseStore): self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, self._background_deduplicate_state, ) + self.register_background_update_handler( + self.STATE_GROUP_INDEX_UPDATE_NAME, + self._background_index_state, + ) @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): @@ -793,3 +798,31 @@ class StateStore(SQLBaseStore): yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME) defer.returnValue(result * BATCH_SIZE_SCALE_FACTOR) + + @defer.inlineCallbacks + def _background_index_state(self, progress, batch_size): + def reindex_txn(txn): + if isinstance(self.database_engine, PostgresEngine): + txn.execute( + "CREATE INDEX state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + else: + txn.execute( + "CREATE INDEX state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + + yield self.runInteraction( + self.STATE_GROUP_INDEX_UPDATE_NAME, reindex_txn + ) + + yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME) + + defer.returnValue(1) -- cgit 1.4.1 From ebb46497ba622983b31a3c5aad943b8922b97e89 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 16:38:54 +0100 Subject: Add delta file --- synapse/storage/schema/delta/35/add_state_index.sql | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 synapse/storage/schema/delta/35/add_state_index.sql diff --git a/synapse/storage/schema/delta/35/add_state_index.sql b/synapse/storage/schema/delta/35/add_state_index.sql new file mode 100644 index 0000000000..0fce26345b --- /dev/null +++ b/synapse/storage/schema/delta/35/add_state_index.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +ALTER TABLE background_updates ADD COLUMN depends_on TEXT; + +INSERT into background_updates (update_name, progress_json, depends_on) + VALUES ('state_group_state_type_index', '{}', 'state_group_state_deduplication'); -- cgit 1.4.1 From fa722a699cd2637546f02451b8ee969c7bc1a84d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Sep 2016 17:35:16 +0100 Subject: Reapply 34/device_outbox in 35/device_outbox_again.py since the schema was bumped before it landed on develop --- .../storage/schema/delta/35/device_outbox_again.py | 30 ++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 synapse/storage/schema/delta/35/device_outbox_again.py diff --git a/synapse/storage/schema/delta/35/device_outbox_again.py b/synapse/storage/schema/delta/35/device_outbox_again.py new file mode 100644 index 0000000000..46da12a93c --- /dev/null +++ b/synapse/storage/schema/delta/35/device_outbox_again.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Re-apply 34/device_outbox.sql since the schema version was bumped before it +# was added to develop. + +import synapse.storage.prepare_database +import os + + +def run_create(cur, database_engine, *args, **kwargs): + try: + delta_dir = os.path.join(os.path.dirname(__file__), "..") + synapse.storage.prepare_database.executescript( + cur, os.path.join(delta_dir, "34", "device_outbox.sql") + ) + except: + pass -- cgit 1.4.1 From 7d5b1425478f7d7a7e06b11579b107f9e7c8c6a0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Sep 2016 17:39:11 +0100 Subject: Add a stub run_upgrade --- synapse/storage/schema/delta/35/device_outbox_again.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/schema/delta/35/device_outbox_again.py b/synapse/storage/schema/delta/35/device_outbox_again.py index 46da12a93c..d6d2260393 100644 --- a/synapse/storage/schema/delta/35/device_outbox_again.py +++ b/synapse/storage/schema/delta/35/device_outbox_again.py @@ -28,3 +28,7 @@ def run_create(cur, database_engine, *args, **kwargs): ) except: pass + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass -- cgit 1.4.1 From 2f267ee160b1f7ce591f4f10ddb5f9239110a2f6 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 8 Sep 2016 17:43:53 +0100 Subject: Collect up all the "instances" lists of individual AS protocol results into one combined answer to the client --- synapse/handlers/appservice.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index b440280b74..25447284eb 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -179,9 +179,37 @@ class ApplicationServicesHandler(object): def get_3pe_protocols(self): services = yield self.store.get_app_services() protocols = {} + + # Collect up all the individual protocol responses out of the ASes for s in services: for p in s.protocols: - protocols[p] = yield self.appservice_api.get_3pe_protocol(s, p) + info = yield self.appservice_api.get_3pe_protocol(s, p) + + # Ignore any result that doesn't contain an "instances" list + if "instances" not in info: + continue + if not isinstance(info["instances"], list): + continue + + if p not in protocols: + protocols[p] = [] + protocols[p].append(info) + + def _merge_instances(infos): + if len(infos) == 0: + return {} + + # Merge the 'instances' lists of multiple results, but just take + # the other fields from the first as they ought to be identical + combined = dict(infos[0]) + + for info in infos[1:]: + combined["instances"].extend(info["instances"]) + + return combined + + for p in protocols.keys(): + protocols[p] = _merge_instances(protocols[p]) defer.returnValue(protocols) -- cgit 1.4.1 From 43b77c5d97a3119296c0f26030140b28e8d25f04 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Sep 2016 17:44:21 +0100 Subject: Only catch databas errors --- synapse/storage/schema/delta/35/device_outbox_again.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/35/device_outbox_again.py b/synapse/storage/schema/delta/35/device_outbox_again.py index d6d2260393..5f950a4a8a 100644 --- a/synapse/storage/schema/delta/35/device_outbox_again.py +++ b/synapse/storage/schema/delta/35/device_outbox_again.py @@ -26,7 +26,7 @@ def run_create(cur, database_engine, *args, **kwargs): synapse.storage.prepare_database.executescript( cur, os.path.join(delta_dir, "34", "device_outbox.sql") ) - except: + except database_engine.module.DatabaseError: pass -- cgit 1.4.1 From b3907561506a98d7e8bbe66efe2037df7ceb70fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 11:00:15 +0100 Subject: Update last_device_stream_id_by_dest if there is nothing to send --- synapse/federation/transaction_queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 633c79c352..5c7245d383 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -200,6 +200,7 @@ class TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: logger.debug("TX [%s] Nothing to send", destination) + self.last_device_stream_id_by_dest[destination] = device_stream_id return yield self._send_new_transaction( -- cgit 1.4.1 From 2ffec928e2512a72d73d5b4a715c4540d68011a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 11:03:31 +0100 Subject: Reduce batch size to be under SQL limit --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 6ab10db328..866d64e679 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -402,7 +402,7 @@ class RoomMemberStore(SQLBaseStore): keyvalues={ "membership": Membership.JOIN, }, - batch_size=1000, + batch_size=500, desc="_get_joined_users_from_context", ) -- cgit 1.4.1 From 0877157353c5610e8ede2f205a84ae80bef7983b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 9 Sep 2016 11:04:47 +0100 Subject: Just move the schema and add some DROPs --- synapse/storage/schema/delta/34/device_outbox.sql | 36 -------------------- synapse/storage/schema/delta/35/device_outbox.sql | 39 ++++++++++++++++++++++ .../storage/schema/delta/35/device_outbox_again.py | 34 ------------------- 3 files changed, 39 insertions(+), 70 deletions(-) delete mode 100644 synapse/storage/schema/delta/34/device_outbox.sql create mode 100644 synapse/storage/schema/delta/35/device_outbox.sql delete mode 100644 synapse/storage/schema/delta/35/device_outbox_again.py diff --git a/synapse/storage/schema/delta/34/device_outbox.sql b/synapse/storage/schema/delta/34/device_outbox.sql deleted file mode 100644 index e87066d9a1..0000000000 --- a/synapse/storage/schema/delta/34/device_outbox.sql +++ /dev/null @@ -1,36 +0,0 @@ -/* Copyright 2016 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE TABLE device_federation_outbox ( - destination TEXT NOT NULL, - stream_id BIGINT NOT NULL, - queued_ts BIGINT NOT NULL, - messages_json TEXT NOT NULL -); - - -CREATE INDEX device_federation_outbox_destination_id - ON device_federation_outbox(destination, stream_id); - - -CREATE TABLE device_federation_inbox ( - origin TEXT NOT NULL, - message_id TEXT NOT NULL, - received_ts BIGINT NOT NULL -); - - -CREATE INDEX device_federation_inbox_sender_id - ON device_federation_inbox(origin, message_id); diff --git a/synapse/storage/schema/delta/35/device_outbox.sql b/synapse/storage/schema/delta/35/device_outbox.sql new file mode 100644 index 0000000000..17e6c43105 --- /dev/null +++ b/synapse/storage/schema/delta/35/device_outbox.sql @@ -0,0 +1,39 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +DROP TABLE IF EXISTS device_federation_outbox; +CREATE TABLE device_federation_outbox ( + destination TEXT NOT NULL, + stream_id BIGINT NOT NULL, + queued_ts BIGINT NOT NULL, + messages_json TEXT NOT NULL +); + + +DROP INDEX IF EXISTS device_federation_outbox_destination_id; +CREATE INDEX device_federation_outbox_destination_id + ON device_federation_outbox(destination, stream_id); + + +DROP TABLE IF EXISTS device_federation_inbox; +CREATE TABLE device_federation_inbox ( + origin TEXT NOT NULL, + message_id TEXT NOT NULL, + received_ts BIGINT NOT NULL +); + +DROP INDEX IF EXISTS device_federation_inbox_sender_id; +CREATE INDEX device_federation_inbox_sender_id + ON device_federation_inbox(origin, message_id); diff --git a/synapse/storage/schema/delta/35/device_outbox_again.py b/synapse/storage/schema/delta/35/device_outbox_again.py deleted file mode 100644 index 5f950a4a8a..0000000000 --- a/synapse/storage/schema/delta/35/device_outbox_again.py +++ /dev/null @@ -1,34 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Re-apply 34/device_outbox.sql since the schema version was bumped before it -# was added to develop. - -import synapse.storage.prepare_database -import os - - -def run_create(cur, database_engine, *args, **kwargs): - try: - delta_dir = os.path.join(os.path.dirname(__file__), "..") - synapse.storage.prepare_database.executescript( - cur, os.path.join(delta_dir, "34", "device_outbox.sql") - ) - except database_engine.module.DatabaseError: - pass - - -def run_upgrade(cur, database_engine, *args, **kwargs): - pass -- cgit 1.4.1 From 66efcbbff1c9f85a9bc003513af45ea513264abb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 11:07:39 +0100 Subject: Bump changelog and version --- CHANGES.rst | 9 ++++++++- synapse/__init__.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index c40a32abd6..23be6c8efa 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,10 +1,17 @@ +Changes in synapse v0.17.3 (2016-09-09) +======================================= + +This release fixes a major bug that stopped servers from handling rooms with +over 1000 members. + + Changes in synapse v0.17.2 (2016-09-08) ======================================= This release contains security bug fixes. Please upgrade. -No changes since v0.17.2 +No changes since v0.17.2-rc1 Changes in synapse v0.17.2-rc1 (2016-09-05) diff --git a/synapse/__init__.py b/synapse/__init__.py index 523deaa5ff..b778cd65c9 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.17.2" +__version__ = "0.17.3" -- cgit 1.4.1 From d2688d7f03b006a1a4d340dce04550214ae86185 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 11:44:36 +0100 Subject: Correctly guard against multiple concurrent transactions --- synapse/federation/transaction_queue.py | 79 +++++++++++++++++---------------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5c7245d383..6900b0121b 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -170,44 +170,53 @@ class TransactionQueue(object): @defer.inlineCallbacks def _attempt_new_transaction(self, destination): - yield run_on_reactor() - while True: - # list of (pending_pdu, deferred, order) - if destination in self.pending_transactions: - # XXX: pending_transactions can get stuck on by a never-ending - # request at which point pending_pdus_by_dest just keeps growing. - # we need application-layer timeouts of some flavour of these - # requests - logger.debug( - "TX [%s] Transaction already in progress", - destination - ) - return + # list of (pending_pdu, deferred, order) + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests + logger.debug( + "TX [%s] Transaction already in progress", + destination + ) + return - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_failures = self.pending_failures_by_dest.pop(destination, []) + try: + self.pending_transactions[destination] = 1 - device_message_edus, device_stream_id = ( - yield self._get_new_device_messages(destination) - ) + yield run_on_reactor() - pending_edus.extend(device_message_edus) + while True: + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_failures = self.pending_failures_by_dest.pop(destination, []) - if pending_pdus: - logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", - destination, len(pending_pdus)) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) - if not pending_pdus and not pending_edus and not pending_failures: - logger.debug("TX [%s] Nothing to send", destination) - self.last_device_stream_id_by_dest[destination] = device_stream_id - return + pending_edus.extend(device_message_edus) - yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures, - device_stream_id, - should_delete_from_device_stream=bool(device_message_edus) - ) + if pending_pdus: + logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) + + if not pending_pdus and not pending_edus and not pending_failures: + logger.debug("TX [%s] Nothing to send", destination) + self.last_device_stream_id_by_dest[destination] = ( + device_stream_id + ) + return + + yield self._send_new_transaction( + destination, pending_pdus, pending_edus, pending_failures, + device_stream_id, + should_delete_from_device_stream=bool(device_message_edus) + ) + finally: + # We want to be *very* sure we delete this after we stop processing + self.pending_transactions.pop(destination, None) @defer.inlineCallbacks def _get_new_device_messages(self, destination): @@ -240,8 +249,6 @@ class TransactionQueue(object): failures = [x.get_dict() for x in pending_failures] try: - self.pending_transactions[destination] = 1 - logger.debug("TX [%s] _attempt_new_transaction", destination) txn_id = str(self._next_txn_id) @@ -375,7 +382,3 @@ class TransactionQueue(object): for p in pdus: logger.info("Failed to send event %s to %s", p.event_id, destination) - - finally: - # We want to be *very* sure we delete this after we stop processing - self.pending_transactions.pop(destination, None) -- cgit 1.4.1 From 6a6cbfcf1e12c0f34a280764f892eaa23e720d57 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 9 Sep 2016 11:48:23 +0100 Subject: Track the max_stream_device_id in a separate table, since we delete from the inbox table --- synapse/replication/slave/storage/deviceinbox.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/deviceinbox.py | 17 +++++++++++++++-- synapse/storage/schema/delta/35/device_stream_id.sql | 20 ++++++++++++++++++++ 4 files changed, 37 insertions(+), 4 deletions(-) create mode 100644 synapse/storage/schema/delta/35/device_stream_id.sql diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 251078ba57..3bfd5e8213 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -23,7 +23,7 @@ class SlavedDeviceInboxStore(BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedDeviceInboxStore, self).__init__(db_conn, hs) self._device_inbox_id_gen = SlavedIdTracker( - db_conn, "device_inbox", "stream_id", + db_conn, "device_max_stream_id", "stream_id", ) self._device_inbox_stream_cache = StreamChangeCache( "DeviceInboxStreamChangeCache", diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 828e5ca60b..a61e83d5de 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -111,7 +111,7 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "presence_stream", "stream_id" ) self._device_inbox_id_gen = StreamIdGenerator( - db_conn, "device_inbox", "stream_id" + db_conn, "device_max_stream_id", "stream_id" ) self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 658fbef27b..b729b7106e 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -130,6 +130,13 @@ class DeviceInboxStore(SQLBaseStore): def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, messages_by_user_then_device): + sql = ( + "UPDATE device_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(sql, (stream_id, stream_id)) + local_by_user_then_device = {} for user_id, messages_by_device in messages_by_user_then_device.items(): messages_json_for_user = {} @@ -148,6 +155,8 @@ class DeviceInboxStore(SQLBaseStore): device = row[0] messages_json_for_user[device] = message_json else: + if not devices: + continue sql = ( "SELECT device_id FROM devices" " WHERE user_id = ? AND device_id IN (" @@ -164,7 +173,11 @@ class DeviceInboxStore(SQLBaseStore): message_json = ujson.dumps(messages_by_device[device]) messages_json_for_user[device] = message_json - local_by_user_then_device[user_id] = messages_json_for_user + if messages_json_for_user: + local_by_user_then_device[user_id] = messages_json_for_user + + if not local_by_user_then_device: + return sql = ( "INSERT INTO device_inbox" @@ -301,7 +314,7 @@ class DeviceInboxStore(SQLBaseStore): has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id ) - if not has_changed: + if not has_changed or last_stream_id == current_stream_id: return defer.succeed(([], current_stream_id)) def get_new_messages_for_remote_destination_txn(txn): diff --git a/synapse/storage/schema/delta/35/device_stream_id.sql b/synapse/storage/schema/delta/35/device_stream_id.sql new file mode 100644 index 0000000000..1ce6336f33 --- /dev/null +++ b/synapse/storage/schema/delta/35/device_stream_id.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE device_max_stream_id ( + stream_id BIGINT NOT NULL +); + +INSERT INTO device_max_stream_id (stream_id) VALUES (0); -- cgit 1.4.1 From 647c7245733a72b9b71decb2b321869a942dbb88 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 9 Sep 2016 11:52:44 +0100 Subject: Use the previous MAX value if any to set the stream_id --- synapse/storage/schema/delta/35/device_stream_id.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/35/device_stream_id.sql b/synapse/storage/schema/delta/35/device_stream_id.sql index 1ce6336f33..7ab7d942e2 100644 --- a/synapse/storage/schema/delta/35/device_stream_id.sql +++ b/synapse/storage/schema/delta/35/device_stream_id.sql @@ -17,4 +17,5 @@ CREATE TABLE device_max_stream_id ( stream_id BIGINT NOT NULL ); -INSERT INTO device_max_stream_id (stream_id) VALUES (0); +INSERT INTO device_max_stream_id (stream_id) + SELECT COALESCE(MAX(stream_id), 0) FROM device_inbox; -- cgit 1.4.1 From 033d43e4190bf765eb29bb6ba8ea7cbe6ad66cf4 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 13:10:36 +0100 Subject: Don't corrupt shared cache on subsequent protocol requests --- synapse/handlers/appservice.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 25447284eb..e68628bdfd 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -201,7 +201,9 @@ class ApplicationServicesHandler(object): # Merge the 'instances' lists of multiple results, but just take # the other fields from the first as they ought to be identical + # deep-clone the result so as not to corrupt the cached one combined = dict(infos[0]) + combined["instances"] = list(combined["instances"]) for info in infos[1:]: combined["instances"].extend(info["instances"]) -- cgit 1.4.1 From 4598682b43dbe55339cfc869042456b74813159f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 13:12:53 +0100 Subject: Fix tightloop on sending transaction --- synapse/federation/transaction_queue.py | 256 +++++++++++++++++--------------- 1 file changed, 134 insertions(+), 122 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 6900b0121b..f8d3fffe95 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -209,11 +209,13 @@ class TransactionQueue(object): ) return - yield self._send_new_transaction( + success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, device_stream_id, should_delete_from_device_stream=bool(device_message_edus) ) + if not success: + break finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) @@ -242,143 +244,153 @@ class TransactionQueue(object): pending_failures, device_stream_id, should_delete_from_device_stream): - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[1]) - pdus = [x[0] for x in pending_pdus] - edus = pending_edus - failures = [x.get_dict() for x in pending_failures] + # Sort based on the order field + pending_pdus.sort(key=lambda t: t[1]) + pdus = [x[0] for x in pending_pdus] + edus = pending_edus + failures = [x.get_dict() for x in pending_failures] - try: - logger.debug("TX [%s] _attempt_new_transaction", destination) + success = True - txn_id = str(self._next_txn_id) + try: + logger.debug("TX [%s] _attempt_new_transaction", destination) - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - ) + txn_id = str(self._next_txn_id) - logger.debug( - "TX [%s] {%s} Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, txn_id, - len(pdus), - len(edus), - len(failures) - ) + limiter = yield get_retry_limiter( + destination, + self.clock, + self.store, + ) - logger.debug("TX [%s] Persisting transaction...", destination) + logger.debug( + "TX [%s] {%s} Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, txn_id, + len(pdus), + len(edus), + len(failures) + ) - transaction = Transaction.create_new( - origin_server_ts=int(self.clock.time_msec()), - transaction_id=txn_id, - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, - ) + logger.debug("TX [%s] Persisting transaction...", destination) - self._next_txn_id += 1 + transaction = Transaction.create_new( + origin_server_ts=int(self.clock.time_msec()), + transaction_id=txn_id, + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) - yield self.transaction_actions.prepare_to_send(transaction) + self._next_txn_id += 1 - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] {%s} Sending transaction [%s]," - " (PDUs: %d, EDUs: %d, failures: %d)", - destination, txn_id, - transaction.transaction_id, - len(pdus), - len(edus), - len(failures), - ) + yield self.transaction_actions.prepare_to_send(transaction) - with limiter: - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self.clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) - code = 200 - - if response: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warn( - "Transaction returned error for %s: %s", - e_id, r, - ) - except HttpResponseException as e: - code = e.code - response = e.response + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] {%s} Sending transaction [%s]," + " (PDUs: %d, EDUs: %d, failures: %d)", + destination, txn_id, + transaction.transaction_id, + len(pdus), + len(edus), + len(failures), + ) - logger.info( - "TX [%s] {%s} got %d response", - destination, txn_id, code + with limiter: + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self.clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb ) + code = 200 + + if response: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, + ) + except HttpResponseException as e: + code = e.code + response = e.response - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) - - yield self.transaction_actions.delivered( - transaction, code, response + logger.info( + "TX [%s] {%s} got %d response", + destination, txn_id, code ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] Sent transaction", destination) + logger.debug("TX [%s] Marking as delivered...", destination) - if code != 200: - for p in pdus: - logger.info( - "Failed to send event %s to %s", p.event_id, destination - ) - else: - # Remove the acknowledged device messages from the database - if should_delete_from_device_stream: - yield self.store.delete_device_msgs_for_remote( - destination, device_stream_id - ) - self.last_device_stream_id_by_dest[destination] = device_stream_id - except NotRetryingDestination: - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - except RuntimeError as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) + yield self.transaction_actions.delivered( + transaction, code, response + ) - for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) - except Exception as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) + logger.debug("TX [%s] Marked as delivered", destination) + if code != 200: for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) + logger.info( + "Failed to send event %s to %s", p.event_id, destination + ) + success = False + else: + # Remove the acknowledged device messages from the database + if should_delete_from_device_stream: + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) + self.last_device_stream_id_by_dest[destination] = device_stream_id + except NotRetryingDestination: + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) + success = False + except RuntimeError as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) + + success = False + + for p in pdus: + logger.info("Failed to send event %s to %s", p.event_id, destination) + except Exception as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) + + success = False + + for p in pdus: + logger.info("Failed to send event %s to %s", p.event_id, destination) + + defer.returnValue(success) -- cgit 1.4.1 From 3328428d055ec62b15281ac719a4dfaa583c774b Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 13:19:04 +0100 Subject: Allow lookup of a single 3PE protocol query metadata --- synapse/rest/client/v2_alpha/thirdparty.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index 4f6f1a7e17..48d8543e76 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -42,6 +42,26 @@ class ThirdPartyProtocolsServlet(RestServlet): defer.returnValue((200, protocols)) +class ThirdPartyProtocolServlet(RestServlet): + PATTERNS = client_v2_patterns("/thirdparty/protocol/(?P[^/]+)$", releases=()) + + def __init__(self, hs): + super(ThirdPartyProtocolServlet, self).__init__() + + self.auth = hs.get_auth() + self.appservice_handler = hs.get_application_service_handler() + + @defer.inlineCallbacks + def on_GET(self, request, protocol): + yield self.auth.get_user_by_req(request) + + protocols = yield self.appservice_handler.get_3pe_protocols() + if protocol in protocols: + defer.returnValue((200, protocols[protocol])) + else: + defer.returnValue((404, {error: "Unknown protocol"})) + + class ThirdPartyUserServlet(RestServlet): PATTERNS = client_v2_patterns("/thirdparty/user(/(?P[^/]+))?$", releases=()) @@ -92,5 +112,6 @@ class ThirdPartyLocationServlet(RestServlet): def register_servlets(hs, http_server): ThirdPartyProtocolsServlet(hs).register(http_server) + ThirdPartyProtocolServlet(hs).register(http_server) ThirdPartyUserServlet(hs).register(http_server) ThirdPartyLocationServlet(hs).register(http_server) -- cgit 1.4.1 From 25eb769b26d6a13afcc9173e0eacf932e5cc1449 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 13:25:02 +0100 Subject: Efficiency fix for lookups of a single protocol --- synapse/handlers/appservice.py | 5 ++++- synapse/rest/client/v2_alpha/thirdparty.py | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index e68628bdfd..a0375f7e3b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -176,13 +176,16 @@ class ApplicationServicesHandler(object): defer.returnValue(ret) @defer.inlineCallbacks - def get_3pe_protocols(self): + def get_3pe_protocols(self, only_protocol=None): services = yield self.store.get_app_services() protocols = {} # Collect up all the individual protocol responses out of the ASes for s in services: for p in s.protocols: + if only_protocol is not None and p != only_protocol: + continue + info = yield self.appservice_api.get_3pe_protocol(s, p) # Ignore any result that doesn't contain an "instances" list diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index 48d8543e76..6bf9eb10ae 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -55,7 +55,9 @@ class ThirdPartyProtocolServlet(RestServlet): def on_GET(self, request, protocol): yield self.auth.get_user_by_req(request) - protocols = yield self.appservice_handler.get_3pe_protocols() + protocols = yield self.appservice_handler.get_3pe_protocols( + only_protocol=protocol, + ) if protocol in protocols: defer.returnValue((200, protocols[protocol])) else: -- cgit 1.4.1 From 6eb0c8a2e46362f61d34a99416fed6b12055c368 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 13:31:17 +0100 Subject: Python isn't JavaScript; have to quote dict keys --- synapse/rest/client/v2_alpha/thirdparty.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index 6bf9eb10ae..d58385d5e5 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -61,7 +61,7 @@ class ThirdPartyProtocolServlet(RestServlet): if protocol in protocols: defer.returnValue((200, protocols[protocol])) else: - defer.returnValue((404, {error: "Unknown protocol"})) + defer.returnValue((404, {"error": "Unknown protocol"})) class ThirdPartyUserServlet(RestServlet): -- cgit 1.4.1 From bdbcfc2a804fc504aa03d777c6b53d247e45d1ff Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 13:31:39 +0100 Subject: appease pep8 --- synapse/rest/client/v2_alpha/thirdparty.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index d58385d5e5..dca615927a 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -43,7 +43,8 @@ class ThirdPartyProtocolsServlet(RestServlet): class ThirdPartyProtocolServlet(RestServlet): - PATTERNS = client_v2_patterns("/thirdparty/protocol/(?P[^/]+)$", releases=()) + PATTERNS = client_v2_patterns("/thirdparty/protocol/(?P[^/]+)$", + releases=()) def __init__(self, hs): super(ThirdPartyProtocolServlet, self).__init__() -- cgit 1.4.1 From a6c67501666c0fefeae8edec2c5f7755a9d24fb8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 13:46:05 +0100 Subject: Check if destination is ready for retry earlier --- synapse/federation/transaction_queue.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f8d3fffe95..d9b8b3fc1d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -192,6 +192,12 @@ class TransactionQueue(object): pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) + limiter = yield get_retry_limiter( + destination, + self.clock, + self.store, + ) + device_message_edus, device_stream_id = ( yield self._get_new_device_messages(destination) ) @@ -212,10 +218,18 @@ class TransactionQueue(object): success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, device_stream_id, - should_delete_from_device_stream=bool(device_message_edus) + should_delete_from_device_stream=bool(device_message_edus), + limiter=limiter, ) if not success: break + except NotRetryingDestination: + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) + success = False finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) @@ -242,7 +256,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, pending_failures, device_stream_id, - should_delete_from_device_stream): + should_delete_from_device_stream, limiter): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -257,12 +271,6 @@ class TransactionQueue(object): txn_id = str(self._next_txn_id) - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - ) - logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", @@ -359,13 +367,6 @@ class TransactionQueue(object): destination, device_stream_id ) self.last_device_stream_id_by_dest[destination] = device_stream_id - except NotRetryingDestination: - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - success = False except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. -- cgit 1.4.1 From 6c4d5821446c861c0448a8d952a7aa40897b1ebd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 14:26:05 +0100 Subject: Deduplicate presence in _update_states --- synapse/handlers/presence.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7a3c16a8aa..16dbddee03 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -265,6 +265,12 @@ class PresenceHandler(object): to_notify = {} # Changes we want to notify everyone about to_federation_ping = {} # These need sending keep-alives + # Only bother handling the last presence change for each user + new_states_dict = {} + for new_state in new_states: + new_states_dict[new_state.user_id] = new_state + new_state = new_states_dict.values() + for new_state in new_states: user_id = new_state.user_id -- cgit 1.4.1 From 0fc0a3bdfff6f89ae0d952cc2bbc2843dbebdbf3 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 14:34:29 +0100 Subject: Allow clients to specify the format a room state event is returned in --- synapse/rest/client/v1/room.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 3c933f1620..2d7d89da04 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -120,6 +120,8 @@ class RoomStateEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id, event_type, state_key): requester = yield self.auth.get_user_by_req(request, allow_guest=True) + format = parse_string(request, "format", default="content", + allowed_values=["content", "event"]) msg_handler = self.handlers.message_handler data = yield msg_handler.get_room_data( @@ -134,7 +136,11 @@ class RoomStateEventRestServlet(ClientV1RestServlet): raise SynapseError( 404, "Event not found.", errcode=Codes.NOT_FOUND ) - defer.returnValue((200, data.get_dict()["content"])) + + if format == "event": + defer.returnValue((200, data.get_dict())) + elif format == "content": + defer.returnValue((200, data.get_dict()["content"])) @defer.inlineCallbacks def on_PUT(self, request, room_id, event_type, state_key, txn_id=None): -- cgit 1.4.1 From d271383e636e0c61d7e89d9baee84358ae32b5ad Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 14:40:15 +0100 Subject: Filter returned events for client-facing format --- synapse/rest/client/v1/room.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 2d7d89da04..22d6a7d31e 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -22,7 +22,7 @@ from synapse.streams.config import PaginationConfig from synapse.api.constants import EventTypes, Membership from synapse.api.filtering import Filter from synapse.types import UserID, RoomID, RoomAlias -from synapse.events.utils import serialize_event +from synapse.events.utils import serialize_event, format_event_for_client_v2 from synapse.http.servlet import parse_json_object_from_request, parse_string import logging @@ -138,7 +138,8 @@ class RoomStateEventRestServlet(ClientV1RestServlet): ) if format == "event": - defer.returnValue((200, data.get_dict())) + event = format_event_for_client_v2(data.get_dict()) + defer.returnValue((200, event)) elif format == "content": defer.returnValue((200, data.get_dict()["content"])) -- cgit 1.4.1 From f25d74f69c7da9bd36ba953d916dd78f7ea79ff1 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 14:54:16 +0100 Subject: Minor fixes from PR comments --- synapse/handlers/appservice.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index a0375f7e3b..4648e78d47 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -199,12 +199,12 @@ class ApplicationServicesHandler(object): protocols[p].append(info) def _merge_instances(infos): - if len(infos) == 0: + if not infos: return {} # Merge the 'instances' lists of multiple results, but just take # the other fields from the first as they ought to be identical - # deep-clone the result so as not to corrupt the cached one + # copy the result so as not to corrupt the cached one combined = dict(infos[0]) combined["instances"] = list(combined["instances"]) -- cgit 1.4.1 From ab80d5e0a968beb48140534b9ceab62b285b35c9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 14:05:01 +0100 Subject: Drop replication log levels --- synapse/federation/transaction_queue.py | 1 - synapse/replication/resource.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index d9b8b3fc1d..1ac569b305 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -229,7 +229,6 @@ class TransactionQueue(object): "dropping transaction for now", destination, ) - success = False finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 1ed9034bcb..857bc9795c 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -181,7 +181,7 @@ class ReplicationResource(Resource): def replicate(self, request_streams, limit): writer = _Writer() current_token = yield self.current_replication_token() - logger.info("Replicating up to %r", current_token) + logger.debug("Replicating up to %r", current_token) yield self.account_data(writer, current_token, limit, request_streams) yield self.events(writer, current_token, limit, request_streams) @@ -195,7 +195,7 @@ class ReplicationResource(Resource): yield self.to_device(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) - logger.info("Replicated %d rows", writer.total) + logger.debug("Replicated %d rows", writer.total) defer.returnValue(writer.finish()) def streams(self, writer, current_token, request_streams): -- cgit 1.4.1 From ed44c475d832196957715f49215a95be1ce1eade Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 15:07:04 +0100 Subject: Reject malformed 3PE query metadata results earlier in AS API handling code --- synapse/appservice/api.py | 12 ++++++++++-- synapse/handlers/appservice.py | 14 +++++--------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index cc4af23962..afc64ed267 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -162,11 +162,19 @@ class ApplicationServiceApi(SimpleHttpClient): urllib.quote(protocol) ) try: - defer.returnValue((yield self.get_json(uri, {}))) + info = yield self.get_json(uri, {}) + + # Ignore any result that doesn't contain an "instances" list + if "instances" not in info: + defer.returnValue(None) + if not isinstance(info["instances"], list): + defer.returnValue(None) + + defer.returnValue(info) except Exception as ex: logger.warning("query_3pe_protocol to %s threw exception %s", uri, ex) - defer.returnValue({}) + defer.returnValue(None) key = (service.id, protocol) return self.protocol_meta_cache.get(key) or ( diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 4648e78d47..88fa0bb2e4 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -186,17 +186,13 @@ class ApplicationServicesHandler(object): if only_protocol is not None and p != only_protocol: continue - info = yield self.appservice_api.get_3pe_protocol(s, p) - - # Ignore any result that doesn't contain an "instances" list - if "instances" not in info: - continue - if not isinstance(info["instances"], list): - continue - if p not in protocols: protocols[p] = [] - protocols[p].append(info) + + info = yield self.appservice_api.get_3pe_protocol(s, p) + + if info is not None: + protocols[p].append(info) def _merge_instances(infos): if not infos: -- cgit 1.4.1 From 776594f99dc082a9a933ec6db6955b56ecd1363b Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 9 Sep 2016 15:09:46 +0100 Subject: Log if rejecting 3PE query metadata result due to type check --- synapse/appservice/api.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index afc64ed267..b0eb0c6d9d 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -32,6 +32,14 @@ HOUR_IN_MS = 60 * 60 * 1000 APP_SERVICE_PREFIX = "/_matrix/app/unstable" +def _is_valid_3pe_metadata(info): + if "instances" not in info: + return False + if not isinstance(info["instances"], list): + return False + return True + + def _is_valid_3pe_result(r, field): if not isinstance(r, dict): return False @@ -164,10 +172,9 @@ class ApplicationServiceApi(SimpleHttpClient): try: info = yield self.get_json(uri, {}) - # Ignore any result that doesn't contain an "instances" list - if "instances" not in info: - defer.returnValue(None) - if not isinstance(info["instances"], list): + if not _is_valid_3pe_metadata(info): + logger.warning("query_3pe_protocol to %s did not return a" + " valid result", uri) defer.returnValue(None) defer.returnValue(info) -- cgit 1.4.1 From 8aee5aa06807210c17ad0e58e4f237fcf2d052f9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 9 Sep 2016 16:29:10 +0100 Subject: Add helper function for getting access_tokens from requests Rather than reimplementing the token parsing in the various places. This will make it easier to change the token parsing to allow access_tokens in HTTP headers. --- synapse/api/auth.py | 58 ++++++++++++++++++++++++++---- synapse/rest/client/v1/logout.py | 10 ++---- synapse/rest/client/v1/register.py | 12 +++---- synapse/rest/client/v1/transactions.py | 4 ++- synapse/rest/client/v2_alpha/register.py | 6 ++-- synapse/rest/client/v2_alpha/thirdparty.py | 4 +-- 6 files changed, 67 insertions(+), 27 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index dcda40863f..98a50f0948 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -583,12 +583,15 @@ class Auth(object): """ # Can optionally look elsewhere in the request (e.g. headers) try: - user_id = yield self._get_appservice_user_id(request.args) + user_id = yield self._get_appservice_user_id(request) if user_id: request.authenticated_entity = user_id defer.returnValue(synapse.types.create_requester(user_id)) - access_token = request.args["access_token"][0] + access_token = get_access_token_from_request( + request, self.TOKEN_NOT_FOUND_HTTP_STATUS + ) + user_info = yield self.get_user_by_access_token(access_token, rights) user = user_info["user"] token_id = user_info["token_id"] @@ -629,17 +632,19 @@ class Auth(object): ) @defer.inlineCallbacks - def _get_appservice_user_id(self, request_args): + def _get_appservice_user_id(self, request): app_service = yield self.store.get_app_service_by_token( - request_args["access_token"][0] + get_access_token_from_request( + request, self.TOKEN_NOT_FOUND_HTTP_STATUS + ) ) if app_service is None: defer.returnValue(None) - if "user_id" not in request_args: + if "user_id" not in request.args: defer.returnValue(app_service.sender) - user_id = request_args["user_id"][0] + user_id = request.args["user_id"][0] if app_service.sender == user_id: defer.returnValue(app_service.sender) @@ -833,7 +838,9 @@ class Auth(object): @defer.inlineCallbacks def get_appservice_by_req(self, request): try: - token = request.args["access_token"][0] + token = get_access_token_from_request( + request, self.TOKEN_NOT_FOUND_HTTP_STATUS + ) service = yield self.store.get_app_service_by_token(token) if not service: logger.warn("Unrecognised appservice access token: %s" % (token,)) @@ -1142,3 +1149,40 @@ class Auth(object): "This server requires you to be a moderator in the room to" " edit its room list entry" ) + + +def has_access_token(request): + """Checks if the request has an access_token. + + Returns: + bool: False if no access_token was given, True otherwise. + """ + query_params = request.args.get("access_token") + return bool(query_params) + + +def get_access_token_from_request(request, token_not_found_http_status=401): + """Extracts the access_token from the request. + + Args: + request: The http request. + token_not_found_http_status(int): The HTTP status code to set in the + AuthError if the token isn't found. This is used in some of the + legacy APIs to change the status code to 403 from the default of + 401 since some of the old clients depended on auth errors returning + 403. + Returns: + str: The access_token + Raises: + AuthError: If there isn't an access_token in the request. + """ + query_params = request.args.get("access_token") + # Try to get the access_token from the query params. + if not query_params: + raise AuthError( + token_not_found_http_status, + "Missing access token.", + errcode=Codes.MISSING_TOKEN + ) + + return query_params[0] diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py index 9bff02ee4e..1358d0acab 100644 --- a/synapse/rest/client/v1/logout.py +++ b/synapse/rest/client/v1/logout.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from synapse.api.errors import AuthError, Codes +from synapse.api.auth import get_access_token_from_request from .base import ClientV1RestServlet, client_path_patterns @@ -37,13 +37,7 @@ class LogoutRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request): - try: - access_token = request.args["access_token"][0] - except KeyError: - raise AuthError( - self.TOKEN_NOT_FOUND_HTTP_STATUS, "Missing access token.", - errcode=Codes.MISSING_TOKEN - ) + access_token = get_access_token_from_request(request) yield self.store.delete_access_token(access_token) defer.returnValue((200, {})) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 71d58c8e8d..3046da7aec 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, Codes from synapse.api.constants import LoginType +from synapse.api.auth import get_access_token_from_request from .base import ClientV1RestServlet, client_path_patterns import synapse.util.stringutils as stringutils from synapse.http.servlet import parse_json_object_from_request @@ -296,12 +297,11 @@ class RegisterRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def _do_app_service(self, request, register_json, session): - if "access_token" not in request.args: - raise SynapseError(400, "Expected application service token.") + as_token = get_access_token_from_request(request) + if "user" not in register_json: raise SynapseError(400, "Expected 'user' key.") - as_token = request.args["access_token"][0] user_localpart = register_json["user"].encode("utf-8") handler = self.handlers.registration_handler @@ -390,11 +390,9 @@ class CreateUserRestServlet(ClientV1RestServlet): def on_POST(self, request): user_json = parse_json_object_from_request(request) - if "access_token" not in request.args: - raise SynapseError(400, "Expected application service token.") - + access_token = get_access_token_from_request(request) app_service = yield self.store.get_app_service_by_token( - request.args["access_token"][0] + access_token ) if not app_service: raise SynapseError(403, "Invalid application service token.") diff --git a/synapse/rest/client/v1/transactions.py b/synapse/rest/client/v1/transactions.py index bdccf464a5..2f2c9d0881 100644 --- a/synapse/rest/client/v1/transactions.py +++ b/synapse/rest/client/v1/transactions.py @@ -17,6 +17,8 @@ to ensure idempotency when performing PUTs using the REST API.""" import logging +from synapse.api.auth import get_access_token_from_request + logger = logging.getLogger(__name__) @@ -90,6 +92,6 @@ class HttpTransactionStore(object): return response def _get_key(self, request): - token = request.args["access_token"][0] + token = get_access_token_from_request(request) path_without_txn_id = request.path.rsplit("/", 1)[0] return path_without_txn_id + "/" + token diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 2121bd75ea..68d18a9b82 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -15,6 +15,7 @@ from twisted.internet import defer +from synapse.api.auth import get_access_token_from_request, has_access_token from synapse.api.constants import LoginType from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError from synapse.http.servlet import RestServlet, parse_json_object_from_request @@ -131,7 +132,7 @@ class RegisterRestServlet(RestServlet): desired_username = body['username'] appservice = None - if 'access_token' in request.args: + if has_access_token(request): appservice = yield self.auth.get_appservice_by_req(request) # fork off as soon as possible for ASes and shared secret auth which @@ -143,10 +144,11 @@ class RegisterRestServlet(RestServlet): # 'user' key not 'username'). Since this is a new addition, we'll # fallback to 'username' if they gave one. desired_username = body.get("user", desired_username) + access_token = get_access_token_from_request(request) if isinstance(desired_username, basestring): result = yield self._do_appservice_registration( - desired_username, request.args["access_token"][0], body + desired_username, access_token, body ) defer.returnValue((200, result)) # we throw for non 200 responses return diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index 4f6f1a7e17..b3e73c0271 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -57,7 +57,7 @@ class ThirdPartyUserServlet(RestServlet): yield self.auth.get_user_by_req(request) fields = request.args - del fields["access_token"] + fields.pop("access_token", None) results = yield self.appservice_handler.query_3pe( ThirdPartyEntityKind.USER, protocol, fields @@ -81,7 +81,7 @@ class ThirdPartyLocationServlet(RestServlet): yield self.auth.get_user_by_req(request) fields = request.args - del fields["access_token"] + fields.pop("access_token", None) results = yield self.appservice_handler.query_3pe( ThirdPartyEntityKind.LOCATION, protocol, fields -- cgit 1.4.1