summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-03-30 17:19:56 +0100
committerMark Haines <mark.haines@matrix.org>2016-03-30 17:19:56 +0100
commit1fbb094c6fbaab33ef8e17802e37057e83718e7e (patch)
tree602b841ceb262e658ddfb8c11a2462cbffee16f2
parentAdd a entry to current_state_resets table when the current state is reset (diff)
downloadsynapse-1fbb094c6fbaab33ef8e17802e37057e83718e7e.tar.xz
Add replication streams for ex outliers and current state resets
Diffstat (limited to '')
-rw-r--r--synapse/replication/resource.py17
-rw-r--r--synapse/storage/events.py60
-rw-r--r--synapse/storage/schema/delta/30/state_stream.sql38
3 files changed, 113 insertions, 2 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 096a79a7a4..7afa1242d5 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -204,7 +204,11 @@ class ReplicationResource(Resource):
                 request_events = current_token.events
             if request_backfill is None:
                 request_backfill = current_token.backfill
-            events_rows, backfill_rows = yield self.store.get_all_new_events(
+            (
+                events_rows, backfill_rows,
+                forward_ex_outliers, backward_ex_outliers,
+                state_resets
+            ) = yield self.store.get_all_new_events(
                 request_backfill, request_events,
                 current_token.backfill, current_token.events,
                 limit
@@ -215,6 +219,17 @@ class ReplicationResource(Resource):
             writer.write_header_and_rows("backfill", backfill_rows, (
                 "position", "internal", "json", "state_group"
             ))
+            writer.write_header_and_rows(
+                "forward_ex_outliers", forward_ex_outliers,
+                ("position", "event_id", "state_group")
+            )
+            writer.write_header_and_rows(
+                "backward_ex_outliers", backward_ex_outliers,
+                ("position", "event_id", "state_group")
+            )
+            writer.write_header_and_rows(
+                "state_resets", state_resets, ("position",)
+            )
 
     @defer.inlineCallbacks
     def presence(self, writer, current_token):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index bd4d503b6d..9725a3fed7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -323,6 +323,18 @@ class EventsStore(SQLBaseStore):
                     (metadata_json, event.event_id,)
                 )
 
+                stream_order = event.internal_metadata.stream_ordering
+                state_group_id = context.state_group or context.new_state_group_id
+                self._simple_insert_txn(
+                    txn,
+                    table="ex_outlier_stream",
+                    values={
+                        "event_stream_ordering": stream_order,
+                        "event_id": event.event_id,
+                        "state_group": state_group_id,
+                    }
+                )
+
                 sql = (
                     "UPDATE events SET outlier = ?"
                     " WHERE event_id = ?"
@@ -1119,8 +1131,34 @@ class EventsStore(SQLBaseStore):
             if last_forward_id != current_forward_id:
                 txn.execute(sql, (last_forward_id, current_forward_id, limit))
                 new_forward_events = txn.fetchall()
+
+                if len(new_forward_events) == limit:
+                    upper_bound = new_forward_events[-1][0]
+                else:
+                    upper_bound = current_forward_id
+
+                sql = (
+                    "SELECT -event_stream_ordering FROM current_state_resets"
+                    " WHERE ? < event_stream_ordering"
+                    " AND event_stream_ordering <= ?"
+                    " ORDER BY event_stream_ordering ASC"
+                )
+                txn.execute(sql, (last_forward_id, upper_bound))
+                state_resets = txn.fetchall()
+
+                sql = (
+                    "SELECT -event_stream_ordering, event_id, state_group"
+                    " FROM ex_outlier_stream"
+                    " WHERE ? > event_stream_ordering"
+                    " AND event_stream_ordering >= ?"
+                    " ORDER BY event_stream_ordering DESC"
+                )
+                txn.execute(sql, (last_forward_id, upper_bound))
+                forward_ex_outliers = txn.fetchall()
             else:
                 new_forward_events = []
+                state_resets = []
+                forward_ex_outliers = []
 
             sql = (
                 "SELECT -e.stream_ordering, ej.internal_metadata, ej.json"
@@ -1136,8 +1174,28 @@ class EventsStore(SQLBaseStore):
             if last_backfill_id != current_backfill_id:
                 txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
                 new_backfill_events = txn.fetchall()
+
+                if len(new_backfill_events) == limit:
+                    upper_bound = new_backfill_events[-1][0]
+                else:
+                    upper_bound = current_backfill_id
+
+                sql = (
+                    "SELECT -event_stream_ordering, event_id, state_group"
+                    " FROM ex_outlier_stream"
+                    " WHERE ? > event_stream_ordering"
+                    " AND event_stream_ordering >= ?"
+                    " ORDER BY event_stream_ordering DESC"
+                )
+                txn.execute(sql, (-last_backfill_id, -upper_bound))
+                backward_ex_outliers = txn.fetchall()
             else:
                 new_backfill_events = []
+                backward_ex_outliers = []
 
-            return (new_forward_events, new_backfill_events)
+            return (
+                new_forward_events, new_backfill_events,
+                forward_ex_outliers, backward_ex_outliers,
+                state_resets,
+            )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
diff --git a/synapse/storage/schema/delta/30/state_stream.sql b/synapse/storage/schema/delta/30/state_stream.sql
new file mode 100644
index 0000000000..706fe1dcf4
--- /dev/null
+++ b/synapse/storage/schema/delta/30/state_stream.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * The positions in the event stream_ordering when the current_state was
+ * replaced by the state at the event.
+ */
+
+CREATE TABLE IF NOT EXISTS current_state_resets(
+    event_stream_ordering BIGINT PRIMARY KEY NOT NULL
+);
+
+/* The outlier events that have aquired a state group typically through
+ * backfill. This is tracked separately to the events table, as assigning a
+ * state group change the position of the existing event in the stream
+ * ordering.
+ * However since a stream_ordering is assigned in persist_event for the
+ * (event, state) pair, we can use that stream_ordering to identify when
+ * the new state was assigned for the event.
+ */
+CREATE TABLE IF NOT EXISTS ex_outlier_stream(
+    event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
+    event_id TEXT NOT NULL,
+    state_group BIGINT NOT NULL
+);