summary refs log tree commit diff
path: root/synapse/handlers/pagination.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/pagination.py')
-rw-r--r--synapse/handlers/pagination.py139
1 files changed, 86 insertions, 53 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 487420bb5d..34ed0e2921 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -14,23 +14,30 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from typing import TYPE_CHECKING, Any, Dict, Optional, Set
 
 from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
+from synapse.api.filtering import Filter
 from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.state import StateFilter
-from synapse.types import RoomStreamToken
+from synapse.streams.config import PaginationConfig
+from synapse.types import Requester, RoomStreamToken
 from synapse.util.async_helpers import ReadWriteLock
 from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+
 logger = logging.getLogger(__name__)
 
 
-class PurgeStatus(object):
+class PurgeStatus:
     """Object tracking the status of a purge request
 
     This class contains information on the progress of a purge request, for
@@ -58,14 +65,14 @@ class PurgeStatus(object):
         return {"status": PurgeStatus.STATUS_TEXT[self.status]}
 
 
-class PaginationHandler(object):
+class PaginationHandler:
     """Handles pagination and purge history requests.
 
     These are in the same handler due to the fact we need to block clients
     paginating during a purge.
     """
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
@@ -75,13 +82,16 @@ class PaginationHandler(object):
         self._server_name = hs.hostname
 
         self.pagination_lock = ReadWriteLock()
-        self._purges_in_progress_by_room = set()
+        self._purges_in_progress_by_room = set()  # type: Set[str]
         # map from purge id to PurgeStatus
-        self._purges_by_id = {}
+        self._purges_by_id = {}  # type: Dict[str, PurgeStatus]
         self._event_serializer = hs.get_event_client_serializer()
 
         self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
 
+        self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min
+        self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max
+
         if hs.config.retention_enabled:
             # Run the purge jobs described in the configuration file.
             for job in hs.config.retention_purge_jobs:
@@ -96,7 +106,9 @@ class PaginationHandler(object):
                     job["longest_max_lifetime"],
                 )
 
-    async def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+    async def purge_history_for_rooms_in_range(
+        self, min_ms: Optional[int], max_ms: Optional[int]
+    ):
         """Purge outdated events from rooms within the given retention range.
 
         If a default retention policy is defined in the server's configuration and its
@@ -104,14 +116,14 @@ class PaginationHandler(object):
         retention policy.
 
         Args:
-            min_ms (int|None): Duration in milliseconds that define the lower limit of
+            min_ms: Duration in milliseconds that define the lower limit of
                 the range to handle (exclusive). If None, it means that the range has no
                 lower limit.
-            max_ms (int|None): Duration in milliseconds that define the upper limit of
+            max_ms: Duration in milliseconds that define the upper limit of
                 the range to handle (inclusive). If None, it means that the range has no
                 upper limit.
         """
-        # We want the storage layer to to include rooms with no retention policy in its
+        # We want the storage layer to include rooms with no retention policy in its
         # return value only if a default retention policy is defined in the server's
         # configuration and that policy's 'max_lifetime' is either lower (or equal) than
         # max_ms or higher than min_ms (or both).
@@ -152,13 +164,32 @@ class PaginationHandler(object):
                 )
                 continue
 
-            max_lifetime = retention_policy["max_lifetime"]
+            # If max_lifetime is None, it means that the room has no retention policy.
+            # Given we only retrieve such rooms when there's a default retention policy
+            # defined in the server's configuration, we can safely assume that's the
+            # case and use it for this room.
+            max_lifetime = (
+                retention_policy["max_lifetime"] or self._retention_default_max_lifetime
+            )
 
-            if max_lifetime is None:
-                # If max_lifetime is None, it means that include_null equals True,
-                # therefore we can safely assume that there is a default policy defined
-                # in the server's configuration.
-                max_lifetime = self._retention_default_max_lifetime
+            # Cap the effective max_lifetime to be within the range allowed in the
+            # config.
+            # We do this in two steps:
+            #   1. Make sure it's higher or equal to the minimum allowed value, and if
+            #      it's not replace it with that value. This is because the server
+            #      operator can be required to not delete information before a given
+            #      time, e.g. to comply with freedom of information laws.
+            #   2. Make sure the resulting value is lower or equal to the maximum allowed
+            #      value, and if it's not replace it with that value. This is because the
+            #      server operator can be required to delete any data after a specific
+            #      amount of time.
+            if self._retention_allowed_lifetime_min is not None:
+                max_lifetime = max(self._retention_allowed_lifetime_min, max_lifetime)
+
+            if self._retention_allowed_lifetime_max is not None:
+                max_lifetime = min(max_lifetime, self._retention_allowed_lifetime_max)
+
+            logger.debug("[purge] max_lifetime for room %s: %s", room_id, max_lifetime)
 
             # Figure out what token we should start purging at.
             ts = self.clock.time_msec() - max_lifetime
@@ -195,18 +226,19 @@ class PaginationHandler(object):
                 "_purge_history", self._purge_history, purge_id, room_id, token, True,
             )
 
-    def start_purge_history(self, room_id, token, delete_local_events=False):
+    def start_purge_history(
+        self, room_id: str, token: str, delete_local_events: bool = False
+    ) -> str:
         """Start off a history purge on a room.
 
         Args:
