diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 1f517e8fad..5cb8cd96d1 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -14,13 +14,10 @@
# limitations under the License.
import itertools
import logging
-import random
from six.moves import range
from six.moves.queue import Empty, PriorityQueue
-from unpaddedbase64 import encode_base64
-
from twisted.internet import defer
from synapse.api.errors import StoreError
@@ -148,8 +145,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
retcol="event_id",
)
- @defer.inlineCallbacks
- def get_prev_events_for_room(self, room_id):
+ def get_prev_events_for_room(self, room_id: str):
"""
Gets a subset of the current forward extremities in the given room.
@@ -160,40 +156,29 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
room_id (str): room_id
Returns:
- Deferred[list[(str, dict[str, str], int)]]
- for each event, a tuple of (event_id, hashes, depth)
- where *hashes* is a map from algorithm to hash.
+ Deferred[List[str]]: the event ids of the forward extremites
+
"""
- res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
- if len(res) > 10:
- # Sort by reverse depth, so we point to the most recent.
- res.sort(key=lambda a: -a[2])
- # we use half of the limit for the actual most recent events, and
- # the other half to randomly point to some of the older events, to
- # make sure that we don't completely ignore the older events.
- res = res[0:5] + random.sample(res[5:], 5)
+ return self.db.runInteraction(
+ "get_prev_events_for_room", self._get_prev_events_for_room_txn, room_id
+ )
- return res
+ def _get_prev_events_for_room_txn(self, txn, room_id: str):
+ # we just use the 10 newest events. Older events will become
+ # prev_events of future events.
- def get_latest_event_ids_and_hashes_in_room(self, room_id):
+ sql = """
+ SELECT e.event_id FROM event_forward_extremities AS f
+ INNER JOIN events AS e USING (event_id)
+ WHERE f.room_id = ?
+ ORDER BY e.depth DESC
+ LIMIT 10
"""
- Gets the current forward extremities in the given room
- Args:
- room_id (str): room_id
+ txn.execute(sql, (room_id,))
- Returns:
- Deferred[list[(str, dict[str, str], int)]]
- for each event, a tuple of (event_id, hashes, depth)
- where *hashes* is a map from algorithm to hash.
- """
-
- return self.db.runInteraction(
- "get_latest_event_ids_and_hashes_in_room",
- self._get_latest_event_ids_and_hashes_in_room,
- room_id,
- )
+ return [row[0] for row in txn]
def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter):
"""Get the top rooms with at least N extremities.
@@ -243,27 +228,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
desc="get_latest_event_ids_in_room",
)
- def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id):
- sql = (
- "SELECT e.event_id, e.depth FROM events as e "
- "INNER JOIN event_forward_extremities as f "
- "ON e.event_id = f.event_id "
- "AND e.room_id = f.room_id "
- "WHERE f.room_id = ?"
- )
-
- txn.execute(sql, (room_id,))
-
- results = []
- for event_id, depth in txn.fetchall():
- hashes = self._get_event_reference_hashes_txn(txn, event_id)
- prev_hashes = {
- k: encode_base64(v) for k, v in hashes.items() if k == "sha256"
- }
- results.append((event_id, prev_hashes, depth))
-
- return results
-
def get_min_depth(self, room_id):
""" For hte given room, get the minimum depth we have seen for it.
"""
|