From 2367c5568c01bc65aacc955b76ba707918b37f1e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Jan 2017 14:27:27 +0000 Subject: Add basic implementation of local device list changes --- synapse/handlers/device.py | 65 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 53 insertions(+), 12 deletions(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index aa68755936..d92780b642 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -15,6 +15,7 @@ from synapse.api import errors from synapse.util import stringutils +from synapse.types import get_domain_from_id from twisted.internet import defer from ._base import BaseHandler @@ -27,6 +28,8 @@ class DeviceHandler(BaseHandler): def __init__(self, hs): super(DeviceHandler, self).__init__(hs) + self.state = hs.get_state_handler() + @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, initial_device_display_name=None): @@ -45,29 +48,29 @@ class DeviceHandler(BaseHandler): str: device id (generated if none was supplied) """ if device_id is not None: - yield self.store.store_device( + new_device = yield self.store.store_device( user_id=user_id, device_id=device_id, initial_device_display_name=initial_device_display_name, - ignore_if_known=True, ) + if new_device: + yield self.notify_device_update(user_id, device_id) defer.returnValue(device_id) # if the device id is not specified, we'll autogen one, but loop a few # times in case of a clash. attempts = 0 while attempts < 5: - try: - device_id = stringutils.random_string(10).upper() - yield self.store.store_device( - user_id=user_id, - device_id=device_id, - initial_device_display_name=initial_device_display_name, - ignore_if_known=False, - ) + device_id = stringutils.random_string(10).upper() + new_device = yield self.store.store_device( + user_id=user_id, + device_id=device_id, + initial_device_display_name=initial_device_display_name, + ) + if new_device: + yield self.notify_device_update(user_id, device_id) defer.returnValue(device_id) - except errors.StoreError: - attempts += 1 + attempts += 1 raise errors.StoreError(500, "Couldn't generate a device ID.") @@ -147,6 +150,8 @@ class DeviceHandler(BaseHandler): user_id=user_id, device_id=device_id ) + yield self.notify_device_update(user_id, device_id) + @defer.inlineCallbacks def update_device(self, user_id, device_id, content): """ Update the given device @@ -166,12 +171,48 @@ class DeviceHandler(BaseHandler): device_id, new_display_name=content.get("display_name") ) + yield self.notify_device_update(user_id, device_id) except errors.StoreError, e: if e.code == 404: raise errors.NotFoundError() else: raise + @defer.inlineCallbacks + def notify_device_update(self, user_id, device_id): + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = [r.room_id for r in rooms] + + hosts = set() + for room_id in room_ids: + users = yield self.state.get_current_user_in_room(room_id) + hosts.update(get_domain_from_id(u) for u in users) + hosts.discard(self.server_name) + + position = yield self.store.add_device_change_to_streams( + user_id, device_id, list(hosts) + ) + + yield self.notifier.on_new_event( + "device_list_key", position, rooms=room_ids, + ) + + for host in hosts: + self.federation.send_device_messages(host) + + @defer.inlineCallbacks + def get_device_list_changes(self, user_id, room_ids, from_key): + room_ids = frozenset(room_ids) + + user_ids_changed = set() + changed = yield self.store.get_user_whose_devices_changed(from_key) + for other_user_id in changed: + other_rooms = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(e.room_id for e in other_rooms): + user_ids_changed.add(other_user_id) + + defer.returnValue(user_ids_changed) + def _update_device_from_client_ips(device, client_ips): ip = client_ips.get((device["user_id"], device["device_id"]), {}) -- cgit 1.4.1 From 51e9fe36e46331ac611cec1d4cb425c1bc98721c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Jan 2017 16:55:21 +0000 Subject: Fix up sending of m.device_list_update edus --- synapse/federation/transaction_queue.py | 121 ++++++++++++++++---------------- synapse/handlers/device.py | 1 + synapse/storage/devices.py | 40 +++++------ 3 files changed, 82 insertions(+), 80 deletions(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 65c6673a87..d18f6b6cfd 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -306,62 +306,74 @@ class TransactionQueue(object): yield run_on_reactor() while True: - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_presence = self.pending_presence_by_dest.pop(destination, {}) - pending_failures = self.pending_failures_by_dest.pop(destination, []) + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_presence = self.pending_presence_by_dest.pop(destination, {}) + pending_failures = self.pending_failures_by_dest.pop(destination, []) - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - ) + limiter = yield get_retry_limiter( + destination, + self.clock, + self.store, + ) - device_message_edus, device_stream_id = ( - yield self._get_new_device_messages(destination) - ) + device_message_edus, device_stream_id, dev_list_id = ( + yield self._get_new_device_messages(destination) + ) - pending_edus.extend(device_message_edus) - if pending_presence: - pending_edus.append( - Edu( - origin=self.server_name, - destination=destination, - edu_type="m.presence", - content={ - "push": [ - format_user_presence_state( - presence, self.clock.time_msec() - ) - for presence in pending_presence.values() - ] - }, - ) + pending_edus.extend(device_message_edus) + if pending_presence: + pending_edus.append( + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.presence", + content={ + "push": [ + format_user_presence_state( + presence, self.clock.time_msec() + ) + for presence in pending_presence.values() + ] + }, ) + ) - if pending_pdus: - logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", - destination, len(pending_pdus)) + if pending_pdus: + logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) - if not pending_pdus and not pending_edus and not pending_failures: - logger.debug("TX [%s] Nothing to send", destination) - self.last_device_stream_id_by_dest[destination] = ( - device_stream_id + if not pending_pdus and not pending_edus and not pending_failures: + logger.debug("TX [%s] Nothing to send", destination) + self.last_device_stream_id_by_dest[destination] = ( + device_stream_id + ) + return + + success = yield self._send_new_transaction( + destination, pending_pdus, pending_edus, pending_failures, + limiter=limiter, + ) + if success: + # Remove the acknowledged device messages from the database + # Only bother if we actually sent some device messages + if device_message_edus: + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) + logger.info("Marking as sent %r %r", destination, dev_list_id) + yield self.store.mark_as_sent_devices_by_remote( + destination, dev_list_id ) - return - success = yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures, - device_stream_id, - includes_device_messages=bool(device_message_edus), - limiter=limiter, - ) - if not success: - break + self.last_device_stream_id_by_dest[destination] = device_stream_id + self.last_device_list_stream_id_by_dest[destination] = dev_list_id + else: + break except NotRetryingDestination: logger.debug( "TX [%s] not ready for retry yet - " @@ -374,8 +386,6 @@ class TransactionQueue(object): @defer.inlineCallbacks def _get_new_device_messages(self, destination): - # TODO: Send appropriate device list messages - last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0) to_device_stream_id = self.store.get_to_device_stream_token() contents, stream_id = yield self.store.get_new_device_msgs_for_remote( @@ -404,13 +414,12 @@ class TransactionQueue(object): ) for content in results ) - defer.returnValue((edus, stream_id)) + defer.returnValue((edus, stream_id, now_stream_id)) @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, - pending_failures, device_stream_id, - includes_device_messages, limiter): + pending_failures, limiter): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -521,14 +530,6 @@ class TransactionQueue(object): "Failed to send event %s to %s", p.event_id, destination ) success = False - else: - # Remove the acknowledged device messages from the database - # Only bother if we actually sent some device messages - if includes_device_messages: - yield self.store.delete_device_msgs_for_remote( - destination, device_stream_id - ) - self.last_device_stream_id_by_dest[destination] = device_stream_id except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d92780b642..ba4c48d590 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -29,6 +29,7 @@ class DeviceHandler(BaseHandler): super(DeviceHandler, self).__init__(hs) self.state = hs.get_state_handler() + self.federation = hs.get_federation_sender() @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index b594f501f9..9628e2ff75 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -141,11 +141,11 @@ class DeviceStore(SQLBaseStore): def get_devices_by_remote(self, destination, from_stream_id): now_stream_id = self._device_list_id_gen.get_current_token() - has_changed = self._device_list_stream_cache.has_entity_changed( + has_changed = self._device_list_federation_stream_cache.has_entity_changed( destination, int(from_stream_id) ) if not has_changed: - defer.returnValue((now_stream_id, [])) + return (now_stream_id, []) return self.runInteraction( "get_devices_by_remote", self._get_devices_by_remote_txn, @@ -165,7 +165,7 @@ class DeviceStore(SQLBaseStore): rows = txn.fetchall() if not rows: - return now_stream_id, [] + return (now_stream_id, []) # maps (user_id, device_id) -> stream_id query_map = {(r[0], r[1]): r[2] for r in rows} @@ -189,7 +189,7 @@ class DeviceStore(SQLBaseStore): result = { "user_id": user_id, "device_id": device_id, - "prev_id": prev_id, + "prev_id": [prev_id] if prev_id else [], "stream_id": stream_id, } @@ -202,9 +202,9 @@ class DeviceStore(SQLBaseStore): if device_display_name: result["device_display_name"] = device_display_name - results.setdefault(user_id, {})[device_id] = result + results.append(result) - return now_stream_id, results + return (now_stream_id, results) def mark_as_sent_devices_by_remote(self, destination, stream_id): return self.runInteraction( @@ -212,19 +212,6 @@ class DeviceStore(SQLBaseStore): destination, stream_id, ) - @defer.inlineCallbacks - def get_user_whose_devices_changed(self, from_key): - from_key = int(from_key) - changed = self._device_list_stream_cache.get_all_entities_changed(from_key) - if changed is not None: - defer.returnValue(set(changed)) - - sql = """ - SELECT user_id FROM device_lists_stream WHERE stream_id > ? - """ - rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) - defer.returnValue(set(row["user_id"] for row in rows)) - def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): sql = """ DELETE FROM device_lists_outbound_pokes @@ -239,7 +226,20 @@ class DeviceStore(SQLBaseStore): UPDATE device_lists_outbound_pokes SET sent = ? WHERE destination = ? AND stream_id <= ? """ - txn.execute(sql, (destination, True,)) + txn.execute(sql, (True, destination, stream_id,)) + + @defer.inlineCallbacks + def get_user_whose_devices_changed(self, from_key): + from_key = int(from_key) + changed = self._device_list_stream_cache.get_all_entities_changed(from_key) + if changed is not None: + defer.returnValue(set(changed)) + + sql = """ + SELECT user_id FROM device_lists_stream WHERE stream_id > ? + """ + rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) + defer.returnValue(set(row["user_id"] for row in rows)) @defer.inlineCallbacks def add_device_change_to_streams(self, user_id, device_id, hosts): -- cgit 1.4.1 From c974116f197d211ba9b42159fe61cfd5957411b5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jan 2017 16:06:54 +0000 Subject: Implement device key caching over federation --- synapse/federation/federation_client.py | 10 + synapse/federation/federation_server.py | 3 + synapse/federation/transport/client.py | 26 +++ synapse/federation/transport/server.py | 8 + synapse/handlers/device.py | 85 +++++++-- synapse/handlers/e2e_keys.py | 40 +++- synapse/storage/devices.py | 201 +++++++++++++++++++-- synapse/storage/end_to_end_keys.py | 4 +- .../schema/delta/40/device_list_streams.sql | 20 +- tests/handlers/test_device.py | 18 +- tests/handlers/test_directory.py | 1 + tests/handlers/test_profile.py | 1 + tests/storage/test_appservice.py | 21 ++- 13 files changed, 381 insertions(+), 57 deletions(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c9175bb33d..b5bcfd705a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -126,6 +126,16 @@ class FederationClient(FederationBase): destination, content, timeout ) + @log_function + def query_user_devices(self, destination, user_id, timeout=30000): + """Query the device keys for a list of user ids hosted on a remote + server. + """ + sent_queries_counter.inc("user_devices") + return self.transport_layer.query_user_devices( + destination, user_id, timeout + ) + @log_function def claim_client_keys(self, destination, content, timeout): """Claims one-time keys for a device hosted on a remote server. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 862ccbef5d..e922b7ff4a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -416,6 +416,9 @@ class FederationServer(FederationBase): def on_query_client_keys(self, origin, content): return self.on_query_request("client_keys", content) + def on_query_user_devices(self, origin, user_id): + return self.on_query_request("user_devices", user_id) + @defer.inlineCallbacks @log_function def on_claim_client_keys(self, origin, content): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 915af34409..f49e8a2cc4 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -346,6 +346,32 @@ class TransportLayerClient(object): ) defer.returnValue(content) + @defer.inlineCallbacks + @log_function + def query_user_devices(self, destination, user_id, timeout): + """Query the devices for a user id hosted on a remote server. + + Response: + { + "stream_id": "...", + "devices": [ { ... } ] + } + + Args: + destination(str): The server to query. + query_content(dict): The user ids to query. + Returns: + A dict containg the device keys. + """ + path = PREFIX + "/user/devices/" + user_id + + content = yield self.client.get_json( + destination=destination, + path=path, + timeout=timeout, + ) + defer.returnValue(content) + @defer.inlineCallbacks @log_function def claim_client_keys(self, destination, query_content, timeout): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 159dbd1747..c840da834c 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -409,6 +409,13 @@ class FederationClientKeysQueryServlet(BaseFederationServlet): return self.handler.on_query_client_keys(origin, content) +class FederationUserDevicesQueryServlet(BaseFederationServlet): + PATH = "/user/devices/(?P[^/]*)" + + def on_GET(self, origin, content, query, user_id): + return self.handler.on_query_user_devices(origin, user_id) + + class FederationClientKeysClaimServlet(BaseFederationServlet): PATH = "/user/keys/claim" @@ -613,6 +620,7 @@ SERVLET_CLASSES = ( FederationGetMissingEventsServlet, FederationEventAuthServlet, FederationClientKeysQueryServlet, + FederationUserDevicesQueryServlet, FederationClientKeysClaimServlet, FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ba4c48d590..2d66b3721a 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -15,6 +15,7 @@ from synapse.api import errors from synapse.util import stringutils +from synapse.util.async import Linearizer from synapse.types import get_domain_from_id from twisted.internet import defer from ._base import BaseHandler @@ -28,8 +29,18 @@ class DeviceHandler(BaseHandler): def __init__(self, hs): super(DeviceHandler, self).__init__(hs) + self.hs = hs self.state = hs.get_state_handler() - self.federation = hs.get_federation_sender() + self.federation_sender = hs.get_federation_sender() + self.federation = hs.get_replication_layer() + self._remote_edue_linearizer = Linearizer(name="remote_device_list") + + self.federation.register_edu_handler( + "m.device_list_update", self._incoming_device_list_update, + ) + self.federation.register_query_handler( + "user_devices", self.on_federation_query_user_devices, + ) @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, @@ -55,7 +66,7 @@ class DeviceHandler(BaseHandler): initial_device_display_name=initial_device_display_name, ) if new_device: - yield self.notify_device_update(user_id, device_id) + yield self.notify_device_update(user_id, [device_id]) defer.returnValue(device_id) # if the device id is not specified, we'll autogen one, but loop a few @@ -69,7 +80,7 @@ class DeviceHandler(BaseHandler): initial_device_display_name=initial_device_display_name, ) if new_device: - yield self.notify_device_update(user_id, device_id) + yield self.notify_device_update(user_id, [device_id]) defer.returnValue(device_id) attempts += 1 @@ -151,7 +162,7 @@ class DeviceHandler(BaseHandler): user_id=user_id, device_id=device_id ) - yield self.notify_device_update(user_id, device_id) + yield self.notify_device_update(user_id, [device_id]) @defer.inlineCallbacks def update_device(self, user_id, device_id, content): @@ -172,7 +183,7 @@ class DeviceHandler(BaseHandler): device_id, new_display_name=content.get("display_name") ) - yield self.notify_device_update(user_id, device_id) + yield self.notify_device_update(user_id, [device_id]) except errors.StoreError, e: if e.code == 404: raise errors.NotFoundError() @@ -180,26 +191,28 @@ class DeviceHandler(BaseHandler): raise @defer.inlineCallbacks - def notify_device_update(self, user_id, device_id): + def notify_device_update(self, user_id, device_ids): rooms = yield self.store.get_rooms_for_user(user_id) room_ids = [r.room_id for r in rooms] hosts = set() - for room_id in room_ids: - users = yield self.state.get_current_user_in_room(room_id) - hosts.update(get_domain_from_id(u) for u in users) - hosts.discard(self.server_name) + if self.hs.is_mine_id(user_id): + for room_id in room_ids: + users = yield self.state.get_current_user_in_room(room_id) + hosts.update(get_domain_from_id(u) for u in users) + hosts.discard(self.server_name) position = yield self.store.add_device_change_to_streams( - user_id, device_id, list(hosts) + user_id, device_ids, list(hosts) ) yield self.notifier.on_new_event( "device_list_key", position, rooms=room_ids, ) + logger.info("Sending device list update notif to: %r", hosts) for host in hosts: - self.federation.send_device_messages(host) + self.federation_sender.send_device_messages(host) @defer.inlineCallbacks def get_device_list_changes(self, user_id, room_ids, from_key): @@ -214,6 +227,54 @@ class DeviceHandler(BaseHandler): defer.returnValue(user_ids_changed) + @defer.inlineCallbacks + def _incoming_device_list_update(self, origin, edu_content): + user_id = edu_content["user_id"] + device_id = edu_content["device_id"] + stream_id = edu_content["stream_id"] + prev_ids = edu_content.get("prev_id", []) + + if get_domain_from_id(user_id) != origin: + # TODO: Raise? + return + + logger.info("Got edu: %r", edu_content) + + with (yield self._remote_edue_linearizer.queue(user_id)): + resync = True + if len(prev_ids) == 1: + extremity = yield self.store.get_device_list_remote_extremity(user_id) + logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids) + if str(extremity) == str(prev_ids[0]): + resync = False + + if resync: + result = yield self.federation.query_user_devices(origin, user_id) + stream_id = result["stream_id"] + devices = result["devices"] + yield self.store.update_remote_device_list_cache( + user_id, devices, stream_id, + ) + device_ids = [device["device_id"] for device in devices] + yield self.notify_device_update(user_id, device_ids) + else: + content = dict(edu_content) + for key in ("user_id", "device_id", "stream_id", "prev_ids"): + content.pop(key, None) + yield self.store.update_remote_device_list_cache_entry( + user_id, device_id, content, stream_id, + ) + yield self.notify_device_update(user_id, [device_id]) + + @defer.inlineCallbacks + def on_federation_query_user_devices(self, user_id): + stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) + defer.returnValue({ + "user_id": user_id, + "stream_id": stream_id, + "devices": devices, + }) + def _update_device_from_client_ips(device, client_ips): ip = client_ips.get((device["user_id"], device["device_id"]), {}) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 38c2a2d39e..832998a6d3 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -73,8 +73,7 @@ class E2eKeysHandler(object): if self.is_mine_id(user_id): local_query[user_id] = device_ids else: - domain = get_domain_from_id(user_id) - remote_queries.setdefault(domain, {})[user_id] = device_ids + remote_queries[user_id] = device_ids # do the queries failures = {} @@ -85,9 +84,40 @@ class E2eKeysHandler(object): if user_id in local_query: results[user_id] = keys + remote_queries_not_in_cache = {} + if remote_queries: + query_list = [] + for user_id, device_ids in remote_queries.iteritems(): + if device_ids: + query_list.extend((user_id, device_id) for device_id in device_ids) + else: + query_list.append((user_id, None)) + + user_ids_not_in_cache, remote_results = ( + yield self.store.get_user_devices_from_cache( + query_list + ) + ) + for user_id, devices in remote_results.iteritems(): + user_devices = results.setdefault(user_id, {}) + for device_id, device in devices.iteritems(): + keys = device.get("keys", None) + device_display_name = device.get("device_display_name", None) + if keys: + result = dict(keys) + unsigned = result.setdefault("unsigned", {}) + if device_display_name: + unsigned["device_display_name"] = device_display_name + user_devices[device_id] = result + + for user_id in user_ids_not_in_cache: + domain = get_domain_from_id(user_id) + r = remote_queries_not_in_cache.setdefault(domain, {}) + r[user_id] = remote_queries[user_id] + @defer.inlineCallbacks def do_remote_query(destination): - destination_query = remote_queries[destination] + destination_query = remote_queries_not_in_cache[destination] try: limiter = yield get_retry_limiter( destination, self.clock, self.store @@ -119,7 +149,7 @@ class E2eKeysHandler(object): yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(do_remote_query)(destination) - for destination in remote_queries + for destination in remote_queries_not_in_cache ])) defer.returnValue({ @@ -259,7 +289,7 @@ class E2eKeysHandler(object): user_id, device_id, time_now, encode_canonical_json(device_keys) ) - yield self.device_handler.notify_device_update(user_id, device_id) + yield self.device_handler.notify_device_update(user_id, [device_id]) one_time_keys = keys.get("one_time_keys", None) if one_time_keys: diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 9628e2ff75..8ee3119db2 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -138,6 +138,89 @@ class DeviceStore(SQLBaseStore): defer.returnValue({d["device_id"]: d for d in devices}) + def get_device_list_remote_extremity(self, user_id): + return self._simple_select_one_onecol( + table="device_lists_remote_extremeties", + keyvalues={"user_id": user_id}, + retcol="stream_id", + desc="get_device_list_remote_extremity", + allow_none=True, + ) + + def update_remote_device_list_cache_entry(self, user_id, device_id, content, + stream_id): + return self.runInteraction( + "update_remote_device_list_cache_entry", + self._update_remote_device_list_cache_entry_txn, + user_id, device_id, content, stream_id, + ) + + def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id, + content, stream_id): + self._simple_upsert_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + values={ + "content": json.dumps(content), + } + ) + + self._simple_upsert_txn( + txn, + table="device_lists_remote_extremeties", + keyvalues={ + "user_id": user_id, + }, + values={ + "stream_id": stream_id, + } + ) + + def update_remote_device_list_cache(self, user_id, devices, stream_id): + return self.runInteraction( + "update_remote_device_list_cache", + self._update_remote_device_list_cache_txn, + user_id, devices, stream_id, + ) + + def _update_remote_device_list_cache_txn(self, txn, user_id, devices, + stream_id): + self._simple_delete_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + }, + ) + + self._simple_insert_many_txn( + txn, + table="device_lists_remote_cache", + values=[ + { + "user_id": user_id, + "device_id": content["device_id"], + "content": json.dumps(content), + } + for content in devices + ] + ) + + self._simple_upsert_txn( + txn, + table="device_lists_remote_extremeties", + keyvalues={ + "user_id": user_id, + }, + values={ + "stream_id": stream_id, + } + ) + def get_devices_by_remote(self, destination, from_stream_id): now_stream_id = self._device_list_id_gen.get_current_token() @@ -184,7 +267,7 @@ class DeviceStore(SQLBaseStore): txn.execute(prev_sent_id_sql, (destination, user_id, True)) rows = txn.fetchall() prev_id = rows[0][0] - for device_id, result in user_devices.iteritems(): + for device_id, device in user_devices.iteritems(): stream_id = query_map[(user_id, device_id)] result = { "user_id": user_id, @@ -195,10 +278,10 @@ class DeviceStore(SQLBaseStore): prev_id = stream_id - key_json = result.get("key_json", None) + key_json = device.get("key_json", None) if key_json: result["keys"] = json.loads(key_json) - device_display_name = result.get("device_display_name", None) + device_display_name = device.get("device_display_name", None) if device_display_name: result["device_display_name"] = device_display_name @@ -206,6 +289,96 @@ class DeviceStore(SQLBaseStore): return (now_stream_id, results) + def get_user_devices_from_cache(self, query_list): + return self.runInteraction( + "get_user_devices_from_cache", self._get_user_devices_from_cache_txn, + query_list, + ) + + def _get_user_devices_from_cache_txn(self, txn, query_list): + user_ids = {user_id for user_id, _ in query_list} + + user_ids_in_cache = set() + for user_id in user_ids: + stream_ids = self._simple_select_onecol_txn( + txn, + table="device_lists_remote_extremeties", + keyvalues={ + "user_id": user_id, + }, + retcol="stream_id", + ) + if stream_ids: + user_ids_in_cache.add(user_id) + + user_ids_not_in_cache = user_ids - user_ids_in_cache + + results = {} + for user_id, device_id in query_list: + if user_id not in user_ids_in_cache: + continue + + if device_id: + content = self._simple_select_one_onecol_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + retcol="content", + ) + results.setdefault(user_id, {})[device_id] = json.loads(content) + else: + devices = self._simple_select_list_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + }, + retcols=("device_id", "content"), + ) + results[user_id] = { + device["device_id"]: json.loads(device["content"]) + for device in devices + } + user_ids_in_cache.discard(user_id) + + return user_ids_not_in_cache, results + + def get_devices_with_keys_by_user(self, user_id): + return self.runInteraction( + "get_devices_with_keys_by_user", + self._get_devices_with_keys_by_user_txn, user_id, + ) + + def _get_devices_with_keys_by_user_txn(self, txn, user_id): + now_stream_id = self._device_list_id_gen.get_current_token() + + devices = self._get_e2e_device_keys_txn( + txn, [(user_id, None)], include_all_devices=True + ) + + for user_id, user_devices in devices.iteritems(): + results = [] + for device_id, device in user_devices.iteritems(): + result = { + "device_id": device_id, + } + + key_json = device.get("key_json", None) + if key_json: + result["keys"] = json.loads(key_json) + device_display_name = device.get("device_display_name", None) + if device_display_name: + result["device_display_name"] = device_display_name + + results.append(result) + + return now_stream_id, results + + return now_stream_id, [] + def mark_as_sent_devices_by_remote(self, destination, stream_id): return self.runInteraction( "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn, @@ -242,17 +415,17 @@ class DeviceStore(SQLBaseStore): defer.returnValue(set(row["user_id"] for row in rows)) @defer.inlineCallbacks - def add_device_change_to_streams(self, user_id, device_id, hosts): + def add_device_change_to_streams(self, user_id, device_ids, hosts): # device_lists_stream # device_lists_outbound_pokes with self._device_list_id_gen.get_next() as stream_id: yield self.runInteraction( "add_device_change_to_streams", self._add_device_change_txn, - user_id, device_id, hosts, stream_id, + user_id, device_ids, hosts, stream_id, ) defer.returnValue(stream_id) - def _add_device_change_txn(self, txn, user_id, device_id, hosts, stream_id): + def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id): txn.call_after( self._device_list_stream_cache.entity_has_changed, user_id, stream_id, @@ -263,14 +436,17 @@ class DeviceStore(SQLBaseStore): host, stream_id, ) - self._simple_insert_txn( + self._simple_insert_many_txn( txn, table="device_lists_stream", - values={ - "stream_id": stream_id, - "user_id": user_id, - "device_id": device_id, - } + values=[ + { + "stream_id": stream_id, + "user_id": user_id, + "device_id": device_id, + } + for device_id in device_ids + ] ) self._simple_insert_many_txn( @@ -285,6 +461,7 @@ class DeviceStore(SQLBaseStore): "sent": False, } for destination in hosts + for device_id in device_ids ] ) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index f82943a7a8..a915c790ff 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -52,11 +52,11 @@ class EndToEndKeyStore(SQLBaseStore): query_params = [] for (user_id, device_id) in query_list: - query_clause = "k.user_id = ?" + query_clause = "user_id = ?" query_params.append(user_id) if device_id: - query_clause += " AND k.device_id = ?" + query_clause += " AND device_id = ?" query_params.append(device_id) query_clauses.append(query_clause) diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql index 61cac63bbb..d1051c6ddf 100644 --- a/synapse/storage/schema/delta/40/device_list_streams.sql +++ b/synapse/storage/schema/delta/40/device_list_streams.sql @@ -13,18 +13,6 @@ * limitations under the License. */ -CREATE TABLE device_list_streams_remote ( - list_id TEXT NOT NULL, - origin TEXT NOT NULL, - user_id TEXT NOT NULL, - is_full BOOLEAN NOT NULL, - ts BIGINT NOT NULL -); - -CREATE INDEX device_list_streams_remote_id_origin ON device_list_streams_remote( - origin, list_id, user_id -); - CREATE TABLE device_lists_remote_cache ( user_id TEXT NOT NULL, @@ -35,6 +23,14 @@ CREATE TABLE device_lists_remote_cache ( CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id); +CREATE TABLE device_lists_remote_extremeties ( + user_id TEXT NOT NULL, + stream_id TEXT NOT NULL +); + +CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id); + + CREATE TABLE device_lists_stream ( stream_id BIGINT NOT NULL, user_id TEXT NOT NULL, diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 85a970a6c9..2eaaa8253c 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -35,51 +35,51 @@ class DeviceTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - hs = yield utils.setup_test_homeserver(handlers=None) - self.handler = synapse.handlers.device.DeviceHandler(hs) + hs = yield utils.setup_test_homeserver() + self.handler = hs.get_device_handler() self.store = hs.get_datastore() self.clock = hs.get_clock() @defer.inlineCallbacks def test_device_is_created_if_doesnt_exist(self): res = yield self.handler.check_device_registered( - user_id="boris", + user_id="@boris:foo", device_id="fco", initial_device_display_name="display name" ) self.assertEqual(res, "fco") - dev = yield self.handler.store.get_device("boris", "fco") + dev = yield self.handler.store.get_device("@boris:foo", "fco") self.assertEqual(dev["display_name"], "display name") @defer.inlineCallbacks def test_device_is_preserved_if_exists(self): res1 = yield self.handler.check_device_registered( - user_id="boris", + user_id="@boris:foo", device_id="fco", initial_device_display_name="display name" ) self.assertEqual(res1, "fco") res2 = yield self.handler.check_device_registered( - user_id="boris", + user_id="@boris:foo", device_id="fco", initial_device_display_name="new display name" ) self.assertEqual(res2, "fco") - dev = yield self.handler.store.get_device("boris", "fco") + dev = yield self.handler.store.get_device("@boris:foo", "fco") self.assertEqual(dev["display_name"], "display name") @defer.inlineCallbacks def test_device_id_is_made_up_if_unspecified(self): device_id = yield self.handler.check_device_registered( - user_id="theresa", + user_id="@theresa:foo", device_id=None, initial_device_display_name="display" ) - dev = yield self.handler.store.get_device("theresa", device_id) + dev = yield self.handler.store.get_device("@theresa:foo", device_id) self.assertEqual(dev["display_name"], "display") @defer.inlineCallbacks diff --git a/tests/handlers/test_directory.py b/tests/handlers/test_directory.py index 5d602c1531..ceb9aa5765 100644 --- a/tests/handlers/test_directory.py +++ b/tests/handlers/test_directory.py @@ -37,6 +37,7 @@ class DirectoryTestCase(unittest.TestCase): def setUp(self): self.mock_federation = Mock(spec=[ "make_query", + "register_edu_handler", ]) self.query_handlers = {} diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index f1f664275f..979cebf600 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -39,6 +39,7 @@ class ProfileTestCase(unittest.TestCase): def setUp(self): self.mock_federation = Mock(spec=[ "make_query", + "register_edu_handler", ]) self.query_handlers = {} diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 9ff1abcd80..9e98d0e330 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -39,7 +39,11 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): event_cache_size=1, password_providers=[], ) - hs = yield setup_test_homeserver(config=config, federation_sender=Mock()) + hs = yield setup_test_homeserver( + config=config, + federation_sender=Mock(), + replication_layer=Mock(), + ) self.as_token = "token1" self.as_url = "some_url" @@ -112,7 +116,11 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): event_cache_size=1, password_providers=[], ) - hs = yield setup_test_homeserver(config=config, federation_sender=Mock()) + hs = yield setup_test_homeserver( + config=config, + federation_sender=Mock(), + replication_layer=Mock(), + ) self.db_pool = hs.get_db_pool() self.as_list = [ @@ -446,7 +454,8 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): hs = yield setup_test_homeserver( config=config, datastore=Mock(), - federation_sender=Mock() + federation_sender=Mock(), + replication_layer=Mock(), ) ApplicationServiceStore(hs) @@ -463,7 +472,8 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): hs = yield setup_test_homeserver( config=config, datastore=Mock(), - federation_sender=Mock() + federation_sender=Mock(), + replication_layer=Mock(), ) with self.assertRaises(ConfigError) as cm: @@ -486,7 +496,8 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): hs = yield setup_test_homeserver( config=config, datastore=Mock(), - federation_sender=Mock() + federation_sender=Mock(), + replication_layer=Mock(), ) with self.assertRaises(ConfigError) as cm: -- cgit 1.4.1 From fbfad76c03afe7538c67205ceb30825d9ce4fb07 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jan 2017 16:30:37 +0000 Subject: Add comments --- synapse/handlers/device.py | 19 +++++++++-- synapse/handlers/e2e_keys.py | 4 ++- synapse/storage/devices.py | 37 ++++++++++++++++++++-- .../schema/delta/40/device_list_streams.sql | 8 ++++- 4 files changed, 61 insertions(+), 7 deletions(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 2d66b3721a..a2ffd273bf 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -192,6 +192,9 @@ class DeviceHandler(BaseHandler): @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): + """Notify that a user's device(s) has changed. Pokes the notifier, and + remote servers if the user is local. + """ rooms = yield self.store.get_rooms_for_user(user_id) room_ids = [r.room_id for r in rooms] @@ -210,12 +213,16 @@ class DeviceHandler(BaseHandler): "device_list_key", position, rooms=room_ids, ) - logger.info("Sending device list update notif to: %r", hosts) - for host in hosts: - self.federation_sender.send_device_messages(host) + if hosts: + logger.info("Sending device list update notif to: %r", hosts) + for host in hosts: + self.federation_sender.send_device_messages(host) @defer.inlineCallbacks def get_device_list_changes(self, user_id, room_ids, from_key): + """For a user and their joined rooms, calculate which device updates + we need to return. + """ room_ids = frozenset(room_ids) user_ids_changed = set() @@ -236,11 +243,14 @@ class DeviceHandler(BaseHandler): if get_domain_from_id(user_id) != origin: # TODO: Raise? + logger.warning("Got device list update edu for %r from %r", user_id, origin) return logger.info("Got edu: %r", edu_content) with (yield self._remote_edue_linearizer.queue(user_id)): + # If the prev id matches whats in our cache table, then we don't need + # to resync the users device list, otherwise we do. resync = True if len(prev_ids) == 1: extremity = yield self.store.get_device_list_remote_extremity(user_id) @@ -249,6 +259,7 @@ class DeviceHandler(BaseHandler): resync = False if resync: + # Fetch all devices for the user. result = yield self.federation.query_user_devices(origin, user_id) stream_id = result["stream_id"] devices = result["devices"] @@ -258,6 +269,8 @@ class DeviceHandler(BaseHandler): device_ids = [device["device_id"] for device in devices] yield self.notify_device_update(user_id, device_ids) else: + # Simply update the single device, since we know that is the only + # change (becuase of the single prev_id matching the current cache) content = dict(edu_content) for key in ("user_id", "device_id", "stream_id", "prev_ids"): content.pop(key, None) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 832998a6d3..a16b9def8d 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -75,7 +75,7 @@ class E2eKeysHandler(object): else: remote_queries[user_id] = device_ids - # do the queries + # Firt get local devices. failures = {} results = {} if local_query: @@ -84,6 +84,7 @@ class E2eKeysHandler(object): if user_id in local_query: results[user_id] = keys + # Now attempt to get any remote devices from our local cache. remote_queries_not_in_cache = {} if remote_queries: query_list = [] @@ -115,6 +116,7 @@ class E2eKeysHandler(object): r = remote_queries_not_in_cache.setdefault(domain, {}) r[user_id] = remote_queries[user_id] + # Now fetch any devices that we don't have in our cache @defer.inlineCallbacks def do_remote_query(destination): destination_query = remote_queries_not_in_cache[destination] diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 8ee3119db2..cf38dbaa3c 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -139,6 +139,9 @@ class DeviceStore(SQLBaseStore): defer.returnValue({d["device_id"]: d for d in devices}) def get_device_list_remote_extremity(self, user_id): + """Get the last stream_id we got for a user. May be None if we haven't + got any information for them. + """ return self._simple_select_one_onecol( table="device_lists_remote_extremeties", keyvalues={"user_id": user_id}, @@ -149,6 +152,8 @@ class DeviceStore(SQLBaseStore): def update_remote_device_list_cache_entry(self, user_id, device_id, content, stream_id): + """Updates a single user's device in the cache. + """ return self.runInteraction( "update_remote_device_list_cache_entry", self._update_remote_device_list_cache_entry_txn, @@ -181,6 +186,8 @@ class DeviceStore(SQLBaseStore): ) def update_remote_device_list_cache(self, user_id, devices, stream_id): + """Replace the cache of the remote user's devices. + """ return self.runInteraction( "update_remote_device_list_cache", self._update_remote_device_list_cache_txn, @@ -222,6 +229,11 @@ class DeviceStore(SQLBaseStore): ) def get_devices_by_remote(self, destination, from_stream_id): + """Get stream of updates to send to remote servers + + Returns: + (now_stream_id, [ { updates }, .. ]) + """ now_stream_id = self._device_list_id_gen.get_current_token() has_changed = self._device_list_federation_stream_cache.has_entity_changed( @@ -290,6 +302,17 @@ class DeviceStore(SQLBaseStore): return (now_stream_id, results) def get_user_devices_from_cache(self, query_list): + """Get the devices (and keys if any) for remote users from the cache. + + Args: + query_list(list): List of (user_id, device_ids), if device_ids is + falsey then return all device ids for that user. + + Returns: + (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is + a set of user_ids and results_map is a mapping of + user_id -> device_id -> device_info + """ return self.runInteraction( "get_user_devices_from_cache", self._get_user_devices_from_cache_txn, query_list, @@ -347,6 +370,11 @@ class DeviceStore(SQLBaseStore): return user_ids_not_in_cache, results def get_devices_with_keys_by_user(self, user_id): + """Get all devices (with any device keys) for a user + + Returns: + (stream_id, devices) + """ return self.runInteraction( "get_devices_with_keys_by_user", self._get_devices_with_keys_by_user_txn, user_id, @@ -380,6 +408,8 @@ class DeviceStore(SQLBaseStore): return now_stream_id, [] def mark_as_sent_devices_by_remote(self, destination, stream_id): + """Mark that updates have successfully been sent to the destination. + """ return self.runInteraction( "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn, destination, stream_id, @@ -403,6 +433,8 @@ class DeviceStore(SQLBaseStore): @defer.inlineCallbacks def get_user_whose_devices_changed(self, from_key): + """Get set of users whose devices have changed since `from_key`. + """ from_key = int(from_key) changed = self._device_list_stream_cache.get_all_entities_changed(from_key) if changed is not None: @@ -416,8 +448,9 @@ class DeviceStore(SQLBaseStore): @defer.inlineCallbacks def add_device_change_to_streams(self, user_id, device_ids, hosts): - # device_lists_stream - # device_lists_outbound_pokes + """Persist that a user's devices have been updated, and which hosts + (if any) should be poked. + """ with self._device_list_id_gen.get_next() as stream_id: yield self.runInteraction( "add_device_change_to_streams", self._add_device_change_txn, diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql index d1051c6ddf..8348c143c3 100644 --- a/synapse/storage/schema/delta/40/device_list_streams.sql +++ b/synapse/storage/schema/delta/40/device_list_streams.sql @@ -13,7 +13,7 @@ * limitations under the License. */ - +-- Cache of remote devices. CREATE TABLE device_lists_remote_cache ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, @@ -23,6 +23,8 @@ CREATE TABLE device_lists_remote_cache ( CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id); +-- The last update we got for a user. Empty if we're not receiving updates for +-- that user. CREATE TABLE device_lists_remote_extremeties ( user_id TEXT NOT NULL, stream_id TEXT NOT NULL @@ -31,6 +33,7 @@ CREATE TABLE device_lists_remote_extremeties ( CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id); +-- Stream of device lists updates. Includes both local and remotes CREATE TABLE device_lists_stream ( stream_id BIGINT NOT NULL, user_id TEXT NOT NULL, @@ -40,6 +43,9 @@ CREATE TABLE device_lists_stream ( CREATE INDEX device_lists_stream_id ON device_lists_stream(stream_id, user_id); +-- The stream of updates to send to other servers. We keep at least one row +-- per user that was sent so that the prev_id for any new updates can be +-- calculated CREATE TABLE device_lists_outbound_pokes ( destination TEXT NOT NULL, stream_id BIGINT NOT NULL, -- cgit 1.4.1 From 76d40f490411ce1a0a208acb4242678b0cb4afb3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jan 2017 16:39:33 +0000 Subject: Handle users leaving rooms --- synapse/handlers/device.py | 17 ++++++++++++++++- synapse/storage/devices.py | 8 ++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index a2ffd273bf..1116dfd27c 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -42,6 +42,8 @@ class DeviceHandler(BaseHandler): "user_devices", self.on_federation_query_user_devices, ) + hs.get_distributor().observe("user_left_room", self.user_left_room) + @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, initial_device_display_name=None): @@ -246,7 +248,11 @@ class DeviceHandler(BaseHandler): logger.warning("Got device list update edu for %r from %r", user_id, origin) return - logger.info("Got edu: %r", edu_content) + rooms = yield self.store.get_rooms_for_user(user_id) + if not rooms: + # We don't share any rooms with this user. Ignore update, as we + # probably won't get any further updates. + return with (yield self._remote_edue_linearizer.queue(user_id)): # If the prev id matches whats in our cache table, then we don't need @@ -288,6 +294,15 @@ class DeviceHandler(BaseHandler): "devices": devices, }) + @defer.inlineCallbacks + def user_left_room(self, user, room_id): + user_id = user.to_string() + rooms = yield self.store.get_rooms_for_user(user_id) + if not rooms: + # We no longer share rooms with this user, so we'll no longer + # receive device updates. Mark this in DB. + yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id) + def _update_device_from_client_ips(device, client_ips): ip = client_ips.get((device["user_id"], device["device_id"]), {}) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index cf38dbaa3c..1c48c3af99 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -150,6 +150,14 @@ class DeviceStore(SQLBaseStore): allow_none=True, ) + def mark_remote_user_device_list_as_unsubscribed(self, user_id): + return self._simple_delete( + table="device_lists_remote_extremeties", + keyvalues={ + "user_id": user_id, + }, + ) + def update_remote_device_list_cache_entry(self, user_id, device_id, content, stream_id): """Updates a single user's device in the cache. -- cgit 1.4.1 From 888c59c955b33c3c69a73766507e134d64a8f25b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jan 2017 10:29:47 +0000 Subject: Better name --- synapse/handlers/device.py | 4 +++- synapse/storage/devices.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 1116dfd27c..ed077c9a76 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -259,7 +259,9 @@ class DeviceHandler(BaseHandler): # to resync the users device list, otherwise we do. resync = True if len(prev_ids) == 1: - extremity = yield self.store.get_device_list_remote_extremity(user_id) + extremity = yield self.store.get_device_list_last_stream_id_for_remote( + user_id + ) logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids) if str(extremity) == str(prev_ids[0]): resync = False diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index b99de2f1b0..d46203dd35 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -137,7 +137,7 @@ class DeviceStore(SQLBaseStore): defer.returnValue({d["device_id"]: d for d in devices}) - def get_device_list_remote_extremity(self, user_id): + def get_device_list_last_stream_id_for_remote(self, user_id): """Get the last stream_id we got for a user. May be None if we haven't got any information for them. """ -- cgit 1.4.1 From 252b503fc8626078141dc6b82eeff63607874347 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jan 2017 13:36:39 +0000 Subject: Hook device list updates to replication --- synapse/app/federation_sender.py | 3 +- synapse/app/synchrotron.py | 27 ++++++++++- synapse/handlers/device.py | 16 ------- synapse/handlers/sync.py | 35 ++++++++++---- synapse/replication/resource.py | 20 +++++++- synapse/replication/slave/storage/devices.py | 72 ++++++++++++++++++++++++++++ synapse/storage/devices.py | 15 ++++++ 7 files changed, 159 insertions(+), 29 deletions(-) create mode 100644 synapse/replication/slave/storage/devices.py (limited to 'synapse/handlers/device.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index ec06620efb..411e47d98d 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -30,6 +30,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.util.async import sleep @@ -56,7 +57,7 @@ logger = logging.getLogger("synapse.app.appservice") class FederationSenderSlaveStore( SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, - SlavedRegistrationStore, + SlavedRegistrationStore, SlavedDeviceStore, ): pass diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 4dfc2dc648..9d250502e0 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -39,6 +39,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.room import RoomStore from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore @@ -77,6 +78,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, SlavedDeviceInboxStore, + SlavedDeviceStore, RoomStore, BaseSlavedStore, ClientIpStore, # After BaseSlavedStore because the constructor is different @@ -380,6 +382,28 @@ class SynchrotronServer(HomeServer): stream_key, position, users=users, rooms=rooms ) + @defer.inlineCallbacks + def notify_device_list_update(result): + stream = result.get("device_lists") + if not stream: + return + + position_index = stream["field_names"].index("position") + user_index = stream["field_names"].index("user_id") + + for row in stream["rows"]: + logger.info("Handling device list row: %r", row) + position = row[position_index] + user_id = row[user_index] + + rooms = yield store.get_rooms_for_user(user_id) + room_ids = [r.room_id for r in rooms] + + notifier.on_new_event( + "device_list_key", position, rooms=room_ids, + ) + + @defer.inlineCallbacks def notify(result): stream = result.get("events") if stream: @@ -417,6 +441,7 @@ class SynchrotronServer(HomeServer): notify_from_stream( result, "to_device", "to_device_key", user="user_id" ) + yield notify_device_list_update(result) while True: try: @@ -427,7 +452,7 @@ class SynchrotronServer(HomeServer): yield store.process_replication(result) typing_handler.process_replication(result) yield presence_handler.process_replication(result) - notify(result) + yield notify(result) except: logger.exception("Error replicating from %r", replication_url) yield sleep(5) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ed077c9a76..6fefb85890 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -220,22 +220,6 @@ class DeviceHandler(BaseHandler): for host in hosts: self.federation_sender.send_device_messages(host) - @defer.inlineCallbacks - def get_device_list_changes(self, user_id, room_ids, from_key): - """For a user and their joined rooms, calculate which device updates - we need to return. - """ - room_ids = frozenset(room_ids) - - user_ids_changed = set() - changed = yield self.store.get_user_whose_devices_changed(from_key) - for other_user_id in changed: - other_rooms = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(e.room_id for e in other_rooms): - user_ids_changed.add(other_user_id) - - defer.returnValue(user_ids_changed) - @defer.inlineCallbacks def _incoming_device_list_update(self, origin, edu_content): user_id = edu_content["user_id"] diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 06bf626367..9199f20817 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -144,7 +144,6 @@ class SyncHandler(object): self.clock = hs.get_clock() self.response_cache = ResponseCache(hs) self.state = hs.get_state_handler() - self.device_handler = hs.get_device_handler() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): @@ -546,15 +545,9 @@ class SyncHandler(object): yield self._generate_sync_entry_for_to_device(sync_result_builder) - if since_token and since_token.device_list_key: - user_id = sync_config.user.to_string() - rooms = yield self.store.get_rooms_for_user(user_id) - joined_room_ids = set(r.room_id for r in rooms) - device_lists = yield self.device_handler.get_device_list_changes( - user_id, joined_room_ids, since_token.device_list_key - ) - else: - device_lists = [] + device_lists = yield self._generate_sync_entry_for_device_list( + sync_result_builder + ) defer.returnValue(SyncResult( presence=sync_result_builder.presence, @@ -567,6 +560,28 @@ class SyncHandler(object): next_batch=sync_result_builder.now_token, )) + @defer.inlineCallbacks + def _generate_sync_entry_for_device_list(self, sync_result_builder): + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + + if since_token and since_token.device_list_key: + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = set(r.room_id for r in rooms) + + user_ids_changed = set() + changed = yield self.store.get_user_whose_devices_changed( + since_token.device_list_key + ) + for other_user_id in changed: + other_rooms = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(e.room_id for e in other_rooms): + user_ids_changed.add(other_user_id) + + defer.returnValue(user_ids_changed) + else: + defer.returnValue([]) + @defer.inlineCallbacks def _generate_sync_entry_for_to_device(self, sync_result_builder): """Generates the portion of the sync response. Populates diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 4616e9b34a..36548c5eda 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -46,6 +46,7 @@ STREAM_NAMES = ( ("to_device",), ("public_rooms",), ("federation",), + ("device_lists",), ) @@ -140,6 +141,7 @@ class ReplicationResource(Resource): caches_token = self.store.get_cache_stream_token() public_rooms_token = self.store.get_current_public_room_stream_id() federation_token = self.federation_sender.get_current_token() + device_list_token = self.store.get_device_stream_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -155,6 +157,7 @@ class ReplicationResource(Resource): int(stream_token.to_device_key), int(public_rooms_token), int(federation_token), + int(device_list_token), )) @request_handler() @@ -214,6 +217,7 @@ class ReplicationResource(Resource): yield self.caches(writer, current_token, limit, request_streams) yield self.to_device(writer, current_token, limit, request_streams) yield self.public_rooms(writer, current_token, limit, request_streams) + yield self.device_lists(writer, current_token, limit, request_streams) self.federation(writer, current_token, limit, request_streams, federation_ack) self.streams(writer, current_token, request_streams) @@ -495,6 +499,20 @@ class ReplicationResource(Resource): "position", "type", "content", ), position=upto_token) + @defer.inlineCallbacks + def device_lists(self, writer, current_token, limit, request_streams): + current_position = current_token.device_lists + + device_lists = request_streams.get("device_lists") + + if device_lists is not None and device_lists != current_position: + changes = yield self.store.get_users_and_hosts_device_list_changes( + device_lists, + ) + writer.write_header_and_rows("device_lists", changes, ( + "position", "user_id", "destination", + ), position=current_position) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -527,7 +545,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", "push_rules", "pushers", "state", "caches", "to_device", "public_rooms", - "federation", + "federation", "device_lists", ))): __slots__ = [] diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py new file mode 100644 index 0000000000..ca46aa17b6 --- /dev/null +++ b/synapse/replication/slave/storage/devices.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class SlavedDeviceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedDeviceStore, self).__init__(db_conn, hs) + + self.hs = hs + + self._device_list_id_gen = SlavedIdTracker( + db_conn, "device_lists_stream", "stream_id", + ) + device_list_max = self._device_list_id_gen.get_current_token() + self._device_list_stream_cache = StreamChangeCache( + "DeviceListStreamChangeCache", device_list_max, + ) + self._device_list_federation_stream_cache = StreamChangeCache( + "DeviceListFederationStreamChangeCache", device_list_max, + ) + + get_device_stream_token = DataStore.get_device_stream_token.__func__ + get_user_whose_devices_changed = DataStore.get_user_whose_devices_changed.__func__ + get_devices_by_remote = DataStore.get_devices_by_remote.__func__ + _get_devices_by_remote_txn = DataStore._get_devices_by_remote_txn.__func__ + _get_e2e_device_keys_txn = DataStore._get_e2e_device_keys_txn.__func__ + mark_as_sent_devices_by_remote = DataStore.mark_as_sent_devices_by_remote.__func__ + _mark_as_sent_devices_by_remote_txn = ( + DataStore._mark_as_sent_devices_by_remote_txn.__func__ + ) + + def stream_positions(self): + result = super(SlavedDeviceStore, self).stream_positions() + result["device_lists"] = self._device_list_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("device_lists") + if stream: + self._device_list_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + stream_id = row[0] + user_id = row[1] + destination = row[2] + + self._device_list_stream_cache.entity_has_changed( + user_id, stream_id + ) + + if destination: + self._device_list_federation_stream_cache.entity_has_changed( + destination, stream_id + ) + + return super(SlavedDeviceStore, self).process_replication(result) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 00317b0c1f..2b2cebacfa 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -458,6 +458,21 @@ class DeviceStore(SQLBaseStore): rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) defer.returnValue(set(row["user_id"] for row in rows)) + def get_users_and_hosts_device_list_changes(self, from_key): + """Return a list of `(stream_id, user_id, destination)` which is the + combined list of changes to devices, and which destinations need to be + poked. `destination` may be None if no destinations need to be poked. + """ + sql = """ + SELECT stream_id, user_id, destination FROM device_lists_stream + LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) + WHERE stream_id > ? + """ + return self._execute( + "get_users_and_hosts_device_list", None, + sql, from_key, + ) + @defer.inlineCallbacks def add_device_change_to_streams(self, user_id, device_ids, hosts): """Persist that a user's devices have been updated, and which hosts -- cgit 1.4.1 From 828db669ecf4a631a4cab0c78d2ea8df7c531716 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Jan 2017 16:37:22 +0000 Subject: Use get_users_in_room and declare it iterable --- synapse/handlers/device.py | 2 +- synapse/storage/roommember.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 6fefb85890..7245d14fab 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -203,7 +203,7 @@ class DeviceHandler(BaseHandler): hosts = set() if self.hs.is_mine_id(user_id): for room_id in room_ids: - users = yield self.state.get_current_user_in_room(room_id) + users = yield self.store.get_users_in_room(room_id) hosts.update(get_domain_from_id(u) for u in users) hosts.discard(self.server_name) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 768e0a4451..6cf1a538ae 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -131,7 +131,7 @@ class RoomMemberStore(SQLBaseStore): with self._stream_id_gen.get_next() as stream_ordering: yield self.runInteraction("locally_reject_invite", f, stream_ordering) - @cached(max_entries=5000) + @cached(max_entries=1000000, iterable=True) def get_users_in_room(self, room_id): def f(txn): -- cgit 1.4.1 From 97479d0c5442f3a644b356c5dbc920bf2ca2c925 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Feb 2017 10:30:03 +0000 Subject: Implement /keys/changes --- synapse/handlers/device.py | 16 +++++++++++++++ synapse/rest/client/v2_alpha/keys.py | 38 ++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 7245d14fab..4a28d95967 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -220,6 +220,22 @@ class DeviceHandler(BaseHandler): for host in hosts: self.federation_sender.send_device_messages(host) + @defer.inlineCallbacks + def get_user_ids_changed(self, user_id, from_device_key): + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = set(r.room_id for r in rooms) + + user_ids_changed = set() + changed = yield self.store.get_user_whose_devices_changed( + from_device_key + ) + for other_user_id in changed: + other_rooms = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(e.room_id for e in other_rooms): + user_ids_changed.add(other_user_id) + + defer.returnValue(user_ids_changed) + @defer.inlineCallbacks def _incoming_device_list_update(self, origin, edu_content): user_id = edu_content["user_id"] diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 46789775b9..5080101f18 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -21,6 +21,8 @@ from synapse.api.errors import SynapseError from synapse.http.servlet import ( RestServlet, parse_json_object_from_request, parse_integer ) +from synapse.http.servlet import parse_string +from synapse.types import StreamToken from ._base import client_v2_patterns logger = logging.getLogger(__name__) @@ -149,6 +151,41 @@ class KeyQueryServlet(RestServlet): defer.returnValue((200, result)) +class KeyChangesServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/keys/changes$", + releases=() + ) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): + """ + super(KeyChangesServlet, self).__init__() + self.auth = hs.get_auth() + self.device_handler = hs.get_device_handler() + + @defer.inlineCallbacks + def on_GET(self, request): + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + + from_token_string = parse_string(request, "from") + parse_string(request, "to") # We want to enforce they do pass us one. + + from_token = StreamToken.from_string(from_token_string) + + user_id = requester.user.to_string() + + changed = yield self.device_handler.get_user_ids_changed( + user_id, from_token.device_list_key, + ) + + defer.returnValue((200, { + "changed": changed + })) + + class OneTimeKeyServlet(RestServlet): """ POST /keys/claim HTTP/1.1 @@ -192,4 +229,5 @@ class OneTimeKeyServlet(RestServlet): def register_servlets(hs, http_server): KeyUploadServlet(hs).register(http_server) KeyQueryServlet(hs).register(http_server) + KeyChangesServlet(hs).register(http_server) OneTimeKeyServlet(hs).register(http_server) -- cgit 1.4.1 From 7e919bdbd09bf200d2e27767450eacbfbf2f4c3f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Feb 2017 17:33:16 +0000 Subject: Include newly joined users in /keys/changes API --- synapse/handlers/device.py | 39 ++++++++++++++++++++++++++++++++---- synapse/rest/client/v2_alpha/keys.py | 2 +- synapse/storage/stream.py | 7 +++++++ 3 files changed, 43 insertions(+), 5 deletions(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 4a28d95967..4589dab409 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.api import errors +from synapse.api.constants import EventTypes from synapse.util import stringutils from synapse.util.async import Linearizer from synapse.types import get_domain_from_id @@ -221,15 +222,45 @@ class DeviceHandler(BaseHandler): self.federation_sender.send_device_messages(host) @defer.inlineCallbacks - def get_user_ids_changed(self, user_id, from_device_key): + def get_user_ids_changed(self, user_id, from_token): rooms = yield self.store.get_rooms_for_user(user_id) room_ids = set(r.room_id for r in rooms) - user_ids_changed = set() + # First we check if any devices have changed changed = yield self.store.get_user_whose_devices_changed( - from_device_key + from_token.device_list_key ) - for other_user_id in changed: + + # Then work out if any users have since joined + rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) + + possibly_changed = set(changed) + for room_id in rooms_changed: + # Fetch (an approximation) of the current state at the time. + event_rows, token = yield self.store.get_recent_event_ids_for_room( + room_id, end_token=from_token.room_key, limit=1, + ) + + if event_rows: + last_event_id = event_rows[-1]["event_id"] + prev_state_ids = yield self.store.get_state_ids_for_event(last_event_id) + else: + prev_state_ids = {} + + current_state_ids = yield self.state.get_current_state_ids(room_id) + + # If there has been any change in membership, include them in the + # possibly changed list. We'll check if they are joined below, + # and we're not toooo worried about spuriously adding users. + for key, event_id in current_state_ids.iteritems(): + etype, state_key = key + if etype == EventTypes.Member: + prev_event_id = prev_state_ids.get(key, None) + if not prev_event_id or prev_event_id != event_id: + possibly_changed.add(state_key) + + user_ids_changed = set() + for other_user_id in possibly_changed: other_rooms = yield self.store.get_rooms_for_user(other_user_id) if room_ids.intersection(e.room_id for e in other_rooms): user_ids_changed.add(other_user_id) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 4590efa6bf..f99b53530a 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -189,7 +189,7 @@ class KeyChangesServlet(RestServlet): user_id = requester.user.to_string() changed = yield self.device_handler.get_user_ids_changed( - user_id, from_token.device_list_key, + user_id, from_token, ) defer.returnValue((200, { diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2dc24951c4..cdc1838895 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -244,6 +244,13 @@ class StreamStore(SQLBaseStore): defer.returnValue(results) + def get_rooms_that_changed(self, room_ids, from_key): + from_key = RoomStreamToken.parse_stream_token(from_key).stream + return set( + room_id for room_id in room_ids + if self._events_stream_cache.has_entity_changed(room_id, from_key) + ) + @defer.inlineCallbacks def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0, order='DESC'): -- cgit 1.4.1 From fbfe44bb4de0e490b5c34ebdb1e8c0c09dd766b1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Feb 2017 17:52:57 +0000 Subject: Doc args --- synapse/handlers/device.py | 7 +++++++ synapse/storage/stream.py | 4 ++++ 2 files changed, 11 insertions(+) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 4589dab409..815410969c 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -223,6 +223,13 @@ class DeviceHandler(BaseHandler): @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): + """Get list of users that have had the devices updated, or have newly + joined a room, that `user_id` may be interested in. + + Args: + user_id (str) + from_token (StreamToken) + """ rooms = yield self.store.get_rooms_for_user(user_id) room_ids = set(r.room_id for r in rooms) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3765d0095c..200d124632 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -247,6 +247,10 @@ class StreamStore(SQLBaseStore): def get_rooms_that_changed(self, room_ids, from_key): """Given a list of rooms and a token, return rooms where there may have been changes. + + Args: + room_ids (list) + from_key (str): The room_key portion of a StreamToken """ from_key = RoomStreamToken.parse_stream_token(from_key).stream return set( -- cgit 1.4.1 From 46ecd9fd6d4ea2786cde0f2576aa28421be40047 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Feb 2017 14:27:19 +0000 Subject: Use stream_ordering_to_exterm for /keys/changes --- synapse/handlers/device.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 815410969c..6c1b945ff3 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -17,7 +17,7 @@ from synapse.api import errors from synapse.api.constants import EventTypes from synapse.util import stringutils from synapse.util.async import Linearizer -from synapse.types import get_domain_from_id +from synapse.types import get_domain_from_id, RoomStreamToken from twisted.internet import defer from ._base import BaseHandler @@ -243,15 +243,15 @@ class DeviceHandler(BaseHandler): possibly_changed = set(changed) for room_id in rooms_changed: - # Fetch (an approximation) of the current state at the time. - event_rows, token = yield self.store.get_recent_event_ids_for_room( - room_id, end_token=from_token.room_key, limit=1, - ) + # Fetch the current state at the time. + stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key) - if event_rows: - last_event_id = event_rows[-1]["event_id"] - prev_state_ids = yield self.store.get_state_ids_for_event(last_event_id) - else: + try: + event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_ordering=stream_ordering + ) + prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + except: prev_state_ids = {} current_state_ids = yield self.state.get_current_state_ids(room_id) -- cgit 1.4.1 From 1232ae41cf1ee4e66025b0db3460e339ef3b9971 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Feb 2017 15:25:00 +0000 Subject: Use new get_users_who_share_room_with_user --- synapse/handlers/device.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 6c1b945ff3..158206aef6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -198,20 +198,22 @@ class DeviceHandler(BaseHandler): """Notify that a user's device(s) has changed. Pokes the notifier, and remote servers if the user is local. """ - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = [r.room_id for r in rooms] + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) hosts = set() if self.hs.is_mine_id(user_id): - for room_id in room_ids: - users = yield self.store.get_users_in_room(room_id) - hosts.update(get_domain_from_id(u) for u in users) + hosts.update(get_domain_from_id(u) for u in users_who_share_room) hosts.discard(self.server_name) position = yield self.store.add_device_change_to_streams( user_id, device_ids, list(hosts) ) + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = [r.room_id for r in rooms] + yield self.notifier.on_new_event( "device_list_key", position, rooms=room_ids, ) @@ -266,13 +268,13 @@ class DeviceHandler(BaseHandler): if not prev_event_id or prev_event_id != event_id: possibly_changed.add(state_key) - user_ids_changed = set() - for other_user_id in possibly_changed: - other_rooms = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(e.room_id for e in other_rooms): - user_ids_changed.add(other_user_id) + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) - defer.returnValue(user_ids_changed) + # Take the intersection of the users whose devices may have changed + # and those that actually still share a room with the user + defer.returnValue(users_who_share_room & possibly_changed) @defer.inlineCallbacks def _incoming_device_list_update(self, origin, edu_content): -- cgit 1.4.1 From a597994fb6acff8d2f72855445c322d7fd685f3f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Feb 2017 18:36:17 +0000 Subject: Measure new device list stuff --- synapse/handlers/device.py | 4 ++++ synapse/handlers/sync.py | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 158206aef6..8cb47ac417 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -17,6 +17,7 @@ from synapse.api import errors from synapse.api.constants import EventTypes from synapse.util import stringutils from synapse.util.async import Linearizer +from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id, RoomStreamToken from twisted.internet import defer from ._base import BaseHandler @@ -193,6 +194,7 @@ class DeviceHandler(BaseHandler): else: raise + @measure_func("notify_device_update") @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): """Notify that a user's device(s) has changed. Pokes the notifier, and @@ -223,6 +225,7 @@ class DeviceHandler(BaseHandler): for host in hosts: self.federation_sender.send_device_messages(host) + @measure_func("device.get_user_ids_changed") @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -276,6 +279,7 @@ class DeviceHandler(BaseHandler): # and those that actually still share a room with the user defer.returnValue(users_who_share_room & possibly_changed) + @measure_func("_incoming_device_list_update") @defer.inlineCallbacks def _incoming_device_list_update(self, origin, edu_content): user_id = edu_content["user_id"] diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9a44de3f33..d7dcd1ce5b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -16,7 +16,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute from synapse.util.logcontext import LoggingContext -from synapse.util.metrics import Measure +from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client @@ -561,6 +561,7 @@ class SyncHandler(object): next_batch=sync_result_builder.now_token, )) + @measure_func("_generate_sync_entry_for_device_list") @defer.inlineCallbacks def _generate_sync_entry_for_device_list(self, sync_result_builder): user_id = sync_result_builder.sync_config.user.to_string() -- cgit 1.4.1