diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index bf07951027..8f0c6e959f 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -77,7 +77,7 @@ class EventBase(object):
return self.content["membership"]
def is_state(self):
- return hasattr(self, "state_key")
+ return hasattr(self, "state_key") and self.state_key is not None
def get_dict(self):
d = dict(self._event_dict)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 86953bf8c8..0876589e31 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -515,6 +515,8 @@ class FederationHandler(BaseHandler):
"Failed to get destination from event %s", s.event_id
)
+ destinations.remove(origin)
+
logger.debug(
"on_send_join_request: Sending event: %s, signatures: %s",
event.event_id,
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 93ab26fcd1..30ce378900 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -163,19 +163,70 @@ class DataStore(RoomMemberStore, RoomStore,
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
- if event.type == EventTypes.Member:
- self._store_room_member_txn(txn, event)
- elif event.type == EventTypes.Feedback:
- self._store_feedback_txn(txn, event)
- elif event.type == EventTypes.Name:
- self._store_room_name_txn(txn, event)
- elif event.type == EventTypes.Topic:
- self._store_room_topic_txn(txn, event)
- elif event.type == EventTypes.Redaction:
- self._store_redaction(txn, event)
+
+ # We purposefully do this first since if we include a `current_state`
+ # key, we *want* to update the `current_state_events` table
+ if current_state:
+ txn.execute(
+ "DELETE FROM current_state_events WHERE room_id = ?",
+ (event.room_id,)
+ )
+
+ for s in current_state:
+ self._simple_insert_txn(
+ txn,
+ "current_state_events",
+ {
+ "event_id": s.event_id,
+ "room_id": s.room_id,
+ "type": s.type,
+ "state_key": s.state_key,
+ },
+ or_replace=True,
+ )
+
+ if event.is_state() and is_new_state:
+ if not backfilled and not context.rejected:
+ self._simple_insert_txn(
+ txn,
+ table="state_forward_extremities",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ or_replace=True,
+ )
+
+ for prev_state_id, _ in event.prev_state:
+ self._simple_delete_txn(
+ txn,
+ table="state_forward_extremities",
+ keyvalues={
+ "event_id": prev_state_id,
+ }
+ )
outlier = event.internal_metadata.is_outlier()
+ if not outlier:
+ self._store_state_groups_txn(txn, event, context)
+
+ self._update_min_depth_for_room_txn(
+ txn,
+ event.room_id,
+ event.depth
+ )
+
+ self._handle_prev_events(
+ txn,
+ outlier=outlier,
+ event_id=event.event_id,
+ prev_events=event.prev_events,
+ room_id=event.room_id,
+ )
+
have_persisted = self._simple_select_one_onecol_txn(
txn,
table="event_json",
@@ -209,6 +260,17 @@ class DataStore(RoomMemberStore, RoomStore,
)
return
+ if event.type == EventTypes.Member:
+ self._store_room_member_txn(txn, event)
+ elif event.type == EventTypes.Feedback:
+ self._store_feedback_txn(txn, event)
+ elif event.type == EventTypes.Name:
+ self._store_room_name_txn(txn, event)
+ elif event.type == EventTypes.Topic:
+ self._store_room_topic_txn(txn, event)
+ elif event.type == EventTypes.Redaction:
+ self._store_redaction(txn, event)
+
event_dict = {
k: v
for k, v in event.get_dict().items()
@@ -273,41 +335,10 @@ class DataStore(RoomMemberStore, RoomStore,
)
raise _RollbackButIsFineException("_persist_event")
- self._handle_prev_events(
- txn,
- outlier=outlier,
- event_id=event.event_id,
- prev_events=event.prev_events,
- room_id=event.room_id,
- )
-
- if not outlier:
- self._store_state_groups_txn(txn, event, context)
-
if context.rejected:
self._store_rejections_txn(txn, event.event_id, context.rejected)
- if current_state:
- txn.execute(
- "DELETE FROM current_state_events WHERE room_id = ?",
- (event.room_id,)
- )
-
- for s in current_state:
- self._simple_insert_txn(
- txn,
- "current_state_events",
- {
- "event_id": s.event_id,
- "room_id": s.room_id,
- "type": s.type,
- "state_key": s.state_key,
- },
- or_replace=True,
- )
-
- is_state = hasattr(event, "state_key") and event.state_key is not None
- if is_state:
+ if event.is_state():
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@@ -315,6 +346,7 @@ class DataStore(RoomMemberStore, RoomStore,
"state_key": event.state_key,
}
+ # TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
@@ -351,28 +383,6 @@ class DataStore(RoomMemberStore, RoomStore,
or_ignore=True,
)
- if not backfilled and not context.rejected:
- self._simple_insert_txn(
- txn,
- table="state_forward_extremities",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- or_replace=True,
- )
-
- for prev_state_id, _ in event.prev_state:
- self._simple_delete_txn(
- txn,
- table="state_forward_extremities",
- keyvalues={
- "event_id": prev_state_id,
- }
- )
-
for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn(
@@ -403,13 +413,6 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, ref_alg, ref_hash_bytes
)
- if not outlier:
- self._update_min_depth_for_room_txn(
- txn,
- event.room_id,
- event.depth
- )
-
def _store_redaction(self, txn, event):
txn.execute(
"INSERT OR IGNORE INTO redactions "
|