summary refs log tree commit diff
path: root/synapse/replication/tcp/streams.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-07-06 10:36:25 +0100
committerErik Johnston <erik@matrix.org>2017-07-06 10:36:25 +0100
commit42b50483be2b022735f8ae2107314d51e92e8471 (patch)
tree6b41c2096b105f04244ce80e840d0f998dbe226f /synapse/replication/tcp/streams.py
parentMerge pull request #2282 from matrix-org/release-v0.21.1 (diff)
parentBump version and changelog (diff)
downloadsynapse-42b50483be2b022735f8ae2107314d51e92e8471.tar.xz
Merge branch 'release-v0.22.0' of github.com:matrix-org/synapse v0.22.0
Diffstat (limited to 'synapse/replication/tcp/streams.py')
-rw-r--r--synapse/replication/tcp/streams.py22
1 files changed, 22 insertions, 0 deletions
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 369d5f2428..fbafe12cc2 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -112,6 +112,12 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
     "data_type",  # str
     "data",  # dict
 ))
+CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
+    "room_id",  # str
+    "type",  # str
+    "state_key",  # str
+    "event_id",  # str, optional
+))
 
 
 class Stream(object):
@@ -443,6 +449,21 @@ class AccountDataStream(Stream):
         defer.returnValue(results)
 
 
+class CurrentStateDeltaStream(Stream):
+    """Current state for a room was changed
+    """
+    NAME = "current_state_deltas"
+    ROW_TYPE = CurrentStateDeltaStreamRow
+
+    def __init__(self, hs):
+        store = hs.get_datastore()
+
+        self.current_token = store.get_max_current_state_delta_stream_id
+        self.update_function = store.get_all_updated_current_state_deltas
+
+        super(CurrentStateDeltaStream, self).__init__(hs)
+
+
 STREAMS_MAP = {
     stream.NAME: stream
     for stream in (
@@ -460,5 +481,6 @@ STREAMS_MAP = {
         FederationStream,
         TagAccountDataStream,
         AccountDataStream,
+        CurrentStateDeltaStream,
     )
 }