summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/config/server.py39
-rw-r--r--synapse/handlers/pagination.py17
-rw-r--r--synapse/storage/data_stores/main/room.py49
-rw-r--r--tests/rest/client/test_retention.py73
4 files changed, 77 insertions, 101 deletions
diff --git a/synapse/config/server.py b/synapse/config/server.py
index aa93a416f1..8a55ffac4f 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -19,7 +19,7 @@ import logging
 import os.path
 import re
 from textwrap import indent
-from typing import List
+from typing import List, Dict, Optional
 
 import attr
 import yaml
@@ -287,13 +287,17 @@ class ServerConfig(Config):
             self.retention_default_min_lifetime = None
             self.retention_default_max_lifetime = None
 
-        self.retention_allowed_lifetime_min = retention_config.get("allowed_lifetime_min")
+        self.retention_allowed_lifetime_min = retention_config.get(
+            "allowed_lifetime_min"
+        )
         if self.retention_allowed_lifetime_min is not None:
             self.retention_allowed_lifetime_min = self.parse_duration(
                 self.retention_allowed_lifetime_min
             )
 
-        self.retention_allowed_lifetime_max = retention_config.get("allowed_lifetime_max")
+        self.retention_allowed_lifetime_max = retention_config.get(
+            "allowed_lifetime_max"
+        )
         if self.retention_allowed_lifetime_max is not None:
             self.retention_allowed_lifetime_max = self.parse_duration(
                 self.retention_allowed_lifetime_max
@@ -302,14 +306,15 @@ class ServerConfig(Config):
         if (
             self.retention_allowed_lifetime_min is not None
             and self.retention_allowed_lifetime_max is not None
-            and self.retention_allowed_lifetime_min > self.retention_allowed_lifetime_max
+            and self.retention_allowed_lifetime_min
+            > self.retention_allowed_lifetime_max
         ):
             raise ConfigError(
                 "Invalid retention policy limits: 'allowed_lifetime_min' can not be"
                 " greater than 'allowed_lifetime_max'"
             )
 
-        self.retention_purge_jobs = []
+        self.retention_purge_jobs = []  # type: List[Dict[str, Optional[int]]]
         for purge_job_config in retention_config.get("purge_jobs", []):
             interval_config = purge_job_config.get("interval")
 
@@ -342,18 +347,22 @@ class ServerConfig(Config):
                     " 'longest_max_lifetime' value."
                 )
 
-            self.retention_purge_jobs.append({
-                "interval": interval,
-                "shortest_max_lifetime": shortest_max_lifetime,
-                "longest_max_lifetime": longest_max_lifetime,
-            })
+            self.retention_purge_jobs.append(
+                {
+                    "interval": interval,
+                    "shortest_max_lifetime": shortest_max_lifetime,
+                    "longest_max_lifetime": longest_max_lifetime,
+                }
+            )
 
         if not self.retention_purge_jobs:
-            self.retention_purge_jobs = [{
-                "interval": self.parse_duration("1d"),
-                "shortest_max_lifetime": None,
-                "longest_max_lifetime": None,
-            }]
+            self.retention_purge_jobs = [
+                {
+                    "interval": self.parse_duration("1d"),
+                    "shortest_max_lifetime": None,
+                    "longest_max_lifetime": None,
+                }
+            ]
 
         self.listeners = []  # type: List[dict]
         for listener in config.get("listeners", []):
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index e1800177fa..d122c11a4d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -154,20 +154,17 @@ class PaginationHandler(object):
             # Figure out what token we should start purging at.
             ts = self.clock.time_msec() - max_lifetime
 
-            stream_ordering = (
-                yield self.store.find_first_stream_ordering_after_ts(ts)
-            )
+            stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
 
-            r = (
-                yield self.store.get_room_event_after_stream_ordering(
-                    room_id, stream_ordering,
-                )
+            r = yield self.store.get_room_event_after_stream_ordering(
+                room_id, stream_ordering,
             )
             if not r:
                 logger.warning(
                     "[purge] purging events not possible: No event found "
                     "(ts %i => stream_ordering %i)",
-                    ts, stream_ordering,
+                    ts,
+                    stream_ordering,
                 )
                 continue
 
@@ -186,9 +183,7 @@ class PaginationHandler(object):
             # the background so that it's not blocking any other operation apart from
             # other purges in the same room.
             run_as_background_process(
-                "_purge_history",
-                self._purge_history,
-                purge_id, room_id, token, True,
+                "_purge_history", self._purge_history, purge_id, room_id, token, True,
             )
 
     def start_purge_history(self, room_id, token, delete_local_events=False):
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 54a7d24c73..7fceae59ca 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -334,8 +334,9 @@ class RoomStore(RoomWorkerStore, SearchStore):
                 WHERE state.room_id > ? AND state.type = '%s'
                 ORDER BY state.room_id ASC
                 LIMIT ?;
-                """ % EventTypes.Retention,
-                (last_room, batch_size)
+                """
+                % EventTypes.Retention,
+                (last_room, batch_size),
             )
 
             rows = self.cursor_to_dict(txn)
