diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 02cefdff26..e9f9406014 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -64,12 +64,12 @@ class StateStore(SQLBaseStore):
for group, state_map in group_to_state.items()
})
- def _store_state_groups_txn(self, txn, event, context):
- return self._store_mult_state_groups_txn(txn, [(event, context)])
-
def _store_mult_state_groups_txn(self, txn, events_and_contexts):
state_groups = {}
for event, context in events_and_contexts:
+ if event.internal_metadata.is_outlier():
+ continue
+
if context.current_state is None:
continue
@@ -82,7 +82,8 @@ class StateStore(SQLBaseStore):
if event.is_state():
state_events[(event.type, event.state_key)] = event
- state_group = self._state_groups_id_gen.get_next()
+ state_group = context.new_state_group_id
+
self._simple_insert_txn(
txn,
table="state_groups",
@@ -114,11 +115,10 @@ class StateStore(SQLBaseStore):
table="event_to_state_groups",
values=[
{
- "state_group": state_groups[event.event_id],
- "event_id": event.event_id,
+ "state_group": state_group_id,
+ "event_id": event_id,
}
- for event, context in events_and_contexts
- if context.current_state is not None
+ for event_id, state_group_id in state_groups.items()
],
)
@@ -249,11 +249,14 @@ class StateStore(SQLBaseStore):
"""
Get the state dict corresponding to a particular event
- :param str event_id: event whose state should be returned
- :param list[(str, str)]|None types: List of (type, state_key) tuples
- which are used to filter the state fetched. May be None, which
- matches any key
- :return: a deferred dict from (type, state_key) -> state_event
+ Args:
+ event_id(str): event whose state should be returned
+ types(list[(str, str)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. May be None, which
+ matches any key
+
+ Returns:
+ A deferred dict from (type, state_key) -> state_event
"""
state_map = yield self.get_state_for_events([event_id], types)
defer.returnValue(state_map[event_id])
@@ -429,3 +432,33 @@ class StateStore(SQLBaseStore):
}
defer.returnValue(results)
+
+ def get_all_new_state_groups(self, last_id, current_id, limit):
+ def get_all_new_state_groups_txn(txn):
+ sql = (
+ "SELECT id, room_id, event_id FROM state_groups"
+ " WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
+ )
+ txn.execute(sql, (last_id, current_id, limit))
+ groups = txn.fetchall()
+
+ if not groups:
+ return ([], [])
+
+ lower_bound = groups[0][0]
+ upper_bound = groups[-1][0]
+ sql = (
+ "SELECT state_group, type, state_key, event_id"
+ " FROM state_groups_state"
+ " WHERE ? <= state_group AND state_group <= ?"
+ )
+
+ txn.execute(sql, (lower_bound, upper_bound))
+ state_group_state = txn.fetchall()
+ return (groups, state_group_state)
+ return self.runInteraction(
+ "get_all_new_state_groups", get_all_new_state_groups_txn
+ )
+
+ def get_state_stream_token(self):
+ return self._state_groups_id_gen.get_current_token()
|