diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index bd4d503b6d..9725a3fed7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -323,6 +323,18 @@ class EventsStore(SQLBaseStore):
(metadata_json, event.event_id,)
)
+ stream_order = event.internal_metadata.stream_ordering
+ state_group_id = context.state_group or context.new_state_group_id
+ self._simple_insert_txn(
+ txn,
+ table="ex_outlier_stream",
+ values={
+ "event_stream_ordering": stream_order,
+ "event_id": event.event_id,
+ "state_group": state_group_id,
+ }
+ )
+
sql = (
"UPDATE events SET outlier = ?"
" WHERE event_id = ?"
@@ -1119,8 +1131,34 @@ class EventsStore(SQLBaseStore):
if last_forward_id != current_forward_id:
txn.execute(sql, (last_forward_id, current_forward_id, limit))
new_forward_events = txn.fetchall()
+
+ if len(new_forward_events) == limit:
+ upper_bound = new_forward_events[-1][0]
+ else:
+ upper_bound = current_forward_id
+
+ sql = (
+ "SELECT -event_stream_ordering FROM current_state_resets"
+ " WHERE ? < event_stream_ordering"
+ " AND event_stream_ordering <= ?"
+ " ORDER BY event_stream_ordering ASC"
+ )
+ txn.execute(sql, (last_forward_id, upper_bound))
+ state_resets = txn.fetchall()
+
+ sql = (
+ "SELECT -event_stream_ordering, event_id, state_group"
+ " FROM ex_outlier_stream"
+ " WHERE ? > event_stream_ordering"
+ " AND event_stream_ordering >= ?"
+ " ORDER BY event_stream_ordering DESC"
+ )
+ txn.execute(sql, (last_forward_id, upper_bound))
+ forward_ex_outliers = txn.fetchall()
else:
new_forward_events = []
+ state_resets = []
+ forward_ex_outliers = []
sql = (
"SELECT -e.stream_ordering, ej.internal_metadata, ej.json"
@@ -1136,8 +1174,28 @@ class EventsStore(SQLBaseStore):
if last_backfill_id != current_backfill_id:
txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
new_backfill_events = txn.fetchall()
+
+ if len(new_backfill_events) == limit:
+ upper_bound = new_backfill_events[-1][0]
+ else:
+ upper_bound = current_backfill_id
+
+ sql = (
+ "SELECT -event_stream_ordering, event_id, state_group"
+ " FROM ex_outlier_stream"
+ " WHERE ? > event_stream_ordering"
+ " AND event_stream_ordering >= ?"
+ " ORDER BY event_stream_ordering DESC"
+ )
+ txn.execute(sql, (-last_backfill_id, -upper_bound))
+ backward_ex_outliers = txn.fetchall()
else:
new_backfill_events = []
+ backward_ex_outliers = []
- return (new_forward_events, new_backfill_events)
+ return (
+ new_forward_events, new_backfill_events,
+ forward_ex_outliers, backward_ex_outliers,
+ state_resets,
+ )
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
diff --git a/synapse/storage/schema/delta/30/state_stream.sql b/synapse/storage/schema/delta/30/state_stream.sql
new file mode 100644
index 0000000000..706fe1dcf4
--- /dev/null
+++ b/synapse/storage/schema/delta/30/state_stream.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * The positions in the event stream_ordering when the current_state was
+ * replaced by the state at the event.
+ */
+
+CREATE TABLE IF NOT EXISTS current_state_resets(
+ event_stream_ordering BIGINT PRIMARY KEY NOT NULL
+);
+
+/* The outlier events that have aquired a state group typically through
+ * backfill. This is tracked separately to the events table, as assigning a
+ * state group change the position of the existing event in the stream
+ * ordering.
+ * However since a stream_ordering is assigned in persist_event for the
+ * (event, state) pair, we can use that stream_ordering to identify when
+ * the new state was assigned for the event.
+ */
+CREATE TABLE IF NOT EXISTS ex_outlier_stream(
+ event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
+ event_id TEXT NOT NULL,
+ state_group BIGINT NOT NULL
+);
|