@@ -358,15 +359,13 @@ class RoomStore(RoomWorkerStore, SearchStore):
                         "event_id": row["event_id"],
                         "min_lifetime": retention_policy.get("min_lifetime"),
                         "max_lifetime": retention_policy.get("max_lifetime"),
-                    }
+                    },
                 )
 
             logger.info("Inserted %d rows into room_retention", len(rows))
 
             self._background_update_progress_txn(
-                txn, "insert_room_retention", {
-                    "room_id": rows[-1]["room_id"],
-                }
+                txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
             )
 
             if batch_size > len(rows):
@@ -375,8 +374,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
                 return False
 
         end = yield self.runInteraction(
-            "insert_room_retention",
-            _background_insert_retention_txn,
+            "insert_room_retention", _background_insert_retention_txn,
         )
 
         if end:
@@ -585,17 +583,15 @@ class RoomStore(RoomWorkerStore, SearchStore):
             )
 
     def _store_retention_policy_for_room_txn(self, txn, event):
-        if (
-            hasattr(event, "content")
-            and ("min_lifetime" in event.content or "max_lifetime" in event.content)
+        if hasattr(event, "content") and (
+            "min_lifetime" in event.content or "max_lifetime" in event.content
         ):
             if (
-                ("min_lifetime" in event.content and not isinstance(
-                    event.content.get("min_lifetime"), integer_types
-                ))
-                or ("max_lifetime" in event.content and not isinstance(
-                    event.content.get("max_lifetime"), integer_types
-                ))
+                "min_lifetime" in event.content
+                and not isinstance(event.content.get("min_lifetime"), integer_types)
+            ) or (
+                "max_lifetime" in event.content
+                and not isinstance(event.content.get("max_lifetime"), integer_types)
             ):
                 # Ignore the event if one of the value isn't an integer.
                 return
@@ -798,7 +794,9 @@ class RoomStore(RoomWorkerStore, SearchStore):
         return local_media_mxcs, remote_media_mxcs
 
     @defer.inlineCallbacks
-    def get_rooms_for_retention_period_in_range(self, min_ms, max_ms, include_null=False):
+    def get_rooms_for_retention_period_in_range(
+        self, min_ms, max_ms, include_null=False
+    ):
         """Retrieves all of the rooms within the given retention range.
 
         Optionally includes the rooms which don't have a retention policy.
@@ -904,23 +902,24 @@ class RoomStore(RoomWorkerStore, SearchStore):
                 INNER JOIN current_state_events USING (event_id, room_id)
                 WHERE room_id = ?;
                 """,
-                (room_id,)
+                (room_id,),
             )
 
             return self.cursor_to_dict(txn)
 
         ret = yield self.runInteraction(
-            "get_retention_policy_for_room",
-            get_retention_policy_for_room_txn,
+            "get_retention_policy_for_room", get_retention_policy_for_room_txn,
         )
 
         # If we don't know this room ID, ret will be None, in this case return the default
         # policy.
         if not ret:
-            defer.returnValue({
-                "min_lifetime": self.config.retention_default_min_lifetime,
-                "max_lifetime": self.config.retention_default_max_lifetime,
-            })
+            defer.returnValue(
+                {
+                    "min_lifetime": self.config.retention_default_min_lifetime,
+                    "max_lifetime": self.config.retention_default_max_lifetime,
+                }
+            )
 
         row = ret[0]
 
diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py
index 7b6f25a838..6bf485c239 100644
--- a/tests/rest/client/test_retention.py
+++ b/tests/rest/client/test_retention.py
@@ -61,9 +61,7 @@ class RetentionTestCase(unittest.HomeserverTestCase):
         self.helper.send_state(
             room_id=room_id,
             event_type=EventTypes.Retention,
-            body={
-                "max_lifetime": one_day_ms * 4,
-            },
+            body={"max_lifetime": one_day_ms * 4},
             tok=self.token,
             expect_code=400,
         )
@@ -71,9 +69,7 @@ class RetentionTestCase(unittest.HomeserverTestCase):
         self.helper.send_state(
             room_id=room_id,
             event_type=EventTypes.Retention,
-            body={
-                "max_lifetime": one_hour_ms,
-            },
+            body={"max_lifetime": one_hour_ms},
             tok=self.token,
             expect_code=400,
         )
