diff options
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/synchrotron.py | 5 | ||||
-rw-r--r-- | synapse/app/user_dir.py | 17 |
2 files changed, 14 insertions, 8 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 9163b56d86..5388def28a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -48,6 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams.events import EventsStreamEventRow from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.room import RoomInitialSyncRestServlet @@ -369,7 +370,9 @@ class SyncReplicationHandler(ReplicationClientHandler): # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. for row in rows: - event = yield self.store.get_event(row.event_id) + if row.type != EventsStreamEventRow.TypeId: + continue + event = yield self.store.get_event(row.data.event_id) extra_users = () if event.type == EventTypes.Member: extra_users = (event.state_key,) diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index d1ab9512cd..355f5aa71d 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams.events import ( + EventsStream, + EventsStreamCurrentStateRow, +) from synapse.rest.client.v2_alpha import user_directory from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -73,19 +77,18 @@ class UserDirectorySlaveStore( prefilled_cache=curr_state_delta_prefill, ) - self._current_state_delta_pos = events_max - def stream_positions(self): result = super(UserDirectorySlaveStore, self).stream_positions() - result["current_state_deltas"] = self._current_state_delta_pos return result def process_replication_rows(self, stream_name, token, rows): - if stream_name == "current_state_deltas": - self._current_state_delta_pos = token + if stream_name == EventsStream.NAME: + self._stream_id_gen.advance(token) for row in rows: + if row.type != EventsStreamCurrentStateRow.TypeId: + continue self._curr_state_delta_stream_cache.entity_has_changed( - row.room_id, token + row.data.room_id, token ) return super(UserDirectorySlaveStore, self).process_replication_rows( stream_name, token, rows @@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): yield super(UserDirectoryReplicationHandler, self).on_rdata( stream_name, token, rows ) - if stream_name == "current_state_deltas": + if stream_name == EventsStream.NAME: run_in_background(self._notify_directory) @defer.inlineCallbacks |