summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-03-27 16:15:59 +0000
committerRichard van der Hoff <richard@matrix.org>2019-03-27 22:07:05 +0000
commit4b91c313a94a4a89998e097e79a96a4423cf1b9f (patch)
treeb031e7608c531078911f2e6b89ee4298099f8613 /synapse/replication
parentMake EventStream rows have a type (diff)
downloadsynapse-4b91c313a94a4a89998e097e79a96a4423cf1b9f.tar.xz
Combine the CurrentStateDeltaStream into the EventStream
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/streams/__init__.py1
-rw-r--r--synapse/replication/tcp/streams/_base.py21
-rw-r--r--synapse/replication/tcp/streams/events.py34
3 files changed, 33 insertions, 23 deletions
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py

index 5c715e3bfa..634f636dc9 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py
@@ -44,7 +44,6 @@ STREAMS_MAP = { federation.FederationStream, _base.TagAccountDataStream, _base.AccountDataStream, - _base.CurrentStateDeltaStream, _base.GroupServerStream, ) } diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 13ab1bee05..8971a6a22e 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -91,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", ( "data_type", # str "data", # dict )) -CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( - "room_id", # str - "type", # str - "state_key", # str - "event_id", # str, optional -)) GroupsStreamRow = namedtuple("GroupsStreamRow", ( "group_id", # str "user_id", # str @@ -428,21 +422,6 @@ class AccountDataStream(Stream): defer.returnValue(results) -class CurrentStateDeltaStream(Stream): - """Current state for a room was changed - """ - NAME = "current_state_deltas" - ROW_TYPE = CurrentStateDeltaStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_max_current_state_delta_stream_id - self.update_function = store.get_all_updated_current_state_deltas - - super(CurrentStateDeltaStream, self).__init__(hs) - - class GroupServerStream(Stream): NAME = "groups" ROW_TYPE = GroupsStreamRow diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 928028e893..e0f6e29248 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py
@@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import heapq import attr @@ -26,6 +27,7 @@ from ._base import Stream This stream contains rows of various types. Each row therefore contains a 'type' identifier before the real data. For example:: + RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]] RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]] An "ev" row is sent for each new event. The fields in the data part are: @@ -36,6 +38,14 @@ An "ev" row is sent for each new event. The fields in the data part are: * The state key of the event, for state events * The event id of an event which is redacted by this event. +A "state" row is sent whenever the "current state" in a room changes. The fields in the +data part are: + + * The room id for the state change + * The event type of the state which has changed + * The state_key of the state which has changed + * The event id of the new state + """ @@ -77,10 +87,21 @@ class EventsStreamEventRow(BaseEventsStreamRow): redacts = attr.ib() # str, optional +@attr.s(slots=True, frozen=True) +class EventsStreamCurrentStateRow(BaseEventsStreamRow): + TypeId = "state" + + room_id = attr.ib() # str + type = attr.ib() # str + state_key = attr.ib() # str + event_id = attr.ib() # str, optional + + TypeToRow = { Row.TypeId: Row for Row in ( EventsStreamEventRow, + EventsStreamCurrentStateRow, ) } @@ -105,7 +126,18 @@ class EventsStream(Stream): (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows ) - defer.returnValue(event_updates) + + state_rows = yield self._store.get_all_updated_current_state_deltas( + from_token, current_token, limit + ) + state_updates = ( + (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) + for row in state_rows + ) + + all_updates = heapq.merge(event_updates, state_updates) + + defer.returnValue(all_updates) @classmethod def parse_row(cls, row):