summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-03-27 16:15:59 +0000
committerRichard van der Hoff <richard@matrix.org>2019-03-27 22:07:05 +0000
commit4b91c313a94a4a89998e097e79a96a4423cf1b9f (patch)
treeb031e7608c531078911f2e6b89ee4298099f8613 /synapse/app
parentMake EventStream rows have a type (diff)
downloadsynapse-4b91c313a94a4a89998e097e79a96a4423cf1b9f.tar.xz
Combine the CurrentStateDeltaStream into the EventStream
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/user_dir.py17
1 files changed, 10 insertions, 7 deletions
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index d1ab9512cd..355f5aa71d 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams.events import (
+    EventsStream,
+    EventsStreamCurrentStateRow,
+)
 from synapse.rest.client.v2_alpha import user_directory
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -73,19 +77,18 @@ class UserDirectorySlaveStore(
             prefilled_cache=curr_state_delta_prefill,
         )
 
-        self._current_state_delta_pos = events_max
-
     def stream_positions(self):
         result = super(UserDirectorySlaveStore, self).stream_positions()
-        result["current_state_deltas"] = self._current_state_delta_pos
         return result
 
     def process_replication_rows(self, stream_name, token, rows):
-        if stream_name == "current_state_deltas":
-            self._current_state_delta_pos = token
+        if stream_name == EventsStream.NAME:
+            self._stream_id_gen.advance(token)
             for row in rows:
+                if row.type != EventsStreamCurrentStateRow.TypeId:
+                    continue
                 self._curr_state_delta_stream_cache.entity_has_changed(
-                    row.room_id, token
+                    row.data.room_id, token
                 )
         return super(UserDirectorySlaveStore, self).process_replication_rows(
             stream_name, token, rows
@@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
         yield super(UserDirectoryReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
-        if stream_name == "current_state_deltas":
+        if stream_name == EventsStream.NAME:
             run_in_background(self._notify_directory)
 
     @defer.inlineCallbacks