diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 308a2c9b02..21487724ed 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -118,7 +118,7 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, context, current_state=None):
+ def persist_event(self, event, context, current_state=None, backfilled=False):
try:
with self._stream_id_gen.get_next() as stream_ordering:
@@ -131,6 +131,7 @@ class EventsStore(SQLBaseStore):
event=event,
context=context,
current_state=current_state,
+ backfilled=backfilled,
)
except _RollbackButIsFineException:
pass
@@ -195,7 +196,7 @@ class EventsStore(SQLBaseStore):
defer.returnValue({e.event_id: e for e in events})
@log_function
- def _persist_event_txn(self, txn, event, context, current_state):
+ def _persist_event_txn(self, txn, event, context, current_state, backfilled=False):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
@@ -238,7 +239,7 @@ class EventsStore(SQLBaseStore):
return self._persist_events_txn(
txn,
[(event, context)],
- backfilled=False,
+ backfilled=backfilled,
)
@log_function
@@ -543,6 +544,22 @@ class EventsStore(SQLBaseStore):
(event.event_id, event.redacts)
)
+ @defer.inlineCallbacks
+ def have_events_in_timeline(self, event_ids):
+ """Given a list of event ids, check if we have already processed and
+ stored them as non outliers.
+ """
+ rows = yield self._simple_select_many_batch(
+ table="events",
+ retcols=("event_id",),
+ column="event_id",
+ iterable=list(event_ids),
+ keyvalues={"outlier": False},
+ desc="have_events_in_timeline",
+ )
+
+ defer.returnValue(set(r["event_id"] for r in rows))
+
def have_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
|