From 8d3542a64e2689a00ed87f9bd58fe3e1d3b10ed8 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 22 May 2019 16:42:00 -0400 Subject: implement federation parts of cross-signing --- synapse/handlers/device.py | 13 ++++- synapse/handlers/e2e_keys.py | 116 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 126 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5f23ee4488..cd6eb52316 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -458,7 +458,18 @@ class DeviceHandler(DeviceWorkerHandler): @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) - return {"user_id": user_id, "stream_id": stream_id, "devices": devices} + master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master") + self_signing_key = yield self.store.get_e2e_cross_signing_key( + user_id, "self_signing" + ) + + return { + "user_id": user_id, + "stream_id": stream_id, + "devices": devices, + "master_key": master_key, + "self_signing_key": self_signing_key + } @defer.inlineCallbacks def user_left_room(self, user, room_id): diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 5ea54f60be..849ee04f93 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -36,6 +36,8 @@ from synapse.types import ( get_verify_key_from_cross_signing_key, ) from synapse.util import unwrapFirstError +from synapse.util.async_helpers import Linearizer +from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -49,10 +51,17 @@ class E2eKeysHandler(object): self.is_mine = hs.is_mine self.clock = hs.get_clock() + self._edu_updater = SigningKeyEduUpdater(hs, self) + + federation_registry = hs.get_federation_registry() + + federation_registry.register_edu_handler( + "m.signing_key_update", self._edu_updater.incoming_signing_key_update, + ) # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the # "query handler" interface. - hs.get_federation_registry().register_query_handler( + federation_registry.register_query_handler( "client_keys", self.on_federation_query_client_keys ) @@ -343,7 +352,15 @@ class E2eKeysHandler(object): """ device_keys_query = query_body.get("device_keys", {}) res = yield self.query_local_devices(device_keys_query) - return {"device_keys": res} + ret = {"device_keys": res} + + # add in the cross-signing keys + cross_signing_keys = yield self.query_cross_signing_keys(device_keys_query) + + for key, value in iteritems(cross_signing_keys): + ret[key + "_keys"] = value + + return ret @trace @defer.inlineCallbacks @@ -1047,3 +1064,98 @@ class SignatureListItem: target_user_id = attr.ib() target_device_id = attr.ib() signature = attr.ib() + + +class SigningKeyEduUpdater(object): + "Handles incoming signing key updates from federation and updates the DB" + + def __init__(self, hs, e2e_keys_handler): + self.store = hs.get_datastore() + self.federation = hs.get_federation_client() + self.clock = hs.get_clock() + self.e2e_keys_handler = e2e_keys_handler + + self._remote_edu_linearizer = Linearizer(name="remote_signing_key") + + # user_id -> list of updates waiting to be handled. + self._pending_updates = {} + + # Recently seen stream ids. We don't bother keeping these in the DB, + # but they're useful to have them about to reduce the number of spurious + # resyncs. + self._seen_updates = ExpiringCache( + cache_name="signing_key_update_edu", + clock=self.clock, + max_len=10000, + expiry_ms=30 * 60 * 1000, + iterable=True, + ) + + @defer.inlineCallbacks + def incoming_signing_key_update(self, origin, edu_content): + """Called on incoming signing key update from federation. Responsible for + parsing the EDU and adding to pending updates list. + + Args: + origin (string): the server that sent the EDU + edu_content (dict): the contents of the EDU + """ + + user_id = edu_content.pop("user_id") + master_key = edu_content.pop("master_key", None) + self_signing_key = edu_content.pop("self_signing_key", None) + + if get_domain_from_id(user_id) != origin: + # TODO: Raise? + logger.warning("Got signing key update edu for %r from %r", user_id, origin) + return + + room_ids = yield self.store.get_rooms_for_user(user_id) + if not room_ids: + # We don't share any rooms with this user. Ignore update, as we + # probably won't get any further updates. + return + + self._pending_updates.setdefault(user_id, []).append( + (master_key, self_signing_key, edu_content) + ) + + yield self._handle_signing_key_updates(user_id) + + @defer.inlineCallbacks + def _handle_signing_key_updates(self, user_id): + """Actually handle pending updates. + + Args: + user_id (string): the user whose updates we are processing + """ + + device_handler = self.e2e_keys_handler.device_handler + + with (yield self._remote_edu_linearizer.queue(user_id)): + pending_updates = self._pending_updates.pop(user_id, []) + if not pending_updates: + # This can happen since we batch updates + return + + device_ids = [] + + logger.info("pending updates: %r", pending_updates) + + for master_key, self_signing_key, edu_content in pending_updates: + if master_key: + yield self.store.set_e2e_cross_signing_key( + user_id, "master", master_key + ) + device_id = \ + get_verify_key_from_cross_signing_key(master_key)[1].version + device_ids.append(device_id) + if self_signing_key: + yield self.store.set_e2e_cross_signing_key( + user_id, "self_signing", self_signing_key + ) + device_id = \ + get_verify_key_from_cross_signing_key(self_signing_key)[1].version + device_ids.append(device_id) + + yield device_handler.notify_device_update(user_id, device_ids) -- cgit 1.5.1 From cfdb84422dba2ca28eacb65aca960aecc5598658 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Mon, 22 Jul 2019 13:04:55 -0400 Subject: make black happy --- synapse/handlers/e2e_keys.py | 12 ++++++---- synapse/storage/devices.py | 54 ++++++++++++++++++++++++++------------------ 2 files changed, 39 insertions(+), 27 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 849ee04f93..f3cfba0bd3 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -56,7 +56,7 @@ class E2eKeysHandler(object): federation_registry = hs.get_federation_registry() federation_registry.register_edu_handler( - "m.signing_key_update", self._edu_updater.incoming_signing_key_update, + "m.signing_key_update", self._edu_updater.incoming_signing_key_update ) # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the @@ -1147,15 +1147,17 @@ class SigningKeyEduUpdater(object): yield self.store.set_e2e_cross_signing_key( user_id, "master", master_key ) - device_id = \ - get_verify_key_from_cross_signing_key(master_key)[1].version + device_id = get_verify_key_from_cross_signing_key(master_key)[ + 1 + ].version device_ids.append(device_id) if self_signing_key: yield self.store.set_e2e_cross_signing_key( user_id, "self_signing", self_signing_key ) - device_id = \ - get_verify_key_from_cross_signing_key(self_signing_key)[1].version + device_id = get_verify_key_from_cross_signing_key(self_signing_key)[ + 1 + ].version device_ids.append(device_id) yield device_handler.notify_device_update(user_id, device_ids) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index f46978c9c3..60bf6d68ec 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -37,6 +37,7 @@ from synapse.storage._base import ( make_in_list_sql_clause, ) from synapse.storage.background_updates import BackgroundUpdateStore +from synapse.types import get_verify_key_from_cross_signing_key from synapse.util import batch_iter from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList @@ -142,17 +143,19 @@ class DeviceWorkerStore(SQLBaseStore): ) master_key_by_user[user] = { "key_info": cross_signing_key, - "pubkey": verify_key.version + "pubkey": verify_key.version, } - cross_signing_key = yield self.get_e2e_cross_signing_key(user, "self_signing") + cross_signing_key = yield self.get_e2e_cross_signing_key( + user, "self_signing" + ) if cross_signing_key: key_id, verify_key = get_verify_key_from_cross_signing_key( cross_signing_key ) self_signing_key_by_user[user] = { "key_info": cross_signing_key, - "pubkey": verify_key.version + "pubkey": verify_key.version, } # if we have exceeded the limit, we need to exclude any results with the @@ -185,10 +188,13 @@ class DeviceWorkerStore(SQLBaseStore): break # skip over cross-signing keys - if (update[0] in master_key_by_user - and update[1] == master_key_by_user[update[0]]["pubkey"]) \ - or (update[0] in master_key_by_user - and update[1] == self_signing_key_by_user[update[0]]["pubkey"]): + if ( + update[0] in master_key_by_user + and update[1] == master_key_by_user[update[0]]["pubkey"] + ) or ( + update[0] in master_key_by_user + and update[1] == self_signing_key_by_user[update[0]]["pubkey"] + ): continue key = (update[0], update[1]) @@ -209,16 +215,16 @@ class DeviceWorkerStore(SQLBaseStore): # update list with the master/self-signing key by user maps cross_signing_keys_by_user = {} for user_id, device_id, stream in updates: - if device_id == master_key_by_user.get(user_id, {}) \ - .get("pubkey", None): + if device_id == master_key_by_user.get(user_id, {}).get("pubkey", None): result = cross_signing_keys_by_user.setdefault(user_id, {}) - result["master_key"] = \ - master_key_by_user[user_id]["key_info"] - elif device_id == self_signing_key_by_user.get(user_id, {}) \ - .get("pubkey", None): + result["master_key"] = master_key_by_user[user_id]["key_info"] + elif device_id == self_signing_key_by_user.get(user_id, {}).get( + "pubkey", None + ): result = cross_signing_keys_by_user.setdefault(user_id, {}) - result["self_signing_key"] = \ - self_signing_key_by_user[user_id]["key_info"] + result["self_signing_key"] = self_signing_key_by_user[user_id][ + "key_info" + ] cross_signing_results = [] @@ -282,13 +288,17 @@ class DeviceWorkerStore(SQLBaseStore): List[Dict]: List of objects representing an device update EDU """ - devices = yield self.runInteraction( - "_get_e2e_device_keys_txn", - self._get_e2e_device_keys_txn, - query_map.keys(), - include_all_devices=True, - include_deleted_devices=True, - ) if query_map else {} + devices = ( + yield self.runInteraction( + "_get_e2e_device_keys_txn", + self._get_e2e_device_keys_txn, + query_map.keys(), + include_all_devices=True, + include_deleted_devices=True, + ) + if query_map + else {} + ) results = [] for user_id, user_devices in iteritems(devices): -- cgit 1.5.1 From 41ad35b5235ad9ed8a1b8889287ae840ee3373bd Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Fri, 2 Aug 2019 18:03:23 -0400 Subject: add missing param --- synapse/handlers/e2e_keys.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index f3cfba0bd3..6b65f47d18 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -355,7 +355,7 @@ class E2eKeysHandler(object): ret = {"device_keys": res} # add in the cross-signing keys - cross_signing_keys = yield self.query_cross_signing_keys(device_keys_query) + cross_signing_keys = yield self.query_cross_signing_keys(device_keys_query, None) for key, value in iteritems(cross_signing_keys): ret[key + "_keys"] = value -- cgit 1.5.1 From 1fabf82d50f3db25ce0e4a93f349d90eb2d30a16 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Tue, 22 Oct 2019 21:44:58 -0400 Subject: update to work with newer code, and fix formatting --- synapse/handlers/device.py | 2 +- synapse/handlers/e2e_keys.py | 9 +++++---- synapse/storage/devices.py | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index cd6eb52316..fd8d14b680 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -468,7 +468,7 @@ class DeviceHandler(DeviceWorkerHandler): "stream_id": stream_id, "devices": devices, "master_key": master_key, - "self_signing_key": self_signing_key + "self_signing_key": self_signing_key, } @defer.inlineCallbacks diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 6b65f47d18..73572f4614 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -260,7 +260,7 @@ class E2eKeysHandler(object): Returns: defer.Deferred[dict[str, dict[str, dict]]]: map from - (master|self_signing|user_signing) -> user_id -> key + (master_keys|self_signing_keys|user_signing_keys) -> user_id -> key """ master_keys = {} self_signing_keys = {} @@ -355,10 +355,11 @@ class E2eKeysHandler(object): ret = {"device_keys": res} # add in the cross-signing keys - cross_signing_keys = yield self.query_cross_signing_keys(device_keys_query, None) + cross_signing_keys = yield self.get_cross_signing_keys_from_cache( + device_keys_query, None + ) - for key, value in iteritems(cross_signing_keys): - ret[key + "_keys"] = value + ret.update(cross_signing_keys) return ret diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 60bf6d68ec..1aaef1fbb0 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -214,7 +214,7 @@ class DeviceWorkerStore(SQLBaseStore): # figure out which cross-signing keys were changed by intersecting the # update list with the master/self-signing key by user maps cross_signing_keys_by_user = {} - for user_id, device_id, stream in updates: + for user_id, device_id, stream, _opentracing_context in updates: if device_id == master_key_by_user.get(user_id, {}).get("pubkey", None): result = cross_signing_keys_by_user.setdefault(user_id, {}) result["master_key"] = master_key_by_user[user_id]["key_info"] -- cgit 1.5.1 From 404e8c85321b4fe9b9b74e07841367c4cf201551 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Tue, 22 Oct 2019 22:33:23 -0400 Subject: vendor-prefix the EDU name until MSC1756 is merged into the spec --- synapse/handlers/e2e_keys.py | 3 ++- synapse/storage/devices.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 73572f4614..d25af42f5f 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -55,8 +55,9 @@ class E2eKeysHandler(object): federation_registry = hs.get_federation_registry() + # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec federation_registry.register_edu_handler( - "m.signing_key_update", self._edu_updater.incoming_signing_key_update + "org.matrix.signing_key_update", self._edu_updater.incoming_signing_key_update ) # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 1aaef1fbb0..6ac165068e 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -231,7 +231,8 @@ class DeviceWorkerStore(SQLBaseStore): # add the updated cross-signing keys to the results list for user_id, result in iteritems(cross_signing_keys_by_user): result["user_id"] = user_id - cross_signing_results.append(("m.signing_key_update", result)) + # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec + cross_signing_results.append(("org.matrix.signing_key_update", result)) # That should only happen if a client is spamming the server with new # devices, in which case E2E isn't going to work well anyway. We'll just -- cgit 1.5.1 From 480eac30eb543ce6947009fa90b8409f153eb3a4 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Tue, 22 Oct 2019 22:37:16 -0400 Subject: black --- synapse/handlers/e2e_keys.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index d25af42f5f..f6aa9a940b 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -57,7 +57,8 @@ class E2eKeysHandler(object): # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec federation_registry.register_edu_handler( - "org.matrix.signing_key_update", self._edu_updater.incoming_signing_key_update + "org.matrix.signing_key_update", + self._edu_updater.incoming_signing_key_update, ) # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the -- cgit 1.5.1 From ff05c9b760ba5736a189b320c2e0d4592d0072a4 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Thu, 24 Oct 2019 21:46:11 -0400 Subject: don't error if federation query doesn't have cross-signing keys --- synapse/handlers/e2e_keys.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index f6aa9a940b..4ab75a351e 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -218,13 +218,15 @@ class E2eKeysHandler(object): if user_id in destination_query: results[user_id] = keys - for user_id, key in remote_result["master_keys"].items(): - if user_id in destination_query: - cross_signing_keys["master_keys"][user_id] = key - - for user_id, key in remote_result["self_signing_keys"].items(): - if user_id in destination_query: - cross_signing_keys["self_signing_keys"][user_id] = key + if "master_keys" in remote_result: + for user_id, key in remote_result["master_keys"].items(): + if user_id in destination_query: + cross_signing_keys["master_keys"][user_id] = key + + if "self_signing_keys" in remote_result: + for user_id, key in remote_result["self_signing_keys"].items(): + if user_id in destination_query: + cross_signing_keys["self_signing_keys"][user_id] = key except Exception as e: failure = _exception_to_failure(e) -- cgit 1.5.1 From d78b1e339dd813214d8a8316c38a3be31ad8f132 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 30 Oct 2019 10:01:53 -0400 Subject: apply changes as a result of PR review --- synapse/handlers/e2e_keys.py | 22 ++++---- synapse/storage/data_stores/main/devices.py | 79 +++++++++++++---------------- 2 files changed, 46 insertions(+), 55 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 4ab75a351e..0f320b3764 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -1072,7 +1072,7 @@ class SignatureListItem: class SigningKeyEduUpdater(object): - "Handles incoming signing key updates from federation and updates the DB" + """Handles incoming signing key updates from federation and updates the DB""" def __init__(self, hs, e2e_keys_handler): self.store = hs.get_datastore() @@ -1111,7 +1111,6 @@ class SigningKeyEduUpdater(object): self_signing_key = edu_content.pop("self_signing_key", None) if get_domain_from_id(user_id) != origin: - # TODO: Raise? logger.warning("Got signing key update edu for %r from %r", user_id, origin) return @@ -1122,7 +1121,7 @@ class SigningKeyEduUpdater(object): return self._pending_updates.setdefault(user_id, []).append( - (master_key, self_signing_key, edu_content) + (master_key, self_signing_key) ) yield self._handle_signing_key_updates(user_id) @@ -1147,22 +1146,21 @@ class SigningKeyEduUpdater(object): logger.info("pending updates: %r", pending_updates) - for master_key, self_signing_key, edu_content in pending_updates: + for master_key, self_signing_key in pending_updates: if master_key: yield self.store.set_e2e_cross_signing_key( user_id, "master", master_key ) - device_id = get_verify_key_from_cross_signing_key(master_key)[ - 1 - ].version - device_ids.append(device_id) + _, verify_key = get_verify_key_from_cross_signing_key(master_key) + # verify_key is a VerifyKey from signedjson, which uses + # .version to denote the portion of the key ID after the + # algorithm and colon, which is the device ID + device_ids.append(verify_key.version) if self_signing_key: yield self.store.set_e2e_cross_signing_key( user_id, "self_signing", self_signing_key ) - device_id = get_verify_key_from_cross_signing_key(self_signing_key)[ - 1 - ].version - device_ids.append(device_id) + _, verify_key = get_verify_key_from_cross_signing_key(self_signing_key) + device_ids.append(verify_key.version) yield device_handler.notify_device_update(user_id, device_ids) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 6ac165068e..0b12bc58c4 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -92,8 +92,12 @@ class DeviceWorkerStore(SQLBaseStore): @trace @defer.inlineCallbacks def get_devices_by_remote(self, destination, from_stream_id, limit): - """Get stream of updates to send to remote servers + """Get a stream of device updates to send to the given remote server. + Args: + destination (str): The host the device updates are intended for + from_stream_id (int): The minimum stream_id to filter updates by, exclusive + limit (int): Maximum number of device updates to return Returns: Deferred[tuple[int, list[tuple[string,dict]]]]: current stream id (ie, the stream id of the last update included in the @@ -131,7 +135,8 @@ class DeviceWorkerStore(SQLBaseStore): if not updates: return now_stream_id, [] - # get the cross-signing keys of the users the list + # get the cross-signing keys of the users in the list, so that we can + # determine which of the device changes were cross-signing keys users = set(r[0] for r in updates) master_key_by_user = {} self_signing_key_by_user = {} @@ -141,9 +146,12 @@ class DeviceWorkerStore(SQLBaseStore): key_id, verify_key = get_verify_key_from_cross_signing_key( cross_signing_key ) + # verify_key is a VerifyKey from signedjson, which uses + # .version to denote the portion of the key ID after the + # algorithm and colon, which is the device ID master_key_by_user[user] = { "key_info": cross_signing_key, - "pubkey": verify_key.version, + "device_id": verify_key.version, } cross_signing_key = yield self.get_e2e_cross_signing_key( @@ -155,7 +163,7 @@ class DeviceWorkerStore(SQLBaseStore): ) self_signing_key_by_user[user] = { "key_info": cross_signing_key, - "pubkey": verify_key.version, + "device_id": verify_key.version, } # if we have exceeded the limit, we need to exclude any results with the @@ -182,69 +190,54 @@ class DeviceWorkerStore(SQLBaseStore): # context which created the Edu. query_map = {} - for update in updates: - if stream_id_cutoff is not None and update[2] >= stream_id_cutoff: + cross_signing_keys_by_user = {} + for user_id, device_id, update_stream_id, update_context in updates: + if stream_id_cutoff is not None and update_stream_id >= stream_id_cutoff: # Stop processing updates break - # skip over cross-signing keys if ( - update[0] in master_key_by_user - and update[1] == master_key_by_user[update[0]]["pubkey"] - ) or ( - update[0] in master_key_by_user - and update[1] == self_signing_key_by_user[update[0]]["pubkey"] + user_id in master_key_by_user + and device_id == master_key_by_user[user_id]["device_id"] ): - continue - - key = (update[0], update[1]) - - update_context = update[3] - update_stream_id = update[2] - - previous_update_stream_id, _ = query_map.get(key, (0, None)) - - if update_stream_id > previous_update_stream_id: - query_map[key] = (update_stream_id, update_context) - - # If we didn't find any updates with a stream_id lower than the cutoff, it - # means that there are more than limit updates all of which have the same - # steam_id. - - # figure out which cross-signing keys were changed by intersecting the - # update list with the master/self-signing key by user maps - cross_signing_keys_by_user = {} - for user_id, device_id, stream, _opentracing_context in updates: - if device_id == master_key_by_user.get(user_id, {}).get("pubkey", None): result = cross_signing_keys_by_user.setdefault(user_id, {}) result["master_key"] = master_key_by_user[user_id]["key_info"] - elif device_id == self_signing_key_by_user.get(user_id, {}).get( - "pubkey", None + elif ( + user_id in master_key_by_user + and device_id == self_signing_key_by_user[user_id]["device_id"] ): result = cross_signing_keys_by_user.setdefault(user_id, {}) result["self_signing_key"] = self_signing_key_by_user[user_id][ "key_info" ] + else: + key = (user_id, device_id) - cross_signing_results = [] + previous_update_stream_id, _ = query_map.get(key, (0, None)) - # add the updated cross-signing keys to the results list - for user_id, result in iteritems(cross_signing_keys_by_user): - result["user_id"] = user_id - # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec - cross_signing_results.append(("org.matrix.signing_key_update", result)) + if update_stream_id > previous_update_stream_id: + query_map[key] = (update_stream_id, update_context) + + # If we didn't find any updates with a stream_id lower than the cutoff, it + # means that there are more than limit updates all of which have the same + # steam_id. # That should only happen if a client is spamming the server with new # devices, in which case E2E isn't going to work well anyway. We'll just # skip that stream_id and return an empty list, and continue with the next # stream_id next time. - if not query_map and not cross_signing_results: + if not query_map and not cross_signing_keys_by_user: return stream_id_cutoff, [] results = yield self._get_device_update_edus_by_remote( destination, from_stream_id, query_map ) - results.extend(cross_signing_results) + + # add the updated cross-signing keys to the results list + for user_id, result in iteritems(cross_signing_keys_by_user): + result["user_id"] = user_id + # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec + results.append(("org.matrix.signing_key_update", result)) return now_stream_id, results -- cgit 1.5.1 From bc32f102cd1144923581771b0cc84ead4d99cefb Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 30 Oct 2019 10:07:36 -0400 Subject: black --- synapse/handlers/e2e_keys.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 0f320b3764..1ab471b3be 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -1160,7 +1160,9 @@ class SigningKeyEduUpdater(object): yield self.store.set_e2e_cross_signing_key( user_id, "self_signing", self_signing_key ) - _, verify_key = get_verify_key_from_cross_signing_key(self_signing_key) + _, verify_key = get_verify_key_from_cross_signing_key( + self_signing_key + ) device_ids.append(verify_key.version) yield device_handler.notify_device_update(user_id, device_ids) -- cgit 1.5.1 From 020add50997f697c7847ac84b86b457ba2f3e32d Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 1 Nov 2019 02:43:24 +1100 Subject: Update black to 19.10b0 (#6304) * update version of black and also fix the mypy config being overridden --- changelog.d/6304.misc | 1 + contrib/experiments/test_messaging.py | 4 +-- mypy.ini | 11 ++++--- synapse/federation/sender/per_destination_queue.py | 11 ++++--- synapse/handlers/account_data.py | 7 ++-- synapse/handlers/appservice.py | 5 ++- synapse/handlers/e2e_keys.py | 37 ++++++++++++++-------- synapse/handlers/federation.py | 9 +++--- synapse/handlers/initial_sync.py | 4 +-- synapse/handlers/message.py | 14 ++++---- synapse/handlers/pagination.py | 13 ++++---- synapse/handlers/register.py | 4 +-- synapse/handlers/room.py | 29 +++++++++-------- synapse/handlers/room_member.py | 35 ++++++++++---------- synapse/handlers/search.py | 12 +++---- synapse/handlers/stats.py | 5 ++- synapse/handlers/sync.py | 16 ++++++---- synapse/logging/_structured.py | 2 +- synapse/push/bulk_push_rule_evaluator.py | 7 ++-- synapse/push/emailpusher.py | 14 ++++---- synapse/push/httppusher.py | 14 ++++---- synapse/push/pusherpool.py | 4 +-- synapse/rest/client/v1/login.py | 13 ++++---- synapse/rest/client/v2_alpha/account.py | 4 +-- synapse/rest/client/v2_alpha/register.py | 4 +-- synapse/rest/key/v2/remote_key_resource.py | 2 +- synapse/server.pyi | 16 +++++----- synapse/storage/data_stores/main/__init__.py | 4 +-- .../storage/data_stores/main/event_push_actions.py | 2 +- synapse/storage/data_stores/main/events.py | 8 ++--- .../storage/data_stores/main/events_bg_updates.py | 2 +- synapse/storage/data_stores/main/group_server.py | 4 +-- .../data_stores/main/monthly_active_users.py | 2 +- synapse/storage/data_stores/main/push_rule.py | 2 +- synapse/storage/data_stores/main/registration.py | 2 +- synapse/storage/data_stores/main/roommember.py | 2 +- synapse/storage/data_stores/main/search.py | 2 +- synapse/storage/data_stores/main/state.py | 20 ++++++------ synapse/storage/data_stores/main/stats.py | 4 +-- synapse/storage/util/id_generators.py | 2 +- tox.ini | 4 +-- 41 files changed, 191 insertions(+), 166 deletions(-) create mode 100644 changelog.d/6304.misc (limited to 'synapse/handlers') diff --git a/changelog.d/6304.misc b/changelog.d/6304.misc new file mode 100644 index 0000000000..20372b4f7c --- /dev/null +++ b/changelog.d/6304.misc @@ -0,0 +1 @@ +Update the version of black used to 19.10b0. diff --git a/contrib/experiments/test_messaging.py b/contrib/experiments/test_messaging.py index 6b22400a60..3bbbcfa1b4 100644 --- a/contrib/experiments/test_messaging.py +++ b/contrib/experiments/test_messaging.py @@ -78,7 +78,7 @@ class InputOutput(object): m = re.match("^join (\S+)$", line) if m: # The `sender` wants to join a room. - room_name, = m.groups() + (room_name,) = m.groups() self.print_line("%s joining %s" % (self.user, room_name)) self.server.join_room(room_name, self.user, self.user) # self.print_line("OK.") @@ -105,7 +105,7 @@ class InputOutput(object): m = re.match("^backfill (\S+)$", line) if m: # we want to backfill a room - room_name, = m.groups() + (room_name,) = m.groups() self.print_line("backfill %s" % room_name) self.server.backfill(room_name) return diff --git a/mypy.ini b/mypy.ini index ffadaddc0b..1d77c0ecc8 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,8 +1,11 @@ [mypy] -namespace_packages=True -plugins=mypy_zope:plugin -follow_imports=skip -mypy_path=stubs +namespace_packages = True +plugins = mypy_zope:plugin +follow_imports = normal +check_untyped_defs = True +show_error_codes = True +show_traceback = True +mypy_path = stubs [mypy-zope] ignore_missing_imports = True diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index cc75c39476..b754a09d7a 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -192,15 +192,16 @@ class PerDestinationQueue(object): # We have to keep 2 free slots for presence and rr_edus limit = MAX_EDUS_PER_TRANSACTION - 2 - device_update_edus, dev_list_id = ( - yield self._get_device_update_edus(limit) + device_update_edus, dev_list_id = yield self._get_device_update_edus( + limit ) limit -= len(device_update_edus) - to_device_edus, device_stream_id = ( - yield self._get_to_device_message_edus(limit) - ) + ( + to_device_edus, + device_stream_id, + ) = yield self._get_to_device_message_edus(limit) pending_edus = device_update_edus + to_device_edus diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 38bc67191c..2d7e6df6e4 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -38,9 +38,10 @@ class AccountDataEventSource(object): {"type": "m.tag", "content": {"tags": room_tags}, "room_id": room_id} ) - account_data, room_account_data = ( - yield self.store.get_updated_account_data_for_user(user_id, last_stream_id) - ) + ( + account_data, + room_account_data, + ) = yield self.store.get_updated_account_data_for_user(user_id, last_stream_id) for account_data_type, content in account_data.items(): results.append({"type": account_data_type, "content": content}) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3e9b298154..fe62f78e67 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -73,7 +73,10 @@ class ApplicationServicesHandler(object): try: limit = 100 while True: - upper_bound, events = yield self.store.get_new_events_for_appservice( + ( + upper_bound, + events, + ) = yield self.store.get_new_events_for_appservice( self.current_max, limit ) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 5ea54f60be..0449034a4e 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -119,9 +119,10 @@ class E2eKeysHandler(object): else: query_list.append((user_id, None)) - user_ids_not_in_cache, remote_results = ( - yield self.store.get_user_devices_from_cache(query_list) - ) + ( + user_ids_not_in_cache, + remote_results, + ) = yield self.store.get_user_devices_from_cache(query_list) for user_id, devices in iteritems(remote_results): user_devices = results.setdefault(user_id, {}) for device_id, device in iteritems(devices): @@ -688,17 +689,21 @@ class E2eKeysHandler(object): try: # get our self-signing key to verify the signatures - _, self_signing_key_id, self_signing_verify_key = yield self._get_e2e_cross_signing_verify_key( - user_id, "self_signing" - ) + ( + _, + self_signing_key_id, + self_signing_verify_key, + ) = yield self._get_e2e_cross_signing_verify_key(user_id, "self_signing") # get our master key, since we may have received a signature of it. # We need to fetch it here so that we know what its key ID is, so # that we can check if a signature that was sent is a signature of # the master key or of a device - master_key, _, master_verify_key = yield self._get_e2e_cross_signing_verify_key( - user_id, "master" - ) + ( + master_key, + _, + master_verify_key, + ) = yield self._get_e2e_cross_signing_verify_key(user_id, "master") # fetch our stored devices. This is used to 1. verify # signatures on the master key, and 2. to compare with what @@ -838,9 +843,11 @@ class E2eKeysHandler(object): try: # get our user-signing key to verify the signatures - user_signing_key, user_signing_key_id, user_signing_verify_key = yield self._get_e2e_cross_signing_verify_key( - user_id, "user_signing" - ) + ( + user_signing_key, + user_signing_key_id, + user_signing_verify_key, + ) = yield self._get_e2e_cross_signing_verify_key(user_id, "user_signing") except SynapseError as e: failure = _exception_to_failure(e) for user, devicemap in signatures.items(): @@ -859,7 +866,11 @@ class E2eKeysHandler(object): try: # get the target user's master key, to make sure it matches # what was sent - master_key, master_key_id, _ = yield self._get_e2e_cross_signing_verify_key( + ( + master_key, + master_key_id, + _, + ) = yield self._get_e2e_cross_signing_verify_key( target_user, "master", user_id ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d2d9f8c26a..a932d3085f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -352,10 +352,11 @@ class FederationHandler(BaseHandler): # note that if any of the missing prevs share missing state or # auth events, the requests to fetch those events are deduped # by the get_pdu_cache in federation_client. - remote_state, got_auth_chain = ( - yield self.federation_client.get_state_for_room( - origin, room_id, p - ) + ( + remote_state, + got_auth_chain, + ) = yield self.federation_client.get_state_for_room( + origin, room_id, p ) # we want the state *after* p; get_state_for_room returns the diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 49c9e031f9..81dce96f4b 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -128,8 +128,8 @@ class InitialSyncHandler(BaseHandler): tags_by_room = yield self.store.get_tags_for_user(user_id) - account_data, account_data_by_room = ( - yield self.store.get_account_data_for_user(user_id) + account_data, account_data_by_room = yield self.store.get_account_data_for_user( + user_id ) public_room_ids = yield self.store.get_public_room_ids() diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 0d546d2487..d682dc2b7a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -76,9 +76,10 @@ class MessageHandler(object): Raises: SynapseError if something went wrong. """ - membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( - room_id, user_id - ) + ( + membership, + membership_event_id, + ) = yield self.auth.check_in_room_or_world_readable(room_id, user_id) if membership == Membership.JOIN: data = yield self.state.get_current_state(room_id, event_type, state_key) @@ -153,9 +154,10 @@ class MessageHandler(object): % (user_id, room_id, at_token), ) else: - membership, membership_event_id = ( - yield self.auth.check_in_room_or_world_readable(room_id, user_id) - ) + ( + membership, + membership_event_id, + ) = yield self.auth.check_in_room_or_world_readable(room_id, user_id) if membership == Membership.JOIN: state_ids = yield self.store.get_filtered_current_state_ids( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index b7185fe7a0..97f15a1c32 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -212,9 +212,10 @@ class PaginationHandler(object): source_config = pagin_config.get_source_config("room") with (yield self.pagination_lock.read(room_id)): - membership, member_event_id = yield self.auth.check_in_room_or_world_readable( - room_id, user_id - ) + ( + membership, + member_event_id, + ) = yield self.auth.check_in_room_or_world_readable(room_id, user_id) if source_config.direction == "b": # if we're going backwards, we might need to backfill. This @@ -297,10 +298,8 @@ class PaginationHandler(object): } if state: - chunk["state"] = ( - yield self._event_serializer.serialize_events( - state, time_now, as_client_event=as_client_event - ) + chunk["state"] = yield self._event_serializer.serialize_events( + state, time_now, as_client_event=as_client_event ) return chunk diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 53410f120b..cff6b0d375 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -396,8 +396,8 @@ class RegistrationHandler(BaseHandler): room_id = room_identifier elif RoomAlias.is_valid(room_identifier): room_alias = RoomAlias.from_string(room_identifier) - room_id, remote_room_hosts = ( - yield room_member_handler.lookup_room_alias(room_alias) + room_id, remote_room_hosts = yield room_member_handler.lookup_room_alias( + room_alias ) room_id = room_id.to_string() else: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 650bd28abb..0182e5b432 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -147,21 +147,22 @@ class RoomCreationHandler(BaseHandler): # we create and auth the tombstone event before properly creating the new # room, to check our user has perms in the old room. - tombstone_event, tombstone_context = ( - yield self.event_creation_handler.create_event( - requester, - { - "type": EventTypes.Tombstone, - "state_key": "", - "room_id": old_room_id, - "sender": user_id, - "content": { - "body": "This room has been replaced", - "replacement_room": new_room_id, - }, + ( + tombstone_event, + tombstone_context, + ) = yield self.event_creation_handler.create_event( + requester, + { + "type": EventTypes.Tombstone, + "state_key": "", + "room_id": old_room_id, + "sender": user_id, + "content": { + "body": "This room has been replaced", + "replacement_room": new_room_id, }, - token_id=requester.access_token_id, - ) + }, + token_id=requester.access_token_id, ) old_room_version = yield self.store.get_room_version(old_room_id) yield self.auth.check_from_context( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 380e2fad5e..9a940d2c05 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -759,22 +759,25 @@ class RoomMemberHandler(object): if room_avatar_event: room_avatar_url = room_avatar_event.content.get("url", "") - token, public_keys, fallback_public_key, display_name = ( - yield self.identity_handler.ask_id_server_for_third_party_invite( - requester=requester, - id_server=id_server, - medium=medium, - address=address, - room_id=room_id, - inviter_user_id=user.to_string(), - room_alias=canonical_room_alias, - room_avatar_url=room_avatar_url, - room_join_rules=room_join_rules, - room_name=room_name, - inviter_display_name=inviter_display_name, - inviter_avatar_url=inviter_avatar_url, - id_access_token=id_access_token, - ) + ( + token, + public_keys, + fallback_public_key, + display_name, + ) = yield self.identity_handler.ask_id_server_for_third_party_invite( + requester=requester, + id_server=id_server, + medium=medium, + address=address, + room_id=room_id, + inviter_user_id=user.to_string(), + room_alias=canonical_room_alias, + room_avatar_url=room_avatar_url, + room_join_rules=room_join_rules, + room_name=room_name, + inviter_display_name=inviter_display_name, + inviter_avatar_url=inviter_avatar_url, + id_access_token=id_access_token, ) yield self.event_creation_handler.create_and_send_nonmember_event( diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index f4d8a60774..56ed262a1f 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -396,15 +396,11 @@ class SearchHandler(BaseHandler): time_now = self.clock.time_msec() for context in contexts.values(): - context["events_before"] = ( - yield self._event_serializer.serialize_events( - context["events_before"], time_now - ) + context["events_before"] = yield self._event_serializer.serialize_events( + context["events_before"], time_now ) - context["events_after"] = ( - yield self._event_serializer.serialize_events( - context["events_after"], time_now - ) + context["events_after"] = yield self._event_serializer.serialize_events( + context["events_after"], time_now ) state_results = {} diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 26bc276692..7f7d56390e 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -108,7 +108,10 @@ class StatsHandler(StateDeltasHandler): user_deltas = {} # Then count deltas for total_events and total_event_bytes. - room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes( + ( + room_count, + user_count, + ) = yield self.store.get_changes_room_total_events_and_bytes( self.pos, max_pos ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 43a082dcda..b536d410e5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1206,10 +1206,11 @@ class SyncHandler(object): since_token = sync_result_builder.since_token if since_token and not sync_result_builder.full_state: - account_data, account_data_by_room = ( - yield self.store.get_updated_account_data_for_user( - user_id, since_token.account_data_key - ) + ( + account_data, + account_data_by_room, + ) = yield self.store.get_updated_account_data_for_user( + user_id, since_token.account_data_key ) push_rules_changed = yield self.store.have_push_rules_changed_for_user( @@ -1221,9 +1222,10 @@ class SyncHandler(object): sync_config.user ) else: - account_data, account_data_by_room = ( - yield self.store.get_account_data_for_user(sync_config.user.to_string()) - ) + ( + account_data, + account_data_by_room, + ) = yield self.store.get_account_data_for_user(sync_config.user.to_string()) account_data["m.push_rules"] = yield self.push_rules_for_user( sync_config.user diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py index 3220e985a9..334ddaf39a 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py @@ -185,7 +185,7 @@ DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}} def parse_drain_configs( - drains: dict + drains: dict, ) -> typing.Generator[DrainConfiguration, None, None]: """ Parse the drain configurations. diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 2bbdd11941..1ba7bcd4d8 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -149,9 +149,10 @@ class BulkPushRuleEvaluator(object): room_members = yield self.store.get_joined_users_from_context(event, context) - (power_levels, sender_power_level) = ( - yield self._get_power_levels_and_sender_level(event, context) - ) + ( + power_levels, + sender_power_level, + ) = yield self._get_power_levels_and_sender_level(event, context) evaluator = PushRuleEvaluatorForEvent( event, len(room_members), sender_power_level, power_levels diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 42e5b0c0a5..8c818a86bf 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -234,14 +234,12 @@ class EmailPusher(object): return self.last_stream_ordering = last_stream_ordering - pusher_still_exists = ( - yield self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.email, - self.user_id, - last_stream_ordering, - self.clock.time_msec(), - ) + pusher_still_exists = yield self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.email, + self.user_id, + last_stream_ordering, + self.clock.time_msec(), ) if not pusher_still_exists: # The pusher has been deleted while we were processing, so diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 9a1bb64887..7dde2ad055 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -211,14 +211,12 @@ class HttpPusher(object): http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action["stream_ordering"] - pusher_still_exists = ( - yield self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.pushkey, - self.user_id, - self.last_stream_ordering, - self.clock.time_msec(), - ) + pusher_still_exists = yield self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.pushkey, + self.user_id, + self.last_stream_ordering, + self.clock.time_msec(), ) if not pusher_still_exists: # The pusher has been deleted while we were processing, so diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 08e840fdc2..0f6992202d 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -103,9 +103,7 @@ class PusherPool: # create the pusher setting last_stream_ordering to the current maximum # stream ordering in event_push_actions, so it will process # pushes from this point onwards. - last_stream_ordering = ( - yield self.store.get_latest_push_action_stream_ordering() - ) + last_stream_ordering = yield self.store.get_latest_push_action_stream_ordering() yield self.store.add_pusher( user_id=user_id, diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 39a5c5e9de..00a7dd6d09 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -203,10 +203,11 @@ class LoginRestServlet(RestServlet): address = address.lower() # Check for login providers that support 3pid login types - canonical_user_id, callback_3pid = ( - yield self.auth_handler.check_password_provider_3pid( - medium, address, login_submission["password"] - ) + ( + canonical_user_id, + callback_3pid, + ) = yield self.auth_handler.check_password_provider_3pid( + medium, address, login_submission["password"] ) if canonical_user_id: # Authentication through password provider and 3pid succeeded @@ -280,8 +281,8 @@ class LoginRestServlet(RestServlet): def do_token_login(self, login_submission): token = login_submission["token"] auth_handler = self.auth_handler - user_id = ( - yield auth_handler.validate_short_term_login_token_and_get_user_id(token) + user_id = yield auth_handler.validate_short_term_login_token_and_get_user_id( + token ) result = yield self._register_device_with_callback(user_id, login_submission) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 332d7138b1..f26eae794c 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -148,7 +148,7 @@ class PasswordResetSubmitTokenServlet(RestServlet): self.clock = hs.get_clock() self.store = hs.get_datastore() if self.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL: - self.failure_email_template, = load_jinja2_templates( + (self.failure_email_template,) = load_jinja2_templates( self.config.email_template_dir, [self.config.email_password_reset_template_failure_html], ) @@ -479,7 +479,7 @@ class AddThreepidEmailSubmitTokenServlet(RestServlet): self.clock = hs.get_clock() self.store = hs.get_datastore() if self.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL: - self.failure_email_template, = load_jinja2_templates( + (self.failure_email_template,) = load_jinja2_templates( self.config.email_template_dir, [self.config.email_add_threepid_template_failure_html], ) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 6c7d25d411..91db923814 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -247,13 +247,13 @@ class RegistrationSubmitTokenServlet(RestServlet): self.store = hs.get_datastore() if self.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL: - self.failure_email_template, = load_jinja2_templates( + (self.failure_email_template,) = load_jinja2_templates( self.config.email_template_dir, [self.config.email_registration_template_failure_html], ) if self.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL: - self.failure_email_template, = load_jinja2_templates( + (self.failure_email_template,) = load_jinja2_templates( self.config.email_template_dir, [self.config.email_registration_template_failure_html], ) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index 55580bc59e..e7fc3f0431 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -102,7 +102,7 @@ class RemoteKey(DirectServeResource): @wrap_json_request_handler async def _async_render_GET(self, request): if len(request.postpath) == 1: - server, = request.postpath + (server,) = request.postpath query = {server.decode("ascii"): {}} elif len(request.postpath) == 2: server, key_id = request.postpath diff --git a/synapse/server.pyi b/synapse/server.pyi index 16f8f6b573..83d1f11283 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -39,7 +39,7 @@ class HomeServer(object): def get_state_resolution_handler(self) -> synapse.state.StateResolutionHandler: pass def get_deactivate_account_handler( - self + self, ) -> synapse.handlers.deactivate_account.DeactivateAccountHandler: pass def get_room_creation_handler(self) -> synapse.handlers.room.RoomCreationHandler: @@ -47,32 +47,32 @@ class HomeServer(object): def get_room_member_handler(self) -> synapse.handlers.room_member.RoomMemberHandler: pass def get_event_creation_handler( - self + self, ) -> synapse.handlers.message.EventCreationHandler: pass def get_set_password_handler( - self + self, ) -> synapse.handlers.set_password.SetPasswordHandler: pass def get_federation_sender(self) -> synapse.federation.sender.FederationSender: pass def get_federation_transport_client( - self + self, ) -> synapse.federation.transport.client.TransportLayerClient: pass def get_media_repository_resource( - self + self, ) -> synapse.rest.media.v1.media_repository.MediaRepositoryResource: pass def get_media_repository( - self + self, ) -> synapse.rest.media.v1.media_repository.MediaRepository: pass def get_server_notices_manager( - self + self, ) -> synapse.server_notices.server_notices_manager.ServerNoticesManager: pass def get_server_notices_sender( - self + self, ) -> synapse.server_notices.server_notices_sender.ServerNoticesSender: pass diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index b185ba0b3e..60ae01d972 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -317,7 +317,7 @@ class DataStore( ) u """ txn.execute(sql, (time_from,)) - count, = txn.fetchone() + (count,) = txn.fetchone() return count def count_r30_users(self): @@ -396,7 +396,7 @@ class DataStore( txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) - count, = txn.fetchone() + (count,) = txn.fetchone() results["all"] = count return results diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 22025effbc..04ce21ac66 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -863,7 +863,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ) stream_row = txn.fetchone() if stream_row: - offset_stream_ordering, = stream_row + (offset_stream_ordering,) = stream_row rotate_to_stream_ordering = min( self.stream_ordering_day_ago, offset_stream_ordering ) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 64a8a05279..aafc2007d3 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1125,7 +1125,7 @@ class EventsStore( AND stream_ordering > ? """ txn.execute(sql, (self.stream_ordering_day_ago,)) - count, = txn.fetchone() + (count,) = txn.fetchone() return count ret = yield self.runInteraction("count_messages", _count_messages) @@ -1146,7 +1146,7 @@ class EventsStore( """ txn.execute(sql, (like_clause, self.stream_ordering_day_ago)) - count, = txn.fetchone() + (count,) = txn.fetchone() return count ret = yield self.runInteraction("count_daily_sent_messages", _count_messages) @@ -1161,7 +1161,7 @@ class EventsStore( AND stream_ordering > ? """ txn.execute(sql, (self.stream_ordering_day_ago,)) - count, = txn.fetchone() + (count,) = txn.fetchone() return count ret = yield self.runInteraction("count_daily_active_rooms", _count) @@ -1646,7 +1646,7 @@ class EventsStore( """, (room_id,), ) - min_depth, = txn.fetchone() + (min_depth,) = txn.fetchone() logger.info("[purge] updating room_depth to %d", min_depth) diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py index 31ea6f917f..51352b9966 100644 --- a/synapse/storage/data_stores/main/events_bg_updates.py +++ b/synapse/storage/data_stores/main/events_bg_updates.py @@ -438,7 +438,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): if not rows: return 0 - upper_event_id, = rows[-1] + (upper_event_id,) = rows[-1] # Update the redactions with the received_ts. # diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py index aeae5a2b28..b3a2771f1b 100644 --- a/synapse/storage/data_stores/main/group_server.py +++ b/synapse/storage/data_stores/main/group_server.py @@ -249,7 +249,7 @@ class GroupServerStore(SQLBaseStore): WHERE group_id = ? AND category_id = ? """ txn.execute(sql, (group_id, category_id)) - order, = txn.fetchone() + (order,) = txn.fetchone() if existing: to_update = {} @@ -509,7 +509,7 @@ class GroupServerStore(SQLBaseStore): WHERE group_id = ? AND role_id = ? """ txn.execute(sql, (group_id, role_id)) - order, = txn.fetchone() + (order,) = txn.fetchone() if existing: to_update = {} diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py index e6ee1e4aaa..b41c3d317a 100644 --- a/synapse/storage/data_stores/main/monthly_active_users.py +++ b/synapse/storage/data_stores/main/monthly_active_users.py @@ -171,7 +171,7 @@ class MonthlyActiveUsersStore(SQLBaseStore): sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users" txn.execute(sql) - count, = txn.fetchone() + (count,) = txn.fetchone() return count return self.runInteraction("count_users", _count_users) diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py index cd95f1ce60..b520062d84 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py @@ -143,7 +143,7 @@ class PushRulesWorkerStore( " WHERE user_id = ? AND ? < stream_id" ) txn.execute(sql, (user_id, last_id)) - count, = txn.fetchone() + (count,) = txn.fetchone() return bool(count) return self.runInteraction( diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py index 6c5b29288a..f70d41ecab 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py @@ -459,7 +459,7 @@ class RegistrationWorkerStore(SQLBaseStore): WHERE appservice_id IS NULL """ ) - count, = txn.fetchone() + (count,) = txn.fetchone() return count ret = yield self.runInteraction("count_users", _count_users) diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py index bc04bfd7d4..2af24a20b7 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py @@ -927,7 +927,7 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore): if not row or not row[0]: return processed, True - next_room, = row + (next_room,) = row sql = """ UPDATE current_state_events diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py index a59b8331e1..d1d7c6863d 100644 --- a/synapse/storage/data_stores/main/search.py +++ b/synapse/storage/data_stores/main/search.py @@ -672,7 +672,7 @@ class SearchStore(SearchBackgroundUpdateStore): ) ) txn.execute(query, (value, search_query)) - headline, = txn.fetchall()[0] + (headline,) = txn.fetchall()[0] # Now we need to pick the possible highlights out of the haedline # result. diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index 9b2207075b..3132848034 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -725,16 +725,18 @@ class StateGroupWorkerStore( member_filter, non_member_filter = state_filter.get_member_split() # Now we look them up in the member and non-member caches - non_member_state, incomplete_groups_nm, = ( - yield self._get_state_for_groups_using_cache( - groups, self._state_group_cache, state_filter=non_member_filter - ) + ( + non_member_state, + incomplete_groups_nm, + ) = yield self._get_state_for_groups_using_cache( + groups, self._state_group_cache, state_filter=non_member_filter ) - member_state, incomplete_groups_m, = ( - yield self._get_state_for_groups_using_cache( - groups, self._state_group_members_cache, state_filter=member_filter - ) + ( + member_state, + incomplete_groups_m, + ) = yield self._get_state_for_groups_using_cache( + groups, self._state_group_members_cache, state_filter=member_filter ) state = dict(non_member_state) @@ -1076,7 +1078,7 @@ class StateBackgroundUpdateStore( " WHERE id < ? AND room_id = ?", (state_group, room_id), ) - prev_group, = txn.fetchone() + (prev_group,) = txn.fetchone() new_last_state_group = state_group if prev_group: diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py index 4d59b7833f..45b3de7d56 100644 --- a/synapse/storage/data_stores/main/stats.py +++ b/synapse/storage/data_stores/main/stats.py @@ -773,7 +773,7 @@ class StatsStore(StateDeltasStore): (room_id,), ) - current_state_events_count, = txn.fetchone() + (current_state_events_count,) = txn.fetchone() users_in_room = self.get_users_in_room_txn(txn, room_id) @@ -863,7 +863,7 @@ class StatsStore(StateDeltasStore): """, (user_id,), ) - count, = txn.fetchone() + (count,) = txn.fetchone() return count, pos joined_rooms, pos = yield self.runInteraction( diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index cbb0a4810a..9d851beaa5 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -46,7 +46,7 @@ def _load_current_id(db_conn, table, column, step=1): cur.execute("SELECT MAX(%s) FROM %s" % (column, table)) else: cur.execute("SELECT MIN(%s) FROM %s" % (column, table)) - val, = cur.fetchone() + (val,) = cur.fetchone() cur.close() current_id = int(val) if val else step return (max if step > 0 else min)(current_id, step) diff --git a/tox.ini b/tox.ini index 50b6afe611..afe9bc909b 100644 --- a/tox.ini +++ b/tox.ini @@ -114,7 +114,7 @@ skip_install = True basepython = python3.6 deps = flake8 - black==19.3b0 # We pin so that our tests don't start failing on new releases of black. + black==19.10b0 # We pin so that our tests don't start failing on new releases of black. commands = python -m black --check --diff . /bin/sh -c "flake8 synapse tests scripts scripts-dev synctl {env:PEP8SUFFIX:}" @@ -167,6 +167,6 @@ deps = env = MYPYPATH = stubs/ extras = all -commands = mypy --show-traceback --check-untyped-defs --show-error-codes --follow-imports=normal \ +commands = mypy \ synapse/logging/ \ synapse/config/ -- cgit 1.5.1 From ace947e8da30c37ead3357abe34adee8a1528296 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Fri, 1 Nov 2019 10:28:09 +0000 Subject: Depublish a room from the public rooms list when it is upgraded (#6232) --- changelog.d/6232.bugfix | 1 + synapse/federation/federation_client.py | 2 +- synapse/handlers/federation.py | 30 +++++++++++- synapse/handlers/room.py | 8 +++- synapse/handlers/room_member.py | 81 ++++++++++++++++++++++----------- 5 files changed, 92 insertions(+), 30 deletions(-) create mode 100644 changelog.d/6232.bugfix (limited to 'synapse/handlers') diff --git a/changelog.d/6232.bugfix b/changelog.d/6232.bugfix new file mode 100644 index 0000000000..12718ba934 --- /dev/null +++ b/changelog.d/6232.bugfix @@ -0,0 +1 @@ +Remove a room from a server's public rooms list on room upgrade. \ No newline at end of file diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 595706d01a..545d719652 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -555,7 +555,7 @@ class FederationClient(FederationBase): Note that this does not append any events to any graphs. Args: - destinations (str): Candidate homeservers which are probably + destinations (Iterable[str]): Candidate homeservers which are probably participating in the room. room_id (str): The room in which the event will happen. user_id (str): The user whose membership is being evented. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a932d3085f..dab6be9573 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1106,7 +1106,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def do_invite_join(self, target_hosts, room_id, joinee, content): """ Attempts to join the `joinee` to the room `room_id` via the - server `target_host`. + servers contained in `target_hosts`. This first triggers a /make_join/ request that returns a partial event that we can fill out and sign. This is then sent to the @@ -1115,6 +1115,15 @@ class FederationHandler(BaseHandler): We suspend processing of any received events from this room until we have finished processing the join. + + Args: + target_hosts (Iterable[str]): List of servers to attempt to join the room with. + + room_id (str): The ID of the room to join. + + joinee (str): The User ID of the joining user. + + content (dict): The event content to use for the join event. """ logger.debug("Joining %s to %s", joinee, room_id) @@ -1174,6 +1183,22 @@ class FederationHandler(BaseHandler): yield self._persist_auth_tree(origin, auth_chain, state, event) + # Check whether this room is the result of an upgrade of a room we already know + # about. If so, migrate over user information + predecessor = yield self.store.get_room_predecessor(room_id) + if not predecessor: + return + old_room_id = predecessor["room_id"] + logger.debug( + "Found predecessor for %s during remote join: %s", room_id, old_room_id + ) + + # We retrieve the room member handler here as to not cause a cyclic dependency + member_handler = self.hs.get_room_member_handler() + yield member_handler.transfer_room_state_on_room_upgrade( + old_room_id, room_id + ) + logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] @@ -2442,6 +2467,8 @@ class FederationHandler(BaseHandler): raise e yield self._check_signature(event, context) + + # We retrieve the room member handler here as to not cause a cyclic dependency member_handler = self.hs.get_room_member_handler() yield member_handler.send_membership_event(None, event, context) else: @@ -2502,6 +2529,7 @@ class FederationHandler(BaseHandler): # though the sender isn't a local user. event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender) + # We retrieve the room member handler here as to not cause a cyclic dependency member_handler = self.hs.get_room_member_handler() yield member_handler.send_membership_event(None, event, context) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0182e5b432..e92b2eafd5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -129,6 +129,7 @@ class RoomCreationHandler(BaseHandler): old_room_id, new_version, # args for _upgrade_room ) + return ret @defer.inlineCallbacks @@ -189,7 +190,12 @@ class RoomCreationHandler(BaseHandler): requester, old_room_id, new_room_id, old_room_state ) - # and finally, shut down the PLs in the old room, and update them in the new + # Copy over user push rules, tags and migrate room directory state + yield self.room_member_handler.transfer_room_state_on_room_upgrade( + old_room_id, new_room_id + ) + + # finally, shut down the PLs in the old room, and update them in the new # room. yield self._update_upgraded_room_pls( requester, old_room_id, new_room_id, old_room_state diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 9a940d2c05..06d09c2947 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -203,10 +203,6 @@ class RoomMemberHandler(object): prev_member_event = yield self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - # Copy over user state if we're joining an upgraded room - yield self.copy_user_state_if_room_upgrade( - room_id, requester.user.to_string() - ) yield self._user_joined_room(target, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: @@ -455,11 +451,6 @@ class RoomMemberHandler(object): requester, remote_room_hosts, room_id, target, content ) - # Copy over user state if this is a join on an remote upgraded room - yield self.copy_user_state_if_room_upgrade( - room_id, requester.user.to_string() - ) - return remote_join_response elif effective_membership_state == Membership.LEAVE: @@ -498,36 +489,72 @@ class RoomMemberHandler(object): return res @defer.inlineCallbacks - def copy_user_state_if_room_upgrade(self, new_room_id, user_id): - """Copy user-specific information when they join a new room if that new room is the + def transfer_room_state_on_room_upgrade(self, old_room_id, room_id): + """Upon our server becoming aware of an upgraded room, either by upgrading a room + ourselves or joining one, we can transfer over information from the previous room. + + Copies user state (tags/push rules) for every local user that was in the old room, as + well as migrating the room directory state. + + Args: + old_room_id (str): The ID of the old room + + room_id (str): The ID of the new room + + Returns: + Deferred + """ + # Find all local users that were in the old room and copy over each user's state + users = yield self.store.get_users_in_room(old_room_id) + yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users) + + # Add new room to the room directory if the old room was there + # Remove old room from the room directory + old_room = yield self.store.get_room(old_room_id) + if old_room and old_room["is_public"]: + yield self.store.set_room_is_public(old_room_id, False) + yield self.store.set_room_is_public(room_id, True) + + @defer.inlineCallbacks + def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids): + """Copy user-specific information when they join a new room when that new room is the result of a room upgrade Args: - new_room_id (str): The ID of the room the user is joining - user_id (str): The ID of the user + old_room_id (str): The ID of upgraded room + new_room_id (str): The ID of the new room + user_ids (Iterable[str]): User IDs to copy state for Returns: Deferred """ - # Check if the new room is an upgraded room - predecessor = yield self.store.get_room_predecessor(new_room_id) - if not predecessor: - return logger.debug( - "Found predecessor for %s: %s. Copying over room tags and push " "rules", + "Copying over room tags and push rules from %s to %s for users %s", + old_room_id, new_room_id, - predecessor, + user_ids, ) - # It is an upgraded room. Copy over old tags - yield self.copy_room_tags_and_direct_to_room( - predecessor["room_id"], new_room_id, user_id - ) - # Copy over push rules - yield self.store.copy_push_rules_from_room_to_room_for_user( - predecessor["room_id"], new_room_id, user_id - ) + for user_id in user_ids: + try: + # It is an upgraded room. Copy over old tags + yield self.copy_room_tags_and_direct_to_room( + old_room_id, new_room_id, user_id + ) + # Copy over push rules + yield self.store.copy_push_rules_from_room_to_room_for_user( + old_room_id, new_room_id, user_id + ) + except Exception: + logger.exception( + "Error copying tags and/or push rules from rooms %s to %s for user %s. " + "Skipping...", + old_room_id, + new_room_id, + user_id, + ) + continue @defer.inlineCallbacks def send_membership_event(self, requester, event, context, ratelimit=True): -- cgit 1.5.1 From fe1f2b452073e5939cddd23acc6f2d226673a03f Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Fri, 1 Nov 2019 12:03:44 +0000 Subject: Remove last usages of deprecated logging.warn method (#6314) --- changelog.d/6314.misc | 1 + synapse/config/logger.py | 4 ++-- synapse/handlers/directory.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) create mode 100644 changelog.d/6314.misc (limited to 'synapse/handlers') diff --git a/changelog.d/6314.misc b/changelog.d/6314.misc new file mode 100644 index 0000000000..2369760272 --- /dev/null +++ b/changelog.d/6314.misc @@ -0,0 +1 @@ +Replace every instance of `logger.warn` method with `logger.warning` as the former is deprecated. \ No newline at end of file diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 2d2c1e54df..75bb904718 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -234,8 +234,8 @@ def setup_logging( # make sure that the first thing we log is a thing we can grep backwards # for - logging.warn("***** STARTING SERVER *****") - logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse)) + logging.warning("***** STARTING SERVER *****") + logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse)) logging.info("Server hostname: %s", config.server_name) return logger diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 526379c6f7..c4632f8984 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -250,7 +250,7 @@ class DirectoryHandler(BaseHandler): ignore_backoff=True, ) except CodeMessageException as e: - logging.warn("Error retrieving alias") + logging.warning("Error retrieving alias") if e.code == 404: result = None else: -- cgit 1.5.1 From 1cb84c6486a5131dd284f341bb657434becda255 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 1 Nov 2019 14:07:44 +0000 Subject: Support for routing outbound HTTP requests via a proxy (#6239) The `http_proxy` and `HTTPS_PROXY` env vars can be set to a `host[:port]` value which should point to a proxy. The address of the proxy should be excluded from IP blacklists such as the `url_preview_ip_range_blacklist`. The proxy will then be used for * push * url previews * phone-home stats * recaptcha validation * CAS auth validation It will *not* be used for: * Application Services * Identity servers * Outbound federation * In worker configurations, connections from workers to masters Fixes #4198. --- changelog.d/6238.feature | 1 + synapse/app/homeserver.py | 2 +- synapse/handlers/ui_auth/checkers.py | 2 +- synapse/http/client.py | 17 +- synapse/http/connectproxyclient.py | 195 ++++++++++++ synapse/http/proxyagent.py | 195 ++++++++++++ synapse/push/httppusher.py | 2 +- synapse/rest/client/v1/login.py | 2 +- synapse/rest/media/v1/preview_url_resource.py | 2 + synapse/server.py | 9 + synapse/server.pyi | 9 + tests/http/__init__.py | 17 ++ .../federation/test_matrix_federation_agent.py | 11 +- tests/http/test_proxyagent.py | 334 +++++++++++++++++++++ tests/push/test_http.py | 2 +- tests/server.py | 24 +- 16 files changed, 812 insertions(+), 12 deletions(-) create mode 100644 changelog.d/6238.feature create mode 100644 synapse/http/connectproxyclient.py create mode 100644 synapse/http/proxyagent.py create mode 100644 tests/http/test_proxyagent.py (limited to 'synapse/handlers') diff --git a/changelog.d/6238.feature b/changelog.d/6238.feature new file mode 100644 index 0000000000..d225ac33b6 --- /dev/null +++ b/changelog.d/6238.feature @@ -0,0 +1 @@ +Add support for outbound http proxying via http_proxy/HTTPS_PROXY env vars. diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 8997c1f9e7..8d28076d92 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -565,7 +565,7 @@ def run(hs): "Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats) ) try: - yield hs.get_simple_http_client().put_json( + yield hs.get_proxied_http_client().put_json( hs.config.report_stats_endpoint, stats ) except Exception as e: diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py index 29aa1e5aaf..8363d887a9 100644 --- a/synapse/handlers/ui_auth/checkers.py +++ b/synapse/handlers/ui_auth/checkers.py @@ -81,7 +81,7 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker): def __init__(self, hs): super().__init__(hs) self._enabled = bool(hs.config.recaptcha_private_key) - self._http_client = hs.get_simple_http_client() + self._http_client = hs.get_proxied_http_client() self._url = hs.config.recaptcha_siteverify_api self._secret = hs.config.recaptcha_private_key diff --git a/synapse/http/client.py b/synapse/http/client.py index 2df5b383b5..d4c285445e 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -45,6 +45,7 @@ from synapse.http import ( cancelled_to_request_timed_out_error, redact_uri, ) +from synapse.http.proxyagent import ProxyAgent from synapse.logging.context import make_deferred_yieldable from synapse.logging.opentracing import set_tag, start_active_span, tags from synapse.util.async_helpers import timeout_deferred @@ -183,7 +184,15 @@ class SimpleHttpClient(object): using HTTP in Matrix """ - def __init__(self, hs, treq_args={}, ip_whitelist=None, ip_blacklist=None): + def __init__( + self, + hs, + treq_args={}, + ip_whitelist=None, + ip_blacklist=None, + http_proxy=None, + https_proxy=None, + ): """ Args: hs (synapse.server.HomeServer) @@ -192,6 +201,8 @@ class SimpleHttpClient(object): we may not request. ip_whitelist (netaddr.IPSet): The whitelisted IP addresses, that we can request if it were otherwise caught in a blacklist. + http_proxy (bytes): proxy server to use for http connections. host[:port] + https_proxy (bytes): proxy server to use for https connections. host[:port] """ self.hs = hs @@ -236,11 +247,13 @@ class SimpleHttpClient(object): # The default context factory in Twisted 14.0.0 (which we require) is # BrowserLikePolicyForHTTPS which will do regular cert validation # 'like a browser' - self.agent = Agent( + self.agent = ProxyAgent( self.reactor, connectTimeout=15, contextFactory=self.hs.get_http_client_context_factory(), pool=pool, + http_proxy=http_proxy, + https_proxy=https_proxy, ) if self._ip_blacklist: diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py new file mode 100644 index 0000000000..be7b2ceb8e --- /dev/null +++ b/synapse/http/connectproxyclient.py @@ -0,0 +1,195 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from zope.interface import implementer + +from twisted.internet import defer, protocol +from twisted.internet.error import ConnectError +from twisted.internet.interfaces import IStreamClientEndpoint +from twisted.internet.protocol import connectionDone +from twisted.web import http + +logger = logging.getLogger(__name__) + + +class ProxyConnectError(ConnectError): + pass + + +@implementer(IStreamClientEndpoint) +class HTTPConnectProxyEndpoint(object): + """An Endpoint implementation which will send a CONNECT request to an http proxy + + Wraps an existing HostnameEndpoint for the proxy. + + When we get the connect() request from the connection pool (via the TLS wrapper), + we'll first connect to the proxy endpoint with a ProtocolFactory which will make the + CONNECT request. Once that completes, we invoke the protocolFactory which was passed + in. + + Args: + reactor: the Twisted reactor to use for the connection + proxy_endpoint (IStreamClientEndpoint): the endpoint to use to connect to the + proxy + host (bytes): hostname that we want to CONNECT to + port (int): port that we want to connect to + """ + + def __init__(self, reactor, proxy_endpoint, host, port): + self._reactor = reactor + self._proxy_endpoint = proxy_endpoint + self._host = host + self._port = port + + def __repr__(self): + return "" % (self._proxy_endpoint,) + + def connect(self, protocolFactory): + f = HTTPProxiedClientFactory(self._host, self._port, protocolFactory) + d = self._proxy_endpoint.connect(f) + # once the tcp socket connects successfully, we need to wait for the + # CONNECT to complete. + d.addCallback(lambda conn: f.on_connection) + return d + + +class HTTPProxiedClientFactory(protocol.ClientFactory): + """ClientFactory wrapper that triggers an HTTP proxy CONNECT on connect. + + Once the CONNECT completes, invokes the original ClientFactory to build the + HTTP Protocol object and run the rest of the connection. + + Args: + dst_host (bytes): hostname that we want to CONNECT to + dst_port (int): port that we want to connect to + wrapped_factory (protocol.ClientFactory): The original Factory + """ + + def __init__(self, dst_host, dst_port, wrapped_factory): + self.dst_host = dst_host + self.dst_port = dst_port + self.wrapped_factory = wrapped_factory + self.on_connection = defer.Deferred() + + def startedConnecting(self, connector): + return self.wrapped_factory.startedConnecting(connector) + + def buildProtocol(self, addr): + wrapped_protocol = self.wrapped_factory.buildProtocol(addr) + + return HTTPConnectProtocol( + self.dst_host, self.dst_port, wrapped_protocol, self.on_connection + ) + + def clientConnectionFailed(self, connector, reason): + logger.debug("Connection to proxy failed: %s", reason) + if not self.on_connection.called: + self.on_connection.errback(reason) + return self.wrapped_factory.clientConnectionFailed(connector, reason) + + def clientConnectionLost(self, connector, reason): + logger.debug("Connection to proxy lost: %s", reason) + if not self.on_connection.called: + self.on_connection.errback(reason) + return self.wrapped_factory.clientConnectionLost(connector, reason) + + +class HTTPConnectProtocol(protocol.Protocol): + """Protocol that wraps an existing Protocol to do a CONNECT handshake at connect + + Args: + host (bytes): The original HTTP(s) hostname or IPv4 or IPv6 address literal + to put in the CONNECT request + + port (int): The original HTTP(s) port to put in the CONNECT request + + wrapped_protocol (interfaces.IProtocol): the original protocol (probably + HTTPChannel or TLSMemoryBIOProtocol, but could be anything really) + + connected_deferred (Deferred): a Deferred which will be callbacked with + wrapped_protocol when the CONNECT completes + """ + + def __init__(self, host, port, wrapped_protocol, connected_deferred): + self.host = host + self.port = port + self.wrapped_protocol = wrapped_protocol + self.connected_deferred = connected_deferred + self.http_setup_client = HTTPConnectSetupClient(self.host, self.port) + self.http_setup_client.on_connected.addCallback(self.proxyConnected) + + def connectionMade(self): + self.http_setup_client.makeConnection(self.transport) + + def connectionLost(self, reason=connectionDone): + if self.wrapped_protocol.connected: + self.wrapped_protocol.connectionLost(reason) + + self.http_setup_client.connectionLost(reason) + + if not self.connected_deferred.called: + self.connected_deferred.errback(reason) + + def proxyConnected(self, _): + self.wrapped_protocol.makeConnection(self.transport) + + self.connected_deferred.callback(self.wrapped_protocol) + + # Get any pending data from the http buf and forward it to the original protocol + buf = self.http_setup_client.clearLineBuffer() + if buf: + self.wrapped_protocol.dataReceived(buf) + + def dataReceived(self, data): + # if we've set up the HTTP protocol, we can send the data there + if self.wrapped_protocol.connected: + return self.wrapped_protocol.dataReceived(data) + + # otherwise, we must still be setting up the connection: send the data to the + # setup client + return self.http_setup_client.dataReceived(data) + + +class HTTPConnectSetupClient(http.HTTPClient): + """HTTPClient protocol to send a CONNECT message for proxies and read the response. + + Args: + host (bytes): The hostname to send in the CONNECT message + port (int): The port to send in the CONNECT message + """ + + def __init__(self, host, port): + self.host = host + self.port = port + self.on_connected = defer.Deferred() + + def connectionMade(self): + logger.debug("Connected to proxy, sending CONNECT") + self.sendCommand(b"CONNECT", b"%s:%d" % (self.host, self.port)) + self.endHeaders() + + def handleStatus(self, version, status, message): + logger.debug("Got Status: %s %s %s", status, message, version) + if status != b"200": + raise ProxyConnectError("Unexpected status on CONNECT: %s" % status) + + def handleEndHeaders(self): + logger.debug("End Headers") + self.on_connected.callback(None) + + def handleResponse(self, body): + pass diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py new file mode 100644 index 0000000000..332da02a8d --- /dev/null +++ b/synapse/http/proxyagent.py @@ -0,0 +1,195 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import re + +from zope.interface import implementer + +from twisted.internet import defer +from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS +from twisted.python.failure import Failure +from twisted.web.client import URI, BrowserLikePolicyForHTTPS, _AgentBase +from twisted.web.error import SchemeNotSupported +from twisted.web.iweb import IAgent + +from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint + +logger = logging.getLogger(__name__) + +_VALID_URI = re.compile(br"\A[\x21-\x7e]+\Z") + + +@implementer(IAgent) +class ProxyAgent(_AgentBase): + """An Agent implementation which will use an HTTP proxy if one was requested + + Args: + reactor: twisted reactor to place outgoing + connections. + + contextFactory (IPolicyForHTTPS): A factory for TLS contexts, to control the + verification parameters of OpenSSL. The default is to use a + `BrowserLikePolicyForHTTPS`, so unless you have special + requirements you can leave this as-is. + + connectTimeout (float): The amount of time that this Agent will wait + for the peer to accept a connection. + + bindAddress (bytes): The local address for client sockets to bind to. + + pool (HTTPConnectionPool|None): connection pool to be used. If None, a + non-persistent pool instance will be created. + """ + + def __init__( + self, + reactor, + contextFactory=BrowserLikePolicyForHTTPS(), + connectTimeout=None, + bindAddress=None, + pool=None, + http_proxy=None, + https_proxy=None, + ): + _AgentBase.__init__(self, reactor, pool) + + self._endpoint_kwargs = {} + if connectTimeout is not None: + self._endpoint_kwargs["timeout"] = connectTimeout + if bindAddress is not None: + self._endpoint_kwargs["bindAddress"] = bindAddress + + self.http_proxy_endpoint = _http_proxy_endpoint( + http_proxy, reactor, **self._endpoint_kwargs + ) + + self.https_proxy_endpoint = _http_proxy_endpoint( + https_proxy, reactor, **self._endpoint_kwargs + ) + + self._policy_for_https = contextFactory + self._reactor = reactor + + def request(self, method, uri, headers=None, bodyProducer=None): + """ + Issue a request to the server indicated by the given uri. + + Supports `http` and `https` schemes. + + An existing connection from the connection pool may be used or a new one may be + created. + + See also: twisted.web.iweb.IAgent.request + + Args: + method (bytes): The request method to use, such as `GET`, `POST`, etc + + uri (bytes): The location of the resource to request. + + headers (Headers|None): Extra headers to send with the request + + bodyProducer (IBodyProducer|None): An object which can generate bytes to + make up the body of this request (for example, the properly encoded + contents of a file for a file upload). Or, None if the request is to + have no body. + + Returns: + Deferred[IResponse]: completes when the header of the response has + been received (regardless of the response status code). + """ + uri = uri.strip() + if not _VALID_URI.match(uri): + raise ValueError("Invalid URI {!r}".format(uri)) + + parsed_uri = URI.fromBytes(uri) + pool_key = (parsed_uri.scheme, parsed_uri.host, parsed_uri.port) + request_path = parsed_uri.originForm + + if parsed_uri.scheme == b"http" and self.http_proxy_endpoint: + # Cache *all* connections under the same key, since we are only + # connecting to a single destination, the proxy: + pool_key = ("http-proxy", self.http_proxy_endpoint) + endpoint = self.http_proxy_endpoint + request_path = uri + elif parsed_uri.scheme == b"https" and self.https_proxy_endpoint: + endpoint = HTTPConnectProxyEndpoint( + self._reactor, + self.https_proxy_endpoint, + parsed_uri.host, + parsed_uri.port, + ) + else: + # not using a proxy + endpoint = HostnameEndpoint( + self._reactor, parsed_uri.host, parsed_uri.port, **self._endpoint_kwargs + ) + + logger.debug("Requesting %s via %s", uri, endpoint) + + if parsed_uri.scheme == b"https": + tls_connection_creator = self._policy_for_https.creatorForNetloc( + parsed_uri.host, parsed_uri.port + ) + endpoint = wrapClientTLS(tls_connection_creator, endpoint) + elif parsed_uri.scheme == b"http": + pass + else: + return defer.fail( + Failure( + SchemeNotSupported("Unsupported scheme: %r" % (parsed_uri.scheme,)) + ) + ) + + return self._requestWithEndpoint( + pool_key, endpoint, method, parsed_uri, headers, bodyProducer, request_path + ) + + +def _http_proxy_endpoint(proxy, reactor, **kwargs): + """Parses an http proxy setting and returns an endpoint for the proxy + + Args: + proxy (bytes|None): the proxy setting + reactor: reactor to be used to connect to the proxy + kwargs: other args to be passed to HostnameEndpoint + + Returns: + interfaces.IStreamClientEndpoint|None: endpoint to use to connect to the proxy, + or None + """ + if proxy is None: + return None + + # currently we only support hostname:port. Some apps also support + # protocol://[:port], which allows a way of requiring a TLS connection to the + # proxy. + + host, port = parse_host_port(proxy, default_port=1080) + return HostnameEndpoint(reactor, host, port, **kwargs) + + +def parse_host_port(hostport, default_port=None): + # could have sworn we had one of these somewhere else... + if b":" in hostport: + host, port = hostport.rsplit(b":", 1) + try: + port = int(port) + return host, port + except ValueError: + # the thing after the : wasn't a valid port; presumably this is an + # IPv6 address. + pass + + return hostport, default_port diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 7dde2ad055..e994037be6 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -103,7 +103,7 @@ class HttpPusher(object): if "url" not in self.data: raise PusherConfigException("'url' required in data for HTTP pusher") self.url = self.data["url"] - self.http_client = hs.get_simple_http_client() + self.http_client = hs.get_proxied_http_client() self.data_minus_url = {} self.data_minus_url.update(self.data) del self.data_minus_url["url"] diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 00a7dd6d09..24a0ce74f2 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -381,7 +381,7 @@ class CasTicketServlet(RestServlet): self.cas_displayname_attribute = hs.config.cas_displayname_attribute self.cas_required_attributes = hs.config.cas_required_attributes self._sso_auth_handler = SSOAuthHandler(hs) - self._http_client = hs.get_simple_http_client() + self._http_client = hs.get_proxied_http_client() @defer.inlineCallbacks def on_GET(self, request): diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 5a25b6b3fc..531d923f76 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -74,6 +74,8 @@ class PreviewUrlResource(DirectServeResource): treq_args={"browser_like_redirects": True}, ip_whitelist=hs.config.url_preview_ip_range_whitelist, ip_blacklist=hs.config.url_preview_ip_range_blacklist, + http_proxy=os.getenv("http_proxy"), + https_proxy=os.getenv("HTTPS_PROXY"), ) self.media_repo = media_repo self.primary_base_path = media_repo.primary_base_path diff --git a/synapse/server.py b/synapse/server.py index 0b81af646c..f8aeebcff8 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -23,6 +23,7 @@ # Imports required for the default HomeServer() implementation import abc import logging +import os from twisted.enterprise import adbapi from twisted.mail.smtp import sendmail @@ -168,6 +169,7 @@ class HomeServer(object): "filtering", "http_client_context_factory", "simple_http_client", + "proxied_http_client", "media_repository", "media_repository_resource", "federation_transport_client", @@ -311,6 +313,13 @@ class HomeServer(object): def build_simple_http_client(self): return SimpleHttpClient(self) + def build_proxied_http_client(self): + return SimpleHttpClient( + self, + http_proxy=os.getenv("http_proxy"), + https_proxy=os.getenv("HTTPS_PROXY"), + ) + def build_room_creation_handler(self): return RoomCreationHandler(self) diff --git a/synapse/server.pyi b/synapse/server.pyi index 83d1f11283..b5e0b57095 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -12,6 +12,7 @@ import synapse.handlers.message import synapse.handlers.room import synapse.handlers.room_member import synapse.handlers.set_password +import synapse.http.client import synapse.rest.media.v1.media_repository import synapse.server_notices.server_notices_manager import synapse.server_notices.server_notices_sender @@ -38,6 +39,14 @@ class HomeServer(object): pass def get_state_resolution_handler(self) -> synapse.state.StateResolutionHandler: pass + def get_simple_http_client(self) -> synapse.http.client.SimpleHttpClient: + """Fetch an HTTP client implementation which doesn't do any blacklisting + or support any HTTP_PROXY settings""" + pass + def get_proxied_http_client(self) -> synapse.http.client.SimpleHttpClient: + """Fetch an HTTP client implementation which doesn't do any blacklisting + but does support HTTP_PROXY settings""" + pass def get_deactivate_account_handler( self, ) -> synapse.handlers.deactivate_account.DeactivateAccountHandler: diff --git a/tests/http/__init__.py b/tests/http/__init__.py index 2d5dba6464..2096ba3c91 100644 --- a/tests/http/__init__.py +++ b/tests/http/__init__.py @@ -20,6 +20,23 @@ from zope.interface import implementer from OpenSSL import SSL from OpenSSL.SSL import Connection from twisted.internet.interfaces import IOpenSSLServerConnectionCreator +from twisted.internet.ssl import Certificate, trustRootFromCertificates +from twisted.web.client import BrowserLikePolicyForHTTPS # noqa: F401 +from twisted.web.iweb import IPolicyForHTTPS # noqa: F401 + + +def get_test_https_policy(): + """Get a test IPolicyForHTTPS which trusts the test CA cert + + Returns: + IPolicyForHTTPS + """ + ca_file = get_test_ca_cert_file() + with open(ca_file) as stream: + content = stream.read() + cert = Certificate.loadPEM(content) + trust_root = trustRootFromCertificates([cert]) + return BrowserLikePolicyForHTTPS(trustRoot=trust_root) def get_test_ca_cert_file(): diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index 71d7025264..cfcd98ff7d 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -124,19 +124,24 @@ class MatrixFederationAgentTests(unittest.TestCase): FakeTransport(client_protocol, self.reactor, server_tls_protocol) ) + # grab a hold of the TLS connection, in case it gets torn down + server_tls_connection = server_tls_protocol._tlsConnection + + # fish the test server back out of the server-side TLS protocol. + http_protocol = server_tls_protocol.wrappedProtocol + # give the reactor a pump to get the TLS juices flowing. self.reactor.pump((0.1,)) # check the SNI - server_name = server_tls_protocol._tlsConnection.get_servername() + server_name = server_tls_connection.get_servername() self.assertEqual( server_name, expected_sni, "Expected SNI %s but got %s" % (expected_sni, server_name), ) - # fish the test server back out of the server-side TLS protocol. - return server_tls_protocol.wrappedProtocol + return http_protocol @defer.inlineCallbacks def _make_get_request(self, uri): diff --git a/tests/http/test_proxyagent.py b/tests/http/test_proxyagent.py new file mode 100644 index 0000000000..22abf76515 --- /dev/null +++ b/tests/http/test_proxyagent.py @@ -0,0 +1,334 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +import treq + +from twisted.internet import interfaces # noqa: F401 +from twisted.internet.protocol import Factory +from twisted.protocols.tls import TLSMemoryBIOFactory +from twisted.web.http import HTTPChannel + +from synapse.http.proxyagent import ProxyAgent + +from tests.http import TestServerTLSConnectionFactory, get_test_https_policy +from tests.server import FakeTransport, ThreadedMemoryReactorClock +from tests.unittest import TestCase + +logger = logging.getLogger(__name__) + +HTTPFactory = Factory.forProtocol(HTTPChannel) + + +class MatrixFederationAgentTests(TestCase): + def setUp(self): + self.reactor = ThreadedMemoryReactorClock() + + def _make_connection( + self, client_factory, server_factory, ssl=False, expected_sni=None + ): + """Builds a test server, and completes the outgoing client connection + + Args: + client_factory (interfaces.IProtocolFactory): the the factory that the + application is trying to use to make the outbound connection. We will + invoke it to build the client Protocol + + server_factory (interfaces.IProtocolFactory): a factory to build the + server-side protocol + + ssl (bool): If true, we will expect an ssl connection and wrap + server_factory with a TLSMemoryBIOFactory + + expected_sni (bytes|None): the expected SNI value + + Returns: + IProtocol: the server Protocol returned by server_factory + """ + if ssl: + server_factory = _wrap_server_factory_for_tls(server_factory) + + server_protocol = server_factory.buildProtocol(None) + + # now, tell the client protocol factory to build the client protocol, + # and wire the output of said protocol up to the server via + # a FakeTransport. + # + # Normally this would be done by the TCP socket code in Twisted, but we are + # stubbing that out here. + client_protocol = client_factory.buildProtocol(None) + client_protocol.makeConnection( + FakeTransport(server_protocol, self.reactor, client_protocol) + ) + + # tell the server protocol to send its stuff back to the client, too + server_protocol.makeConnection( + FakeTransport(client_protocol, self.reactor, server_protocol) + ) + + if ssl: + http_protocol = server_protocol.wrappedProtocol + tls_connection = server_protocol._tlsConnection + else: + http_protocol = server_protocol + tls_connection = None + + # give the reactor a pump to get the TLS juices flowing (if needed) + self.reactor.advance(0) + + if expected_sni is not None: + server_name = tls_connection.get_servername() + self.assertEqual( + server_name, + expected_sni, + "Expected SNI %s but got %s" % (expected_sni, server_name), + ) + + return http_protocol + + def test_http_request(self): + agent = ProxyAgent(self.reactor) + + self.reactor.lookups["test.com"] = "1.2.3.4" + d = agent.request(b"GET", b"http://test.com") + + # there should be a pending TCP connection + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 80) + + # make a test server, and wire up the client + http_server = self._make_connection( + client_factory, _get_test_protocol_factory() + ) + + # the FakeTransport is async, so we need to pump the reactor + self.reactor.advance(0) + + # now there should be a pending request + self.assertEqual(len(http_server.requests), 1) + + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"/") + self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"test.com"]) + request.write(b"result") + request.finish() + + self.reactor.advance(0) + + resp = self.successResultOf(d) + body = self.successResultOf(treq.content(resp)) + self.assertEqual(body, b"result") + + def test_https_request(self): + agent = ProxyAgent(self.reactor, contextFactory=get_test_https_policy()) + + self.reactor.lookups["test.com"] = "1.2.3.4" + d = agent.request(b"GET", b"https://test.com/abc") + + # there should be a pending TCP connection + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 443) + + # make a test server, and wire up the client + http_server = self._make_connection( + client_factory, + _get_test_protocol_factory(), + ssl=True, + expected_sni=b"test.com", + ) + + # the FakeTransport is async, so we need to pump the reactor + self.reactor.advance(0) + + # now there should be a pending request + self.assertEqual(len(http_server.requests), 1) + + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"/abc") + self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"test.com"]) + request.write(b"result") + request.finish() + + self.reactor.advance(0) + + resp = self.successResultOf(d) + body = self.successResultOf(treq.content(resp)) + self.assertEqual(body, b"result") + + def test_http_request_via_proxy(self): + agent = ProxyAgent(self.reactor, http_proxy=b"proxy.com:8888") + + self.reactor.lookups["proxy.com"] = "1.2.3.5" + d = agent.request(b"GET", b"http://test.com") + + # there should be a pending TCP connection + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.5") + self.assertEqual(port, 8888) + + # make a test server, and wire up the client + http_server = self._make_connection( + client_factory, _get_test_protocol_factory() + ) + + # the FakeTransport is async, so we need to pump the reactor + self.reactor.advance(0) + + # now there should be a pending request + self.assertEqual(len(http_server.requests), 1) + + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"http://test.com") + self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"test.com"]) + request.write(b"result") + request.finish() + + self.reactor.advance(0) + + resp = self.successResultOf(d) + body = self.successResultOf(treq.content(resp)) + self.assertEqual(body, b"result") + + def test_https_request_via_proxy(self): + agent = ProxyAgent( + self.reactor, + contextFactory=get_test_https_policy(), + https_proxy=b"proxy.com", + ) + + self.reactor.lookups["proxy.com"] = "1.2.3.5" + d = agent.request(b"GET", b"https://test.com/abc") + + # there should be a pending TCP connection + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.5") + self.assertEqual(port, 1080) + + # make a test HTTP server, and wire up the client + proxy_server = self._make_connection( + client_factory, _get_test_protocol_factory() + ) + + # fish the transports back out so that we can do the old switcheroo + s2c_transport = proxy_server.transport + client_protocol = s2c_transport.other + c2s_transport = client_protocol.transport + + # the FakeTransport is async, so we need to pump the reactor + self.reactor.advance(0) + + # now there should be a pending CONNECT request + self.assertEqual(len(proxy_server.requests), 1) + + request = proxy_server.requests[0] + self.assertEqual(request.method, b"CONNECT") + self.assertEqual(request.path, b"test.com:443") + + # tell the proxy server not to close the connection + proxy_server.persistent = True + + # this just stops the http Request trying to do a chunked response + # request.setHeader(b"Content-Length", b"0") + request.finish() + + # now we can replace the proxy channel with a new, SSL-wrapped HTTP channel + ssl_factory = _wrap_server_factory_for_tls(_get_test_protocol_factory()) + ssl_protocol = ssl_factory.buildProtocol(None) + http_server = ssl_protocol.wrappedProtocol + + ssl_protocol.makeConnection( + FakeTransport(client_protocol, self.reactor, ssl_protocol) + ) + c2s_transport.other = ssl_protocol + + self.reactor.advance(0) + + server_name = ssl_protocol._tlsConnection.get_servername() + expected_sni = b"test.com" + self.assertEqual( + server_name, + expected_sni, + "Expected SNI %s but got %s" % (expected_sni, server_name), + ) + + # now there should be a pending request + self.assertEqual(len(http_server.requests), 1) + + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"/abc") + self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"test.com"]) + request.write(b"result") + request.finish() + + self.reactor.advance(0) + + resp = self.successResultOf(d) + body = self.successResultOf(treq.content(resp)) + self.assertEqual(body, b"result") + + +def _wrap_server_factory_for_tls(factory, sanlist=None): + """Wrap an existing Protocol Factory with a test TLSMemoryBIOFactory + + The resultant factory will create a TLS server which presents a certificate + signed by our test CA, valid for the domains in `sanlist` + + Args: + factory (interfaces.IProtocolFactory): protocol factory to wrap + sanlist (iterable[bytes]): list of domains the cert should be valid for + + Returns: + interfaces.IProtocolFactory + """ + if sanlist is None: + sanlist = [b"DNS:test.com"] + + connection_creator = TestServerTLSConnectionFactory(sanlist=sanlist) + return TLSMemoryBIOFactory( + connection_creator, isClient=False, wrappedFactory=factory + ) + + +def _get_test_protocol_factory(): + """Get a protocol Factory which will build an HTTPChannel + + Returns: + interfaces.IProtocolFactory + """ + server_factory = Factory.forProtocol(HTTPChannel) + + # Request.finish expects the factory to have a 'log' method. + server_factory.log = _log_request + + return server_factory + + +def _log_request(request): + """Implements Factory.log, which is expected by Request.finish""" + logger.info("Completed request %s", request) diff --git a/tests/push/test_http.py b/tests/push/test_http.py index 8ce6bb62da..af2327fb66 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -50,7 +50,7 @@ class HTTPPusherTests(HomeserverTestCase): config = self.default_config() config["start_pushers"] = True - hs = self.setup_test_homeserver(config=config, simple_http_client=m) + hs = self.setup_test_homeserver(config=config, proxied_http_client=m) return hs diff --git a/tests/server.py b/tests/server.py index 469efb4edb..f878aeaada 100644 --- a/tests/server.py +++ b/tests/server.py @@ -395,11 +395,24 @@ class FakeTransport(object): self.disconnecting = True if self._protocol: self._protocol.connectionLost(reason) - self.disconnected = True + + # if we still have data to write, delay until that is done + if self.buffer: + logger.info( + "FakeTransport: Delaying disconnect until buffer is flushed" + ) + else: + self.disconnected = True def abortConnection(self): logger.info("FakeTransport: abortConnection()") - self.loseConnection() + + if not self.disconnecting: + self.disconnecting = True + if self._protocol: + self._protocol.connectionLost(None) + + self.disconnected = True def pauseProducing(self): if not self.producer: @@ -430,6 +443,9 @@ class FakeTransport(object): self._reactor.callLater(0.0, _produce) def write(self, byt): + if self.disconnecting: + raise Exception("Writing to disconnecting FakeTransport") + self.buffer = self.buffer + byt # always actually do the write asynchronously. Some protocols (notably the @@ -474,6 +490,10 @@ class FakeTransport(object): if self.buffer and self.autoflush: self._reactor.callLater(0.0, self.flush) + if not self.buffer and self.disconnecting: + logger.info("FakeTransport: Buffer now empty, completing disconnect") + self.disconnected = True + def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol: """ -- cgit 1.5.1 From c6516adbe03a0acdd614ba6eb9d6f447dd4259e9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 1 Nov 2019 16:19:09 +0000 Subject: Factor out an _AsyncEventContextImpl (#6298) The intention here is to make it clearer which fields we can expect to be populated when: notably, that the _event_type etc aren't used for the synchronous impl of EventContext. --- changelog.d/6298.misc | 1 + synapse/events/snapshot.py | 107 ++++++++++++++++------------------------- synapse/handlers/federation.py | 38 +++++++-------- tests/test_federation.py | 4 +- 4 files changed, 65 insertions(+), 85 deletions(-) create mode 100644 changelog.d/6298.misc (limited to 'synapse/handlers') diff --git a/changelog.d/6298.misc b/changelog.d/6298.misc new file mode 100644 index 0000000000..d4190730b2 --- /dev/null +++ b/changelog.d/6298.misc @@ -0,0 +1 @@ +Refactor EventContext for clarity. \ No newline at end of file diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 27cd8a63ff..a269de5482 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -37,9 +37,6 @@ class EventContext: delta_ids (dict[(str, str), str]): Delta from ``prev_group``. (type, state_key) -> event_id. ``None`` for an outlier. - prev_state_events (?): XXX: is this ever set to anything other than - the empty list? - app_service: FIXME _current_state_ids (dict[(str, str), str]|None): @@ -51,36 +48,16 @@ class EventContext: The current state map excluding the current event. None if outlier or we haven't fetched the state from DB yet. (type, state_key) -> event_id - - _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have - been calculated. None if we haven't started calculating yet - - _event_type (str): The type of the event the context is associated with. - Only set when state has not been fetched yet. - - _event_state_key (str|None): The state_key of the event the context is - associated with. Only set when state has not been fetched yet. - - _prev_state_id (str|None): If the event associated with the context is - a state event, then `_prev_state_id` is the event_id of the state - that was replaced. - Only set when state has not been fetched yet. """ state_group = attr.ib(default=None) rejected = attr.ib(default=False) prev_group = attr.ib(default=None) delta_ids = attr.ib(default=None) - prev_state_events = attr.ib(default=attr.Factory(list)) app_service = attr.ib(default=None) - _current_state_ids = attr.ib(default=None) _prev_state_ids = attr.ib(default=None) - _prev_state_id = attr.ib(default=None) - - _event_type = attr.ib(default=None) - _event_state_key = attr.ib(default=None) - _fetching_state_deferred = attr.ib(default=None) + _current_state_ids = attr.ib(default=None) @staticmethod def with_state( @@ -90,7 +67,6 @@ class EventContext: current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, state_group=state_group, - fetching_state_deferred=defer.succeed(None), prev_group=prev_group, delta_ids=delta_ids, ) @@ -125,7 +101,6 @@ class EventContext: "rejected": self.rejected, "prev_group": self.prev_group, "delta_ids": _encode_state_dict(self.delta_ids), - "prev_state_events": self.prev_state_events, "app_service_id": self.app_service.id if self.app_service else None, } @@ -141,7 +116,7 @@ class EventContext: Returns: EventContext """ - context = EventContext( + context = _AsyncEventContextImpl( # We use the state_group and prev_state_id stuff to pull the # current_state_ids out of the DB and construct prev_state_ids. prev_state_id=input["prev_state_id"], @@ -151,7 +126,6 @@ class EventContext: prev_group=input["prev_group"], delta_ids=_decode_state_dict(input["delta_ids"]), rejected=input["rejected"], - prev_state_events=input["prev_state_events"], ) app_service_id = input["app_service_id"] @@ -170,14 +144,7 @@ class EventContext: Maps a (type, state_key) to the event ID of the state event matching this tuple. """ - - if not self._fetching_state_deferred: - self._fetching_state_deferred = run_in_background( - self._fill_out_state, store - ) - - yield make_deferred_yieldable(self._fetching_state_deferred) - + yield self._ensure_fetched(store) return self._current_state_ids @defer.inlineCallbacks @@ -190,14 +157,7 @@ class EventContext: Maps a (type, state_key) to the event ID of the state event matching this tuple. """ - - if not self._fetching_state_deferred: - self._fetching_state_deferred = run_in_background( - self._fill_out_state, store - ) - - yield make_deferred_yieldable(self._fetching_state_deferred) - + yield self._ensure_fetched(store) return self._prev_state_ids def get_cached_current_state_ids(self): @@ -211,6 +171,44 @@ class EventContext: return self._current_state_ids + def _ensure_fetched(self, store): + return defer.succeed(None) + + +@attr.s(slots=True) +class _AsyncEventContextImpl(EventContext): + """ + An implementation of EventContext which fetches _current_state_ids and + _prev_state_ids from the database on demand. + + Attributes: + + _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have + been calculated. None if we haven't started calculating yet + + _event_type (str): The type of the event the context is associated with. + + _event_state_key (str): The state_key of the event the context is + associated with. + + _prev_state_id (str|None): If the event associated with the context is + a state event, then `_prev_state_id` is the event_id of the state + that was replaced. + """ + + _prev_state_id = attr.ib(default=None) + _event_type = attr.ib(default=None) + _event_state_key = attr.ib(default=None) + _fetching_state_deferred = attr.ib(default=None) + + def _ensure_fetched(self, store): + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store + ) + + return make_deferred_yieldable(self._fetching_state_deferred) + @defer.inlineCallbacks def _fill_out_state(self, store): """Called to populate the _current_state_ids and _prev_state_ids @@ -228,27 +226,6 @@ class EventContext: else: self._prev_state_ids = self._current_state_ids - @defer.inlineCallbacks - def update_state( - self, state_group, prev_state_ids, current_state_ids, prev_group, delta_ids - ): - """Replace the state in the context - """ - - # We need to make sure we wait for any ongoing fetching of state - # to complete so that the updated state doesn't get clobbered - if self._fetching_state_deferred: - yield make_deferred_yieldable(self._fetching_state_deferred) - - self.state_group = state_group - self._prev_state_ids = prev_state_ids - self.prev_group = prev_group - self._current_state_ids = current_state_ids - self.delta_ids = delta_ids - - # We need to ensure that that we've marked as having fetched the state - self._fetching_state_deferred = defer.succeed(None) - def _encode_state_dict(state_dict): """Since dicts of (type, state_key) -> event_id cannot be serialized in diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index dab6be9573..8cafcfdab0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -45,6 +45,7 @@ from synapse.api.errors import ( from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.crypto.event_signing import compute_event_signature from synapse.event_auth import auth_types_for_event +from synapse.events.snapshot import EventContext from synapse.events.validator import EventValidator from synapse.logging.context import ( make_deferred_yieldable, @@ -1871,14 +1872,7 @@ class FederationHandler(BaseHandler): if c and c.type == EventTypes.Create: auth_events[(c.type, c.state_key)] = c - try: - yield self.do_auth(origin, event, context, auth_events=auth_events) - except AuthError as e: - logger.warning( - "[%s %s] Rejecting: %s", event.room_id, event.event_id, e.msg - ) - - context.rejected = RejectedReason.AUTH_ERROR + context = yield self.do_auth(origin, event, context, auth_events=auth_events) if not context.rejected: yield self._check_for_soft_fail(event, state, backfilled) @@ -2047,12 +2041,12 @@ class FederationHandler(BaseHandler): Also NB that this function adds entries to it. Returns: - defer.Deferred[None] + defer.Deferred[EventContext]: updated context object """ room_version = yield self.store.get_room_version(event.room_id) try: - yield self._update_auth_events_and_context_for_auth( + context = yield self._update_auth_events_and_context_for_auth( origin, event, context, auth_events ) except Exception: @@ -2070,7 +2064,9 @@ class FederationHandler(BaseHandler): event_auth.check(room_version, event, auth_events=auth_events) except AuthError as e: logger.warning("Failed auth resolution for %r because %s", event, e) - raise e + context.rejected = RejectedReason.AUTH_ERROR + + return context @defer.inlineCallbacks def _update_auth_events_and_context_for_auth( @@ -2094,7 +2090,7 @@ class FederationHandler(BaseHandler): auth_events (dict[(str, str)->synapse.events.EventBase]): Returns: - defer.Deferred[None] + defer.Deferred[EventContext]: updated context """ event_auth_events = set(event.auth_event_ids()) @@ -2133,7 +2129,7 @@ class FederationHandler(BaseHandler): # The other side isn't around or doesn't implement the # endpoint, so lets just bail out. logger.info("Failed to get event auth from remote: %s", e) - return + return context seen_remotes = yield self.store.have_seen_events( [e.event_id for e in remote_auth_chain] @@ -2174,7 +2170,7 @@ class FederationHandler(BaseHandler): if event.internal_metadata.is_outlier(): logger.info("Skipping auth_event fetch for outlier") - return + return context # FIXME: Assumes we have and stored all the state for all the # prev_events @@ -2183,7 +2179,7 @@ class FederationHandler(BaseHandler): ) if not different_auth: - return + return context logger.info( "auth_events refers to events which are not in our calculated auth " @@ -2230,10 +2226,12 @@ class FederationHandler(BaseHandler): auth_events.update(new_state) - yield self._update_context_for_auth_events( + context = yield self._update_context_for_auth_events( event, context, auth_events, event_key ) + return context + @defer.inlineCallbacks def _update_context_for_auth_events(self, event, context, auth_events, event_key): """Update the state_ids in an event context after auth event resolution, @@ -2242,14 +2240,16 @@ class FederationHandler(BaseHandler): Args: event (Event): The event we're handling the context for - context (synapse.events.snapshot.EventContext): event context - to be updated + context (synapse.events.snapshot.EventContext): initial event context auth_events (dict[(str, str)->str]): Events to update in the event context. event_key ((str, str)): (type, state_key) for the current event. this will not be included in the current_state in the context. + + Returns: + Deferred[EventContext]: new event context """ state_updates = { k: a.event_id for k, a in iteritems(auth_events) if k != event_key @@ -2274,7 +2274,7 @@ class FederationHandler(BaseHandler): current_state_ids=current_state_ids, ) - yield context.update_state( + return EventContext.with_state( state_group=state_group, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, diff --git a/tests/test_federation.py b/tests/test_federation.py index d1acb16f30..7d82b58466 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -59,7 +59,9 @@ class MessageAcceptTests(unittest.TestCase): ) self.handler = self.homeserver.get_handlers().federation_handler - self.handler.do_auth = lambda *a, **b: succeed(True) + self.handler.do_auth = lambda origin, event, context, auth_events: succeed( + context + ) self.client = self.homeserver.get_federation_client() self.client._check_sigs_and_hash_and_fetch = lambda dest, pdus, **k: succeed( pdus -- cgit 1.5.1