summary refs log tree commit diff
path: root/synapse/app/user_dir.py
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2019-03-28 18:32:13 +0000
committerGitHub <noreply@github.com>2019-03-28 18:32:13 +0000
commit902cdc63b6668bebbf66dc084de057386da59bd8 (patch)
tree408a8187c455a4cfc47e8b4fbf6e11aaf0c3c893 /synapse/app/user_dir.py
parentMerge pull request #4954 from matrix-org/rav/refactor_parse_row (diff)
parentchangelog (diff)
downloadsynapse-902cdc63b6668bebbf66dc084de057386da59bd8.tar.xz
Merge pull request #4955 from matrix-org/rav/merge_state_into_events
Combine the CurrentStateDeltaStream into the EventStream
Diffstat (limited to 'synapse/app/user_dir.py')
-rw-r--r--synapse/app/user_dir.py17
1 files changed, 10 insertions, 7 deletions
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index d1ab9512cd..355f5aa71d 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams.events import (
+    EventsStream,
+    EventsStreamCurrentStateRow,
+)
 from synapse.rest.client.v2_alpha import user_directory
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -73,19 +77,18 @@ class UserDirectorySlaveStore(
             prefilled_cache=curr_state_delta_prefill,
         )
 
-        self._current_state_delta_pos = events_max
-
     def stream_positions(self):
         result = super(UserDirectorySlaveStore, self).stream_positions()
-        result["current_state_deltas"] = self._current_state_delta_pos
         return result
 
     def process_replication_rows(self, stream_name, token, rows):
-        if stream_name == "current_state_deltas":
-            self._current_state_delta_pos = token
+        if stream_name == EventsStream.NAME:
+            self._stream_id_gen.advance(token)
             for row in rows:
+                if row.type != EventsStreamCurrentStateRow.TypeId:
+                    continue
                 self._curr_state_delta_stream_cache.entity_has_changed(
-                    row.room_id, token
+                    row.data.room_id, token
                 )
         return super(UserDirectorySlaveStore, self).process_replication_rows(
             stream_name, token, rows
@@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
         yield super(UserDirectoryReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
-        if stream_name == "current_state_deltas":
+        if stream_name == EventsStream.NAME:
             run_in_background(self._notify_directory)
 
     @defer.inlineCallbacks