diff options
Diffstat (limited to '')
-rw-r--r-- | CHANGES.md | 3 | ||||
-rw-r--r-- | changelog.d/11696.misc | 1 | ||||
-rw-r--r-- | changelog.d/11714.misc | 1 | ||||
-rw-r--r-- | changelog.d/11725.doc | 1 | ||||
-rw-r--r-- | changelog.d/11729.bugfix | 1 | ||||
-rw-r--r-- | changelog.d/11730.bugfix | 1 | ||||
-rw-r--r-- | docs/postgres.md | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/devices.py | 102 | ||||
-rw-r--r-- | tests/crypto/test_event_signing.py | 8 | ||||
-rw-r--r-- | tests/storage/test_devices.py | 160 |
10 files changed, 258 insertions, 22 deletions
diff --git a/CHANGES.md b/CHANGES.md index 77a56dd481..f91109f885 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -42,6 +42,7 @@ Deprecations and Removals ------------------------- - Replace `mock` package by its standard library version. ([\#11588](https://github.com/matrix-org/synapse/issues/11588)) +- Drop support for Python 3.6 and Ubuntu 18.04. ([\#11633](https://github.com/matrix-org/synapse/issues/11633)) Internal Changes @@ -77,13 +78,13 @@ Internal Changes - Improve OpenTracing support for requests which use a `ResponseCache`. ([\#11607](https://github.com/matrix-org/synapse/issues/11607)) - Improve OpenTracing support for incoming HTTP requests. ([\#11618](https://github.com/matrix-org/synapse/issues/11618)) - A number of improvements to opentracing support. ([\#11619](https://github.com/matrix-org/synapse/issues/11619)) -- Drop support for Python 3.6 and Ubuntu 18.04. ([\#11633](https://github.com/matrix-org/synapse/issues/11633)) - Refactor the way that the `outlier` flag is set on events received over federation. ([\#11634](https://github.com/matrix-org/synapse/issues/11634)) - Improve the error messages from `get_create_event_for_room`. ([\#11638](https://github.com/matrix-org/synapse/issues/11638)) - Remove redundant `get_current_events_token` method. ([\#11643](https://github.com/matrix-org/synapse/issues/11643)) - Convert `namedtuples` to `attrs`. ([\#11665](https://github.com/matrix-org/synapse/issues/11665), [\#11574](https://github.com/matrix-org/synapse/issues/11574)) - Update the `/capabilities` response to include whether support for [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440) is available. ([\#11690](https://github.com/matrix-org/synapse/issues/11690)) - Send the `Accept` header in HTTP requests made using `SimpleHttpClient.get_json`. ([\#11677](https://github.com/matrix-org/synapse/issues/11677)) +- Work around Mjolnir compatibility issue by adding an import for `glob_to_regex` in `synapse.util`, where it moved from. ([\#11696](https://github.com/matrix-org/synapse/issues/11696)) Synapse 1.49.2 (2021-12-21) diff --git a/changelog.d/11696.misc b/changelog.d/11696.misc deleted file mode 100644 index e8f39dde18..0000000000 --- a/changelog.d/11696.misc +++ /dev/null @@ -1 +0,0 @@ -Work around Mjolnir compatibility issue by adding an import for `glob_to_regex` in `synapse.util`, where it moved from. \ No newline at end of file diff --git a/changelog.d/11714.misc b/changelog.d/11714.misc new file mode 100644 index 0000000000..7f39bf0e3d --- /dev/null +++ b/changelog.d/11714.misc @@ -0,0 +1 @@ +Fix a typechecker problem related to our (ab)use of `nacl.signing.SigningKey`s. \ No newline at end of file diff --git a/changelog.d/11725.doc b/changelog.d/11725.doc new file mode 100644 index 0000000000..46eb9b814f --- /dev/null +++ b/changelog.d/11725.doc @@ -0,0 +1 @@ +Document that now the minimum supported PostgreSQL version is 10. diff --git a/changelog.d/11729.bugfix b/changelog.d/11729.bugfix new file mode 100644 index 0000000000..8438ce5686 --- /dev/null +++ b/changelog.d/11729.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse v1.0.0 whereby some device list updates would not be sent to remote homeservers if there were too many to send at once. \ No newline at end of file diff --git a/changelog.d/11730.bugfix b/changelog.d/11730.bugfix new file mode 100644 index 0000000000..a0bd7dd1a3 --- /dev/null +++ b/changelog.d/11730.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse v1.50.0rc1 whereby outbound federation could fail because too many EDUs were produced for device updates. \ No newline at end of file diff --git a/docs/postgres.md b/docs/postgres.md index e4861c1f12..0562021da5 100644 --- a/docs/postgres.md +++ b/docs/postgres.md @@ -1,6 +1,6 @@ # Using Postgres -Synapse supports PostgreSQL versions 9.6 or later. +Synapse supports PostgreSQL versions 10 or later. ## Install postgres client libraries diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 273adb61fd..bc7e876047 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -191,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore): @trace async def get_device_updates_by_remote( self, destination: str, from_stream_id: int, limit: int - ) -> Tuple[int, List[Tuple[str, dict]]]: + ) -> Tuple[int, List[Tuple[str, JsonDict]]]: """Get a stream of device updates to send to the given remote server. Args: @@ -200,9 +200,10 @@ class DeviceWorkerStore(SQLBaseStore): limit: Maximum number of device updates to return Returns: - A mapping from the current stream id (ie, the stream id of the last - update included in the response), and the list of updates, where - each update is a pair of EDU type and EDU contents. + - The current stream id (i.e. the stream id of the last update included + in the response); and + - The list of updates, where each update is a pair of EDU type and + EDU contents. """ now_stream_id = self.get_device_stream_token() @@ -221,6 +222,9 @@ class DeviceWorkerStore(SQLBaseStore): limit, ) + # We need to ensure `updates` doesn't grow too big. + # Currently: `len(updates) <= limit`. + # Return an empty list if there are no updates if not updates: return now_stream_id, [] @@ -270,19 +274,50 @@ class DeviceWorkerStore(SQLBaseStore): # The most recent request's opentracing_context is used as the # context which created the Edu. + # This is the stream ID that we will return for the consumer to resume + # following this stream later. + last_processed_stream_id = from_stream_id + query_map = {} cross_signing_keys_by_user = {} for user_id, device_id, update_stream_id, update_context in updates: - if ( + # Calculate the remaining length budget. + # Note that, for now, each entry in `cross_signing_keys_by_user` + # gives rise to two device updates in the result, so those cost twice + # as much (and are the whole reason we need to separately calculate + # the budget; we know len(updates) <= limit otherwise!) + # N.B. len() on dicts is cheap since they store their size. + remaining_length_budget = limit - ( + len(query_map) + 2 * len(cross_signing_keys_by_user) + ) + assert remaining_length_budget >= 0 + + is_master_key_update = ( user_id in master_key_by_user and device_id == master_key_by_user[user_id]["device_id"] - ): - result = cross_signing_keys_by_user.setdefault(user_id, {}) - result["master_key"] = master_key_by_user[user_id]["key_info"] - elif ( + ) + is_self_signing_key_update = ( user_id in self_signing_key_by_user and device_id == self_signing_key_by_user[user_id]["device_id"] + ) + + is_cross_signing_key_update = ( + is_master_key_update or is_self_signing_key_update + ) + + if ( + is_cross_signing_key_update + and user_id not in cross_signing_keys_by_user ): + # This will give rise to 2 device updates. + # If we don't have the budget, stop here! + if remaining_length_budget < 2: + break + + if is_master_key_update: + result = cross_signing_keys_by_user.setdefault(user_id, {}) + result["master_key"] = master_key_by_user[user_id]["key_info"] + elif is_self_signing_key_update: result = cross_signing_keys_by_user.setdefault(user_id, {}) result["self_signing_key"] = self_signing_key_by_user[user_id][ "key_info" @@ -290,24 +325,47 @@ class DeviceWorkerStore(SQLBaseStore): else: key = (user_id, device_id) + if key not in query_map and remaining_length_budget < 1: + # We don't have space for a new entry + break + previous_update_stream_id, _ = query_map.get(key, (0, None)) if update_stream_id > previous_update_stream_id: + # FIXME If this overwrites an older update, this discards the + # previous OpenTracing context. + # It might make it harder to track down issues using OpenTracing. + # If there's a good reason why it doesn't matter, a comment here + # about that would not hurt. query_map[key] = (update_stream_id, update_context) + # As this update has been added to the response, advance the stream + # position. + last_processed_stream_id = update_stream_id + + # In the worst case scenario, each update is for a distinct user and is + # added either to the query_map or to cross_signing_keys_by_user, + # but not both: + # len(query_map) + len(cross_signing_keys_by_user) <= len(updates) here, + # so len(query_map) + len(cross_signing_keys_by_user) <= limit. + results = await self._get_device_update_edus_by_remote( destination, from_stream_id, query_map ) - # add the updated cross-signing keys to the results list + # len(results) <= len(query_map) here, + # so len(results) + len(cross_signing_keys_by_user) <= limit. + + # Add the updated cross-signing keys to the results list for user_id, result in cross_signing_keys_by_user.items(): result["user_id"] = user_id results.append(("m.signing_key_update", result)) # also send the unstable version # FIXME: remove this when enough servers have upgraded + # and remove the length budgeting above. results.append(("org.matrix.signing_key_update", result)) - return now_stream_id, results + return last_processed_stream_id, results def _get_device_updates_by_remote_txn( self, @@ -316,7 +374,7 @@ class DeviceWorkerStore(SQLBaseStore): from_stream_id: int, now_stream_id: int, limit: int, - ): + ) -> List[Tuple[str, str, int, Optional[str]]]: """Return device update information for a given remote destination Args: @@ -327,7 +385,11 @@ class DeviceWorkerStore(SQLBaseStore): limit: Maximum number of device updates to return Returns: - List: List of device updates + List: List of device update tuples: + - user_id + - device_id + - stream_id + - opentracing_context """ # get the list of device updates that need to be sent sql = """ @@ -351,15 +413,21 @@ class DeviceWorkerStore(SQLBaseStore): Args: destination: The host the device updates are intended for from_stream_id: The minimum stream_id to filter updates by, exclusive - query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping - user_id/device_id to update stream_id and the relevant json-encoded - opentracing context + query_map: Dictionary mapping (user_id, device_id) to + (update stream_id, the relevant json-encoded opentracing context) Returns: - List of objects representing an device update EDU + List of objects representing a device update EDU. + + Postconditions: + The returned list has a length not exceeding that of the query_map: + len(result) <= len(query_map) """ devices = ( await self.get_e2e_device_keys_and_signatures( + # Because these are (user_id, device_id) tuples with all + # device_ids not being None, the returned list's length will not + # exceed that of query_map. query_map.keys(), include_all_devices=True, include_deleted_devices=True, diff --git a/tests/crypto/test_event_signing.py b/tests/crypto/test_event_signing.py index 1c920157f5..a72a0103d3 100644 --- a/tests/crypto/test_event_signing.py +++ b/tests/crypto/test_event_signing.py @@ -14,6 +14,7 @@ import nacl.signing +import signedjson.types from unpaddedbase64 import decode_base64 from synapse.api.room_versions import RoomVersions @@ -35,7 +36,12 @@ HOSTNAME = "domain" class EventSigningTestCase(unittest.TestCase): def setUp(self): - self.signing_key = nacl.signing.SigningKey(SIGNING_KEY_SEED) + # NB: `signedjson` expects `nacl.signing.SigningKey` instances which have been + # monkeypatched to include new `alg` and `version` attributes. This is captured + # by the `signedjson.types.SigningKey` protocol. + self.signing_key: signedjson.types.SigningKey = nacl.signing.SigningKey( + SIGNING_KEY_SEED + ) self.signing_key.alg = KEY_ALG self.signing_key.version = KEY_VER diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py index 6790aa5242..b547bf8d99 100644 --- a/tests/storage/test_devices.py +++ b/tests/storage/test_devices.py @@ -94,7 +94,7 @@ class DeviceStoreTestCase(HomeserverTestCase): def test_get_device_updates_by_remote(self): device_ids = ["device_id1", "device_id2"] - # Add two device updates with a single stream_id + # Add two device updates with sequential `stream_id`s self.get_success( self.store.add_device_change_to_streams("user_id", device_ids, ["somehost"]) ) @@ -107,6 +107,164 @@ class DeviceStoreTestCase(HomeserverTestCase): # Check original device_ids are contained within these updates self._check_devices_in_updates(device_ids, device_updates) + def test_get_device_updates_by_remote_can_limit_properly(self): + """ + Tests that `get_device_updates_by_remote` returns an appropriate + stream_id to resume fetching from (without skipping any results). + """ + + # Add some device updates with sequential `stream_id`s + device_ids = [ + "device_id1", + "device_id2", + "device_id3", + "device_id4", + "device_id5", + ] + self.get_success( + self.store.add_device_change_to_streams("user_id", device_ids, ["somehost"]) + ) + + # Get device updates meant for this remote + next_stream_id, device_updates = self.get_success( + self.store.get_device_updates_by_remote("somehost", -1, limit=3) + ) + + # Check the first three original device_ids are contained within these updates + self._check_devices_in_updates(device_ids[:3], device_updates) + + # Get the next batch of device updates + next_stream_id, device_updates = self.get_success( + self.store.get_device_updates_by_remote("somehost", next_stream_id, limit=3) + ) + + # Check the last two original device_ids are contained within these updates + self._check_devices_in_updates(device_ids[3:], device_updates) + + # Add some more device updates to ensure it still resumes properly + device_ids = ["device_id6", "device_id7"] + self.get_success( + self.store.add_device_change_to_streams("user_id", device_ids, ["somehost"]) + ) + + # Get the next batch of device updates + next_stream_id, device_updates = self.get_success( + self.store.get_device_updates_by_remote("somehost", next_stream_id, limit=3) + ) + + # Check the newly-added device_ids are contained within these updates + self._check_devices_in_updates(device_ids, device_updates) + + # Check there are no more device updates left. + _, device_updates = self.get_success( + self.store.get_device_updates_by_remote("somehost", next_stream_id, limit=3) + ) + self.assertEqual(device_updates, []) + + def test_get_device_updates_by_remote_cross_signing_key_updates( + self, + ) -> None: + """ + Tests that `get_device_updates_by_remote` limits the length of the return value + properly when cross-signing key updates are present. + Current behaviour is that the cross-signing key updates will always come in pairs, + even if that means leaving an earlier batch one EDU short of the limit. + """ + + assert self.hs.is_mine_id( + "@user_id:test" + ), "Test not valid: this MXID should be considered local" + + self.get_success( + self.store.set_e2e_cross_signing_key( + "@user_id:test", + "master", + { + "keys": { + "ed25519:fakeMaster": "aaafakefakefake1AAAAAAAAAAAAAAAAAAAAAAAAAAA=" + }, + "signatures": { + "@user_id:test": { + "ed25519:fake2": "aaafakefakefake2AAAAAAAAAAAAAAAAAAAAAAAAAAA=" + } + }, + }, + ) + ) + self.get_success( + self.store.set_e2e_cross_signing_key( + "@user_id:test", + "self_signing", + { + "keys": { + "ed25519:fakeSelfSigning": "aaafakefakefake3AAAAAAAAAAAAAAAAAAAAAAAAAAA=" + }, + "signatures": { + "@user_id:test": { + "ed25519:fake4": "aaafakefakefake4AAAAAAAAAAAAAAAAAAAAAAAAAAA=" + } + }, + }, + ) + ) + + # Add some device updates with sequential `stream_id`s + # Note that the public cross-signing keys occupy the same space as device IDs, + # so also notify that those have updated. + device_ids = [ + "device_id1", + "device_id2", + "fakeMaster", + "fakeSelfSigning", + ] + + self.get_success( + self.store.add_device_change_to_streams( + "@user_id:test", device_ids, ["somehost"] + ) + ) + + # Get device updates meant for this remote + next_stream_id, device_updates = self.get_success( + self.store.get_device_updates_by_remote("somehost", -1, limit=3) + ) + + # Here we expect the device updates for `device_id1` and `device_id2`. + # That means we only receive 2 updates this time around. + # If we had a higher limit, we would expect to see the pair of + # (unstable-prefixed & unprefixed) signing key updates for the device + # represented by `fakeMaster` and `fakeSelfSigning`. + # Our implementation only sends these two variants together, so we get + # a short batch. + self.assertEqual(len(device_updates), 2, device_updates) + + # Check the first two devices (device_id1, device_id2) came out. + self._check_devices_in_updates(device_ids[:2], device_updates) + + # Get more device updates meant for this remote + next_stream_id, device_updates = self.get_success( + self.store.get_device_updates_by_remote("somehost", next_stream_id, limit=3) + ) + + # The next 2 updates should be a cross-signing key update + # (the master key update and the self-signing key update are combined into + # one 'signing key update', but the cross-signing key update is emitted + # twice, once with an unprefixed type and once again with an unstable-prefixed type) + # (This is a temporary arrangement for backwards compatibility!) + self.assertEqual(len(device_updates), 2, device_updates) + self.assertEqual( + device_updates[0][0], "m.signing_key_update", device_updates[0] + ) + self.assertEqual( + device_updates[1][0], "org.matrix.signing_key_update", device_updates[1] + ) + + # Check there are no more device updates left. + _, device_updates = self.get_success( + self.store.get_device_updates_by_remote("somehost", next_stream_id, limit=3) + ) + self.assertEqual(device_updates, []) + def _check_devices_in_updates(self, expected_device_ids, device_updates): """Check that an specific device ids exist in a list of device update EDUs""" self.assertEqual(len(device_updates), len(expected_device_ids)) |