summary refs log tree commit diff
path: root/synapse/storage/event_federation.py
diff options
context:
space:
mode:
authorMichael Telatynski <7t3chguy@gmail.com>2018-07-24 17:17:46 +0100
committerMichael Telatynski <7t3chguy@gmail.com>2018-07-24 17:17:46 +0100
commit87951d3891efb5bccedf72c12b3da0d6ab482253 (patch)
treede7d997567c66c5a4d8743c1f3b9d6b474f5cfd9 /synapse/storage/event_federation.py
parentif inviter_display_name == ""||None then default to inviter MXID (diff)
parentMerge pull request #3595 from matrix-org/erikj/use_deltas (diff)
downloadsynapse-87951d3891efb5bccedf72c12b3da0d6ab482253.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into t3chguy/default_inviter_display_name_3pid
Diffstat (limited to 'synapse/storage/event_federation.py')
-rw-r--r--synapse/storage/event_federation.py333
1 files changed, 182 insertions, 151 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index e8133de2fa..8d366d1b91 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -12,45 +12,27 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import random
 
-from twisted.internet import defer
+from six.moves import range
+from six.moves.queue import Empty, PriorityQueue
 
-from ._base import SQLBaseStore
-from synapse.api.errors import StoreError
-from synapse.util.caches.descriptors import cached
 from unpaddedbase64 import encode_base64
 
-import logging
-from Queue import PriorityQueue, Empty
+from twisted.internet import defer
 
+from synapse.api.errors import StoreError
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.events import EventsWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
+from synapse.util.caches.descriptors import cached
 
 logger = logging.getLogger(__name__)
 
 
