diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 925a83c645..13154b1723 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -33,6 +33,9 @@ class _EventInternalMetadata(object):
def is_outlier(self):
return getattr(self, "outlier", False)
+ def is_invite_from_remote(self):
+ return getattr(self, "invite_from_remote", False)
+
def _event_dict_property(key):
def getter(self):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 267fedf114..4a35344d32 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -102,8 +102,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, origin, pdu, state=None,
- auth_chain=None):
+ def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
"""
@@ -174,11 +173,7 @@ class FederationHandler(BaseHandler):
})
seen_ids.add(e.event_id)
- yield self._handle_new_events(
- origin,
- event_infos,
- outliers=True
- )
+ yield self._handle_new_events(origin, event_infos)
try:
context, event_stream_id, max_stream_id = yield self._handle_new_event(
@@ -761,6 +756,7 @@ class FederationHandler(BaseHandler):
event = pdu
event.internal_metadata.outlier = True
+ event.internal_metadata.invite_from_remote = True
event.signatures.update(
compute_event_signature(
@@ -1069,9 +1065,6 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def _handle_new_event(self, origin, event, state=None, auth_events=None):
-
- outlier = event.internal_metadata.is_outlier()
-
context = yield self._prep_event(
origin, event,
state=state,
@@ -1087,14 +1080,12 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
- is_new_state=not outlier,
)
defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks
- def _handle_new_events(self, origin, event_infos, backfilled=False,
- outliers=False):
+ def _handle_new_events(self, origin, event_infos, backfilled=False):
contexts = yield defer.gatherResults(
[
self._prep_event(
@@ -1113,7 +1104,6 @@ class FederationHandler(BaseHandler):
for ev_info, context in itertools.izip(event_infos, contexts)
],
backfilled=backfilled,
- is_new_state=(not outliers and not backfilled),
)
@defer.inlineCallbacks
@@ -1176,7 +1166,6 @@ class FederationHandler(BaseHandler):
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
],
- is_new_state=False,
)
new_event_context = yield self.state_handler.compute_event_context(
@@ -1185,7 +1174,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event, new_event_context,
- is_new_state=True,
current_state=state,
)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index dc3e994de9..b5f9b3b900 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -61,8 +61,7 @@ class EventsStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def persist_events(self, events_and_contexts, backfilled=False,
- is_new_state=True):
+ def persist_events(self, events_and_contexts, backfilled=False):
if not events_and_contexts:
return
@@ -110,13 +109,11 @@ class EventsStore(SQLBaseStore):
self._persist_events_txn,
events_and_contexts=chunk,
backfilled=backfilled,
- is_new_state=is_new_state,
)
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, context,
- is_new_state=True, current_state=None):
+ def persist_event(self, event, context, current_state=None):
try:
with self._stream_id_gen.get_next() as stream_ordering:
@@ -128,7 +125,6 @@ class EventsStore(SQLBaseStore):
self._persist_event_txn,
event=event,
context=context,
- is_new_state=is_new_state,
current_state=current_state,
)
except _RollbackButIsFineException:
@@ -194,8 +190,7 @@ class EventsStore(SQLBaseStore):
defer.returnValue({e.event_id: e for e in events})
@log_function
- def _persist_event_txn(self, txn, event, context,
- is_new_state, current_state):
+ def _persist_event_txn(self, txn, event, context, current_state):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
@@ -236,12 +231,10 @@ class EventsStore(SQLBaseStore):
txn,
[(event, context)],
backfilled=False,
- is_new_state=is_new_state,
)
@log_function
- def _persist_events_txn(self, txn, events_and_contexts, backfilled,
- is_new_state):
+ def _persist_events_txn(self, txn, events_and_contexts, backfilled):
depth_updates = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
@@ -452,10 +445,9 @@ class EventsStore(SQLBaseStore):
txn, [event for event, _ in events_and_contexts]
)
- state_events_and_contexts = filter(
- lambda i: i[0].is_state(),
- events_and_contexts,
- )
+ state_events_and_contexts = [
+ ec for ec in events_and_contexts if ec[0].is_state()
+ ]
state_values = []
for event, context in state_events_and_contexts:
@@ -493,32 +485,50 @@ class EventsStore(SQLBaseStore):
],
)
- if is_new_state:
- for event, _ in state_events_and_contexts:
- if not context.rejected:
- txn.call_after(
- self._get_current_state_for_key.invalidate,
- (event.room_id, event.type, event.state_key,)
- )
+ for event, _ in state_events_and_contexts:
+ if backfilled:
+ # Backfilled events come before the current state so shouldn't
+ # clobber it.
+ continue
- if event.type in [EventTypes.Name, EventTypes.Aliases]:
- txn.call_after(
- self.get_room_name_and_aliases.invalidate,
- (event.room_id,)
- )
-
- self._simple_upsert_txn(
- txn,
- "current_state_events",
- keyvalues={
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- values={
- "event_id": event.event_id,
- }
- )
+ if (not event.internal_metadata.is_invite_from_remote()
+ and event.internal_metadata.is_outlier()):
+ # Outlier events generally shouldn't clobber the current state.
+ # However invites from remote severs for rooms we aren't in
+ # are a bit special: they don't come with any associated
+ # state so are technically an outlier, however all the
+ # client-facing code assumes that they are in the current
+ # state table so we insert the event anyway.
+ continue
+
+ if context.rejected:
+ # If the event failed it's auth checks then it shouldn't
+ # clobbler the current state.
+ continue
+
+ txn.call_after(
+ self._get_current_state_for_key.invalidate,
+ (event.room_id, event.type, event.state_key,)
+ )
+
+ if event.type in [EventTypes.Name, EventTypes.Aliases]:
+ txn.call_after(
+ self.get_room_name_and_aliases.invalidate,
+ (event.room_id,)
+ )
+
+ self._simple_upsert_txn(
+ txn,
+ "current_state_events",
+ keyvalues={
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ values={
+ "event_id": event.event_id,
+ }
+ )
return
|