summary refs log tree commit diff
path: root/synapse/replication/tcp/streams.py
diff options
context:
space:
mode:
authorMichael Telatynski <7t3chguy@gmail.com>2018-07-24 17:17:46 +0100
committerMichael Telatynski <7t3chguy@gmail.com>2018-07-24 17:17:46 +0100
commit87951d3891efb5bccedf72c12b3da0d6ab482253 (patch)
treede7d997567c66c5a4d8743c1f3b9d6b474f5cfd9 /synapse/replication/tcp/streams.py
parentif inviter_display_name == ""||None then default to inviter MXID (diff)
parentMerge pull request #3595 from matrix-org/erikj/use_deltas (diff)
downloadsynapse-87951d3891efb5bccedf72c12b3da0d6ab482253.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into t3chguy/default_inviter_display_name_3pid
Diffstat (limited to 'synapse/replication/tcp/streams.py')
-rw-r--r--synapse/replication/tcp/streams.py25
1 files changed, 22 insertions, 3 deletions
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py

index fbafe12cc2..55fe701c5c 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py
@@ -24,11 +24,10 @@ Each stream is defined by the following information: update_function: The function that returns a list of updates between two tokens """ -from twisted.internet import defer -from collections import namedtuple - import logging +from collections import namedtuple +from twisted.internet import defer logger = logging.getLogger(__name__) @@ -118,6 +117,12 @@ CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( "state_key", # str "event_id", # str, optional )) +GroupsStreamRow = namedtuple("GroupsStreamRow", ( + "group_id", # str + "user_id", # str + "type", # str + "content", # dict +)) class Stream(object): @@ -464,6 +469,19 @@ class CurrentStateDeltaStream(Stream): super(CurrentStateDeltaStream, self).__init__(hs) +class GroupServerStream(Stream): + NAME = "groups" + ROW_TYPE = GroupsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_group_stream_token + self.update_function = store.get_all_groups_changes + + super(GroupServerStream, self).__init__(hs) + + STREAMS_MAP = { stream.NAME: stream for stream in ( @@ -482,5 +500,6 @@ STREAMS_MAP = { TagAccountDataStream, AccountDataStream, CurrentStateDeltaStream, + GroupServerStream, ) }