summary refs log tree commit diff
path: root/synapse/handlers/pagination.py
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-08-26 12:22:25 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2020-08-26 12:22:25 +0100
commit7affcd01c76f495dfe70dbb9f68d964a2d58b9bd (patch)
tree7a42640f7b1c7bd068332a4fd9dce3c2a0dcecd6 /synapse/handlers/pagination.py
parentSimplify medium and address assignment (diff)
parentAdd functions to `MultiWriterIdGen` used by events stream (#8164) (diff)
downloadsynapse-github/anoa/user_param_ui_auth.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/user_param_ui_auth github/anoa/user_param_ui_auth anoa/user_param_ui_auth
* 'develop' of github.com:matrix-org/synapse: (369 commits)
  Add functions to `MultiWriterIdGen` used by events stream (#8164)
  Do not allow send_nonmember_event to be called with shadow-banned users. (#8158)
  Changelog fixes
  1.19.1rc1
  Make StreamIdGen `get_next` and `get_next_mult` async  (#8161)
  Wording fixes to 'name' user admin api filter (#8163)
  Fix missing double-backtick in RST document
  Search in columns 'name' and 'displayname' in the admin users endpoint (#7377)
  Add type hints for state. (#8140)
  Stop shadow-banned users from sending non-member events. (#8142)
  Allow capping a room's retention policy (#8104)
  Add healthcheck for default localhost 8008 port on /health endpoint. (#8147)
  Fix flaky shadow-ban tests. (#8152)
  Fix join ratelimiter breaking profile updates and idempotency (#8153)
  Do not apply ratelimiting on joins to appservices (#8139)
  Don't fail /submit_token requests on incorrect session ID if request_token_inhibit_3pid_errors is turned on (#7991)
  Do not apply ratelimiting on joins to appservices (#8139)
  Micro-optimisations to get_auth_chain_ids (#8132)
  Allow denying or shadow banning registrations via the spam checker (#8034)
  Stop shadow-banned users from sending invites. (#8095)
  ...
Diffstat (limited to 'synapse/handlers/pagination.py')
-rw-r--r--synapse/handlers/pagination.py66
1 files changed, 39 insertions, 27 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py

index d7442c62a7..ac3418d69d 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py
@@ -15,9 +15,6 @@ # limitations under the License. import logging -from six import iteritems - -from twisted.internet import defer from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership @@ -85,6 +82,9 @@ class PaginationHandler(object): self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime + self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min + self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max + if hs.config.retention_enabled: # Run the purge jobs described in the configuration file. for job in hs.config.retention_purge_jobs: @@ -99,8 +99,7 @@ class PaginationHandler(object): job["longest_max_lifetime"], ) - @defer.inlineCallbacks - def purge_history_for_rooms_in_range(self, min_ms, max_ms): + async def purge_history_for_rooms_in_range(self, min_ms, max_ms): """Purge outdated events from rooms within the given retention range. If a default retention policy is defined in the server's configuration and its @@ -115,7 +114,7 @@ class PaginationHandler(object): the range to handle (inclusive). If None, it means that the range has no upper limit. """ - # We want the storage layer to to include rooms with no retention policy in its + # We want the storage layer to include rooms with no retention policy in its # return value only if a default retention policy is defined in the server's # configuration and that policy's 'max_lifetime' is either lower (or equal) than # max_ms or higher than min_ms (or both). @@ -139,13 +138,13 @@ class PaginationHandler(object): include_null, ) - rooms = yield self.store.get_rooms_for_retention_period_in_range( + rooms = await self.store.get_rooms_for_retention_period_in_range( min_ms, max_ms, include_null ) logger.debug("[purge] Rooms to purge: %s", rooms) - for room_id, retention_policy in iteritems(rooms): + for room_id, retention_policy in rooms.items(): logger.info("[purge] Attempting to purge messages in room %s", room_id) if room_id in self._purges_in_progress_by_room: @@ -156,20 +155,39 @@ class PaginationHandler(object): ) continue - max_lifetime = retention_policy["max_lifetime"] + # If max_lifetime is None, it means that the room has no retention policy. + # Given we only retrieve such rooms when there's a default retention policy + # defined in the server's configuration, we can safely assume that's the + # case and use it for this room. + max_lifetime = ( + retention_policy["max_lifetime"] or self._retention_default_max_lifetime + ) - if max_lifetime is None: - # If max_lifetime is None, it means that include_null equals True, - # therefore we can safely assume that there is a default policy defined - # in the server's configuration. - max_lifetime = self._retention_default_max_lifetime + # Cap the effective max_lifetime to be within the range allowed in the + # config. + # We do this in two steps: + # 1. Make sure it's higher or equal to the minimum allowed value, and if + # it's not replace it with that value. This is because the server + # operator can be required to not delete information before a given + # time, e.g. to comply with freedom of information laws. + # 2. Make sure the resulting value is lower or equal to the maximum allowed + # value, and if it's not replace it with that value. This is because the + # server operator can be required to delete any data after a specific + # amount of time. + if self._retention_allowed_lifetime_min is not None: + max_lifetime = max(self._retention_allowed_lifetime_min, max_lifetime) + + if self._retention_allowed_lifetime_max is not None: + max_lifetime = min(max_lifetime, self._retention_allowed_lifetime_max) + + logger.debug("[purge] max_lifetime for room %s: %s", room_id, max_lifetime) # Figure out what token we should start purging at. ts = self.clock.time_msec() - max_lifetime - stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts) + stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts) - r = yield self.store.get_room_event_before_stream_ordering( + r = await self.store.get_room_event_before_stream_ordering( room_id, stream_ordering, ) if not r: @@ -229,8 +247,7 @@ class PaginationHandler(object): ) return purge_id - @defer.inlineCallbacks - def _purge_history(self, purge_id, room_id, token, delete_local_events): + async def _purge_history(self, purge_id, room_id, token, delete_local_events): """Carry out a history purge on a room. Args: @@ -239,14 +256,11 @@ class PaginationHandler(object): token (str): topological token to delete events before delete_local_events (bool): True to delete local events as well as remote ones - - Returns: - Deferred """ self._purges_in_progress_by_room.add(room_id) try: - with (yield self.pagination_lock.write(room_id)): - yield self.storage.purge_events.purge_history( + with await self.pagination_lock.write(room_id): + await self.storage.purge_events.purge_history( room_id, token, delete_local_events ) logger.info("[purge] complete") @@ -284,9 +298,7 @@ class PaginationHandler(object): await self.store.get_room_version_id(room_id) # first check that we have no users in this room - joined = await defer.maybeDeferred( - self.store.is_host_joined, room_id, self._server_name - ) + joined = await self.store.is_host_joined(room_id, self._server_name) if joined: raise SynapseError(400, "Users are still joined to this room") @@ -319,7 +331,7 @@ class PaginationHandler(object): room_token = pagin_config.from_token.room_key else: pagin_config.from_token = ( - await self.hs.get_event_sources().get_current_token_for_pagination() + self.hs.get_event_sources().get_current_token_for_pagination() ) room_token = pagin_config.from_token.room_key