@@ -89,9 +85,7 @@ class RetentionTestCase(unittest.HomeserverTestCase):
         self.helper.send_state(
             room_id=room_id,
             event_type=EventTypes.Retention,
-            body={
-                "max_lifetime": lifetime,
-            },
+            body={"max_lifetime": lifetime},
             tok=self.token,
         )
 
@@ -115,20 +109,12 @@ class RetentionTestCase(unittest.HomeserverTestCase):
         events = []
 
         # Send a first event, which should be filtered out at the end of the test.
-        resp = self.helper.send(
-            room_id=room_id,
-            body="1",
-            tok=self.token,
-        )
+        resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
 
         # Get the event from the store so that we end up with a FrozenEvent that we can
         # give to filter_events_for_client. We need to do this now because the event won't
         # be in the database anymore after it has expired.
-        events.append(self.get_success(
-            store.get_event(
-                resp.get("event_id")
-            )
-        ))
+        events.append(self.get_success(store.get_event(resp.get("event_id"))))
 
         # Advance the time by 2 days. We're using the default retention policy, therefore
         # after this the first event will still be valid.
@@ -143,20 +129,16 @@ class RetentionTestCase(unittest.HomeserverTestCase):
 
         valid_event_id = resp.get("event_id")
 
-        events.append(self.get_success(
-            store.get_event(
-                valid_event_id
-            )
-        ))
+        events.append(self.get_success(store.get_event(valid_event_id)))
 
         # Advance the time by anothe 2 days. After this, the first event should be
         # outdated but not the second one.
         self.reactor.advance(one_day_ms * 2 / 1000)
 
         # Run filter_events_for_client with our list of FrozenEvents.
-        filtered_events = self.get_success(filter_events_for_client(
-            storage, self.user_id, events
-        ))
+        filtered_events = self.get_success(
+            filter_events_for_client(storage, self.user_id, events)
+        )
 
         # We should only get one event back.
         self.assertEqual(len(filtered_events), 1, filtered_events)
@@ -172,28 +154,22 @@ class RetentionTestCase(unittest.HomeserverTestCase):
 
         # Send a first event to the room. This is the event we'll want to be purged at the
         # end of the test.
-        resp = self.helper.send(
-            room_id=room_id,
-            body="1",
-            tok=self.token,
-        )
+        resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
 
         expired_event_id = resp.get("event_id")
 
         # Check that we can retrieve the event.
         expired_event = self.get_event(room_id, expired_event_id)
-        self.assertEqual(expired_event.get("content", {}).get("body"), "1", expired_event)
+        self.assertEqual(
+            expired_event.get("content", {}).get("body"), "1", expired_event
+        )
 
         # Advance the time.
         self.reactor.advance(increment / 1000)
 
         # Send another event. We need this because the purge job won't purge the most
         # recent event in the room.
-        resp = self.helper.send(
-            room_id=room_id,
-            body="2",
-            tok=self.token,
-        )
+        resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
 
         valid_event_id = resp.get("event_id")
 
@@ -240,8 +216,7 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
         mock_federation_client = Mock(spec=["backfill"])
 
         self.hs = self.setup_test_homeserver(
-            config=config,
-            federation_client=mock_federation_client,
+            config=config, federation_client=mock_federation_client,
         )
         return self.hs
 
@@ -268,9 +243,7 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
         self.helper.send_state(
             room_id=room_id,
             event_type=EventTypes.Retention,
-            body={
-                "max_lifetime": one_day_ms * 35,
-            },
+            body={"max_lifetime": one_day_ms * 35},
             tok=self.token,
         )
 
@@ -289,18 +262,16 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
 
         # Check that we can retrieve the event.
         expired_event = self.get_event(room_id, first_event_id)
-        self.assertEqual(expired_event.get("content", {}).get("body"), "1", expired_event)
+        self.assertEqual(
+            expired_event.get("content", {}).get("body"), "1", expired_event
+        )
 
         # Advance the time by a month.
         self.reactor.advance(one_day_ms * 30 / 1000)
 
         # Send another event. We need this because the purge job won't purge the most
         # recent event in the room.
-        resp = self.helper.send(
-            room_id=room_id,
-            body="2",
-            tok=self.token,
-        )
+        resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
 
         second_event_id = resp.get("event_id")
 
@@ -313,7 +284,9 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
         )
 
         if expected_code_for_first_event == 200:
-            self.assertEqual(first_event.get("content", {}).get("body"), "1", first_event)
+            self.assertEqual(
+                first_event.get("content", {}).get("body"), "1", first_event
+            )
 
         # Check that the event that hasn't been purged can still be retrieved.
         second_event = self.get_event(room_id, second_event_id)