summary refs log tree commit diff
path: root/synapse/storage/event_federation.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-04-17 19:44:40 +0100
committerErik Johnston <erik@matrix.org>2019-04-17 19:44:40 +0100
commitca90336a6935b36b5761244005b0f68b496d5d79 (patch)
tree6bbce5eafc0db3b24ccc3b59b051da850382ae09 /synapse/storage/event_federation.py
parentAdd management endpoints for account validity (diff)
parentMerge pull request #5047 from matrix-org/babolivier/account_expiration (diff)
downloadsynapse-ca90336a6935b36b5761244005b0f68b496d5d79.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into babolivier/account_expiration
Diffstat (limited to 'synapse/storage/event_federation.py')
-rw-r--r--synapse/storage/event_federation.py149
1 files changed, 60 insertions, 89 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index a8d90456e3..956f876572 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -32,8 +32,7 @@ from synapse.util.caches.descriptors import cached
 logger = logging.getLogger(__name__)
 
 
-class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
-                                 SQLBaseStore):
+class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore):
     def get_auth_chain(self, event_ids, include_given=False):
         """Get auth events for given event_ids. The events *must* be state events.
 
@@ -45,7 +44,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             list of events
         """
         return self.get_auth_chain_ids(
-            event_ids, include_given=include_given,
+            event_ids, include_given=include_given
         ).addCallback(self._get_events)
 
     def get_auth_chain_ids(self, event_ids, include_given=False):
@@ -59,9 +58,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             list of event_ids
         """
         return self.runInteraction(
-            "get_auth_chain_ids",
-            self._get_auth_chain_ids_txn,
-            event_ids, include_given
+            "get_auth_chain_ids", self._get_auth_chain_ids_txn, event_ids, include_given
         )
 
     def _get_auth_chain_ids_txn(self, txn, event_ids, include_given):
@@ -70,23 +67,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
         else:
             results = set()
 
-        base_sql = (
-            "SELECT auth_id FROM event_auth WHERE event_id IN (%s)"
-        )
+        base_sql = "SELECT auth_id FROM event_auth WHERE event_id IN (%s)"
 
         front = set(event_ids)
         while front:
             new_front = set()
             front_list = list(front)
-            chunks = [
-                front_list[x:x + 100]
-                for x in range(0, len(front), 100)
-            ]
+            chunks = [front_list[x : x + 100] for x in range(0, len(front), 100)]
             for chunk in chunks:
-                txn.execute(
-                    base_sql % (",".join(["?"] * len(chunk)),),
-                    chunk
-                )
+                txn.execute(base_sql % (",".join(["?"] * len(chunk)),), chunk)
                 new_front.update([r[0] for r in txn])
 
             new_front -= results
@@ -98,9 +87,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
 
     def get_oldest_events_in_room(self, room_id):
         return self.runInteraction(
-            "get_oldest_events_in_room",
-            self._get_oldest_events_in_room_txn,
-            room_id,
+            "get_oldest_events_in_room", self._get_oldest_events_in_room_txn, room_id
         )
 
     def get_oldest_events_with_depth_in_room(self, room_id):
@@ -121,7 +108,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             " GROUP BY b.event_id"
         )
 
-        txn.execute(sql, (room_id, False,))
+        txn.execute(sql, (room_id, False))
 
         return dict(txn)
 
@@ -152,9 +139,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
         return self._simple_select_onecol_txn(
             txn,
             table="event_backward_extremities",
-            keyvalues={
-                "room_id": room_id,
-            },
+            keyvalues={"room_id": room_id},
             retcol="event_id",
         )
 
@@ -209,9 +194,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
     def get_latest_event_ids_in_room(self, room_id):
         return self._simple_select_onecol(
             table="event_forward_extremities",
-            keyvalues={
-                "room_id": room_id,
-            },
+            keyvalues={"room_id": room_id},
             retcol="event_id",
             desc="get_latest_event_ids_in_room",
         )
@@ -225,14 +208,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             "WHERE f.room_id = ?"
         )
 
-        txn.execute(sql, (room_id, ))
+        txn.execute(sql, (room_id,))
 
         results = []
         for event_id, depth in txn.fetchall():
             hashes = self._get_event_reference_hashes_txn(txn, event_id)
             prev_hashes = {
-                k: encode_base64(v) for k, v in hashes.items()
-                if k == "sha256"
+                k: encode_base64(v) for k, v in hashes.items() if k == "sha256"
             }
             results.append((event_id, prev_hashes, depth))
 
@@ -242,9 +224,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
         """ For hte given room, get the minimum depth we have seen for it.
         """
         return self.runInteraction(
-            "get_min_depth",
-            self._get_min_depth_interaction,
-            room_id,
+            "get_min_depth", self._get_min_depth_interaction, room_id
         )
 
     def _get_min_depth_interaction(self, txn, room_id):
@@ -300,7 +280,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
         if stream_ordering <= self.stream_ordering_month_ago:
             raise StoreError(400, "stream_ordering too old")
 
-        sql = ("""
+        sql = """
                 SELECT event_id FROM stream_ordering_to_exterm
                 INNER JOIN (
                     SELECT room_id, MAX(stream_ordering) AS stream_ordering
@@ -308,15 +288,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
                     WHERE stream_ordering <= ? GROUP BY room_id
                 ) AS rms USING (room_id, stream_ordering)
                 WHERE room_id = ?
-        """)
+        """
 
         def get_forward_extremeties_for_room_txn(txn):
             txn.execute(sql, (stream_ordering, room_id))
             return [event_id for event_id, in txn]
 
         return self.runInteraction(
-            "get_forward_extremeties_for_room",
-            get_forward_extremeties_for_room_txn
+            "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
         )
 
     def get_backfill_events(self, room_id, event_list, limit):
@@ -329,19 +308,21 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             event_list (list)
             limit (int)
         """
