diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4beb951b9f..4f09909607 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -30,6 +30,7 @@ from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
from .media_repository import MediaRepositoryStore
+from .rejections import RejectionsStore
from .state import StateStore
from .signatures import SignatureStore
@@ -66,7 +67,7 @@ SCHEMAS = [
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 11
+SCHEMA_VERSION = 12
class _RollbackButIsFineException(Exception):
@@ -82,6 +83,7 @@ class DataStore(RoomMemberStore, RoomStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
EventFederationStore,
MediaRepositoryStore,
+ RejectionsStore,
):
def __init__(self, hs):
@@ -224,6 +226,9 @@ class DataStore(RoomMemberStore, RoomStore,
if not outlier:
self._store_state_groups_txn(txn, event, context)
+ if context.rejected:
+ self._store_rejections_txn(txn, event.event_id, context.rejected)
+
if current_state:
txn.execute(
"DELETE FROM current_state_events WHERE room_id = ?",
@@ -262,7 +267,7 @@ class DataStore(RoomMemberStore, RoomStore,
or_replace=True,
)
- if is_new_state:
+ if is_new_state and not context.rejected:
self._simple_insert_txn(
txn,
"current_state_events",
@@ -288,7 +293,7 @@ class DataStore(RoomMemberStore, RoomStore,
or_ignore=True,
)
- if not backfilled:
+ if not backfilled and not context.rejected:
self._simple_insert_txn(
txn,
table="state_forward_extremities",
@@ -417,6 +422,35 @@ class DataStore(RoomMemberStore, RoomStore,
],
)
+ def have_events(self, event_ids):
+ """Given a list of event ids, check if we have already processed them.
+
+ Returns:
+ dict: Has an entry for each event id we already have seen. Maps to
+ the rejected reason string if we rejected the event, else maps to
+ None.
+ """
+ def f(txn):
+ sql = (
+ "SELECT e.event_id, reason FROM events as e "
+ "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+ "WHERE event_id = ?"
+ )
+
+ res = {}
+ for event_id in event_ids:
+ txn.execute(sql, (event_id,))
+ row = txn.fetchone()
+ if row:
+ _, rejected = row
+ res[event_id] = rejected
+
+ return res
+
+ return self.runInteraction(
+ "have_events", f,
+ )
+
def schema_path(schema):
""" Get a filesystem path for the named database schema
|