From 2ade05dca3d6da67e35c3a8ccdd278221f2566ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Sep 2019 14:16:10 +0100 Subject: Add last seen info to devices table. This allows us to purge old user_ips entries without having to preserve the latest last seen info for active devices. --- synapse/storage/client_ips.py | 15 +++++++++++++++ .../storage/schema/delta/56/devices_last_seen.sql | 21 +++++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 synapse/storage/schema/delta/56/devices_last_seen.sql (limited to 'synapse') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 6db8c54077..4db2e7f481 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -354,6 +354,21 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): }, lock=False, ) + + # Technically an access token might not be associated with + # a device so we need to check. + if device_id: + self._simple_upsert_txn( + txn, + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + values={ + "user_agent": user_agent, + "last_seen": last_seen, + "ip": ip, + }, + lock=False, + ) except Exception as e: # Failed to upsert, log and continue logger.error("Failed to insert client IP %r: %r", entry, e) diff --git a/synapse/storage/schema/delta/56/devices_last_seen.sql b/synapse/storage/schema/delta/56/devices_last_seen.sql new file mode 100644 index 0000000000..8818eeeb7e --- /dev/null +++ b/synapse/storage/schema/delta/56/devices_last_seen.sql @@ -0,0 +1,21 @@ +/* Copyright 2019 Matrix.org Foundation CIC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Track last seen information for a device in the devices table, rather +-- than relying on it being in the user_ips table (which we want to be able +-- to purge old entries from) +ALTER TABLE devices ADD COLUMN last_seen BIGINT; +ALTER TABLE devices ADD COLUMN ip TEXT; +ALTER TABLE devices ADD COLUMN user_agent TEXT; -- cgit 1.5.1 From ed80231ade20ce7881bb2026692fe3a6252f1c02 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Sep 2019 15:59:43 +0100 Subject: Add BG update to populate devices last seen info --- synapse/storage/client_ips.py | 52 ++++++++++++++++++++++ .../storage/schema/delta/56/devices_last_seen.sql | 3 ++ 2 files changed, 55 insertions(+) (limited to 'synapse') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 4db2e7f481..8839562269 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -85,6 +85,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "user_ips_drop_nonunique_index", self._remove_user_ip_nonunique ) + # Update the last seen info in devices. + self.register_background_update_handler( + "devices_last_seen", self._devices_last_seen_update + ) + # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) self._batch_row_update = {} @@ -485,3 +490,50 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): } for (access_token, ip), (user_agent, last_seen) in iteritems(results) ) + + @defer.inlineCallbacks + def _devices_last_seen_update(self, progress, batch_size): + """Background update to insert last seen info into devices table + """ + + last_user_id = progress.get("last_user_id", "") + last_device_id = progress.get("last_device_id", "") + + def _devices_last_seen_update_txn(txn): + sql = """ + SELECT u.last_seen, u.ip, u.user_agent, user_id, device_id FROM devices + INNER JOIN user_ips AS u USING (user_id, device_id) + WHERE user_id > ? OR (user_id = ? AND device_id > ?) + ORDER BY user_id ASC, device_id ASC + LIMIT ? + """ + txn.execute(sql, (last_user_id, last_user_id, last_device_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + sql = """ + UPDATE devices + SET last_seen = ?, ip = ?, user_agent = ? + WHERE user_id = ? AND device_id = ? + """ + txn.execute_batch(sql, rows) + + _, _, _, user_id, device_id = rows[-1] + self._background_update_progress_txn( + txn, + "devices_last_seen", + {"last_user_id": user_id, "last_device_id": device_id}, + ) + + return len(rows) + + updated = yield self.runInteraction( + "_devices_last_seen_update", _devices_last_seen_update_txn + ) + + if not updated: + yield self._end_background_update("devices_last_seen") + + return updated diff --git a/synapse/storage/schema/delta/56/devices_last_seen.sql b/synapse/storage/schema/delta/56/devices_last_seen.sql index 8818eeeb7e..dfa902d0ba 100644 --- a/synapse/storage/schema/delta/56/devices_last_seen.sql +++ b/synapse/storage/schema/delta/56/devices_last_seen.sql @@ -19,3 +19,6 @@ ALTER TABLE devices ADD COLUMN last_seen BIGINT; ALTER TABLE devices ADD COLUMN ip TEXT; ALTER TABLE devices ADD COLUMN user_agent TEXT; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('devices_last_seen', '{}'); -- cgit 1.5.1 From 51d28272e20d799b2e35a8a14b3c1d9d5f555d10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Sep 2019 16:00:18 +0100 Subject: Query devices table for last seen info. This is a) simpler than querying user_ips directly and b) means we can purge older entries from user_ips without losing the required info. The storage functions now no longer return the access_token, since it was unused. --- synapse/storage/client_ips.py | 57 ++++++---------------------------------- tests/storage/test_client_ips.py | 1 - 2 files changed, 8 insertions(+), 50 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 8839562269..a4e6d9dbe7 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -392,19 +392,14 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): keys giving the column names """ - res = yield self.runInteraction( - "get_last_client_ip_by_device", - self._get_last_client_ip_by_device_txn, - user_id, - device_id, - retcols=( - "user_id", - "access_token", - "ip", - "user_agent", - "device_id", - "last_seen", - ), + keyvalues = {"user_id": user_id} + if device_id: + keyvalues["device_id"] = device_id + + res = yield self._simple_select_list( + table="devices", + keyvalues=keyvalues, + retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), ) ret = {(d["user_id"], d["device_id"]): d for d in res} @@ -423,42 +418,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): } return ret - @classmethod - def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols): - where_clauses = [] - bindings = [] - if device_id is None: - where_clauses.append("user_id = ?") - bindings.extend((user_id,)) - else: - where_clauses.append("(user_id = ? AND device_id = ?)") - bindings.extend((user_id, device_id)) - - if not where_clauses: - return [] - - inner_select = ( - "SELECT MAX(last_seen) mls, user_id, device_id FROM user_ips " - "WHERE %(where)s " - "GROUP BY user_id, device_id" - ) % {"where": " OR ".join(where_clauses)} - - sql = ( - "SELECT %(retcols)s FROM user_ips " - "JOIN (%(inner_select)s) ips ON" - " user_ips.last_seen = ips.mls AND" - " user_ips.user_id = ips.user_id AND" - " (user_ips.device_id = ips.device_id OR" - " (user_ips.device_id IS NULL AND ips.device_id IS NULL)" - " )" - ) % { - "retcols": ",".join("user_ips." + c for c in retcols), - "inner_select": inner_select, - } - - txn.execute(sql, bindings) - return cls.cursor_to_dict(txn) - @defer.inlineCallbacks def get_user_ip_and_agents(self, user): user_id = user.to_string() diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py index 09305c3bf1..6ac4654085 100644 --- a/tests/storage/test_client_ips.py +++ b/tests/storage/test_client_ips.py @@ -55,7 +55,6 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase): { "user_id": user_id, "device_id": "device_id", - "access_token": "access_token", "ip": "ip", "user_agent": "user_agent", "last_seen": 12345678000, -- cgit 1.5.1 From 367158a609d18b6dbd143f8bee0529e743d5b5a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Sep 2019 14:16:16 +0100 Subject: Add wrap_as_background_process decorator. This does the same thing as `run_as_background_process` but means we don't need to create superfluous functions. --- synapse/metrics/background_process_metrics.py | 29 ++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index edd6b42db3..b24e2fab4a 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -15,6 +15,8 @@ import logging import threading +from asyncio import iscoroutine +from functools import wraps import six @@ -197,7 +199,15 @@ def run_as_background_process(desc, func, *args, **kwargs): _background_processes.setdefault(desc, set()).add(proc) try: - yield func(*args, **kwargs) + # We ensureDeferred here to handle coroutines + result = func(*args, **kwargs) + + # We need this check because ensureDeferred doesn't like when + # func doesn't return a Deferred or coroutine. + if iscoroutine(result): + result = defer.ensureDeferred(result) + + return (yield result) except Exception: logger.exception("Background process '%s' threw an exception", desc) finally: @@ -208,3 +218,20 @@ def run_as_background_process(desc, func, *args, **kwargs): with PreserveLoggingContext(): return run() + + +def wrap_as_background_process(desc): + """Decorator that wraps a function that gets called as a background + process. + + Equivalent of calling the function with `run_as_background_process` + """ + + def wrap_as_background_process_inner(func): + @wraps(func) + def wrap_as_background_process_inner_2(*args, **kwargs): + return run_as_background_process(desc, func, *args, **kwargs) + + return wrap_as_background_process_inner_2 + + return wrap_as_background_process_inner -- cgit 1.5.1 From 2135c198d17b41297511a2fc3b39551d160069b2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Sep 2019 14:18:31 +0100 Subject: Add has_completed_background_update This allows checking if a specific background update has completed. --- synapse/storage/background_updates.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index e5f0668f09..3fc25cd828 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -140,7 +140,7 @@ class BackgroundUpdateStore(SQLBaseStore): "background_updates", keyvalues=None, retcol="1", - desc="check_background_updates", + desc="has_completed_background_updates", ) if not updates: self._all_done = True @@ -148,6 +148,29 @@ class BackgroundUpdateStore(SQLBaseStore): return False + async def has_completed_background_update(self, update_name): + """Check if the given background update has finished running. + + Returns: + Deferred[bool] + """ + + if self._all_done: + return True + + if update_name in self._background_update_queue: + return False + + update_exists = await self._simple_select_one_onecol( + "background_updates", + keyvalues={"update_name": update_name}, + retcol="1", + desc="has_completed_background_update", + allow_none=True, + ) + + return not update_exists + @defer.inlineCallbacks def do_next_background_update(self, desired_duration_ms): """Does some amount of work on the next queued background update -- cgit 1.5.1 From 242017db8b7b57be28a019ecbba1619d75d54889 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Sep 2019 15:20:40 +0100 Subject: Prune rows in user_ips older than configured period Defaults to pruning everything older than 28d. --- docs/sample_config.yaml | 6 +++++ synapse/config/server.py | 13 +++++++++ synapse/storage/client_ips.py | 62 +++++++++++++++++++++++++++++++++++++------ 3 files changed, 73 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 61d9f09a99..cc6035c838 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -313,6 +313,12 @@ listeners: # redaction_retention_period: 7d +# How long to track users' last seen time and IPs in the database. +# +# Defaults to `28d`. Set to `null` to disable. +# +#user_ips_max_age: 14d + ## TLS ## diff --git a/synapse/config/server.py b/synapse/config/server.py index 7f8d315954..655e7487a4 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -172,6 +172,13 @@ class ServerConfig(Config): else: self.redaction_retention_period = None + # How long to keep entries in the `users_ips` table. + user_ips_max_age = config.get("user_ips_max_age", "28d") + if user_ips_max_age is not None: + self.user_ips_max_age = self.parse_duration(user_ips_max_age) + else: + self.user_ips_max_age = None + # Options to disable HS self.hs_disabled = config.get("hs_disabled", False) self.hs_disabled_message = config.get("hs_disabled_message", "") @@ -735,6 +742,12 @@ class ServerConfig(Config): # Defaults to `7d`. Set to `null` to disable. # redaction_retention_period: 7d + + # How long to track users' last seen time and IPs in the database. + # + # Defaults to `28d`. Set to `null` to disable. + # + #user_ips_max_age: 14d """ % locals() ) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index a4e6d9dbe7..176c812b1f 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -19,7 +19,7 @@ from six import iteritems from twisted.internet import defer -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.util.caches import CACHE_SIZE_FACTOR from . import background_updates @@ -42,6 +42,8 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): super(ClientIpStore, self).__init__(db_conn, hs) + self.user_ips_max_age = hs.config.user_ips_max_age + self.register_background_index_update( "user_ips_device_index", index_name="user_ips_device_id", @@ -100,6 +102,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "before", "shutdown", self._update_client_ips_batch ) + if self.user_ips_max_age: + self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) + @defer.inlineCallbacks def _remove_user_ip_nonunique(self, progress, batch_size): def f(conn): @@ -319,20 +324,19 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): self._batch_row_update[key] = (user_agent, device_id, now) + @wrap_as_background_process("update_client_ips") def _update_client_ips_batch(self): # If the DB pool has already terminated, don't try updating if not self.hs.get_db_pool().running: return - def update(): - to_update = self._batch_row_update - self._batch_row_update = {} - return self.runInteraction( - "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update - ) + to_update = self._batch_row_update + self._batch_row_update = {} - return run_as_background_process("update_client_ips", update) + return self.runInteraction( + "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update + ) def _update_client_ips_batch_txn(self, txn, to_update): if "user_ips" in self._unsafe_to_upsert_tables or ( @@ -496,3 +500,45 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): yield self._end_background_update("devices_last_seen") return updated + + @wrap_as_background_process("prune_old_user_ips") + async def _prune_old_user_ips(self): + """Removes entries in user IPs older than the configured period. + """ + + if not self.user_ips_max_age: + # Nothing to do + return + + if not await self.has_completed_background_update("devices_last_seen"): + # Only start pruning if we have finished populating the devices + # last seen info. + return + + # We do a slightly funky SQL delete to ensure we don't try and delete + # too much at once (as the table may be very large from before we + # started pruning). + # + # This works by finding the max last_seen that is less than the given + # time, but has no more than N rows before it, deleting all rows with + # a lesser last_seen time. (We COALESCE so that the sub-SELECT always + # returns exactly one row). + sql = """ + DELETE FROM user_ips + WHERE last_seen <= ( + SELECT COALESCE(MAX(last_seen), -1) + FROM ( + SELECT last_seen FROM user_ips + WHERE last_seen <= ? + ORDER BY last_seen ASC + LIMIT 5000 + ) AS u + ) + """ + + timestamp = self.clock.time_msec() - self.user_ips_max_age + + def _prune_old_user_ips_txn(txn): + txn.execute(sql, (timestamp,)) + + await self.runInteraction("_prune_old_user_ips", _prune_old_user_ips_txn) -- cgit 1.5.1 From 50572db837f3e6a0869e9ec573e02d4af72548ea Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Sep 2019 17:00:23 +0100 Subject: Use if `is not None` Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/client_ips.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index a4e6d9dbe7..8996689744 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -393,7 +393,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): """ keyvalues = {"user_id": user_id} - if device_id: + if device_id is not None: keyvalues["device_id"] = device_id res = yield self._simple_select_list( -- cgit 1.5.1 From 39b50ad42a8cf784e627959e9652589338121ccd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Sep 2019 17:22:33 +0100 Subject: Review comments --- docs/sample_config.yaml | 2 +- synapse/config/server.py | 2 +- synapse/storage/background_updates.py | 5 +---- synapse/storage/client_ips.py | 2 +- 4 files changed, 4 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index cc6035c838..7902d9ed6f 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -315,7 +315,7 @@ redaction_retention_period: 7d # How long to track users' last seen time and IPs in the database. # -# Defaults to `28d`. Set to `null` to disable. +# Defaults to `28d`. Set to `null` to disable clearing out of old rows. # #user_ips_max_age: 14d diff --git a/synapse/config/server.py b/synapse/config/server.py index 655e7487a4..f8b7b4bef9 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -745,7 +745,7 @@ class ServerConfig(Config): # How long to track users' last seen time and IPs in the database. # - # Defaults to `28d`. Set to `null` to disable. + # Defaults to `28d`. Set to `null` to disable clearing out of old rows. # #user_ips_max_age: 14d """ diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 3fc25cd828..30788137a8 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -148,11 +148,8 @@ class BackgroundUpdateStore(SQLBaseStore): return False - async def has_completed_background_update(self, update_name): + async def has_completed_background_update(self, update_name) -> bool: """Check if the given background update has finished running. - - Returns: - Deferred[bool] """ if self._all_done: diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 176c812b1f..a4d40dfa1e 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -506,7 +506,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): """Removes entries in user IPs older than the configured period. """ - if not self.user_ips_max_age: + if self.user_ips_max_age is None: # Nothing to do return -- cgit 1.5.1 From a4f3ca48b5250a1c2c4de8a363f69bbeb0adeefd Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Wed, 25 Sep 2019 17:27:35 +0100 Subject: Enable cleaning up extremities with dummy events by default to prevent undue build up of forward extremities. (#5884) --- changelog.d/5884.feature | 1 + synapse/config/server.py | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) create mode 100644 changelog.d/5884.feature (limited to 'synapse') diff --git a/changelog.d/5884.feature b/changelog.d/5884.feature new file mode 100644 index 0000000000..bfd0489392 --- /dev/null +++ b/changelog.d/5884.feature @@ -0,0 +1 @@ +Enable cleaning up extremities with dummy events by default to prevent undue build up of forward extremities. diff --git a/synapse/config/server.py b/synapse/config/server.py index 419787a89c..3a7a49bc91 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -355,10 +355,8 @@ class ServerConfig(Config): _check_resource_config(self.listeners) - # An experimental option to try and periodically clean up extremities - # by sending dummy events. self.cleanup_extremities_with_dummy_events = config.get( - "cleanup_extremities_with_dummy_events", False + "cleanup_extremities_with_dummy_events", True ) def has_tls_listener(self): -- cgit 1.5.1 From a96318127dc17ee102bcf90821d90b7e6079a85d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 25 Sep 2019 18:17:39 +0100 Subject: Update comments and docstring --- synapse/metrics/background_process_metrics.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index b24e2fab4a..c53d2a0d40 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -175,7 +175,7 @@ def run_as_background_process(desc, func, *args, **kwargs): Args: desc (str): a description for this background process type - func: a function, which may return a Deferred + func: a function, which may return a Deferred or a coroutine args: positional args for func kwargs: keyword args for func @@ -199,11 +199,13 @@ def run_as_background_process(desc, func, *args, **kwargs): _background_processes.setdefault(desc, set()).add(proc) try: - # We ensureDeferred here to handle coroutines result = func(*args, **kwargs) - # We need this check because ensureDeferred doesn't like when - # func doesn't return a Deferred or coroutine. + # We probably don't have an ensureDeferred in our call stack to handle + # coroutine results, so we need to ensureDeferred here. + # + # But we need this check because ensureDeferred doesn't like being + # called on immediate values (as opposed to Deferreds or coroutines). if iscoroutine(result): result = defer.ensureDeferred(result) -- cgit 1.5.1 From 034db2ba2115d935ce62b641b4051e477a454eac Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Thu, 26 Sep 2019 11:47:53 +0100 Subject: Fix dummy event insertion consent bug (#6053) Fixes #5905 --- changelog.d/6053.bugfix | 1 + synapse/handlers/message.py | 99 ++++++++++++++++------ synapse/storage/event_federation.py | 18 +++- tests/storage/test_cleanup_extrems.py | 147 +++++++++++++++++++++++++++++++-- tests/storage/test_event_federation.py | 40 +++++++++ 5 files changed, 266 insertions(+), 39 deletions(-) create mode 100644 changelog.d/6053.bugfix (limited to 'synapse') diff --git a/changelog.d/6053.bugfix b/changelog.d/6053.bugfix new file mode 100644 index 0000000000..6311157bf6 --- /dev/null +++ b/changelog.d/6053.bugfix @@ -0,0 +1 @@ +Prevent exceptions being logged when extremity-cleanup events fail due to lack of user consent to the terms of service. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1f8272784e..0f8cce8ffe 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -222,6 +222,13 @@ class MessageHandler(object): } +# The duration (in ms) after which rooms should be removed +# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try +# to generate a dummy event for them once more) +# +_DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000 + + class EventCreationHandler(object): def __init__(self, hs): self.hs = hs @@ -258,6 +265,13 @@ class EventCreationHandler(object): self.config.block_events_without_consent_error ) + # Rooms which should be excluded from dummy insertion. (For instance, + # those without local users who can send events into the room). + # + # map from room id to time-of-last-attempt. + # + self._rooms_to_exclude_from_dummy_event_insertion = {} # type: dict[str, int] + # we need to construct a ConsentURIBuilder here, as it checks that the necessary # config options, but *only* if we have a configuration for which we are # going to need it. @@ -888,9 +902,11 @@ class EventCreationHandler(object): """Background task to send dummy events into rooms that have a large number of extremities """ - + self._expire_rooms_to_exclude_from_dummy_event_insertion() room_ids = yield self.store.get_rooms_with_many_extremities( - min_count=10, limit=5 + min_count=10, + limit=5, + room_id_filter=self._rooms_to_exclude_from_dummy_event_insertion.keys(), ) for room_id in room_ids: @@ -904,32 +920,61 @@ class EventCreationHandler(object): members = yield self.state.get_current_users_in_room( room_id, latest_event_ids=latest_event_ids ) + dummy_event_sent = False + for user_id in members: + if not self.hs.is_mine_id(user_id): + continue + requester = create_requester(user_id) + try: + event, context = yield self.create_event( + requester, + { + "type": "org.matrix.dummy_event", + "content": {}, + "room_id": room_id, + "sender": user_id, + }, + prev_events_and_hashes=prev_events_and_hashes, + ) - user_id = None - for member in members: - if self.hs.is_mine_id(member): - user_id = member - break - - if not user_id: - # We don't have a joined user. - # TODO: We should do something here to stop the room from - # appearing next time. - continue + event.internal_metadata.proactively_send = False - requester = create_requester(user_id) + yield self.send_nonmember_event( + requester, event, context, ratelimit=False + ) + dummy_event_sent = True + break + except ConsentNotGivenError: + logger.info( + "Failed to send dummy event into room %s for user %s due to " + "lack of consent. Will try another user" % (room_id, user_id) + ) + except AuthError: + logger.info( + "Failed to send dummy event into room %s for user %s due to " + "lack of power. Will try another user" % (room_id, user_id) + ) - event, context = yield self.create_event( - requester, - { - "type": "org.matrix.dummy_event", - "content": {}, - "room_id": room_id, - "sender": user_id, - }, - prev_events_and_hashes=prev_events_and_hashes, + if not dummy_event_sent: + # Did not find a valid user in the room, so remove from future attempts + # Exclusion is time limited, so the room will be rechecked in the future + # dependent on _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY + logger.info( + "Failed to send dummy event into room %s. Will exclude it from " + "future attempts until cache expires" % (room_id,) + ) + now = self.clock.time_msec() + self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now + + def _expire_rooms_to_exclude_from_dummy_event_insertion(self): + expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY + to_expire = set() + for room_id, time in self._rooms_to_exclude_from_dummy_event_insertion.items(): + if time < expire_before: + to_expire.add(room_id) + for room_id in to_expire: + logger.debug( + "Expiring room id %s from dummy event insertion exclusion cache", + room_id, ) - - event.internal_metadata.proactively_send = False - - yield self.send_nonmember_event(requester, event, context, ratelimit=False) + del self._rooms_to_exclude_from_dummy_event_insertion[room_id] diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 4f500d893e..f5e8c39262 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import itertools import logging import random @@ -190,12 +191,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas room_id, ) - def get_rooms_with_many_extremities(self, min_count, limit): + def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter): """Get the top rooms with at least N extremities. Args: min_count (int): The minimum number of extremities limit (int): The maximum number of rooms to return. + room_id_filter (iterable[str]): room_ids to exclude from the results Returns: Deferred[list]: At most `limit` room IDs that have at least @@ -203,15 +205,25 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas """ def _get_rooms_with_many_extremities_txn(txn): + where_clause = "1=1" + if room_id_filter: + where_clause = "room_id NOT IN (%s)" % ( + ",".join("?" for _ in room_id_filter), + ) + sql = """ SELECT room_id FROM event_forward_extremities + WHERE %s GROUP BY room_id HAVING count(*) > ? ORDER BY count(*) DESC LIMIT ? - """ + """ % ( + where_clause, + ) - txn.execute(sql, (min_count, limit)) + query_args = list(itertools.chain(room_id_filter, [min_count, limit])) + txn.execute(sql, query_args) return [room_id for room_id, in txn] return self.runInteraction( diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py index e9e2d5337c..34f9c72709 100644 --- a/tests/storage/test_cleanup_extrems.py +++ b/tests/storage/test_cleanup_extrems.py @@ -14,7 +14,13 @@ # limitations under the License. import os.path +from unittest.mock import patch +from mock import Mock + +import synapse.rest.admin +from synapse.api.constants import EventTypes +from synapse.rest.client.v1 import login, room from synapse.storage import prepare_database from synapse.types import Requester, UserID @@ -225,6 +231,14 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): class CleanupExtremDummyEventsTestCase(HomeserverTestCase): + CONSENT_VERSION = "1" + EXTREMITIES_COUNT = 50 + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + login.register_servlets, + room.register_servlets, + ] + def make_homeserver(self, reactor, clock): config = self.default_config() config["cleanup_extremities_with_dummy_events"] = True @@ -233,33 +247,148 @@ class CleanupExtremDummyEventsTestCase(HomeserverTestCase): def prepare(self, reactor, clock, homeserver): self.store = homeserver.get_datastore() self.room_creator = homeserver.get_room_creation_handler() + self.event_creator_handler = homeserver.get_event_creation_handler() # Create a test user and room - self.user = UserID("alice", "test") + self.user = UserID.from_string(self.register_user("user1", "password")) + self.token1 = self.login("user1", "password") self.requester = Requester(self.user, None, False, None, None) info = self.get_success(self.room_creator.create_room(self.requester, {})) self.room_id = info["room_id"] + self.event_creator = homeserver.get_event_creation_handler() + homeserver.config.user_consent_version = self.CONSENT_VERSION def test_send_dummy_event(self): - # Create a bushy graph with 50 extremities. - - event_id_start = self.create_and_send_event(self.room_id, self.user) + self._create_extremity_rich_graph() - for _ in range(50): - self.create_and_send_event( - self.room_id, self.user, prev_event_ids=[event_id_start] - ) + # Pump the reactor repeatedly so that the background updates have a + # chance to run. + self.pump(10 * 60) latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) ) - self.assertEqual(len(latest_event_ids), 50) + self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids)) + @patch("synapse.handlers.message._DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY", new=0) + def test_send_dummy_events_when_insufficient_power(self): + self._create_extremity_rich_graph() + # Criple power levels + self.helper.send_state( + self.room_id, + EventTypes.PowerLevels, + body={"users": {str(self.user): -1}}, + tok=self.token1, + ) # Pump the reactor repeatedly so that the background updates have a # chance to run. self.pump(10 * 60) + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + # Check that the room has not been pruned + self.assertTrue(len(latest_event_ids) > 10) + + # New user with regular levels + user2 = self.register_user("user2", "password") + token2 = self.login("user2", "password") + self.helper.join(self.room_id, user2, tok=token2) + self.pump(10 * 60) + + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids)) + + @patch("synapse.handlers.message._DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY", new=0) + def test_send_dummy_event_without_consent(self): + self._create_extremity_rich_graph() + self._enable_consent_checking() + + # Pump the reactor repeatedly so that the background updates have a + # chance to run. Attempt to add dummy event with user that has not consented + # Check that dummy event send fails. + self.pump(10 * 60) + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + self.assertTrue(len(latest_event_ids) == self.EXTREMITIES_COUNT) + + # Create new user, and add consent + user2 = self.register_user("user2", "password") + token2 = self.login("user2", "password") + self.get_success( + self.store.user_set_consent_version(user2, self.CONSENT_VERSION) + ) + self.helper.join(self.room_id, user2, tok=token2) + + # Background updates should now cause a dummy event to be added to the graph + self.pump(10 * 60) + latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) ) self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids)) + + @patch("synapse.handlers.message._DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY", new=250) + def test_expiry_logic(self): + """Simple test to ensure that _expire_rooms_to_exclude_from_dummy_event_insertion() + expires old entries correctly. + """ + self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[ + "1" + ] = 100000 + self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[ + "2" + ] = 200000 + self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[ + "3" + ] = 300000 + self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion() + # All entries within time frame + self.assertEqual( + len( + self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion + ), + 3, + ) + # Oldest room to expire + self.pump(1) + self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion() + self.assertEqual( + len( + self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion + ), + 2, + ) + # All rooms to expire + self.pump(2) + self.assertEqual( + len( + self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion + ), + 0, + ) + + def _create_extremity_rich_graph(self): + """Helper method to create bushy graph on demand""" + + event_id_start = self.create_and_send_event(self.room_id, self.user) + + for _ in range(self.EXTREMITIES_COUNT): + self.create_and_send_event( + self.room_id, self.user, prev_event_ids=[event_id_start] + ) + + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + self.assertEqual(len(latest_event_ids), 50) + + def _enable_consent_checking(self): + """Helper method to enable consent checking""" + self.event_creator._block_events_without_consent_error = "No consent from user" + consent_uri_builder = Mock() + consent_uri_builder.build_user_consent_uri.return_value = "http://example.com" + self.event_creator._consent_uri_builder = consent_uri_builder diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 86c7ac350d..b58386994e 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -75,3 +75,43 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase): el = r[i] depth = el[2] self.assertLessEqual(5, depth) + + @defer.inlineCallbacks + def test_get_rooms_with_many_extremities(self): + room1 = "#room1" + room2 = "#room2" + room3 = "#room3" + + def insert_event(txn, i, room_id): + event_id = "$event_%i:local" % i + txn.execute( + ( + "INSERT INTO event_forward_extremities (room_id, event_id) " + "VALUES (?, ?)" + ), + (room_id, event_id), + ) + + for i in range(0, 20): + yield self.store.runInteraction("insert", insert_event, i, room1) + yield self.store.runInteraction("insert", insert_event, i, room2) + yield self.store.runInteraction("insert", insert_event, i, room3) + + # Test simple case + r = yield self.store.get_rooms_with_many_extremities(5, 5, []) + self.assertEqual(len(r), 3) + + # Does filter work? + + r = yield self.store.get_rooms_with_many_extremities(5, 5, [room1]) + self.assertTrue(room2 in r) + self.assertTrue(room3 in r) + self.assertEqual(len(r), 2) + + r = yield self.store.get_rooms_with_many_extremities(5, 5, [room1, room2]) + self.assertEqual(r, [room3]) + + # Does filter and limit work? + + r = yield self.store.get_rooms_with_many_extremities(5, 1, [room1]) + self.assertTrue(r == [room2] or r == [room3]) -- cgit 1.5.1 From 2927c6bc4c4e0c975a875d7eb5aa736b6abd66cd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 26 Sep 2019 12:29:59 +0100 Subject: bump version --- synapse/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/__init__.py b/synapse/__init__.py index 6766ef445c..ddfe9ec542 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -35,4 +35,4 @@ try: except ImportError: pass -__version__ = "1.3.1" +__version__ = "1.4.0rc1" -- cgit 1.5.1 From 8b8f8c7b3c6136ea777265fff8052afed2b7031e Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Thu, 26 Sep 2019 12:57:01 +0100 Subject: Explicitly log when a homeserver does not have a trusted key server configured (#6090) --- changelog.d/6090.feature | 1 + docs/sample_config.yaml | 14 ++++++++++---- synapse/config/key.py | 48 ++++++++++++++++++++++++++++++++++++++++++++---- synapse/config/server.py | 16 ++++++++-------- 4 files changed, 63 insertions(+), 16 deletions(-) create mode 100644 changelog.d/6090.feature (limited to 'synapse') diff --git a/changelog.d/6090.feature b/changelog.d/6090.feature new file mode 100644 index 0000000000..a6da448a1a --- /dev/null +++ b/changelog.d/6090.feature @@ -0,0 +1 @@ +Explicitly log when a homeserver does not have the 'trusted_key_servers' config field configured. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 8f801daf35..254e1b17b4 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -1072,6 +1072,10 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key" # This setting supercedes an older setting named `perspectives`. The old format # is still supported for backwards-compatibility, but it is deprecated. # +# 'trusted_key_servers' defaults to matrix.org, but using it will generate a +# warning on start-up. To suppress this warning, set +# 'suppress_key_server_warning' to true. +# # Options for each entry in the list include: # # server_name: the name of the server. required. @@ -1096,11 +1100,13 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key" # "ed25519:auto": "abcdefghijklmnopqrstuvwxyzabcdefghijklmopqr" # - server_name: "my_other_trusted_server.example.com" # -# The default configuration is: -# -#trusted_key_servers: -# - server_name: "matrix.org" +trusted_key_servers: + - server_name: "matrix.org" + +# Uncomment the following to disable the warning that is emitted when the +# trusted_key_servers include 'matrix.org'. See above. # +#suppress_key_server_warning: true # The signing keys to use when acting as a trusted key server. If not specified # defaults to the server signing key. diff --git a/synapse/config/key.py b/synapse/config/key.py index ba2199bceb..f039f96e9c 100644 --- a/synapse/config/key.py +++ b/synapse/config/key.py @@ -50,6 +50,33 @@ and you should enable 'federation_verify_certificates' in your configuration. If you are *sure* you want to do this, set 'accept_keys_insecurely' on the trusted_key_server configuration.""" +TRUSTED_KEY_SERVER_NOT_CONFIGURED_WARN = """\ +Synapse requires that a list of trusted key servers are specified in order to +provide signing keys for other servers in the federation. + +This homeserver does not have a trusted key server configured in +homeserver.yaml and will fall back to the default of 'matrix.org'. + +Trusted key servers should be long-lived and stable which makes matrix.org a +good choice for many admins, but some admins may wish to choose another. To +suppress this warning, the admin should set 'trusted_key_servers' in +homeserver.yaml to their desired key server and 'suppress_key_server_warning' +to 'true'. + +In a future release the software-defined default will be removed entirely and +the trusted key server will be defined exclusively by the value of +'trusted_key_servers'. +--------------------------------------------------------------------------------""" + +TRUSTED_KEY_SERVER_CONFIGURED_AS_M_ORG_WARN = """\ +This server is configured to use 'matrix.org' as its trusted key server via the +'trusted_key_servers' config option. 'matrix.org' is a good choice for a key +server since it is long-lived, stable and trusted. However, some admins may +wish to use another server for this purpose. + +To suppress this warning and continue using 'matrix.org', admins should set +'suppress_key_server_warning' to 'true' in homeserver.yaml. +--------------------------------------------------------------------------------""" logger = logging.getLogger(__name__) @@ -85,6 +112,7 @@ class KeyConfig(Config): config.get("key_refresh_interval", "1d") ) + suppress_key_server_warning = config.get("suppress_key_server_warning", False) key_server_signing_keys_path = config.get("key_server_signing_keys_path") if key_server_signing_keys_path: self.key_server_signing_keys = self.read_signing_keys( @@ -95,6 +123,7 @@ class KeyConfig(Config): # if neither trusted_key_servers nor perspectives are given, use the default. if "perspectives" not in config and "trusted_key_servers" not in config: + logger.warn(TRUSTED_KEY_SERVER_NOT_CONFIGURED_WARN) key_servers = [{"server_name": "matrix.org"}] else: key_servers = config.get("trusted_key_servers", []) @@ -108,6 +137,11 @@ class KeyConfig(Config): # merge the 'perspectives' config into the 'trusted_key_servers' config. key_servers.extend(_perspectives_to_key_servers(config)) + if not suppress_key_server_warning and "matrix.org" in ( + s["server_name"] for s in key_servers + ): + logger.warning(TRUSTED_KEY_SERVER_CONFIGURED_AS_M_ORG_WARN) + # list of TrustedKeyServer objects self.key_servers = list( _parse_key_servers(key_servers, self.federation_verify_certificates) @@ -190,6 +224,10 @@ class KeyConfig(Config): # This setting supercedes an older setting named `perspectives`. The old format # is still supported for backwards-compatibility, but it is deprecated. # + # 'trusted_key_servers' defaults to matrix.org, but using it will generate a + # warning on start-up. To suppress this warning, set + # 'suppress_key_server_warning' to true. + # # Options for each entry in the list include: # # server_name: the name of the server. required. @@ -214,11 +252,13 @@ class KeyConfig(Config): # "ed25519:auto": "abcdefghijklmnopqrstuvwxyzabcdefghijklmopqr" # - server_name: "my_other_trusted_server.example.com" # - # The default configuration is: - # - #trusted_key_servers: - # - server_name: "matrix.org" + trusted_key_servers: + - server_name: "matrix.org" + + # Uncomment the following to disable the warning that is emitted when the + # trusted_key_servers include 'matrix.org'. See above. # + #suppress_key_server_warning: true # The signing keys to use when acting as a trusted key server. If not specified # defaults to the server signing key. diff --git a/synapse/config/server.py b/synapse/config/server.py index 9d3f1b5bfc..5ad7ee911d 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -48,6 +48,13 @@ ROOM_COMPLEXITY_TOO_GREAT = ( "to join this room." ) +METRICS_PORT_WARNING = """\ +The metrics_port configuration option is deprecated in Synapse 0.31 in favour of +a listener. Please see +https://github.com/matrix-org/synapse/blob/master/docs/metrics-howto.md +on how to configure the new listener. +--------------------------------------------------------------------------------""" + class ServerConfig(Config): def read_config(self, config, **kwargs): @@ -341,14 +348,7 @@ class ServerConfig(Config): metrics_port = config.get("metrics_port") if metrics_port: - logger.warn( - ( - "The metrics_port configuration option is deprecated in Synapse 0.31 " - "in favour of a listener. Please see " - "http://github.com/matrix-org/synapse/blob/master/docs/metrics-howto.md" - " on how to configure the new listener." - ) - ) + logger.warning(METRICS_PORT_WARNING) self.listeners.append( { -- cgit 1.5.1