| diff --git a/changelog.d/8104.bugfix b/changelog.d/8104.bugfix
new file mode 100644
index 0000000000..e32e2996c4
--- /dev/null
+++ b/changelog.d/8104.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in v1.7.2 impacting message retention policies that would allow federated homeservers to dictate a retention period that's lower than the configured minimum allowed duration in the configuration file.
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
 index f168853f67..3528d9e11f 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -378,11 +378,10 @@ retention:
   #  min_lifetime: 1d
   #  max_lifetime: 1y
 
-  # Retention policy limits. If set, a user won't be able to send a
-  # 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
-  # that's not within this range. This is especially useful in closed federations,
-  # in which server admins can make sure every federating server applies the same
-  # rules.
+  # Retention policy limits. If set, and the state of a room contains a
+  # 'm.room.retention' event in its state which contains a 'min_lifetime' or a
+  # 'max_lifetime' that's out of these bounds, Synapse will cap the room's policy
+  # to these limits when running purge jobs.
   #
   #allowed_lifetime_min: 1d
   #allowed_lifetime_max: 1y
@@ -408,12 +407,19 @@ retention:
   # (e.g. every 12h), but not want that purge to be performed by a job that's
   # iterating over every room it knows, which could be heavy on the server.
   #
+  # If any purge job is configured, it is strongly recommended to have at least
+  # a single job with neither 'shortest_max_lifetime' nor 'longest_max_lifetime'
+  # set, or one job without 'shortest_max_lifetime' and one job without
+  # 'longest_max_lifetime' set. Otherwise some rooms might be ignored, even if
+  # 'allowed_lifetime_min' and 'allowed_lifetime_max' are set, because capping a
+  # room's policy to these values is done after the policies are retrieved from
+  # Synapse's database (which is done using the range specified in a purge job's
+  # configuration).
+  #
   #purge_jobs:
-  #  - shortest_max_lifetime: 1d
-  #    longest_max_lifetime: 3d
+  #  - longest_max_lifetime: 3d
   #    interval: 12h
   #  - shortest_max_lifetime: 3d
-  #    longest_max_lifetime: 1y
   #    interval: 1d
 
 # Inhibits the /requestToken endpoints from returning an error that might leak
diff --git a/synapse/config/server.py b/synapse/config/server.py
 index ed66f3eba1..526a90b26a 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -961,11 +961,10 @@ class ServerConfig(Config):
           #  min_lifetime: 1d
           #  max_lifetime: 1y
 
-          # Retention policy limits. If set, a user won't be able to send a
-          # 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
-          # that's not within this range. This is especially useful in closed federations,
-          # in which server admins can make sure every federating server applies the same
-          # rules.
+          # Retention policy limits. If set, and the state of a room contains a
+          # 'm.room.retention' event in its state which contains a 'min_lifetime' or a
+          # 'max_lifetime' that's out of these bounds, Synapse will cap the room's policy
+          # to these limits when running purge jobs.
           #
           #allowed_lifetime_min: 1d
           #allowed_lifetime_max: 1y
@@ -991,12 +990,19 @@ class ServerConfig(Config):
           # (e.g. every 12h), but not want that purge to be performed by a job that's
           # iterating over every room it knows, which could be heavy on the server.
           #
+          # If any purge job is configured, it is strongly recommended to have at least
+          # a single job with neither 'shortest_max_lifetime' nor 'longest_max_lifetime'
+          # set, or one job without 'shortest_max_lifetime' and one job without
+          # 'longest_max_lifetime' set. Otherwise some rooms might be ignored, even if
+          # 'allowed_lifetime_min' and 'allowed_lifetime_max' are set, because capping a
+          # room's policy to these values is done after the policies are retrieved from
+          # Synapse's database (which is done using the range specified in a purge job's
+          # configuration).
+          #
           #purge_jobs:
-          #  - shortest_max_lifetime: 1d
-          #    longest_max_lifetime: 3d
+          #  - longest_max_lifetime: 3d
           #    interval: 12h
           #  - shortest_max_lifetime: 3d
