From 09957ce0e4dcfd84c2de4039653059faae03065b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 4 Nov 2019 17:09:22 +0000 Subject: Implement per-room message retention policies --- synapse/config/server.py | 172 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) (limited to 'synapse/config/server.py') diff --git a/synapse/config/server.py b/synapse/config/server.py index d556df308d..aa93a416f1 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -246,6 +246,115 @@ class ServerConfig(Config): # events with profile information that differ from the target's global profile. self.allow_per_room_profiles = config.get("allow_per_room_profiles", True) + retention_config = config.get("retention") + if retention_config is None: + retention_config = {} + + self.retention_enabled = retention_config.get("enabled", False) + + retention_default_policy = retention_config.get("default_policy") + + if retention_default_policy is not None: + self.retention_default_min_lifetime = retention_default_policy.get( + "min_lifetime" + ) + if self.retention_default_min_lifetime is not None: + self.retention_default_min_lifetime = self.parse_duration( + self.retention_default_min_lifetime + ) + + self.retention_default_max_lifetime = retention_default_policy.get( + "max_lifetime" + ) + if self.retention_default_max_lifetime is not None: + self.retention_default_max_lifetime = self.parse_duration( + self.retention_default_max_lifetime + ) + + if ( + self.retention_default_min_lifetime is not None + and self.retention_default_max_lifetime is not None + and ( + self.retention_default_min_lifetime + > self.retention_default_max_lifetime + ) + ): + raise ConfigError( + "The default retention policy's 'min_lifetime' can not be greater" + " than its 'max_lifetime'" + ) + else: + self.retention_default_min_lifetime = None + self.retention_default_max_lifetime = None + + self.retention_allowed_lifetime_min = retention_config.get("allowed_lifetime_min") + if self.retention_allowed_lifetime_min is not None: + self.retention_allowed_lifetime_min = self.parse_duration( + self.retention_allowed_lifetime_min + ) + + self.retention_allowed_lifetime_max = retention_config.get("allowed_lifetime_max") + if self.retention_allowed_lifetime_max is not None: + self.retention_allowed_lifetime_max = self.parse_duration( + self.retention_allowed_lifetime_max + ) + + if ( + self.retention_allowed_lifetime_min is not None + and self.retention_allowed_lifetime_max is not None + and self.retention_allowed_lifetime_min > self.retention_allowed_lifetime_max + ): + raise ConfigError( + "Invalid retention policy limits: 'allowed_lifetime_min' can not be" + " greater than 'allowed_lifetime_max'" + ) + + self.retention_purge_jobs = [] + for purge_job_config in retention_config.get("purge_jobs", []): + interval_config = purge_job_config.get("interval") + + if interval_config is None: + raise ConfigError( + "A retention policy's purge jobs configuration must have the" + " 'interval' key set." + ) + + interval = self.parse_duration(interval_config) + + shortest_max_lifetime = purge_job_config.get("shortest_max_lifetime") + + if shortest_max_lifetime is not None: + shortest_max_lifetime = self.parse_duration(shortest_max_lifetime) + + longest_max_lifetime = purge_job_config.get("longest_max_lifetime") + + if longest_max_lifetime is not None: + longest_max_lifetime = self.parse_duration(longest_max_lifetime) + + if ( + shortest_max_lifetime is not None + and longest_max_lifetime is not None + and shortest_max_lifetime > longest_max_lifetime + ): + raise ConfigError( + "A retention policy's purge jobs configuration's" + " 'shortest_max_lifetime' value can not be greater than its" + " 'longest_max_lifetime' value." + ) + + self.retention_purge_jobs.append({ + "interval": interval, + "shortest_max_lifetime": shortest_max_lifetime, + "longest_max_lifetime": longest_max_lifetime, + }) + + if not self.retention_purge_jobs: + self.retention_purge_jobs = [{ + "interval": self.parse_duration("1d"), + "shortest_max_lifetime": None, + "longest_max_lifetime": None, + }] + self.listeners = [] # type: List[dict] for listener in config.get("listeners", []): if not isinstance(listener.get("port", None), int): @@ -761,6 +870,69 @@ class ServerConfig(Config): # Defaults to `28d`. Set to `null` to disable clearing out of old rows. # #user_ips_max_age: 14d + + # Message retention policy at the server level. + # + # Room admins and mods can define a retention period for their rooms using the + # 'm.room.retention' state event, and server admins can cap this period by setting + # the 'allowed_lifetime_min' and 'allowed_lifetime_max' config options. + # + # If this feature is enabled, Synapse will regularly look for and purge events + # which are older than the room's maximum retention period. Synapse will also + # filter events received over federation so that events that should have been + # purged are ignored and not stored again. + # + retention: + # The message retention policies feature is disabled by default. Uncomment the + # following line to enable it. + # + #enabled: true + + # Default retention policy. If set, Synapse will apply it to rooms that lack the + # 'm.room.retention' state event. Currently, the value of 'min_lifetime' doesn't + # matter much because Synapse doesn't take it into account yet. + # + #default_policy: + # min_lifetime: 1d + # max_lifetime: 1y + + # Retention policy limits. If set, a user won't be able to send a + # 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime' + # that's not within this range. This is especially useful in closed federations, + # in which server admins can make sure every federating server applies the same + # rules. + # + #allowed_lifetime_min: 1d + #allowed_lifetime_max: 1y + + # Server admins can define the settings of the background jobs purging the + # events which lifetime has expired under the 'purge_jobs' section. + # + # If no configuration is provided, a single job will be set up to delete expired + # events in every room daily. + # + # Each job's configuration defines which range of message lifetimes the job + # takes care of. For example, if 'shortest_max_lifetime' is '2d' and + # 'longest_max_lifetime' is '3d', the job will handle purging expired events in + # rooms whose state defines a 'max_lifetime' that's both higher than 2 days, and + # lower than or equal to 3 days. Both the minimum and the maximum value of a + # range are optional, e.g. a job with no 'shortest_max_lifetime' and a + # 'longest_max_lifetime' of '3d' will handle every room with a retention policy + # which 'max_lifetime' is lower than or equal to three days. + # + # The rationale for this per-job configuration is that some rooms might have a + # retention policy with a low 'max_lifetime', where history needs to be purged + # of outdated messages on a very frequent basis (e.g. every 5min), but not want + # that purge to be performed by a job that's iterating over every room it knows, + # which would be quite heavy on the server. + # + #purge_jobs: + # - shortest_max_lifetime: 1d + # longest_max_lifetime: 3d + # interval: 5m: + # - shortest_max_lifetime: 3d + # longest_max_lifetime: 1y + # interval: 24h """ % locals() ) -- cgit 1.5.1 From 7c24d0f443724082376c89f9f75954d81f524a8e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 19 Nov 2019 13:22:37 +0000 Subject: Lint --- synapse/config/server.py | 39 ++++++++++------- synapse/handlers/pagination.py | 17 +++----- synapse/storage/data_stores/main/room.py | 49 +++++++++++---------- tests/rest/client/test_retention.py | 73 ++++++++++---------------------- 4 files changed, 77 insertions(+), 101 deletions(-) (limited to 'synapse/config/server.py') diff --git a/synapse/config/server.py b/synapse/config/server.py index aa93a416f1..8a55ffac4f 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -19,7 +19,7 @@ import logging import os.path import re from textwrap import indent -from typing import List +from typing import List, Dict, Optional import attr import yaml @@ -287,13 +287,17 @@ class ServerConfig(Config): self.retention_default_min_lifetime = None self.retention_default_max_lifetime = None - self.retention_allowed_lifetime_min = retention_config.get("allowed_lifetime_min") + self.retention_allowed_lifetime_min = retention_config.get( + "allowed_lifetime_min" + ) if self.retention_allowed_lifetime_min is not None: self.retention_allowed_lifetime_min = self.parse_duration( self.retention_allowed_lifetime_min ) - self.retention_allowed_lifetime_max = retention_config.get("allowed_lifetime_max") + self.retention_allowed_lifetime_max = retention_config.get( + "allowed_lifetime_max" + ) if self.retention_allowed_lifetime_max is not None: self.retention_allowed_lifetime_max = self.parse_duration( self.retention_allowed_lifetime_max @@ -302,14 +306,15 @@ class ServerConfig(Config): if ( self.retention_allowed_lifetime_min is not None and self.retention_allowed_lifetime_max is not None - and self.retention_allowed_lifetime_min > self.retention_allowed_lifetime_max + and self.retention_allowed_lifetime_min + > self.retention_allowed_lifetime_max ): raise ConfigError( "Invalid retention policy limits: 'allowed_lifetime_min' can not be" " greater than 'allowed_lifetime_max'" ) - self.retention_purge_jobs = [] + self.retention_purge_jobs = [] # type: List[Dict[str, Optional[int]]] for purge_job_config in retention_config.get("purge_jobs", []): interval_config = purge_job_config.get("interval") @@ -342,18 +347,22 @@ class ServerConfig(Config): " 'longest_max_lifetime' value." ) - self.retention_purge_jobs.append({ - "interval": interval, - "shortest_max_lifetime": shortest_max_lifetime, - "longest_max_lifetime": longest_max_lifetime, - }) + self.retention_purge_jobs.append( + { + "interval": interval, + "shortest_max_lifetime": shortest_max_lifetime, + "longest_max_lifetime": longest_max_lifetime, + } + ) if not self.retention_purge_jobs: - self.retention_purge_jobs = [{ - "interval": self.parse_duration("1d"), - "shortest_max_lifetime": None, - "longest_max_lifetime": None, - }] + self.retention_purge_jobs = [ + { + "interval": self.parse_duration("1d"), + "shortest_max_lifetime": None, + "longest_max_lifetime": None, + } + ] self.listeners = [] # type: List[dict] for listener in config.get("listeners", []): diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index e1800177fa..d122c11a4d 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -154,20 +154,17 @@ class PaginationHandler(object): # Figure out what token we should start purging at. ts = self.clock.time_msec() - max_lifetime - stream_ordering = ( - yield self.store.find_first_stream_ordering_after_ts(ts) - ) + stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts) - r = ( - yield self.store.get_room_event_after_stream_ordering( - room_id, stream_ordering, - ) + r = yield self.store.get_room_event_after_stream_ordering( + room_id, stream_ordering, ) if not r: logger.warning( "[purge] purging events not possible: No event found " "(ts %i => stream_ordering %i)", - ts, stream_ordering, + ts, + stream_ordering, ) continue @@ -186,9 +183,7 @@ class PaginationHandler(object): # the background so that it's not blocking any other operation apart from # other purges in the same room. run_as_background_process( - "_purge_history", - self._purge_history, - purge_id, room_id, token, True, + "_purge_history", self._purge_history, purge_id, room_id, token, True, ) def start_purge_history(self, room_id, token, delete_local_events=False): diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index 54a7d24c73..7fceae59ca 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -334,8 +334,9 @@ class RoomStore(RoomWorkerStore, SearchStore): WHERE state.room_id > ? AND state.type = '%s' ORDER BY state.room_id ASC LIMIT ?; - """ % EventTypes.Retention, - (last_room, batch_size) + """ + % EventTypes.Retention, + (last_room, batch_size), ) rows = self.cursor_to_dict(txn) @@ -358,15 +359,13 @@ class RoomStore(RoomWorkerStore, SearchStore): "event_id": row["event_id"], "min_lifetime": retention_policy.get("min_lifetime"), "max_lifetime": retention_policy.get("max_lifetime"), - } + }, ) logger.info("Inserted %d rows into room_retention", len(rows)) self._background_update_progress_txn( - txn, "insert_room_retention", { - "room_id": rows[-1]["room_id"], - } + txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]} ) if batch_size > len(rows): @@ -375,8 +374,7 @@ class RoomStore(RoomWorkerStore, SearchStore): return False end = yield self.runInteraction( - "insert_room_retention", - _background_insert_retention_txn, + "insert_room_retention", _background_insert_retention_txn, ) if end: @@ -585,17 +583,15 @@ class RoomStore(RoomWorkerStore, SearchStore): ) def _store_retention_policy_for_room_txn(self, txn, event): - if ( - hasattr(event, "content") - and ("min_lifetime" in event.content or "max_lifetime" in event.content) + if hasattr(event, "content") and ( + "min_lifetime" in event.content or "max_lifetime" in event.content ): if ( - ("min_lifetime" in event.content and not isinstance( - event.content.get("min_lifetime"), integer_types - )) - or ("max_lifetime" in event.content and not isinstance( - event.content.get("max_lifetime"), integer_types - )) + "min_lifetime" in event.content + and not isinstance(event.content.get("min_lifetime"), integer_types) + ) or ( + "max_lifetime" in event.content + and not isinstance(event.content.get("max_lifetime"), integer_types) ): # Ignore the event if one of the value isn't an integer. return @@ -798,7 +794,9 @@ class RoomStore(RoomWorkerStore, SearchStore): return local_media_mxcs, remote_media_mxcs @defer.inlineCallbacks - def get_rooms_for_retention_period_in_range(self, min_ms, max_ms, include_null=False): + def get_rooms_for_retention_period_in_range( + self, min_ms, max_ms, include_null=False + ): """Retrieves all of the rooms within the given retention range. Optionally includes the rooms which don't have a retention policy. @@ -904,23 +902,24 @@ class RoomStore(RoomWorkerStore, SearchStore): INNER JOIN current_state_events USING (event_id, room_id) WHERE room_id = ?; """, - (room_id,) + (room_id,), ) return self.cursor_to_dict(txn) ret = yield self.runInteraction( - "get_retention_policy_for_room", - get_retention_policy_for_room_txn, + "get_retention_policy_for_room", get_retention_policy_for_room_txn, ) # If we don't know this room ID, ret will be None, in this case return the default # policy. if not ret: - defer.returnValue({ - "min_lifetime": self.config.retention_default_min_lifetime, - "max_lifetime": self.config.retention_default_max_lifetime, - }) + defer.returnValue( + { + "min_lifetime": self.config.retention_default_min_lifetime, + "max_lifetime": self.config.retention_default_max_lifetime, + } + ) row = ret[0] diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py index 7b6f25a838..6bf485c239 100644 --- a/tests/rest/client/test_retention.py +++ b/tests/rest/client/test_retention.py @@ -61,9 +61,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": one_day_ms * 4, - }, + body={"max_lifetime": one_day_ms * 4}, tok=self.token, expect_code=400, ) @@ -71,9 +69,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": one_hour_ms, - }, + body={"max_lifetime": one_hour_ms}, tok=self.token, expect_code=400, ) @@ -89,9 +85,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": lifetime, - }, + body={"max_lifetime": lifetime}, tok=self.token, ) @@ -115,20 +109,12 @@ class RetentionTestCase(unittest.HomeserverTestCase): events = [] # Send a first event, which should be filtered out at the end of the test. - resp = self.helper.send( - room_id=room_id, - body="1", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="1", tok=self.token) # Get the event from the store so that we end up with a FrozenEvent that we can # give to filter_events_for_client. We need to do this now because the event won't # be in the database anymore after it has expired. - events.append(self.get_success( - store.get_event( - resp.get("event_id") - ) - )) + events.append(self.get_success(store.get_event(resp.get("event_id")))) # Advance the time by 2 days. We're using the default retention policy, therefore # after this the first event will still be valid. @@ -143,20 +129,16 @@ class RetentionTestCase(unittest.HomeserverTestCase): valid_event_id = resp.get("event_id") - events.append(self.get_success( - store.get_event( - valid_event_id - ) - )) + events.append(self.get_success(store.get_event(valid_event_id))) # Advance the time by anothe 2 days. After this, the first event should be # outdated but not the second one. self.reactor.advance(one_day_ms * 2 / 1000) # Run filter_events_for_client with our list of FrozenEvents. - filtered_events = self.get_success(filter_events_for_client( - storage, self.user_id, events - )) + filtered_events = self.get_success( + filter_events_for_client(storage, self.user_id, events) + ) # We should only get one event back. self.assertEqual(len(filtered_events), 1, filtered_events) @@ -172,28 +154,22 @@ class RetentionTestCase(unittest.HomeserverTestCase): # Send a first event to the room. This is the event we'll want to be purged at the # end of the test. - resp = self.helper.send( - room_id=room_id, - body="1", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="1", tok=self.token) expired_event_id = resp.get("event_id") # Check that we can retrieve the event. expired_event = self.get_event(room_id, expired_event_id) - self.assertEqual(expired_event.get("content", {}).get("body"), "1", expired_event) + self.assertEqual( + expired_event.get("content", {}).get("body"), "1", expired_event + ) # Advance the time. self.reactor.advance(increment / 1000) # Send another event. We need this because the purge job won't purge the most # recent event in the room. - resp = self.helper.send( - room_id=room_id, - body="2", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="2", tok=self.token) valid_event_id = resp.get("event_id") @@ -240,8 +216,7 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): mock_federation_client = Mock(spec=["backfill"]) self.hs = self.setup_test_homeserver( - config=config, - federation_client=mock_federation_client, + config=config, federation_client=mock_federation_client, ) return self.hs @@ -268,9 +243,7 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": one_day_ms * 35, - }, + body={"max_lifetime": one_day_ms * 35}, tok=self.token, ) @@ -289,18 +262,16 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): # Check that we can retrieve the event. expired_event = self.get_event(room_id, first_event_id) - self.assertEqual(expired_event.get("content", {}).get("body"), "1", expired_event) + self.assertEqual( + expired_event.get("content", {}).get("body"), "1", expired_event + ) # Advance the time by a month. self.reactor.advance(one_day_ms * 30 / 1000) # Send another event. We need this because the purge job won't purge the most # recent event in the room. - resp = self.helper.send( - room_id=room_id, - body="2", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="2", tok=self.token) second_event_id = resp.get("event_id") @@ -313,7 +284,9 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): ) if expected_code_for_first_event == 200: - self.assertEqual(first_event.get("content", {}).get("body"), "1", first_event) + self.assertEqual( + first_event.get("content", {}).get("body"), "1", first_event + ) # Check that the event that hasn't been purged can still be retrieved. second_event = self.get_event(room_id, second_event_id) -- cgit 1.5.1 From bf9a11c54d3c9ca1e4fa9420b567efceb1e46d5b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 19 Nov 2019 13:30:04 +0000 Subject: Lint again --- synapse/config/server.py | 2 +- tests/rest/client/test_retention.py | 12 ++---------- 2 files changed, 3 insertions(+), 11 deletions(-) (limited to 'synapse/config/server.py') diff --git a/synapse/config/server.py b/synapse/config/server.py index 8a55ffac4f..ed916e1400 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -19,7 +19,7 @@ import logging import os.path import re from textwrap import indent -from typing import List, Dict, Optional +from typing import Dict, List, Optional import attr import yaml diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py index 6bf485c239..9e549d8a91 100644 --- a/tests/rest/client/test_retention.py +++ b/tests/rest/client/test_retention.py @@ -121,11 +121,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.reactor.advance(one_day_ms * 2 / 1000) # Send another event, which shouldn't get filtered out. - resp = self.helper.send( - room_id=room_id, - body="2", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="2", tok=self.token) valid_event_id = resp.get("event_id") @@ -252,11 +248,7 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): def _test_retention(self, room_id, expected_code_for_first_event=200): # Send a first event to the room. This is the event we'll want to be purged at the # end of the test. - resp = self.helper.send( - room_id=room_id, - body="1", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="1", tok=self.token) first_event_id = resp.get("event_id") -- cgit 1.5.1 From 97b863fe32c03ce30c4cd5bda1479c1f9a03da83 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 19 Nov 2019 13:33:58 +0000 Subject: Lint again --- synapse/config/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/config/server.py') diff --git a/synapse/config/server.py b/synapse/config/server.py index ed916e1400..4404953e27 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -879,7 +879,7 @@ class ServerConfig(Config): # Defaults to `28d`. Set to `null` to disable clearing out of old rows. # #user_ips_max_age: 14d - + # Message retention policy at the server level. # # Room admins and mods can define a retention period for their rooms using the -- cgit 1.5.1 From 3916e1b97a1ffc481dfdf66f7da58201a52140a9 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Thu, 21 Nov 2019 12:00:14 +0000 Subject: Clean up newline quote marks around the codebase (#6362) --- changelog.d/6362.misc | 1 + synapse/app/federation_sender.py | 2 +- synapse/appservice/api.py | 2 +- synapse/config/appservice.py | 2 +- synapse/config/room_directory.py | 2 +- synapse/config/server.py | 6 +++--- synapse/federation/persistence.py | 4 ++-- synapse/federation/sender/__init__.py | 2 +- synapse/federation/sender/transaction_manager.py | 4 ++-- synapse/handlers/directory.py | 2 +- synapse/http/servlet.py | 2 +- synapse/push/httppusher.py | 5 ++--- synapse/push/mailer.py | 4 ++-- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/server_notices/consent_server_notices.py | 2 +- synapse/storage/_base.py | 2 +- synapse/storage/data_stores/main/deviceinbox.py | 2 +- synapse/storage/data_stores/main/end_to_end_keys.py | 6 +++--- synapse/storage/data_stores/main/events.py | 8 +++----- synapse/storage/data_stores/main/filtering.py | 2 +- synapse/storage/data_stores/main/media_repository.py | 6 +++--- synapse/storage/data_stores/main/registration.py | 4 +--- synapse/storage/data_stores/main/stream.py | 2 +- synapse/storage/data_stores/main/tags.py | 4 +--- synapse/storage/prepare_database.py | 2 +- synapse/streams/config.py | 9 ++++++--- 26 files changed, 43 insertions(+), 46 deletions(-) create mode 100644 changelog.d/6362.misc (limited to 'synapse/config/server.py') diff --git a/changelog.d/6362.misc b/changelog.d/6362.misc new file mode 100644 index 0000000000..b79a5bea99 --- /dev/null +++ b/changelog.d/6362.misc @@ -0,0 +1 @@ +Clean up some unnecessary quotation marks around the codebase. \ No newline at end of file diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 139221ad34..448e45e00f 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -69,7 +69,7 @@ class FederationSenderSlaveStore( self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) def _get_federation_out_pos(self, db_conn): - sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?" + sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?" sql = self.database_engine.convert_param_style(sql) txn = db_conn.cursor() diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 3e25bf5747..57174da021 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -185,7 +185,7 @@ class ApplicationServiceApi(SimpleHttpClient): if not _is_valid_3pe_metadata(info): logger.warning( - "query_3pe_protocol to %s did not return a" " valid result", uri + "query_3pe_protocol to %s did not return a valid result", uri ) return None diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index e77d3387ff..ca43e96bd1 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -134,7 +134,7 @@ def _load_appservice(hostname, as_info, config_filename): for regex_obj in as_info["namespaces"][ns]: if not isinstance(regex_obj, dict): raise ValueError( - "Expected namespace entry in %s to be an object," " but got %s", + "Expected namespace entry in %s to be an object, but got %s", ns, regex_obj, ) diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py index 7c9f05bde4..7ac7699676 100644 --- a/synapse/config/room_directory.py +++ b/synapse/config/room_directory.py @@ -170,7 +170,7 @@ class _RoomDirectoryRule(object): self.action = action else: raise ConfigError( - "%s rules can only have action of 'allow'" " or 'deny'" % (option_name,) + "%s rules can only have action of 'allow' or 'deny'" % (option_name,) ) self._alias_matches_all = alias == "*" diff --git a/synapse/config/server.py b/synapse/config/server.py index 00d01c43af..11336d7549 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -223,7 +223,7 @@ class ServerConfig(Config): self.federation_ip_range_blacklist.update(["0.0.0.0", "::"]) except Exception as e: raise ConfigError( - "Invalid range(s) provided in " "federation_ip_range_blacklist: %s" % e + "Invalid range(s) provided in federation_ip_range_blacklist: %s" % e ) if self.public_baseurl is not None: @@ -787,14 +787,14 @@ class ServerConfig(Config): "--print-pidfile", action="store_true", default=None, - help="Print the path to the pidfile just" " before daemonizing", + help="Print the path to the pidfile just before daemonizing", ) server_group.add_argument( "--manhole", metavar="PORT", dest="manhole", type=int, - help="Turn on the twisted telnet manhole" " service on the given port.", + help="Turn on the twisted telnet manhole service on the given port.", ) diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 44edcabed4..d68b4bd670 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -44,7 +44,7 @@ class TransactionActions(object): response code and response body. """ if not transaction.transaction_id: - raise RuntimeError("Cannot persist a transaction with no " "transaction_id") + raise RuntimeError("Cannot persist a transaction with no transaction_id") return self.store.get_received_txn_response(transaction.transaction_id, origin) @@ -56,7 +56,7 @@ class TransactionActions(object): Deferred """ if not transaction.transaction_id: - raise RuntimeError("Cannot persist a transaction with no " "transaction_id") + raise RuntimeError("Cannot persist a transaction with no transaction_id") return self.store.set_received_txn_response( transaction.transaction_id, origin, code, response diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 2b2ee8612a..4ebb0e8bc0 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -49,7 +49,7 @@ sent_pdus_destination_dist_count = Counter( sent_pdus_destination_dist_total = Counter( "synapse_federation_client_sent_pdu_destinations:total", - "" "Total number of PDUs queued for sending across all destinations", + "Total number of PDUs queued for sending across all destinations", ) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 67b3e1ab6e..5fed626d5b 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -84,7 +84,7 @@ class TransactionManager(object): txn_id = str(self._next_txn_id) logger.debug( - "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)", + "TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)", destination, txn_id, len(pdus), @@ -103,7 +103,7 @@ class TransactionManager(object): self._next_txn_id += 1 logger.info( - "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)", + "TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)", destination, txn_id, transaction.transaction_id, diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 69051101a6..a07d2f1a17 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler): if not service.is_interested_in_alias(room_alias.to_string()): raise SynapseError( 400, - "This application service has not reserved" " this kind of alias.", + "This application service has not reserved this kind of alias.", errcode=Codes.EXCLUSIVE, ) else: diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index e9a5e46ced..13fcb408a6 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -96,7 +96,7 @@ def parse_boolean_from_args(args, name, default=None, required=False): return {b"true": True, b"false": False}[args[name][0]] except Exception: message = ( - "Boolean query parameter %r must be one of" " ['true', 'false']" + "Boolean query parameter %r must be one of ['true', 'false']" ) % (name,) raise SynapseError(400, message) else: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index e994037be6..d0879b0490 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -246,7 +246,7 @@ class HttpPusher(object): # fixed, we don't suddenly deliver a load # of old notifications. logger.warning( - "Giving up on a notification to user %s, " "pushkey %s", + "Giving up on a notification to user %s, pushkey %s", self.user_id, self.pushkey, ) @@ -299,8 +299,7 @@ class HttpPusher(object): # for sanity, we only remove the pushkey if it # was the one we actually sent... logger.warning( - ("Ignoring rejected pushkey %s because we" " didn't send it"), - pk, + ("Ignoring rejected pushkey %s because we didn't send it"), pk, ) else: logger.info("Pushkey %s was rejected: removing", pk) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 1d15a06a58..b13b646bfd 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -43,7 +43,7 @@ logger = logging.getLogger(__name__) MESSAGE_FROM_PERSON_IN_ROOM = ( - "You have a message on %(app)s from %(person)s " "in the %(room)s room..." + "You have a message on %(app)s from %(person)s in the %(room)s room..." ) MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..." MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..." @@ -55,7 +55,7 @@ MESSAGES_FROM_PERSON_AND_OTHERS = ( "You have messages on %(app)s from %(person)s and others..." ) INVITE_FROM_PERSON_TO_ROOM = ( - "%(person)s has invited you to join the " "%(room)s room on %(app)s..." + "%(person)s has invited you to join the %(room)s room on %(app)s..." ) INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..." diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 15c15a12f5..a23d6f5c75 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -122,7 +122,7 @@ class PreviewUrlResource(DirectServeResource): pattern = entry[attrib] value = getattr(url_tuple, attrib) logger.debug( - "Matching attrib '%s' with value '%s' against" " pattern '%s'", + "Matching attrib '%s' with value '%s' against pattern '%s'", attrib, value, pattern, diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py index 415e9c17d8..5736c56032 100644 --- a/synapse/server_notices/consent_server_notices.py +++ b/synapse/server_notices/consent_server_notices.py @@ -54,7 +54,7 @@ class ConsentServerNotices(object): ) if "body" not in self._server_notice_content: raise ConfigError( - "user_consent server_notice_consent must contain a 'body' " "key." + "user_consent server_notice_consent must contain a 'body' key." ) self._consent_uri_builder = ConsentURIBuilder(hs.config) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ab596fa68d..6b8a9cd89a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -851,7 +851,7 @@ class SQLBaseStore(object): allvalues.update(values) latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values) - sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % ( + sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % ( table, ", ".join(k for k in allvalues), ", ".join("?" for _ in allvalues), diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py index 96cd0fb77a..a23744f11c 100644 --- a/synapse/storage/data_stores/main/deviceinbox.py +++ b/synapse/storage/data_stores/main/deviceinbox.py @@ -380,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) devices = list(messages_by_device.keys()) if len(devices) == 1 and devices[0] == "*": # Handle wildcard device_ids. - sql = "SELECT device_id FROM devices" " WHERE user_id = ?" + sql = "SELECT device_id FROM devices WHERE user_id = ?" txn.execute(sql, (user_id,)) message_json = json.dumps(messages_by_device["*"]) for row in txn: diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index 073412a78d..d8ad59ad93 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore): result.setdefault(user_id, {})[device_id] = None # get signatures on the device - signature_sql = ( - "SELECT * " " FROM e2e_cross_signing_signatures " " WHERE %s" - ) % (" OR ".join("(" + q + ")" for q in signature_query_clauses)) + signature_sql = ("SELECT * FROM e2e_cross_signing_signatures WHERE %s") % ( + " OR ".join("(" + q + ")" for q in signature_query_clauses) + ) txn.execute(signature_sql, signature_query_params) rows = self.cursor_to_dict(txn) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 878f7568a6..627c0b67f1 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -713,9 +713,7 @@ class EventsStore( metadata_json = encode_json(event.internal_metadata.get_dict()) - sql = ( - "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?" - ) + sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?" txn.execute(sql, (metadata_json, event.event_id)) # Add an entry to the ex_outlier_stream table to replicate the @@ -732,7 +730,7 @@ class EventsStore( }, ) - sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?" + sql = "UPDATE events SET outlier = ? WHERE event_id = ?" txn.execute(sql, (False, event.event_id)) # Update the event_backward_extremities table now that this @@ -1479,7 +1477,7 @@ class EventsStore( # We do joins against events_to_purge for e.g. calculating state # groups to purge, etc., so lets make an index. - txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)") + txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)") txn.execute("SELECT event_id, should_delete FROM events_to_purge") event_rows = txn.fetchall() diff --git a/synapse/storage/data_stores/main/filtering.py b/synapse/storage/data_stores/main/filtering.py index a2a2a67927..f05ace299a 100644 --- a/synapse/storage/data_stores/main/filtering.py +++ b/synapse/storage/data_stores/main/filtering.py @@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore): if filter_id_response is not None: return filter_id_response[0] - sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?" + sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?" txn.execute(sql, (user_localpart,)) max_id = txn.fetchone()[0] if max_id is None: diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py index 84b5f3ad5e..0f2887bdce 100644 --- a/synapse/storage/data_stores/main/media_repository.py +++ b/synapse/storage/data_stores/main/media_repository.py @@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): if len(media_ids) == 0: return - sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?" + sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?" def _delete_url_cache_txn(txn): txn.executemany(sql, [(media_id,) for media_id in media_ids]) @@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): return def _delete_url_cache_media_txn(txn): - sql = "DELETE FROM local_media_repository" " WHERE media_id = ?" + sql = "DELETE FROM local_media_repository WHERE media_id = ?" txn.executemany(sql, [(media_id,) for media_id in media_ids]) - sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?" + sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?" txn.executemany(sql, [(media_id,) for media_id in media_ids]) diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py index ee1b2b2bbf..6a594c160c 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py @@ -377,9 +377,7 @@ class RegistrationWorkerStore(SQLBaseStore): """ def f(txn): - sql = ( - "SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)" - ) + sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)" txn.execute(sql, (user_id,)) return dict(txn) diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 8780fdd989..9ae4a913a1 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -616,7 +616,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def _get_max_topological_txn(self, txn, room_id): txn.execute( - "SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?", + "SELECT MAX(topological_ordering) FROM events WHERE room_id = ?", (room_id,), ) diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py index 10d1887f75..aa24339717 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py @@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore): ) def get_tag_content(txn, tag_ids): - sql = ( - "SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?" - ) + sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?" results = [] for stream_id, user_id, room_id in tag_ids: txn.execute(sql, (user_id, room_id)) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 2e7753820e..731e1c9d9c 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams) # Mark as done. cur.execute( database_engine.convert_param_style( - "INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)" + "INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)" ), (modname, name), ) diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 02994ab2a5..cd56cd91ed 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -88,9 +88,12 @@ class PaginationConfig(object): raise SynapseError(400, "Invalid request.") def __repr__(self): - return ( - "PaginationConfig(from_tok=%r, to_tok=%r," " direction=%r, limit=%r)" - ) % (self.from_token, self.to_token, self.direction, self.limit) + return ("PaginationConfig(from_tok=%r, to_tok=%r, direction=%r, limit=%r)") % ( + self.from_token, + self.to_token, + self.direction, + self.limit, + ) def get_source_config(self, source_name): keyname = "%s_key" % source_name -- cgit 1.5.1 From 54dd5dc12b0ac5c48303144c4a73ce3822209488 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 19:19:45 +0000 Subject: Add ephemeral messages support (MSC2228) (#6409) Implement part [MSC2228](https://github.com/matrix-org/matrix-doc/pull/2228). The parts that differ are: * the feature is hidden behind a configuration flag (`enable_ephemeral_messages`) * self-destruction doesn't happen for state events * only implement support for the `m.self_destruct_after` field (not the `m.self_destruct` one) * doesn't send synthetic redactions to clients because for this specific case we consider the clients to be able to destroy an event themselves, instead we just censor it (by pruning its JSON) in the database --- changelog.d/6409.feature | 1 + synapse/api/constants.py | 4 + synapse/config/server.py | 2 + synapse/handlers/federation.py | 8 ++ synapse/handlers/message.py | 123 +++++++++++++++++++- synapse/storage/data_stores/main/events.py | 126 ++++++++++++++++++++- .../main/schema/delta/56/event_expiry.sql | 21 ++++ tests/rest/client/test_ephemeral_message.py | 101 +++++++++++++++++ 8 files changed, 379 insertions(+), 7 deletions(-) create mode 100644 changelog.d/6409.feature create mode 100644 synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql create mode 100644 tests/rest/client/test_ephemeral_message.py (limited to 'synapse/config/server.py') diff --git a/changelog.d/6409.feature b/changelog.d/6409.feature new file mode 100644 index 0000000000..653ff5a5ad --- /dev/null +++ b/changelog.d/6409.feature @@ -0,0 +1 @@ +Add ephemeral messages support by partially implementing [MSC2228](https://github.com/matrix-org/matrix-doc/pull/2228). diff --git a/synapse/api/constants.py b/synapse/api/constants.py index e3f086f1c3..69cef369a5 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -147,3 +147,7 @@ class EventContentFields(object): # Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326 LABELS = "org.matrix.labels" + + # Timestamp to delete the event after + # cf https://github.com/matrix-org/matrix-doc/pull/2228 + SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after" diff --git a/synapse/config/server.py b/synapse/config/server.py index 7a9d711669..837fbe1582 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -490,6 +490,8 @@ class ServerConfig(Config): "cleanup_extremities_with_dummy_events", True ) + self.enable_ephemeral_messages = config.get("enable_ephemeral_messages", False) + def has_tls_listener(self) -> bool: return any(l["tls"] for l in self.listeners) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d3267734f7..d9d0cd9eef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -121,6 +121,7 @@ class FederationHandler(BaseHandler): self.pusher_pool = hs.get_pusherpool() self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() + self._message_handler = hs.get_message_handler() self._server_notices_mxid = hs.config.server_notices_mxid self.config = hs.config self.http_client = hs.get_simple_http_client() @@ -141,6 +142,8 @@ class FederationHandler(BaseHandler): self.third_party_event_rules = hs.get_third_party_event_rules() + self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages + @defer.inlineCallbacks def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False): """ Process a PDU received via a federation /send/ transaction, or @@ -2715,6 +2718,11 @@ class FederationHandler(BaseHandler): event_and_contexts, backfilled=backfilled ) + if self._ephemeral_messages_enabled: + for (event, context) in event_and_contexts: + # If there's an expiry timestamp on the event, schedule its expiry. + self._message_handler.maybe_schedule_expiry(event) + if not backfilled: # Never notify for backfilled events for event, _ in event_and_contexts: yield self._notify_persisted_event(event, max_stream_id) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3b0156f516..4f53a5f5dc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import Optional from six import iteritems, itervalues, string_types @@ -22,9 +23,16 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer from twisted.internet.defer import succeed +from twisted.internet.interfaces import IDelayedCall from synapse import event_auth -from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes +from synapse.api.constants import ( + EventContentFields, + EventTypes, + Membership, + RelationTypes, + UserTypes, +) from synapse.api.errors import ( AuthError, Codes, @@ -62,6 +70,17 @@ class MessageHandler(object): self.storage = hs.get_storage() self.state_store = self.storage.state self._event_serializer = hs.get_event_client_serializer() + self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages + self._is_worker_app = bool(hs.config.worker_app) + + # The scheduled call to self._expire_event. None if no call is currently + # scheduled. + self._scheduled_expiry = None # type: Optional[IDelayedCall] + + if not hs.config.worker_app: + run_as_background_process( + "_schedule_next_expiry", self._schedule_next_expiry + ) @defer.inlineCallbacks def get_room_data( @@ -225,6 +244,100 @@ class MessageHandler(object): for user_id, profile in iteritems(users_with_profile) } + def maybe_schedule_expiry(self, event): + """Schedule the expiry of an event if there's not already one scheduled, + or if the one running is for an event that will expire after the provided + timestamp. + + This function needs to invalidate the event cache, which is only possible on + the master process, and therefore needs to be run on there. + + Args: + event (EventBase): The event to schedule the expiry of. + """ + assert not self._is_worker_app + + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) + if not isinstance(expiry_ts, int) or event.is_state(): + return + + # _schedule_expiry_for_event won't actually schedule anything if there's already + # a task scheduled for a timestamp that's sooner than the provided one. + self._schedule_expiry_for_event(event.event_id, expiry_ts) + + @defer.inlineCallbacks + def _schedule_next_expiry(self): + """Retrieve the ID and the expiry timestamp of the next event to be expired, + and schedule an expiry task for it. + + If there's no event left to expire, set _expiry_scheduled to None so that a + future call to save_expiry_ts can schedule a new expiry task. + """ + # Try to get the expiry timestamp of the next event to expire. + res = yield self.store.get_next_event_to_expire() + if res: + event_id, expiry_ts = res + self._schedule_expiry_for_event(event_id, expiry_ts) + + def _schedule_expiry_for_event(self, event_id, expiry_ts): + """Schedule an expiry task for the provided event if there's not already one + scheduled at a timestamp that's sooner than the provided one. + + Args: + event_id (str): The ID of the event to expire. + expiry_ts (int): The timestamp at which to expire the event. + """ + if self._scheduled_expiry: + # If the provided timestamp refers to a time before the scheduled time of the + # next expiry task, cancel that task and reschedule it for this timestamp. + next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 + if expiry_ts < next_scheduled_expiry_ts: + self._scheduled_expiry.cancel() + else: + return + + # Figure out how many seconds we need to wait before expiring the event. + now_ms = self.clock.time_msec() + delay = (expiry_ts - now_ms) / 1000 + + # callLater doesn't support negative delays, so trim the delay to 0 if we're + # in that case. + if delay < 0: + delay = 0 + + logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay) + + self._scheduled_expiry = self.clock.call_later( + delay, + run_as_background_process, + "_expire_event", + self._expire_event, + event_id, + ) + + @defer.inlineCallbacks + def _expire_event(self, event_id): + """Retrieve and expire an event that needs to be expired from the database. + + If the event doesn't exist in the database, log it and delete the expiry date + from the database (so that we don't try to expire it again). + """ + assert self._ephemeral_events_enabled + + self._scheduled_expiry = None + + logger.info("Expiring event %s", event_id) + + try: + # Expire the event if we know about it. This function also deletes the expiry + # date from the database in the same database transaction. + yield self.store.expire_event(event_id) + except Exception as e: + logger.error("Could not expire event %s: %r", event_id, e) + + # Schedule the expiry of the next event to expire. + yield self._schedule_next_expiry() + # The duration (in ms) after which rooms should be removed # `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try @@ -295,6 +408,10 @@ class EventCreationHandler(object): 5 * 60 * 1000, ) + self._message_handler = hs.get_message_handler() + + self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages + @defer.inlineCallbacks def create_event( self, @@ -877,6 +994,10 @@ class EventCreationHandler(object): event, context=context ) + if self._ephemeral_events_enabled: + # If there's an expiry timestamp on the event, schedule its expiry. + self._message_handler.maybe_schedule_expiry(event) + yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) def _notify(): diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 2737a1d3ae..79c91fe284 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -130,6 +130,8 @@ class EventsStore( if self.hs.config.redaction_retention_period is not None: hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) + self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages + @defer.inlineCallbacks def _read_forward_extremities(self): def fetch(txn): @@ -940,6 +942,12 @@ class EventsStore( txn, event.event_id, labels, event.room_id, event.depth ) + if self._ephemeral_messages_enabled: + # If there's an expiry timestamp on the event, store it. + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) + if isinstance(expiry_ts, int) and not event.is_state(): + self._insert_event_expiry_txn(txn, event.event_id, expiry_ts) + # Insert into the room_memberships table. self._store_room_members_txn( txn, @@ -1101,12 +1109,7 @@ class EventsStore( def _update_censor_txn(txn): for redaction_id, event_id, pruned_json in updates: if pruned_json: - self._simple_update_one_txn( - txn, - table="event_json", - keyvalues={"event_id": event_id}, - updatevalues={"json": pruned_json}, - ) + self._censor_event_txn(txn, event_id, pruned_json) self._simple_update_one_txn( txn, @@ -1117,6 +1120,22 @@ class EventsStore( yield self.runInteraction("_update_censor_txn", _update_censor_txn) + def _censor_event_txn(self, txn, event_id, pruned_json): + """Censor an event by replacing its JSON in the event_json table with the + provided pruned JSON. + + Args: + txn (LoggingTransaction): The database transaction. + event_id (str): The ID of the event to censor. + pruned_json (str): The pruned JSON + """ + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + @defer.inlineCallbacks def count_daily_messages(self): """ @@ -1957,6 +1976,101 @@ class EventsStore( ], ) + def _insert_event_expiry_txn(self, txn, event_id, expiry_ts): + """Save the expiry timestamp associated with a given event ID. + + Args: + txn (LoggingTransaction): The database transaction to use. + event_id (str): The event ID the expiry timestamp is associated with. + expiry_ts (int): The timestamp at which to expire (delete) the event. + """ + return self._simple_insert_txn( + txn=txn, + table="event_expiry", + values={"event_id": event_id, "expiry_ts": expiry_ts}, + ) + + @defer.inlineCallbacks + def expire_event(self, event_id): + """Retrieve and expire an event that has expired, and delete its associated + expiry timestamp. If the event can't be retrieved, delete its associated + timestamp so we don't try to expire it again in the future. + + Args: + event_id (str): The ID of the event to delete. + """ + # Try to retrieve the event's content from the database or the event cache. + event = yield self.get_event(event_id) + + def delete_expired_event_txn(txn): + # Delete the expiry timestamp associated with this event from the database. + self._delete_event_expiry_txn(txn, event_id) + + if not event: + # If we can't find the event, log a warning and delete the expiry date + # from the database so that we don't try to expire it again in the + # future. + logger.warning( + "Can't expire event %s because we don't have it.", event_id + ) + return + + # Prune the event's dict then convert it to JSON. + pruned_json = encode_json(prune_event_dict(event.get_dict())) + + # Update the event_json table to replace the event's JSON with the pruned + # JSON. + self._censor_event_txn(txn, event.event_id, pruned_json) + + # We need to invalidate the event cache entry for this event because we + # changed its content in the database. We can't call + # self._invalidate_cache_and_stream because self.get_event_cache isn't of the + # right type. + txn.call_after(self._get_event_cache.invalidate, (event.event_id,)) + # Send that invalidation to replication so that other workers also invalidate + # the event cache. + self._send_invalidation_to_replication( + txn, "_get_event_cache", (event.event_id,) + ) + + yield self.runInteraction("delete_expired_event", delete_expired_event_txn) + + def _delete_event_expiry_txn(self, txn, event_id): + """Delete the expiry timestamp associated with an event ID without deleting the + actual event. + + Args: + txn (LoggingTransaction): The transaction to use to perform the deletion. + event_id (str): The event ID to delete the associated expiry timestamp of. + """ + return self._simple_delete_txn( + txn=txn, table="event_expiry", keyvalues={"event_id": event_id} + ) + + def get_next_event_to_expire(self): + """Retrieve the entry with the lowest expiry timestamp in the event_expiry + table, or None if there's no more event to expire. + + Returns: Deferred[Optional[Tuple[str, int]]] + A tuple containing the event ID as its first element and an expiry timestamp + as its second one, if there's at least one row in the event_expiry table. + None otherwise. + """ + + def get_next_event_to_expire_txn(txn): + txn.execute( + """ + SELECT event_id, expiry_ts FROM event_expiry + ORDER BY expiry_ts ASC LIMIT 1 + """ + ) + + return txn.fetchone() + + return self.runInteraction( + desc="get_next_event_to_expire", func=get_next_event_to_expire_txn + ) + AllNewEventsResult = namedtuple( "AllNewEventsResult", diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql new file mode 100644 index 0000000000..81a36a8b1d --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql @@ -0,0 +1,21 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_expiry ( + event_id TEXT PRIMARY KEY, + expiry_ts BIGINT NOT NULL +); + +CREATE INDEX event_expiry_expiry_ts_idx ON event_expiry(expiry_ts); diff --git a/tests/rest/client/test_ephemeral_message.py b/tests/rest/client/test_ephemeral_message.py new file mode 100644 index 0000000000..5e9c07ebf3 --- /dev/null +++ b/tests/rest/client/test_ephemeral_message.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.api.constants import EventContentFields, EventTypes +from synapse.rest import admin +from synapse.rest.client.v1 import room + +from tests import unittest + + +class EphemeralMessageTestCase(unittest.HomeserverTestCase): + + user_id = "@user:test" + + servlets = [ + admin.register_servlets, + room.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + config = self.default_config() + + config["enable_ephemeral_messages"] = True + + self.hs = self.setup_test_homeserver(config=config) + return self.hs + + def prepare(self, reactor, clock, homeserver): + self.room_id = self.helper.create_room_as(self.user_id) + + def test_message_expiry_no_delay(self): + """Tests that sending a message sent with a m.self_destruct_after field set to the + past results in that event being deleted right away. + """ + # Send a message in the room that has expired. From here, the reactor clock is + # at 200ms, so 0 is in the past, and even if that wasn't the case and the clock + # is at 0ms the code path is the same if the event's expiry timestamp is the + # current timestamp. + res = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={ + "msgtype": "m.text", + "body": "hello", + EventContentFields.SELF_DESTRUCT_AFTER: 0, + }, + ) + event_id = res["event_id"] + + # Check that we can't retrieve the content of the event. + event_content = self.get_event(self.room_id, event_id)["content"] + self.assertFalse(bool(event_content), event_content) + + def test_message_expiry_delay(self): + """Tests that sending a message with a m.self_destruct_after field set to the + future results in that event not being deleted right away, but advancing the + clock to after that expiry timestamp causes the event to be deleted. + """ + # Send a message in the room that'll expire in 1s. + res = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={ + "msgtype": "m.text", + "body": "hello", + EventContentFields.SELF_DESTRUCT_AFTER: self.clock.time_msec() + 1000, + }, + ) + event_id = res["event_id"] + + # Check that we can retrieve the content of the event before it has expired. + event_content = self.get_event(self.room_id, event_id)["content"] + self.assertTrue(bool(event_content), event_content) + + # Advance the clock to after the deletion. + self.reactor.advance(1) + + # Check that we can't retrieve the content of the event anymore. + event_content = self.get_event(self.room_id, event_id)["content"] + self.assertFalse(bool(event_content), event_content) + + def get_event(self, room_id, event_id, expected_code=200): + url = "/_matrix/client/r0/rooms/%s/event/%s" % (room_id, event_id) + + request, channel = self.make_request("GET", url) + self.render(request) + + self.assertEqual(channel.code, expected_code, channel.result) + + return channel.json_body -- cgit 1.5.1 From cb0aeb147e3b3defc27866ad0e4982e63600a7ee Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Wed, 4 Dec 2019 09:46:16 +0000 Subject: privacy by default for room dir (#6355) Ensure that the the default settings for the room directory are that the it is hidden from public view by default. --- UPGRADE.rst | 17 ++++++++++ changelog.d/6354.feature | 1 + docs/sample_config.yaml | 13 ++++---- synapse/config/server.py | 26 +++++++++------- tests/federation/transport/test_server.py | 52 +++++++++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 18 deletions(-) create mode 100644 changelog.d/6354.feature create mode 100644 tests/federation/transport/test_server.py (limited to 'synapse/config/server.py') diff --git a/UPGRADE.rst b/UPGRADE.rst index 5ebf16a73e..d9020f2663 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -75,6 +75,23 @@ for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb +Upgrading to v1.7.0 +=================== + +In an attempt to configure Synapse in a privacy preserving way, the default +behaviours of ``allow_public_rooms_without_auth`` and +``allow_public_rooms_over_federation`` have been inverted. This means that by +default, only authenticated users querying the Client/Server API will be able +to query the room directory, and relatedly that the server will not share +room directory information with other servers over federation. + +If your installation does not explicitly set these settings one way or the other +and you want either setting to be ``true`` then it will necessary to update +your homeserver configuration file accordingly. + +For more details on the surrounding context see our `explainer +`_. + Upgrading to v1.5.0 =================== diff --git a/changelog.d/6354.feature b/changelog.d/6354.feature new file mode 100644 index 0000000000..fed9db884b --- /dev/null +++ b/changelog.d/6354.feature @@ -0,0 +1 @@ +Configure privacy preserving settings by default for the room directory. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index c7391f0c48..10664ae8f7 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -54,15 +54,16 @@ pid_file: DATADIR/homeserver.pid # #require_auth_for_profile_requests: true -# If set to 'false', requires authentication to access the server's public rooms -# directory through the client API. Defaults to 'true'. +# If set to 'true', removes the need for authentication to access the server's +# public rooms directory through the client API, meaning that anyone can +# query the room directory. Defaults to 'false'. # -#allow_public_rooms_without_auth: false +#allow_public_rooms_without_auth: true -# If set to 'false', forbids any other homeserver to fetch the server's public -# rooms directory via federation. Defaults to 'true'. +# If set to 'true', allows any other homeserver to fetch the server's public +# rooms directory via federation. Defaults to 'false'. # -#allow_public_rooms_over_federation: false +#allow_public_rooms_over_federation: true # The default room version for newly created rooms. # diff --git a/synapse/config/server.py b/synapse/config/server.py index 837fbe1582..a4bef00936 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -118,15 +118,16 @@ class ServerConfig(Config): self.allow_public_rooms_without_auth = False self.allow_public_rooms_over_federation = False else: - # If set to 'False', requires authentication to access the server's public - # rooms directory through the client API. Defaults to 'True'. + # If set to 'true', removes the need for authentication to access the server's + # public rooms directory through the client API, meaning that anyone can + # query the room directory. Defaults to 'false'. self.allow_public_rooms_without_auth = config.get( - "allow_public_rooms_without_auth", True + "allow_public_rooms_without_auth", False ) - # If set to 'False', forbids any other homeserver to fetch the server's public - # rooms directory via federation. Defaults to 'True'. + # If set to 'true', allows any other homeserver to fetch the server's public + # rooms directory via federation. Defaults to 'false'. self.allow_public_rooms_over_federation = config.get( - "allow_public_rooms_over_federation", True + "allow_public_rooms_over_federation", False ) default_room_version = config.get("default_room_version", DEFAULT_ROOM_VERSION) @@ -620,15 +621,16 @@ class ServerConfig(Config): # #require_auth_for_profile_requests: true - # If set to 'false', requires authentication to access the server's public rooms - # directory through the client API. Defaults to 'true'. + # If set to 'true', removes the need for authentication to access the server's + # public rooms directory through the client API, meaning that anyone can + # query the room directory. Defaults to 'false'. # - #allow_public_rooms_without_auth: false + #allow_public_rooms_without_auth: true - # If set to 'false', forbids any other homeserver to fetch the server's public - # rooms directory via federation. Defaults to 'true'. + # If set to 'true', allows any other homeserver to fetch the server's public + # rooms directory via federation. Defaults to 'false'. # - #allow_public_rooms_over_federation: false + #allow_public_rooms_over_federation: true # The default room version for newly created rooms. # diff --git a/tests/federation/transport/test_server.py b/tests/federation/transport/test_server.py new file mode 100644 index 0000000000..27d83bb7d9 --- /dev/null +++ b/tests/federation/transport/test_server.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer + +from synapse.config.ratelimiting import FederationRateLimitConfig +from synapse.federation.transport import server +from synapse.util.ratelimitutils import FederationRateLimiter + +from tests import unittest +from tests.unittest import override_config + + +class RoomDirectoryFederationTests(unittest.HomeserverTestCase): + def prepare(self, reactor, clock, homeserver): + class Authenticator(object): + def authenticate_request(self, request, content): + return defer.succeed("otherserver.nottld") + + ratelimiter = FederationRateLimiter(clock, FederationRateLimitConfig()) + server.register_servlets( + homeserver, self.resource, Authenticator(), ratelimiter + ) + + @override_config({"allow_public_rooms_over_federation": False}) + def test_blocked_public_room_list_over_federation(self): + request, channel = self.make_request( + "GET", "/_matrix/federation/v1/publicRooms" + ) + self.render(request) + self.assertEquals(403, channel.code) + + @override_config({"allow_public_rooms_over_federation": True}) + def test_open_public_room_list_over_federation(self): + request, channel = self.make_request( + "GET", "/_matrix/federation/v1/publicRooms" + ) + self.render(request) + self.assertEquals(200, channel.code) -- cgit 1.5.1