summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py37
1 files changed, 8 insertions, 29 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6685b9da1c..c88f689d3a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -28,6 +28,7 @@ from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.state import resolve_events
+from synapse.util.caches.descriptors import cached
 
 from canonicaljson import encode_canonical_json
 from collections import deque, namedtuple, OrderedDict
@@ -301,7 +302,7 @@ class EventsStore(SQLBaseStore):
                                 room_id
                             )
                             new_latest_event_ids = yield self._calculate_new_extremeties(
-                                room_id, [ev for ev, _ in ev_ctx_rm]
+                                room_id, ev_ctx_rm, latest_event_ids
                             )
 
                             if new_latest_event_ids == set(latest_event_ids):
@@ -328,27 +329,24 @@ class EventsStore(SQLBaseStore):
                 persist_event_counter.inc_by(len(chunk))
 
     @defer.inlineCallbacks
-    def _calculate_new_extremeties(self, room_id, events):
+    def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
         """Calculates the new forward extremeties for a room given events to
         persist.
 
         Assumes that we are only persisting events for one room at a time.
         """
-        latest_event_ids = yield self.get_latest_event_ids_in_room(
-            room_id
-        )
         new_latest_event_ids = set(latest_event_ids)
         # First, add all the new events to the list
         new_latest_event_ids.update(
-            event.event_id for event in events
-            if not event.internal_metadata.is_outlier()
+            event.event_id for event, ctx in event_contexts
+            if not event.internal_metadata.is_outlier() and not ctx.rejected
         )
         # Now remove all events that are referenced by the to-be-added events
         new_latest_event_ids.difference_update(
             e_id
-            for event in events
+            for event, ctx in event_contexts
             for e_id, _ in event.prev_events
-            if not event.internal_metadata.is_outlier()
+            if not event.internal_metadata.is_outlier() and not ctx.rejected
         )
 
         # And finally remove any events that are referenced by previously added
@@ -572,14 +570,6 @@ class EventsStore(SQLBaseStore):
                     txn, self.get_users_in_room, (room_id,)
                 )
 
-                # Add an entry to the current_state_resets table to record the point
-                # where we clobbered the current state
-                self._simple_insert_txn(
-                    txn,
-                    table="current_state_resets",
-                    values={"event_stream_ordering": max_stream_order}
-                )
-
         for room_id, new_extrem in new_forward_extremeties.items():
             self._simple_delete_txn(
                 txn,
@@ -1579,6 +1569,7 @@ class EventsStore(SQLBaseStore):
         """The current minimum token that backfilled events have reached"""
         return -self._backfill_id_gen.get_current_token()
 
+    @cached(num_args=5, max_entries=10)
     def get_all_new_events(self, last_backfill_id, last_forward_id,
                            current_backfill_id, current_forward_id, limit):
         """Get all the new events that have arrived at the server either as
@@ -1611,15 +1602,6 @@ class EventsStore(SQLBaseStore):
                     upper_bound = current_forward_id
 
                 sql = (
-                    "SELECT event_stream_ordering FROM current_state_resets"
-                    " WHERE ? < event_stream_ordering"
-                    " AND event_stream_ordering <= ?"
-                    " ORDER BY event_stream_ordering ASC"
-                )
-                txn.execute(sql, (last_forward_id, upper_bound))
-                state_resets = txn.fetchall()
-
-                sql = (
                     "SELECT event_stream_ordering, event_id, state_group"
                     " FROM ex_outlier_stream"
                     " WHERE ? > event_stream_ordering"
@@ -1630,7 +1612,6 @@ class EventsStore(SQLBaseStore):
                 forward_ex_outliers = txn.fetchall()
             else:
                 new_forward_events = []
-                state_resets = []
                 forward_ex_outliers = []
 
             sql = (
@@ -1670,7 +1651,6 @@ class EventsStore(SQLBaseStore):
             return AllNewEventsResult(
                 new_forward_events, new_backfill_events,
                 forward_ex_outliers, backward_ex_outliers,
-                state_resets,
             )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
@@ -1896,5 +1876,4 @@ class EventsStore(SQLBaseStore):
 AllNewEventsResult = namedtuple("AllNewEventsResult", [
     "new_forward_events", "new_backfill_events",
     "forward_ex_outliers", "backward_ex_outliers",
-    "state_resets"
 ])