-        return self.runInteraction(
-            "get_backfill_events",
-            self._get_backfill_events, room_id, event_list, limit
-        ).addCallback(
-            self._get_events
-        ).addCallback(
-            lambda l: sorted(l, key=lambda e: -e.depth)
+        return (
+            self.runInteraction(
+                "get_backfill_events",
+                self._get_backfill_events,
+                room_id,
+                event_list,
+                limit,
+            )
+            .addCallback(self._get_events)
+            .addCallback(lambda l: sorted(l, key=lambda e: -e.depth))
         )
 
     def _get_backfill_events(self, txn, room_id, event_list, limit):
         logger.debug(
-            "_get_backfill_events: %s, %s, %s",
-            room_id, repr(event_list), limit
+            "_get_backfill_events: %s, %s, %s", room_id, repr(event_list), limit
         )
 
         event_results = set()
@@ -364,10 +345,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             depth = self._simple_select_one_onecol_txn(
                 txn,
                 table="events",
-                keyvalues={
-                    "event_id": event_id,
-                    "room_id": room_id,
-                },
+                keyvalues={"event_id": event_id, "room_id": room_id},
                 retcol="depth",
                 allow_none=True,
             )
@@ -386,10 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
 
             event_results.add(event_id)
 
-            txn.execute(
-                query,
-                (event_id, False, limit - len(event_results))
-            )
+            txn.execute(query, (event_id, False, limit - len(event_results)))
 
             for row in txn:
                 if row[1] not in event_results:
@@ -398,18 +373,19 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
         return event_results
 
     @defer.inlineCallbacks
-    def get_missing_events(self, room_id, earliest_events, latest_events,
-                           limit):
+    def get_missing_events(self, room_id, earliest_events, latest_events, limit):
         ids = yield self.runInteraction(
             "get_missing_events",
             self._get_missing_events,
-            room_id, earliest_events, latest_events, limit,
+            room_id,
+            earliest_events,
+            latest_events,
+            limit,
         )
         events = yield self._get_events(ids)
         defer.returnValue(events)
 
-    def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
-                            limit):
+    def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit):
 
         seen_events = set(earliest_events)
         front = set(latest_events) - seen_events
