diff options
author | Mark Haines <mjark@negativecurvature.net> | 2016-03-31 11:20:57 +0100 |
---|---|---|
committer | Mark Haines <mjark@negativecurvature.net> | 2016-03-31 11:20:57 +0100 |
commit | 62e395f0e36c680662e71e25b1556f3477a26b74 (patch) | |
tree | c314edc7427ed4eb544ecf6a09aeaf0056df6672 /synapse/storage/events.py | |
parent | Line length (diff) | |
parent | Use a namedtuple rather than tuple unpacking (diff) | |
download | synapse-62e395f0e36c680662e71e25b1556f3477a26b74.tar.xz |
Merge pull request #676 from matrix-org/markjh/replicate_stateIII
Add replication streams for ex outliers and current state resets
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/events.py | 78 |
1 files changed, 76 insertions, 2 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e0ef7f46b2..dc3e994de9 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -25,7 +25,7 @@ from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json from contextlib import contextmanager - +from collections import namedtuple import logging import math @@ -205,6 +205,15 @@ class EventsStore(SQLBaseStore): txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases, event.room_id) + # Add an entry to the current_state_resets table to record the point + # where we clobbered the current state + stream_order = event.internal_metadata.stream_ordering + self._simple_insert_txn( + txn, + table="current_state_resets", + values={"event_stream_ordering": stream_order} + ) + self._simple_delete_txn( txn, table="current_state_events", @@ -314,6 +323,18 @@ class EventsStore(SQLBaseStore): (metadata_json, event.event_id,) ) + stream_order = event.internal_metadata.stream_ordering + state_group_id = context.state_group or context.new_state_group_id + self._simple_insert_txn( + txn, + table="ex_outlier_stream", + values={ + "event_stream_ordering": stream_order, + "event_id": event.event_id, + "state_group": state_group_id, + } + ) + sql = ( "UPDATE events SET outlier = ?" " WHERE event_id = ?" @@ -1110,8 +1131,34 @@ class EventsStore(SQLBaseStore): if last_forward_id != current_forward_id: txn.execute(sql, (last_forward_id, current_forward_id, limit)) new_forward_events = txn.fetchall() + + if len(new_forward_events) == limit: + upper_bound = new_forward_events[-1][0] + else: + upper_bound = current_forward_id + + sql = ( + "SELECT -event_stream_ordering FROM current_state_resets" + " WHERE ? < event_stream_ordering" + " AND event_stream_ordering <= ?" + " ORDER BY event_stream_ordering ASC" + ) + txn.execute(sql, (last_forward_id, upper_bound)) + state_resets = txn.fetchall() + + sql = ( + "SELECT -event_stream_ordering, event_id, state_group" + " FROM ex_outlier_stream" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (last_forward_id, upper_bound)) + forward_ex_outliers = txn.fetchall() else: new_forward_events = [] + state_resets = [] + forward_ex_outliers = [] sql = ( "SELECT -e.stream_ordering, ej.internal_metadata, ej.json," @@ -1128,8 +1175,35 @@ class EventsStore(SQLBaseStore): if last_backfill_id != current_backfill_id: txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit)) new_backfill_events = txn.fetchall() + + if len(new_backfill_events) == limit: + upper_bound = new_backfill_events[-1][0] + else: + upper_bound = current_backfill_id + + sql = ( + "SELECT -event_stream_ordering, event_id, state_group" + " FROM ex_outlier_stream" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (-last_backfill_id, -upper_bound)) + backward_ex_outliers = txn.fetchall() else: new_backfill_events = [] + backward_ex_outliers = [] - return (new_forward_events, new_backfill_events) + return AllNewEventsResult( + new_forward_events, new_backfill_events, + forward_ex_outliers, backward_ex_outliers, + state_resets, + ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) + + +AllNewEventsResult = namedtuple("AllNewEventsResult", [ + "new_forward_events", "new_backfill_events", + "forward_ex_outliers", "backward_ex_outliers", + "state_resets" +]) |