-            room_id (str): The room to purge from
-
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
+            room_id: The room to purge from
+            token: topological token to delete events before
+            delete_local_events: True to delete local events as well as
                 remote ones
 
         Returns:
-            str: unique ID for this purge transaction.
+            unique ID for this purge transaction.
         """
         if room_id in self._purges_in_progress_by_room:
             raise SynapseError(
@@ -225,15 +257,16 @@ class PaginationHandler(object):
         )
         return purge_id
 
-    async def _purge_history(self, purge_id, room_id, token, delete_local_events):
+    async def _purge_history(
+        self, purge_id: str, room_id: str, token: str, delete_local_events: bool
+    ) -> None:
         """Carry out a history purge on a room.
 
         Args:
-            purge_id (str): The id for this purge
-            room_id (str): The room to purge from
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
+            purge_id: The id for this purge
+            room_id: The room to purge from
+            token: topological token to delete events before
+            delete_local_events: True to delete local events as well as remote ones
         """
         self._purges_in_progress_by_room.add(room_id)
         try:
@@ -258,20 +291,17 @@ class PaginationHandler(object):
 
             self.hs.get_reactor().callLater(24 * 3600, clear_purge)
 
-    def get_purge_status(self, purge_id):
+    def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
         """Get the current status of an active purge
 
         Args:
-            purge_id (str): purge_id returned by start_purge_history
-
-        Returns:
-            PurgeStatus|None
+            purge_id: purge_id returned by start_purge_history
         """
         return self._purges_by_id.get(purge_id)
 
-    async def purge_room(self, room_id):
+    async def purge_room(self, room_id: str) -> None:
         """Purge the given room from the database"""
-        with (await self.pagination_lock.write(room_id)):
+        with await self.pagination_lock.write(room_id):
             # check we know about the room
             await self.store.get_room_version_id(room_id)
 
@@ -285,23 +315,22 @@ class PaginationHandler(object):
 
     async def get_messages(
         self,
-        requester,
-        room_id=None,
-        pagin_config=None,
-        as_client_event=True,
-        event_filter=None,
-    ):
+        requester: Requester,
+        room_id: str,
+        pagin_config: PaginationConfig,
+        as_client_event: bool = True,
+        event_filter: Optional[Filter] = None,
+    ) -> Dict[str, Any]:
         """Get messages in a room.
 
         Args:
-            requester (Requester): The user requesting messages.
-            room_id (str): The room they want messages from.
-            pagin_config (synapse.api.streams.PaginationConfig): The pagination
-                config rules to apply, if any.
-            as_client_event (bool): True to get events in client-server format.
-            event_filter (Filter): Filter to apply to results or None
+            requester: The user requesting messages.
+            room_id: The room they want messages from.
+            pagin_config: The pagination config rules to apply, if any.
+            as_client_event: True to get events in client-server format.
+            event_filter: Filter to apply to results or None
         Returns:
-            dict: Pagination API results
+            Pagination API results
         """
         user_id = requester.user.to_string()
 
@@ -321,7 +350,7 @@ class PaginationHandler(object):
 
         source_config = pagin_config.get_source_config("room")
 
-        with (await self.pagination_lock.read(room_id)):
+        with await self.pagination_lock.read(room_id):
             (
                 membership,
                 member_event_id,
@@ -343,11 +372,15 @@ class PaginationHandler(object):
                     # If they have left the room then clamp the token to be before
                     # they left the room, to save the effort of loading from the
                     # database.
+
+                    # This is only None if the room is world_readable, in which
+                    # case "JOIN" would have been returned.
+                    assert member_event_id
+
                     leave_token = await self.store.get_topological_token_for_event(
                         member_event_id
                     )
-                    leave_token = RoomStreamToken.parse(leave_token)
-                    if leave_token.topological < max_topo:
+                    if RoomStreamToken.parse(leave_token).topological < max_topo:
                         source_config.from_key = str(leave_token)
 
                 await self.hs.get_handlers().federation_handler.maybe_backfill(
@@ -394,8 +427,8 @@ class PaginationHandler(object):
             )
 
             if state_ids:
-                state = await self.store.get_events(list(state_ids.values()))
-                state = state.values()
+                state_dict = await self.store.get_events(list(state_ids.values()))
+                state = state_dict.values()
 
         time_now = self.clock.time_msec()