@@ -425,8 +401,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             new_front = set()
             for event_id in front:
                 txn.execute(
-                    query,
-                    (room_id, event_id, False, limit - len(event_results))
+                    query, (room_id, event_id, False, limit - len(event_results))
                 )
 
                 new_results = set(t[0] for t in txn) - seen_events
@@ -457,12 +432,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             column="prev_event_id",
             iterable=event_ids,
             retcols=("event_id",),
-            desc="get_successor_events"
+            desc="get_successor_events",
         )
 
-        defer.returnValue([
-            row["event_id"] for row in rows
-        ])
+        defer.returnValue([row["event_id"] for row in rows])
 
 
 class EventFederationStore(EventFederationWorkerStore):
@@ -481,12 +454,11 @@ class EventFederationStore(EventFederationWorkerStore):
         super(EventFederationStore, self).__init__(db_conn, hs)
 
         self.register_background_update_handler(
-            self.EVENT_AUTH_STATE_ONLY,
-            self._background_delete_non_state_event_auth,
+            self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
         )
 
         hs.get_clock().looping_call(
-            self._delete_old_forward_extrem_cache, 60 * 60 * 1000,
+            self._delete_old_forward_extrem_cache, 60 * 60 * 1000
         )
 
     def _update_min_depth_for_room_txn(self, txn, room_id, depth):
@@ -498,12 +470,8 @@ class EventFederationStore(EventFederationWorkerStore):
         self._simple_upsert_txn(
             txn,
             table="room_depth",
-            keyvalues={
-                "room_id": room_id,
-            },
-            values={
-                "min_depth": depth,
-            },
+            keyvalues={"room_id": room_id},
+            values={"min_depth": depth},
         )
 
     def _handle_mult_prev_events(self, txn, events):
@@ -553,11 +521,15 @@ class EventFederationStore(EventFederationWorkerStore):
             " )"
         )
 
-        txn.executemany(query, [
-            (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
-            for ev in events for e_id in ev.prev_event_ids()
-            if not ev.internal_metadata.is_outlier()
-        ])
+        txn.executemany(
+            query,
+            [
+                (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+                for ev in events
+                for e_id in ev.prev_event_ids()
+                if not ev.internal_metadata.is_outlier()
+            ],
+        )
 
         query = (
             "DELETE FROM event_backward_extremities"
@@ -566,16 +538,17 @@ class EventFederationStore(EventFederationWorkerStore):
         txn.executemany(
             query,
             [
-                (ev.event_id, ev.room_id) for ev in events
+                (ev.event_id, ev.room_id)
+                for ev in events
                 if not ev.internal_metadata.is_outlier()
-            ]
+            ],
         )
 
     def _delete_old_forward_extrem_cache(self):
         def _delete_old_forward_extrem_cache_txn(txn):
             # Delete entries older than a month, while making sure we don't delete
             # the only entries for a room.
-            sql = ("""
+            sql = """
                 DELETE FROM stream_ordering_to_exterm
                 WHERE
                 room_id IN (
@@ -583,11 +556,11 @@ class EventFederationStore(EventFederationWorkerStore):
                     FROM stream_ordering_to_exterm
                     WHERE stream_ordering > ?
                 ) AND stream_ordering < ?
-            """)
+            """
             txn.execute(
-                sql,
-                (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
+                sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago)
             )
+
         return run_as_background_process(
             "delete_old_forward_extrem_cache",
             self.runInteraction,
@@ -597,9 +570,7 @@ class EventFederationStore(EventFederationWorkerStore):
 
     def clean_room_for_join(self, room_id):
         return self.runInteraction(
-            "clean_room_for_join",
-            self._clean_room_for_join_txn,
-            room_id,
+            "clean_room_for_join", self._clean_room_for_join_txn, room_id
         )
 
     def _clean_room_for_join_txn(self, txn, room_id):
@@ -635,7 +606,7 @@ class EventFederationStore(EventFederationWorkerStore):
                 )
             """
 
-            txn.execute(sql, (min_stream_id, max_stream_id,))
+            txn.execute(sql, (min_stream_id, max_stream_id))
 
             new_progress = {
                 "target_min_stream_id_inclusive": target_min_stream_id,