-class EventFederationStore(SQLBaseStore):
-    """ Responsible for storing and serving up the various graphs associated
-    with an event. Including the main event graph and the auth chains for an
-    event.
-
-    Also has methods for getting the front (latest) and back (oldest) edges
-    of the event graphs. These are used to generate the parents for new events
-    and backfilling from another server respectively.
-    """
-
-    EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
-
-    def __init__(self, hs):
-        super(EventFederationStore, self).__init__(hs)
-
-        self.register_background_update_handler(
-            self.EVENT_AUTH_STATE_ONLY,
-            self._background_delete_non_state_event_auth,
-        )
-
-        hs.get_clock().looping_call(
-            self._delete_old_forward_extrem_cache, 60 * 60 * 1000
-        )
-
+class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
+                                 SQLBaseStore):
     def get_auth_chain(self, event_ids, include_given=False):
         """Get auth events for given event_ids. The events *must* be state events.
 
@@ -97,7 +79,7 @@ class EventFederationStore(SQLBaseStore):
             front_list = list(front)
             chunks = [
                 front_list[x:x + 100]
-                for x in xrange(0, len(front), 100)
+                for x in range(0, len(front), 100)
             ]
             for chunk in chunks:
                 txn.execute(
@@ -152,7 +134,47 @@ class EventFederationStore(SQLBaseStore):
             retcol="event_id",
         )
 
+    @defer.inlineCallbacks
+    def get_prev_events_for_room(self, room_id):
+        """
+        Gets a subset of the current forward extremities in the given room.
+
+        Limits the result to 10 extremities, so that we can avoid creating
+        events which refer to hundreds of prev_events.
+
+        Args:
+            room_id (str): room_id
+
+        Returns:
+            Deferred[list[(str, dict[str, str], int)]]
+                for each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+        """
+        res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
+        if len(res) > 10:
+            # Sort by reverse depth, so we point to the most recent.
+            res.sort(key=lambda a: -a[2])
+
+            # we use half of the limit for the actual most recent events, and
+            # the other half to randomly point to some of the older events, to
+            # make sure that we don't completely ignore the older events.
+            res = res[0:5] + random.sample(res[5:], 5)
+
+        defer.returnValue(res)
+
     def get_latest_event_ids_and_hashes_in_room(self, room_id):
+        """
+        Gets the current forward extremities in the given room
+
+        Args:
+            room_id (str): room_id
+
+        Returns:
+            Deferred[list[(str, dict[str, str], int)]]
+                for each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+        """
+
         return self.runInteraction(
             "get_latest_event_ids_and_hashes_in_room",
             self._get_latest_event_ids_and_hashes_in_room,
@@ -201,22 +223,6 @@ class EventFederationStore(SQLBaseStore):
             room_id,
         )
 
-    @defer.inlineCallbacks
-    def get_max_depth_of_events(self, event_ids):
-        sql = (
-            "SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
-        ) % (",".join(["?"] * len(event_ids)),)
-
-        rows = yield self._execute(
-            "get_max_depth_of_events", None,
-            sql, *event_ids
-        )
-
-        if rows:
-            defer.returnValue(rows[0][0])
-        else:
-            defer.returnValue(1)
-
     def _get_min_depth_interaction(self, txn, room_id):
         min_depth = self._simple_select_one_onecol_txn(
             txn,
@@ -228,88 +234,6 @@ class EventFederationStore(SQLBaseStore):
 
         return int(min_depth) if min_depth is not None else None
 
-    def _update_min_depth_for_room_txn(self, txn, room_id, depth):
-        min_depth = self._get_min_depth_interaction(txn, room_id)
-
-        if min_depth and depth >= min_depth:
-            return
-
-        self._simple_upsert_txn(
-            txn,
-            table="room_depth",
-            keyvalues={
-                "room_id": room_id,
-            },
-            values={
-                "min_depth": depth,
-            },
-        )
-
-    def _handle_mult_prev_events(self, txn, events):
-        """
-        For the given event, update the event edges table and forward and
-        backward extremities tables.
-        """
-        self._simple_insert_many_txn(
-            txn,
-            table="event_edges",
-            values=[
-                {
-                    "event_id": ev.event_id,
-                    "prev_event_id": e_id,
-                    "room_id": ev.room_id,
-                    "is_state": False,
-                }
-                for ev in events
-                for e_id, _ in ev.prev_events
-            ],
-        )
-
-        self._update_backward_extremeties(txn, events)
-
-    def _update_backward_extremeties(self, txn, events):
-        """Updates the event_backward_extremities tables based on the new/updated
-        events being persisted.
-
-        This is called for new events *and* for events that were outliers, but
-        are now being persisted as non-outliers.
-
-        Forward extremities are handled when we first start persisting the events.
-        """
-        events_by_room = {}
-        for ev in events:
-            events_by_room.setdefault(ev.room_id, []).append(ev)
-
-        query = (
-            "INSERT INTO event_backward_extremities (event_id, room_id)"
-            " SELECT ?, ? WHERE NOT EXISTS ("
-            " SELECT 1 FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
-            " )"
-            " AND NOT EXISTS ("
-            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-            " AND outlier = ?"
-            " )"
-        )
-
-        txn.executemany(query, [
-            (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
-            for ev in events for e_id, _ in ev.prev_events
-            if not ev.internal_metadata.is_outlier()
-        ])
-
-        query = (
-            "DELETE FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
-        )
-        txn.executemany(
-            query,
-            [
-                (ev.event_id, ev.room_id) for ev in events
-                if not ev.internal_metadata.is_outlier()
-            ]
-        )
-
     def get_forward_extremeties_for_room(self, room_id, stream_ordering):
         """For a given room_id and stream_ordering, return the forward
         extremeties of the room at that point in "time".
@@ -371,28 +295,6 @@ class EventFederationStore(SQLBaseStore):
             get_forward_extremeties_for_room_txn
         )
 
