summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorHubert Chathi <hubert@uhoreg.ca>2018-08-24 11:44:26 -0400
committerGitHub <noreply@github.com>2018-08-24 11:44:26 -0400
commit83caead95a921b0977164468a52c5c0b6e9eee5a (patch)
treee87ae6c5d439de1e1dfd7598cab35a14e320ef59 /synapse/storage/events.py
parentallow session_data to be any JSON instead of just a string (diff)
parentMerge pull request #3755 from matrix-org/erikj/fix_server_notice_tags (diff)
downloadsynapse-83caead95a921b0977164468a52c5c0b6e9eee5a.tar.xz
Merge branch 'develop' into e2e_backups
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py101
1 files changed, 13 insertions, 88 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e8e5a0fe44..f39c8c8461 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -38,7 +38,7 @@ from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken, get_domain_from_id
-from synapse.util.async import ObservableDeferred
+from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
@@ -485,9 +485,14 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                     new_forward_extremeties=new_forward_extremeties,
                 )
                 persist_event_counter.inc(len(chunk))
-                synapse.metrics.event_persisted_position.set(
-                    chunk[-1][0].internal_metadata.stream_ordering,
-                )
+
+                if not backfilled:
+                    # backfilled events have negative stream orderings, so we don't
+                    # want to set the event_persisted_position to that.
+                    synapse.metrics.event_persisted_position.set(
+                        chunk[-1][0].internal_metadata.stream_ordering,
+                    )
+
                 for event, context in chunk:
                     if context.app_service:
                         origin_type = "local"
@@ -700,9 +705,11 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         }
 
         events_map = {ev.event_id: ev for ev, _ in events_context}
+        room_version = yield self.get_room_version(room_id)
+
         logger.debug("calling resolve_state_groups from preserve_events")
         res = yield self._state_resolution_handler.resolve_state_groups(
-            room_id, state_groups, events_map, get_events
+            room_id, room_version, state_groups, events_map, get_events
         )
 
         defer.returnValue((res.state, None))
@@ -1431,88 +1438,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         )
 
     @defer.inlineCallbacks
-    def have_events_in_timeline(self, event_ids):
-        """Given a list of event ids, check if we have already processed and
-        stored them as non outliers.
-        """
-        rows = yield self._simple_select_many_batch(
-            table="events",
-            retcols=("event_id",),
-            column="event_id",
-            iterable=list(event_ids),
-            keyvalues={"outlier": False},
-            desc="have_events_in_timeline",
-        )
-
-        defer.returnValue(set(r["event_id"] for r in rows))
-
-    @defer.inlineCallbacks
-    def have_seen_events(self, event_ids):
-        """Given a list of event ids, check if we have already processed them.
-
-        Args:
-            event_ids (iterable[str]):
-
-        Returns:
-            Deferred[set[str]]: The events we have already seen.
-        """
-        results = set()
-
-        def have_seen_events_txn(txn, chunk):
-            sql = (
-                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
-                % (",".join("?" * len(chunk)), )
-            )
-            txn.execute(sql, chunk)
-            for (event_id, ) in txn:
-                results.add(event_id)
-
-        # break the input up into chunks of 100
-        input_iterator = iter(event_ids)
-        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
-                          []):
-            yield self.runInteraction(
-                "have_seen_events",
-                have_seen_events_txn,
-                chunk,
-            )
-        defer.returnValue(results)
-
-    def get_seen_events_with_rejections(self, event_ids):
-        """Given a list of event ids, check if we rejected them.
-
-        Args:
-            event_ids (list[str])
-
-        Returns:
-            Deferred[dict[str, str|None):
-                Has an entry for each event id we already have seen. Maps to
-                the rejected reason string if we rejected the event, else maps
-                to None.
-        """
-        if not event_ids:
-            return defer.succeed({})
-
-        def f(txn):
-            sql = (
-                "SELECT e.event_id, reason FROM events as e "
-                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
-                "WHERE e.event_id = ?"
-            )
-
-            res = {}
-            for event_id in event_ids:
-                txn.execute(sql, (event_id,))
-                row = txn.fetchone()
-                if row:
-                    _, rejected = row
-                    res[event_id] = rejected
-
-            return res
-
-        return self.runInteraction("get_rejection_reasons", f)
-
-    @defer.inlineCallbacks
     def count_daily_messages(self):
         """
         Returns an estimate of the number of messages sent in the last day.
@@ -1988,7 +1913,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         max_depth = max(row[0] for row in rows)
 
         if max_depth <= token.topological:
-            # We need to ensure we don't delete all the events from the datanase
+            # We need to ensure we don't delete all the events from the database
             # otherwise we wouldn't be able to send any events (due to not
             # having any backwards extremeties)
             raise SynapseError(