-          #    longest_max_lifetime: 1y
           #    interval: 1d
 
         # Inhibits the /requestToken endpoints from returning an error that might leak
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
 index 588d222f36..5ce3874fba 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -74,15 +74,14 @@ class EventValidator(object):
                         )
 
         if event.type == EventTypes.Retention:
-            self._validate_retention(event, config)
+            self._validate_retention(event)
 
-    def _validate_retention(self, event, config):
+    def _validate_retention(self, event):
         """Checks that an event that defines the retention policy for a room respects the
-        boundaries imposed by the server's administrator.
+        format enforced by the spec.
 
         Args:
             event (FrozenEvent): The event to validate.
-            config (Config): The homeserver's configuration.
         """
         min_lifetime = event.content.get("min_lifetime")
         max_lifetime = event.content.get("max_lifetime")
@@ -95,32 +94,6 @@ class EventValidator(object):
                     errcode=Codes.BAD_JSON,
                 )
 
-            if (
-                config.retention_allowed_lifetime_min is not None
-                and min_lifetime < config.retention_allowed_lifetime_min
-            ):
-                raise SynapseError(
-                    code=400,
-                    msg=(
-                        "'min_lifetime' can't be lower than the minimum allowed"
-                        " value enforced by the server's administrator"
-                    ),
-                    errcode=Codes.BAD_JSON,
-                )
-
-            if (
-                config.retention_allowed_lifetime_max is not None
-                and min_lifetime > config.retention_allowed_lifetime_max
-            ):
-                raise SynapseError(
-                    code=400,
-                    msg=(
-                        "'min_lifetime' can't be greater than the maximum allowed"
-                        " value enforced by the server's administrator"
-                    ),
-                    errcode=Codes.BAD_JSON,
-                )
-
         if max_lifetime is not None:
             if not isinstance(max_lifetime, int):
                 raise SynapseError(
@@ -129,32 +102,6 @@ class EventValidator(object):
                     errcode=Codes.BAD_JSON,
                 )
 