-    def _delete_old_forward_extrem_cache(self):
-        def _delete_old_forward_extrem_cache_txn(txn):
-            # Delete entries older than a month, while making sure we don't delete
-            # the only entries for a room.
-            sql = ("""
-                DELETE FROM stream_ordering_to_exterm
-                WHERE
-                room_id IN (
-                    SELECT room_id
-                    FROM stream_ordering_to_exterm
-                    WHERE stream_ordering > ?
-                ) AND stream_ordering < ?
-            """)
-            txn.execute(
-                sql,
-                (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
-            )
-        return self.runInteraction(
-            "_delete_old_forward_extrem_cache",
-            _delete_old_forward_extrem_cache_txn
-        )
-
     def get_backfill_events(self, room_id, event_list, limit):
         """Get a list of Events for a given topic that occurred before (and
         including) the events in event_list. Return a list of max size `limit`
@@ -522,6 +424,135 @@ class EventFederationStore(SQLBaseStore):
 
         return event_results
 
+
+class EventFederationStore(EventFederationWorkerStore):
+    """ Responsible for storing and serving up the various graphs associated
+    with an event. Including the main event graph and the auth chains for an
+    event.
+
+    Also has methods for getting the front (latest) and back (oldest) edges
+    of the event graphs. These are used to generate the parents for new events
+    and backfilling from another server respectively.
+    """
+
+    EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
+
+    def __init__(self, db_conn, hs):
+        super(EventFederationStore, self).__init__(db_conn, hs)
+
+        self.register_background_update_handler(
+            self.EVENT_AUTH_STATE_ONLY,
+            self._background_delete_non_state_event_auth,
+        )
+
+        hs.get_clock().looping_call(
+            self._delete_old_forward_extrem_cache, 60 * 60 * 1000
+        )
+
+    def _update_min_depth_for_room_txn(self, txn, room_id, depth):
+        min_depth = self._get_min_depth_interaction(txn, room_id)
+
+        if min_depth and depth >= min_depth:
+            return
+
+        self._simple_upsert_txn(
+            txn,
+            table="room_depth",
+            keyvalues={
+                "room_id": room_id,
+            },
+            values={
+                "min_depth": depth,
+            },
+        )
+
+    def _handle_mult_prev_events(self, txn, events):
+        """
+        For the given event, update the event edges table and forward and
+        backward extremities tables.
+        """
+        self._simple_insert_many_txn(
+            txn,
+            table="event_edges",
+            values=[
+                {
+                    "event_id": ev.event_id,
+                    "prev_event_id": e_id,
+                    "room_id": ev.room_id,
+                    "is_state": False,
+                }
+                for ev in events
+                for e_id, _ in ev.prev_events
+            ],
+        )
+
+        self._update_backward_extremeties(txn, events)
+
+    def _update_backward_extremeties(self, txn, events):
+        """Updates the event_backward_extremities tables based on the new/updated
+        events being persisted.
+
+        This is called for new events *and* for events that were outliers, but
+        are now being persisted as non-outliers.
+
+        Forward extremities are handled when we first start persisting the events.
+        """
+        events_by_room = {}
+        for ev in events:
+            events_by_room.setdefault(ev.room_id, []).append(ev)
+
+        query = (
+            "INSERT INTO event_backward_extremities (event_id, room_id)"
+            " SELECT ?, ? WHERE NOT EXISTS ("
+            " SELECT 1 FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+            " )"
+            " AND NOT EXISTS ("
+            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+            " AND outlier = ?"
+            " )"
+        )
+
+        txn.executemany(query, [
+            (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+            for ev in events for e_id, _ in ev.prev_events
+            if not ev.internal_metadata.is_outlier()
+        ])
+
+        query = (
+            "DELETE FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+        )
+        txn.executemany(
+            query,
+            [
+                (ev.event_id, ev.room_id) for ev in events
+                if not ev.internal_metadata.is_outlier()
+            ]
+        )
+
+    def _delete_old_forward_extrem_cache(self):
+        def _delete_old_forward_extrem_cache_txn(txn):
+            # Delete entries older than a month, while making sure we don't delete
+            # the only entries for a room.
+            sql = ("""
+                DELETE FROM stream_ordering_to_exterm
+                WHERE
+                room_id IN (
+                    SELECT room_id
+                    FROM stream_ordering_to_exterm
+                    WHERE stream_ordering > ?
+                ) AND stream_ordering < ?
+            """)
+            txn.execute(
+                sql,
+                (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
+            )
+        return self.runInteraction(
+            "_delete_old_forward_extrem_cache",
+            _delete_old_forward_extrem_cache_txn
+        )
+
     def clean_room_for_join(self, room_id):
         return self.runInteraction(
             "clean_room_for_join",