From 4b91c313a94a4a89998e097e79a96a4423cf1b9f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 16:15:59 +0000 Subject: Combine the CurrentStateDeltaStream into the EventStream --- synapse/replication/tcp/streams/__init__.py | 1 - synapse/replication/tcp/streams/_base.py | 21 ------------------ synapse/replication/tcp/streams/events.py | 34 ++++++++++++++++++++++++++++- 3 files changed, 33 insertions(+), 23 deletions(-) (limited to 'synapse/replication/tcp/streams') diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 5c715e3bfa..634f636dc9 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -44,7 +44,6 @@ STREAMS_MAP = { federation.FederationStream, _base.TagAccountDataStream, _base.AccountDataStream, - _base.CurrentStateDeltaStream, _base.GroupServerStream, ) } diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 13ab1bee05..8971a6a22e 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -91,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", ( "data_type", # str "data", # dict )) -CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( - "room_id", # str - "type", # str - "state_key", # str - "event_id", # str, optional -)) GroupsStreamRow = namedtuple("GroupsStreamRow", ( "group_id", # str "user_id", # str @@ -428,21 +422,6 @@ class AccountDataStream(Stream): defer.returnValue(results) -class CurrentStateDeltaStream(Stream): - """Current state for a room was changed - """ - NAME = "current_state_deltas" - ROW_TYPE = CurrentStateDeltaStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_max_current_state_delta_stream_id - self.update_function = store.get_all_updated_current_state_deltas - - super(CurrentStateDeltaStream, self).__init__(hs) - - class GroupServerStream(Stream): NAME = "groups" ROW_TYPE = GroupsStreamRow diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 928028e893..e0f6e29248 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import heapq import attr @@ -26,6 +27,7 @@ from ._base import Stream This stream contains rows of various types. Each row therefore contains a 'type' identifier before the real data. For example:: + RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]] RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]] An "ev" row is sent for each new event. The fields in the data part are: @@ -36,6 +38,14 @@ An "ev" row is sent for each new event. The fields in the data part are: * The state key of the event, for state events * The event id of an event which is redacted by this event. +A "state" row is sent whenever the "current state" in a room changes. The fields in the +data part are: + + * The room id for the state change + * The event type of the state which has changed + * The state_key of the state which has changed + * The event id of the new state + """ @@ -77,10 +87,21 @@ class EventsStreamEventRow(BaseEventsStreamRow): redacts = attr.ib() # str, optional +@attr.s(slots=True, frozen=True) +class EventsStreamCurrentStateRow(BaseEventsStreamRow): + TypeId = "state" + + room_id = attr.ib() # str + type = attr.ib() # str + state_key = attr.ib() # str + event_id = attr.ib() # str, optional + + TypeToRow = { Row.TypeId: Row for Row in ( EventsStreamEventRow, + EventsStreamCurrentStateRow, ) } @@ -105,7 +126,18 @@ class EventsStream(Stream): (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows ) - defer.returnValue(event_updates) + + state_rows = yield self._store.get_all_updated_current_state_deltas( + from_token, current_token, limit + ) + state_updates = ( + (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) + for row in state_rows + ) + + all_updates = heapq.merge(event_updates, state_updates) + + defer.returnValue(all_updates) @classmethod def parse_row(cls, row): -- cgit 1.4.1