-            if (
-                config.retention_allowed_lifetime_min is not None
-                and max_lifetime < config.retention_allowed_lifetime_min
-            ):
-                raise SynapseError(
-                    code=400,
-                    msg=(
-                        "'max_lifetime' can't be lower than the minimum allowed value"
-                        " enforced by the server's administrator"
-                    ),
-                    errcode=Codes.BAD_JSON,
-                )
-
-            if (
-                config.retention_allowed_lifetime_max is not None
-                and max_lifetime > config.retention_allowed_lifetime_max
-            ):
-                raise SynapseError(
-                    code=400,
-                    msg=(
-                        "'max_lifetime' can't be greater than the maximum allowed"
-                        " value enforced by the server's administrator"
-                    ),
-                    errcode=Codes.BAD_JSON,
-                )
-
         if (
             min_lifetime is not None
             and max_lifetime is not None
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
 index 487420bb5d..ac3418d69d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -82,6 +82,9 @@ class PaginationHandler(object):
 
         self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
 
+        self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min
+        self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max
+
         if hs.config.retention_enabled:
             # Run the purge jobs described in the configuration file.
             for job in hs.config.retention_purge_jobs:
@@ -111,7 +114,7 @@ class PaginationHandler(object):
                 the range to handle (inclusive). If None, it means that the range has no
                 upper limit.
         """
-        # We want the storage layer to to include rooms with no retention policy in its
+        # We want the storage layer to include rooms with no retention policy in its
         # return value only if a default retention policy is defined in the server's
         # configuration and that policy's 'max_lifetime' is either lower (or equal) than
         # max_ms or higher than min_ms (or both).
@@ -152,13 +155,32 @@ class PaginationHandler(object):
                 )
                 continue
 
-            max_lifetime = retention_policy["max_lifetime"]
+            # If max_lifetime is None, it means that the room has no retention policy.
+            # Given we only retrieve such rooms when there's a default retention policy
+            # defined in the server's configuration, we can safely assume that's the
+            # case and use it for this room.
+            max_lifetime = (
+                retention_policy["max_lifetime"] or self._retention_default_max_lifetime
+            )
 
-            if max_lifetime is None:
-                # If max_lifetime is None, it means that include_null equals True,
-                # therefore we can safely assume that there is a default policy defined
-                # in the server's configuration.
-                max_lifetime = self._retention_default_max_lifetime
+            # Cap the effective max_lifetime to be within the range allowed in the
+            # config.
+            # We do this in two steps:
+            #   1. Make sure it's higher or equal to the minimum allowed value, and if
+            #      it's not replace it with that value. This is because the server
+            #      operator can be required to not delete information before a given
+            #      time, e.g. to comply with freedom of information laws.
+            #   2. Make sure the resulting value is lower or equal to the maximum allowed
+            #      value, and if it's not replace it with that value. This is because the
+            #      server operator can be required to delete any data after a specific
+            #      amount of time.
+            if self._retention_allowed_lifetime_min is not None:
+                max_lifetime = max(self._retention_allowed_lifetime_min, max_lifetime)
+
+            if self._retention_allowed_lifetime_max is not None:
+                max_lifetime = min(max_lifetime, self._retention_allowed_lifetime_max)
+
+            logger.debug("[purge] max_lifetime for room %s: %s", room_id, max_lifetime)
 
             # Figure out what token we should start purging at.
             ts = self.clock.time_msec() - max_lifetime
diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py
 index 0b191d13c6..d4e7fa1293 100644
--- a/tests/rest/client/test_retention.py
+++ b/tests/rest/client/test_retention.py
@@ -45,50 +45,63 @@ class RetentionTestCase(unittest.HomeserverTestCase):
         }
 
         self.hs = self.setup_test_homeserver(config=config)
+
         return self.hs
 
     def prepare(self, reactor, clock, homeserver):
         self.user_id = self.register_user("user", "password")
         self.token = self.login("user", "password")
 
-    def test_retention_state_event(self):
-        """Tests that the server configuration can limit the values a user can set to the
-        room's retention policy.
+        self.store = self.hs.get_datastore()
+        self.serializer = self.hs.get_event_client_serializer()
+        self.clock = self.hs.get_clock()
+
+    def test_retention_event_purged_with_state_event(self):
+        """Tests that expired events are correctly purged when the room's retention policy
+        is defined by a state event.
         """
         room_id = self.helper.create_room_as(self.user_id, tok=self.token)
 
+        # Set the room's retention period to 2 days.
+        lifetime = one_day_ms * 2
         self.helper.send_state(
             room_id=room_id,
             event_type=EventTypes.Retention,
-            body={"max_lifetime": one_day_ms * 4},
+            body={"max_lifetime": lifetime},
             tok=self.token,
-            expect_code=400,
         )
 
+        self._test_retention_event_purged(room_id, one_day_ms * 1.5)
+
+    def test_retention_event_purged_with_state_event_outside_allowed(self):
+        """Tests that the server configuration can override the policy for a room when
+        running the purge jobs.
+        """
+        room_id = self.helper.create_room_as(self.user_id, tok=self.token)
+
+        # Set a max_lifetime higher than the maximum allowed value.
         self.helper.send_state(
             room_id=room_id,
             event_type=EventTypes.Retention,
-            body={"max_lifetime": one_hour_ms},
+            body={"max_lifetime": one_day_ms * 4},
             tok=self.token,
-            expect_code=400,
         )
 
-    def test_retention_event_purged_with_state_event(self):
-        """Tests that expired events are correctly purged when the room's retention policy
-        is defined by a state event.
-        """
-        room_id = self.helper.create_room_as(self.user_id, tok=self.token)
+        # Check that the event is purged after waiting for the maximum allowed duration
+        # instead of the one specified in the room's policy.
+        self._test_retention_event_purged(room_id, one_day_ms * 1.5)
 
-        # Set the room's retention period to 2 days.
-        lifetime = one_day_ms * 2
+        # Set a max_lifetime lower than the minimum allowed value.
         self.helper.send_state(
             room_id=room_id,
             event_type=EventTypes.Retention,
-            body={"max_lifetime": lifetime},
+            body={"max_lifetime": one_hour_ms},
             tok=self.token,
         )
 
-        self._test_retention_event_purged(room_id, one_day_ms * 1.5)
+        # Check that the event is purged after waiting for the minimum allowed duration
+        # instead of the one specified in the room's policy.
+        self._test_retention_event_purged(room_id, one_day_ms * 0.5)
 
     def test_retention_event_purged_without_state_event(self):
         """Tests that expired events are correctly purged when the room's retention policy
@@ -140,7 +153,27 @@ class RetentionTestCase(unittest.HomeserverTestCase):
         # That event should be the second, not outdated event.
         self.assertEqual(filtered_events[0].event_id, valid_event_id, filtered_events)
 
-    def _test_retention_event_purged(self, room_id, increment):
+    def _test_retention_event_purged(self, room_id: str, increment: float):
+        """Run the following test scenario to test the message retention policy support:
+
+        1. Send event 1
+        2. Increment time by `increment`
+        3. Send event 2
+        4. Increment time by `increment`
+        5. Check that event 1 has been purged
+        6. Check that event 2 has not been purged
+        7. Check that state events that were sent before event 1 aren't purged.
+        The main reason for sending a second event is because currently Synapse won't
+        purge the latest message in a room because it would otherwise result in a lack of
+        forward extremities for this room. It's also a good thing to ensure the purge jobs
+        aren't too greedy and purge messages they shouldn't.
+
+        Args:
+            room_id: The ID of the room to test retention in.
+            increment: The number of milliseconds to advance the clock each time. Must be
+                defined so that events in the room aren't purged if they are `increment`
+                old but are purged if they are `increment * 2` old.
+        """
         # Get the create event to, later, check that we can still access it.
         message_handler = self.hs.get_message_handler()
         create_event = self.get_success(
@@ -156,7 +189,7 @@ class RetentionTestCase(unittest.HomeserverTestCase):
         expired_event_id = resp.get("event_id")
 
         # Check that we can retrieve the event.
-        expired_event = self.get_event(room_id, expired_event_id)
+        expired_event = self.get_event(expired_event_id)
         self.assertEqual(
             expired_event.get("content", {}).get("body"), "1", expired_event
         )
@@ -174,26 +207,31 @@ class RetentionTestCase(unittest.HomeserverTestCase):
         # one should still be kept.
         self.reactor.advance(increment / 1000)
 
-        # Check that the event has been purged from the database.
-        self.get_event(room_id, expired_event_id, expected_code=404)
+        # Check that the first event has been purged from the database, i.e. that we
+        # can't retrieve it anymore, because it has expired.
+        self.get_event(expired_event_id, expect_none=True)
 
-        # Check that the event that hasn't been purged can still be retrieved.
-        valid_event = self.get_event(room_id, valid_event_id)
+        # Check that the event that hasn't expired can still be retrieved.
+        valid_event = self.get_event(valid_event_id)
         self.assertEqual(valid_event.get("content", {}).get("body"), "2", valid_event)
 
         # Check that we can still access state events that were sent before the event that
         # has been purged.
         self.get_event(room_id, create_event.event_id)
 
-    def get_event(self, room_id, event_id, expected_code=200):
-        url = "/_matrix/client/r0/rooms/%s/event/%s" % (room_id, event_id)
+    def get_event(self, event_id, expect_none=False):
+        event = self.get_success(self.store.get_event(event_id, allow_none=True))
 
-        request, channel = self.make_request("GET", url, access_token=self.token)
-        self.render(request)
+        if expect_none:
+            self.assertIsNone(event)
+            return {}
 
-        self.assertEqual(channel.code, expected_code, channel.result)
+        self.assertIsNotNone(event)
 
-        return channel.json_body
+        time_now = self.clock.time_msec()
+        serialized = self.get_success(self.serializer.serialize_event(event, time_now))
+
+        return serialized
 
 
 class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
 |