diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2019-03-28 18:32:13 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-28 18:32:13 +0000 |
commit | 902cdc63b6668bebbf66dc084de057386da59bd8 (patch) | |
tree | 408a8187c455a4cfc47e8b4fbf6e11aaf0c3c893 /synapse/app/user_dir.py | |
parent | Merge pull request #4954 from matrix-org/rav/refactor_parse_row (diff) | |
parent | changelog (diff) | |
download | synapse-902cdc63b6668bebbf66dc084de057386da59bd8.tar.xz |
Merge pull request #4955 from matrix-org/rav/merge_state_into_events
Combine the CurrentStateDeltaStream into the EventStream
Diffstat (limited to 'synapse/app/user_dir.py')
-rw-r--r-- | synapse/app/user_dir.py | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index d1ab9512cd..355f5aa71d 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams.events import ( + EventsStream, + EventsStreamCurrentStateRow, +) from synapse.rest.client.v2_alpha import user_directory from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -73,19 +77,18 @@ class UserDirectorySlaveStore( prefilled_cache=curr_state_delta_prefill, ) - self._current_state_delta_pos = events_max - def stream_positions(self): result = super(UserDirectorySlaveStore, self).stream_positions() - result["current_state_deltas"] = self._current_state_delta_pos return result def process_replication_rows(self, stream_name, token, rows): - if stream_name == "current_state_deltas": - self._current_state_delta_pos = token + if stream_name == EventsStream.NAME: + self._stream_id_gen.advance(token) for row in rows: + if row.type != EventsStreamCurrentStateRow.TypeId: + continue self._curr_state_delta_stream_cache.entity_has_changed( - row.room_id, token + row.data.room_id, token ) return super(UserDirectorySlaveStore, self).process_replication_rows( stream_name, token, rows @@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): yield super(UserDirectoryReplicationHandler, self).on_rdata( stream_name, token, rows ) - if stream_name == "current_state_deltas": + if stream_name == EventsStream.NAME: run_in_background(self._notify_directory) @defer.inlineCallbacks |