diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d7442c62a7..34ed0e2921 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -14,26 +14,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import TYPE_CHECKING, Any, Dict, Optional, Set
-from six import iteritems
-
-from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
+from synapse.api.filtering import Filter
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
-from synapse.types import RoomStreamToken
+from synapse.streams.config import PaginationConfig
+from synapse.types import Requester, RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
logger = logging.getLogger(__name__)
-class PurgeStatus(object):
+class PurgeStatus:
"""Object tracking the status of a purge request
This class contains information on the progress of a purge request, for
@@ -61,14 +65,14 @@ class PurgeStatus(object):
return {"status": PurgeStatus.STATUS_TEXT[self.status]}
-class PaginationHandler(object):
+class PaginationHandler:
"""Handles pagination and purge history requests.
These are in the same handler due to the fact we need to block clients
paginating during a purge.
"""
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
@@ -78,13 +82,16 @@ class PaginationHandler(object):
self._server_name = hs.hostname
self.pagination_lock = ReadWriteLock()
- self._purges_in_progress_by_room = set()
+ self._purges_in_progress_by_room = set() # type: Set[str]
# map from purge id to PurgeStatus
- self._purges_by_id = {}
+ self._purges_by_id = {} # type: Dict[str, PurgeStatus]
self._event_serializer = hs.get_event_client_serializer()
self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
+ self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min
+ self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max
+
if hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
@@ -99,8 +106,9 @@ class PaginationHandler(object):
job["longest_max_lifetime"],
)
- @defer.inlineCallbacks
- def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+ async def purge_history_for_rooms_in_range(
+ self, min_ms: Optional[int], max_ms: Optional[int]
+ ):
"""Purge outdated events from rooms within the given retention range.
If a default retention policy is defined in the server's configuration and its
@@ -108,14 +116,14 @@ class PaginationHandler(object):
retention policy.
Args:
- min_ms (int|None): Duration in milliseconds that define the lower limit of
+ min_ms: Duration in milliseconds that define the lower limit of
the range to handle (exclusive). If None, it means that the range has no
lower limit.
- max_ms (int|None): Duration in milliseconds that define the upper limit of
+ max_ms: Duration in milliseconds that define the upper limit of
the range to handle (inclusive). If None, it means that the range has no
upper limit.
"""
- # We want the storage layer to to include rooms with no retention policy in its
+ # We want the storage layer to include rooms with no retention policy in its
# return value only if a default retention policy is defined in the server's
# configuration and that policy's 'max_lifetime' is either lower (or equal) than
# max_ms or higher than min_ms (or both).
@@ -139,13 +147,13 @@ class PaginationHandler(object):
include_null,
)
- rooms = yield self.store.get_rooms_for_retention_period_in_range(
+ rooms = await self.store.get_rooms_for_retention_period_in_range(
min_ms, max_ms, include_null
)
logger.debug("[purge] Rooms to purge: %s", rooms)
- for room_id, retention_policy in iteritems(rooms):
+ for room_id, retention_policy in rooms.items():
logger.info("[purge] Attempting to purge messages in room %s", room_id)
if room_id in self._purges_in_progress_by_room:
@@ -156,20 +164,39 @@ class PaginationHandler(object):
)
continue
- max_lifetime = retention_policy["max_lifetime"]
+ # If max_lifetime is None, it means that the room has no retention policy.
+ # Given we only retrieve such rooms when there's a default retention policy
+ # defined in the server's configuration, we can safely assume that's the
+ # case and use it for this room.
+ max_lifetime = (
+ retention_policy["max_lifetime"] or self._retention_default_max_lifetime
+ )
- if max_lifetime is None:
- # If max_lifetime is None, it means that include_null equals True,
- # therefore we can safely assume that there is a default policy defined
- # in the server's configuration.
- max_lifetime = self._retention_default_max_lifetime
+ # Cap the effective max_lifetime to be within the range allowed in the
+ # config.
+ # We do this in two steps:
+ # 1. Make sure it's higher or equal to the minimum allowed value, and if
+ # it's not replace it with that value. This is because the server
+ # operator can be required to not delete information before a given
+ # time, e.g. to comply with freedom of information laws.
+ # 2. Make sure the resulting value is lower or equal to the maximum allowed
+ # value, and if it's not replace it with that value. This is because the
+ # server operator can be required to delete any data after a specific
+ # amount of time.
+ if self._retention_allowed_lifetime_min is not None:
+ max_lifetime = max(self._retention_allowed_lifetime_min, max_lifetime)
+
+ if self._retention_allowed_lifetime_max is not None:
+ max_lifetime = min(max_lifetime, self._retention_allowed_lifetime_max)
+
+ logger.debug("[purge] max_lifetime for room %s: %s", room_id, max_lifetime)
# Figure out what token we should start purging at.
ts = self.clock.time_msec() - max_lifetime
- stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
+ stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
- r = yield self.store.get_room_event_before_stream_ordering(
+ r = await self.store.get_room_event_before_stream_ordering(
room_id, stream_ordering,
)
if not r:
@@ -199,18 +226,19 @@ class PaginationHandler(object):
"_purge_history", self._purge_history, purge_id, room_id, token, True,
)
- def start_purge_history(self, room_id, token, delete_local_events=False):
+ def start_purge_history(
+ self, room_id: str, token: str, delete_local_events: bool = False
+ ) -> str:
"""Start off a history purge on a room.
Args:
- room_id (str): The room to purge from
-
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
+ room_id: The room to purge from
+ token: topological token to delete events before
+ delete_local_events: True to delete local events as well as
remote ones
Returns:
- str: unique ID for this purge transaction.
+ unique ID for this purge transaction.
"""
if room_id in self._purges_in_progress_by_room:
raise SynapseError(
@@ -229,24 +257,21 @@ class PaginationHandler(object):
)
return purge_id
- @defer.inlineCallbacks
- def _purge_history(self, purge_id, room_id, token, delete_local_events):
+ async def _purge_history(
+ self, purge_id: str, room_id: str, token: str, delete_local_events: bool
+ ) -> None:
"""Carry out a history purge on a room.
Args:
- purge_id (str): The id for this purge
- room_id (str): The room to purge from
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
-
- Returns:
- Deferred
+ purge_id: The id for this purge
+ room_id: The room to purge from
+ token: topological token to delete events before
+ delete_local_events: True to delete local events as well as remote ones
"""
self._purges_in_progress_by_room.add(room_id)
try:
- with (yield self.pagination_lock.write(room_id)):
- yield self.storage.purge_events.purge_history(
+ with await self.pagination_lock.write(room_id):
+ await self.storage.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete")
@@ -266,27 +291,22 @@ class PaginationHandler(object):
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
- def get_purge_status(self, purge_id):
+ def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
"""Get the current status of an active purge
Args:
- purge_id (str): purge_id returned by start_purge_history
-
- Returns:
- PurgeStatus|None
+ purge_id: purge_id returned by start_purge_history
"""
return self._purges_by_id.get(purge_id)
- async def purge_room(self, room_id):
+ async def purge_room(self, room_id: str) -> None:
"""Purge the given room from the database"""
- with (await self.pagination_lock.write(room_id)):
+ with await self.pagination_lock.write(room_id):
# check we know about the room
await self.store.get_room_version_id(room_id)
# first check that we have no users in this room
- joined = await defer.maybeDeferred(
- self.store.is_host_joined, room_id, self._server_name
- )
+ joined = await self.store.is_host_joined(room_id, self._server_name)
if joined:
raise SynapseError(400, "Users are still joined to this room")
@@ -295,23 +315,22 @@ class PaginationHandler(object):
async def get_messages(
self,
- requester,
- room_id=None,
- pagin_config=None,
- as_client_event=True,
- event_filter=None,
- ):
+ requester: Requester,
+ room_id: str,
+ pagin_config: PaginationConfig,
+ as_client_event: bool = True,
+ event_filter: Optional[Filter] = None,
+ ) -> Dict[str, Any]:
"""Get messages in a room.
Args:
- requester (Requester): The user requesting messages.
- room_id (str): The room they want messages from.
- pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config rules to apply, if any.
- as_client_event (bool): True to get events in client-server format.
- event_filter (Filter): Filter to apply to results or None
+ requester: The user requesting messages.
+ room_id: The room they want messages from.
+ pagin_config: The pagination config rules to apply, if any.
+ as_client_event: True to get events in client-server format.
+ event_filter: Filter to apply to results or None
Returns:
- dict: Pagination API results
+ Pagination API results
"""
user_id = requester.user.to_string()
@@ -319,7 +338,7 @@ class PaginationHandler(object):
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
- await self.hs.get_event_sources().get_current_token_for_pagination()
+ self.hs.get_event_sources().get_current_token_for_pagination()
)
room_token = pagin_config.from_token.room_key
@@ -331,7 +350,7 @@ class PaginationHandler(object):
source_config = pagin_config.get_source_config("room")
- with (await self.pagination_lock.read(room_id)):
+ with await self.pagination_lock.read(room_id):
(
membership,
member_event_id,
@@ -353,11 +372,15 @@ class PaginationHandler(object):
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
+
+ # This is only None if the room is world_readable, in which
+ # case "JOIN" would have been returned.
+ assert member_event_id
+
leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
- leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < max_topo:
+ if RoomStreamToken.parse(leave_token).topological < max_topo:
source_config.from_key = str(leave_token)
await self.hs.get_handlers().federation_handler.maybe_backfill(
@@ -404,8 +427,8 @@ class PaginationHandler(object):
)
if state_ids:
- state = await self.store.get_events(list(state_ids.values()))
- state = state.values()
+ state_dict = await self.store.get_events(list(state_ids.values()))
+ state = state_dict.values()
time_now = self.clock.time_msec()
|