diff options
author | Erik Johnston <erik@matrix.org> | 2017-10-23 13:13:53 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-10-23 13:13:53 +0100 |
commit | ffba97807746f6f0d7a8820c0d980f188dcad08c (patch) | |
tree | 36301e98cdc6400afe71b89fc4b0c8beccd0deea /synapse/replication/tcp | |
parent | Merge pull request #2540 from 4nd3r/patch-1 (diff) | |
parent | Bump version and changelog (diff) | |
download | synapse-ffba97807746f6f0d7a8820c0d980f188dcad08c.tar.xz |
Merge branch 'release-v0.24.0' of github.com:matrix-org/synapse v0.24.0
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/resource.py | 6 | ||||
-rw-r--r-- | synapse/replication/tcp/streams.py | 20 |
2 files changed, 25 insertions, 1 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 3ea3ca5a6f..6c1beca4e3 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -160,7 +160,11 @@ class ReplicationStreamer(object): "Getting stream: %s: %s -> %s", stream.NAME, stream.last_token, stream.upto_token ) - updates, current_token = yield stream.get_updates() + try: + updates, current_token = yield stream.get_updates() + except: + logger.info("Failed to handle stream %s", stream.NAME) + raise logger.debug( "Sending %d updates to %d connections", diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index fbafe12cc2..4c60bf79f9 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -118,6 +118,12 @@ CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( "state_key", # str "event_id", # str, optional )) +GroupsStreamRow = namedtuple("GroupsStreamRow", ( + "group_id", # str + "user_id", # str + "type", # str + "content", # dict +)) class Stream(object): @@ -464,6 +470,19 @@ class CurrentStateDeltaStream(Stream): super(CurrentStateDeltaStream, self).__init__(hs) +class GroupServerStream(Stream): + NAME = "groups" + ROW_TYPE = GroupsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_group_stream_token + self.update_function = store.get_all_groups_changes + + super(GroupServerStream, self).__init__(hs) + + STREAMS_MAP = { stream.NAME: stream for stream in ( @@ -482,5 +501,6 @@ STREAMS_MAP = { TagAccountDataStream, AccountDataStream, CurrentStateDeltaStream, + GroupServerStream, ) } |