summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2016-09-09 19:11:34 +0100
committerDavid Baker <dave@matrix.org>2016-09-09 19:11:34 +0100
commitb91e2833b3b59d6a8d104d8f6304383e68de2086 (patch)
tree4df820b06f89d7cb98ec6f75a99be4eebbd92ef6 /synapse/replication
parentAdd index to event_push_actions (diff)
parentMerge pull request #1096 from matrix-org/markjh/get_access_token (diff)
downloadsynapse-b91e2833b3b59d6a8d104d8f6304383e68de2086.tar.xz
Merge remote-tracking branch 'origin/develop' into dbkr/make_notif_highlight_query_fast
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/resource.py4
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py13
2 files changed, 14 insertions, 3 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py

index 1ed9034bcb..857bc9795c 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py
@@ -181,7 +181,7 @@ class ReplicationResource(Resource): def replicate(self, request_streams, limit): writer = _Writer() current_token = yield self.current_replication_token() - logger.info("Replicating up to %r", current_token) + logger.debug("Replicating up to %r", current_token) yield self.account_data(writer, current_token, limit, request_streams) yield self.events(writer, current_token, limit, request_streams) @@ -195,7 +195,7 @@ class ReplicationResource(Resource): yield self.to_device(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) - logger.info("Replicated %d rows", writer.total) + logger.debug("Replicated %d rows", writer.total) defer.returnValue(writer.finish()) def streams(self, writer, current_token, request_streams): diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 64d8eb2af1..3bfd5e8213 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -16,13 +16,18 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker from synapse.storage import DataStore +from synapse.util.caches.stream_change_cache import StreamChangeCache class SlavedDeviceInboxStore(BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedDeviceInboxStore, self).__init__(db_conn, hs) self._device_inbox_id_gen = SlavedIdTracker( - db_conn, "device_inbox", "stream_id", + db_conn, "device_max_stream_id", "stream_id", + ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", + self._device_inbox_id_gen.get_current_token() ) get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__ @@ -38,5 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore): stream = result.get("to_device") if stream: self._device_inbox_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + stream_id = row[0] + user_id = row[1] + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) return super(SlavedDeviceInboxStore, self).process_replication(result)