diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-09-07 15:27:07 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-09-07 15:27:07 +0100 |
commit | 31a07d2335dd628afb32f71167849ad88685525a (patch) | |
tree | 5dc2079a0ca2b67f3cd5c243976b75900dcfc5e8 /synapse/storage | |
parent | Comment the add_messages storage functions (diff) | |
download | synapse-31a07d2335dd628afb32f71167849ad88685525a.tar.xz |
Add stream change caches for device messages
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/__init__.py | 24 | ||||
-rw-r--r-- | synapse/storage/deviceinbox.py | 25 |
2 files changed, 49 insertions, 0 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6c32773f25..6965daddc5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -182,6 +182,30 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + max_device_inbox_id = self._device_inbox_id_gen.get_current_token() + device_inbox_prefill, min_device_inbox_id = self._get_cache_dict( + db_conn, "device_inbox", + entity_column="user_id", + stream_column="stream_id", + max_value=max_device_inbox_id + ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_inbox_id, + prefilled_cache=device_inbox_prefill, + ) + # The federation outbox and the local device inbox uses the same + # stream_id generator. + device_outbox_prefill, min_device_outbox_id = self._get_cache_dict( + db_conn, "device_federation_outbox", + entity_column="destination", + stream_column="stream_id", + max_value=max_device_inbox_id, + ) + self._device_federation_outbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_outbox_id, + prefilled_cache=device_outbox_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 61da0e89e6..0d37bb961b 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -70,6 +70,14 @@ class DeviceInboxStore(SQLBaseStore): now_ms, stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) + for destination in remote_messages_by_destination.keys(): + self._device_federation_outbox_stream_cache.entity_has_changed( + destination, stream_id + ) defer.returnValue(self._device_inbox_id_gen.get_current_token()) @@ -115,6 +123,10 @@ class DeviceInboxStore(SQLBaseStore): now_ms, stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, messages_by_user_then_device): @@ -161,6 +173,12 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + has_changed = self._device_inbox_stream_cache.has_entity_changed( + user_id, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_device_txn(txn): sql = ( "SELECT stream_id, message_json FROM device_inbox" @@ -261,6 +279,13 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( + destination, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" |