From a55fa2047f813d639e2a0beed0c2d2738b0b639b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2017 15:40:04 +0000 Subject: Insert delta of current_state_events to be more efficient --- synapse/replication/slave/storage/events.py | 10 ---------- 1 file changed, 10 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 64f18bbb3e..b3f3bf7488 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -76,9 +76,6 @@ class SlavedEventStore(BaseSlavedStore): get_latest_event_ids_in_room = EventFederationStore.__dict__[ "get_latest_event_ids_in_room" ] - _get_current_state_for_key = StateStore.__dict__[ - "_get_current_state_for_key" - ] get_invited_rooms_for_user = RoomMemberStore.__dict__[ "get_invited_rooms_for_user" ] @@ -115,8 +112,6 @@ class SlavedEventStore(BaseSlavedStore): ) get_event = DataStore.get_event.__func__ get_events = DataStore.get_events.__func__ - get_current_state = DataStore.get_current_state.__func__ - get_current_state_for_key = DataStore.get_current_state_for_key.__func__ get_rooms_for_user_where_membership_is = ( DataStore.get_rooms_for_user_where_membership_is.__func__ ) @@ -248,7 +243,6 @@ class SlavedEventStore(BaseSlavedStore): def invalidate_caches_for_event(self, event, backfilled, reset_state): if reset_state: - self._get_current_state_for_key.invalidate_all() self.get_rooms_for_user.invalidate_all() self.get_users_in_room.invalidate((event.room_id,)) @@ -289,7 +283,3 @@ class SlavedEventStore(BaseSlavedStore): if (not event.internal_metadata.is_invite_from_remote() and event.internal_metadata.is_outlier()): return - - self._get_current_state_for_key.invalidate(( - event.room_id, event.type, event.state_key - )) -- cgit 1.5.1 From 252b503fc8626078141dc6b82eeff63607874347 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jan 2017 13:36:39 +0000 Subject: Hook device list updates to replication --- synapse/app/federation_sender.py | 3 +- synapse/app/synchrotron.py | 27 ++++++++++- synapse/handlers/device.py | 16 ------- synapse/handlers/sync.py | 35 ++++++++++---- synapse/replication/resource.py | 20 +++++++- synapse/replication/slave/storage/devices.py | 72 ++++++++++++++++++++++++++++ synapse/storage/devices.py | 15 ++++++ 7 files changed, 159 insertions(+), 29 deletions(-) create mode 100644 synapse/replication/slave/storage/devices.py (limited to 'synapse/replication') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index ec06620efb..411e47d98d 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -30,6 +30,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.util.async import sleep @@ -56,7 +57,7 @@ logger = logging.getLogger("synapse.app.appservice") class FederationSenderSlaveStore( SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, - SlavedRegistrationStore, + SlavedRegistrationStore, SlavedDeviceStore, ): pass diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 4dfc2dc648..9d250502e0 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -39,6 +39,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.room import RoomStore from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore @@ -77,6 +78,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, SlavedDeviceInboxStore, + SlavedDeviceStore, RoomStore, BaseSlavedStore, ClientIpStore, # After BaseSlavedStore because the constructor is different @@ -380,6 +382,28 @@ class SynchrotronServer(HomeServer): stream_key, position, users=users, rooms=rooms ) + @defer.inlineCallbacks + def notify_device_list_update(result): + stream = result.get("device_lists") + if not stream: + return + + position_index = stream["field_names"].index("position") + user_index = stream["field_names"].index("user_id") + + for row in stream["rows"]: + logger.info("Handling device list row: %r", row) + position = row[position_index] + user_id = row[user_index] + + rooms = yield store.get_rooms_for_user(user_id) + room_ids = [r.room_id for r in rooms] + + notifier.on_new_event( + "device_list_key", position, rooms=room_ids, + ) + + @defer.inlineCallbacks def notify(result): stream = result.get("events") if stream: @@ -417,6 +441,7 @@ class SynchrotronServer(HomeServer): notify_from_stream( result, "to_device", "to_device_key", user="user_id" ) + yield notify_device_list_update(result) while True: try: @@ -427,7 +452,7 @@ class SynchrotronServer(HomeServer): yield store.process_replication(result) typing_handler.process_replication(result) yield presence_handler.process_replication(result) - notify(result) + yield notify(result) except: logger.exception("Error replicating from %r", replication_url) yield sleep(5) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ed077c9a76..6fefb85890 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -220,22 +220,6 @@ class DeviceHandler(BaseHandler): for host in hosts: self.federation_sender.send_device_messages(host) - @defer.inlineCallbacks - def get_device_list_changes(self, user_id, room_ids, from_key): - """For a user and their joined rooms, calculate which device updates - we need to return. - """ - room_ids = frozenset(room_ids) - - user_ids_changed = set() - changed = yield self.store.get_user_whose_devices_changed(from_key) - for other_user_id in changed: - other_rooms = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(e.room_id for e in other_rooms): - user_ids_changed.add(other_user_id) - - defer.returnValue(user_ids_changed) - @defer.inlineCallbacks def _incoming_device_list_update(self, origin, edu_content): user_id = edu_content["user_id"] diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 06bf626367..9199f20817 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -144,7 +144,6 @@ class SyncHandler(object): self.clock = hs.get_clock() self.response_cache = ResponseCache(hs) self.state = hs.get_state_handler() - self.device_handler = hs.get_device_handler() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): @@ -546,15 +545,9 @@ class SyncHandler(object): yield self._generate_sync_entry_for_to_device(sync_result_builder) - if since_token and since_token.device_list_key: - user_id = sync_config.user.to_string() - rooms = yield self.store.get_rooms_for_user(user_id) - joined_room_ids = set(r.room_id for r in rooms) - device_lists = yield self.device_handler.get_device_list_changes( - user_id, joined_room_ids, since_token.device_list_key - ) - else: - device_lists = [] + device_lists = yield self._generate_sync_entry_for_device_list( + sync_result_builder + ) defer.returnValue(SyncResult( presence=sync_result_builder.presence, @@ -567,6 +560,28 @@ class SyncHandler(object): next_batch=sync_result_builder.now_token, )) + @defer.inlineCallbacks + def _generate_sync_entry_for_device_list(self, sync_result_builder): + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + + if since_token and since_token.device_list_key: + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = set(r.room_id for r in rooms) + + user_ids_changed = set() + changed = yield self.store.get_user_whose_devices_changed( + since_token.device_list_key + ) + for other_user_id in changed: + other_rooms = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(e.room_id for e in other_rooms): + user_ids_changed.add(other_user_id) + + defer.returnValue(user_ids_changed) + else: + defer.returnValue([]) + @defer.inlineCallbacks def _generate_sync_entry_for_to_device(self, sync_result_builder): """Generates the portion of the sync response. Populates diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 4616e9b34a..36548c5eda 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -46,6 +46,7 @@ STREAM_NAMES = ( ("to_device",), ("public_rooms",), ("federation",), + ("device_lists",), ) @@ -140,6 +141,7 @@ class ReplicationResource(Resource): caches_token = self.store.get_cache_stream_token() public_rooms_token = self.store.get_current_public_room_stream_id() federation_token = self.federation_sender.get_current_token() + device_list_token = self.store.get_device_stream_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -155,6 +157,7 @@ class ReplicationResource(Resource): int(stream_token.to_device_key), int(public_rooms_token), int(federation_token), + int(device_list_token), )) @request_handler() @@ -214,6 +217,7 @@ class ReplicationResource(Resource): yield self.caches(writer, current_token, limit, request_streams) yield self.to_device(writer, current_token, limit, request_streams) yield self.public_rooms(writer, current_token, limit, request_streams) + yield self.device_lists(writer, current_token, limit, request_streams) self.federation(writer, current_token, limit, request_streams, federation_ack) self.streams(writer, current_token, request_streams) @@ -495,6 +499,20 @@ class ReplicationResource(Resource): "position", "type", "content", ), position=upto_token) + @defer.inlineCallbacks + def device_lists(self, writer, current_token, limit, request_streams): + current_position = current_token.device_lists + + device_lists = request_streams.get("device_lists") + + if device_lists is not None and device_lists != current_position: + changes = yield self.store.get_users_and_hosts_device_list_changes( + device_lists, + ) + writer.write_header_and_rows("device_lists", changes, ( + "position", "user_id", "destination", + ), position=current_position) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -527,7 +545,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", "push_rules", "pushers", "state", "caches", "to_device", "public_rooms", - "federation", + "federation", "device_lists", ))): __slots__ = [] diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py new file mode 100644 index 0000000000..ca46aa17b6 --- /dev/null +++ b/synapse/replication/slave/storage/devices.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class SlavedDeviceStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedDeviceStore, self).__init__(db_conn, hs) + + self.hs = hs + + self._device_list_id_gen = SlavedIdTracker( + db_conn, "device_lists_stream", "stream_id", + ) + device_list_max = self._device_list_id_gen.get_current_token() + self._device_list_stream_cache = StreamChangeCache( + "DeviceListStreamChangeCache", device_list_max, + ) + self._device_list_federation_stream_cache = StreamChangeCache( + "DeviceListFederationStreamChangeCache", device_list_max, + ) + + get_device_stream_token = DataStore.get_device_stream_token.__func__ + get_user_whose_devices_changed = DataStore.get_user_whose_devices_changed.__func__ + get_devices_by_remote = DataStore.get_devices_by_remote.__func__ + _get_devices_by_remote_txn = DataStore._get_devices_by_remote_txn.__func__ + _get_e2e_device_keys_txn = DataStore._get_e2e_device_keys_txn.__func__ + mark_as_sent_devices_by_remote = DataStore.mark_as_sent_devices_by_remote.__func__ + _mark_as_sent_devices_by_remote_txn = ( + DataStore._mark_as_sent_devices_by_remote_txn.__func__ + ) + + def stream_positions(self): + result = super(SlavedDeviceStore, self).stream_positions() + result["device_lists"] = self._device_list_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("device_lists") + if stream: + self._device_list_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + stream_id = row[0] + user_id = row[1] + destination = row[2] + + self._device_list_stream_cache.entity_has_changed( + user_id, stream_id + ) + + if destination: + self._device_list_federation_stream_cache.entity_has_changed( + destination, stream_id + ) + + return super(SlavedDeviceStore, self).process_replication(result) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 00317b0c1f..2b2cebacfa 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -458,6 +458,21 @@ class DeviceStore(SQLBaseStore): rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) defer.returnValue(set(row["user_id"] for row in rows)) + def get_users_and_hosts_device_list_changes(self, from_key): + """Return a list of `(stream_id, user_id, destination)` which is the + combined list of changes to devices, and which destinations need to be + poked. `destination` may be None if no destinations need to be poked. + """ + sql = """ + SELECT stream_id, user_id, destination FROM device_lists_stream + LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) + WHERE stream_id > ? + """ + return self._execute( + "get_users_and_hosts_device_list", None, + sql, from_key, + ) + @defer.inlineCallbacks def add_device_change_to_streams(self, user_id, device_ids, hosts): """Persist that a user's devices have been updated, and which hosts -- cgit 1.5.1 From 3670025e641e338de14a012a24d0ceb1cade194c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Jan 2017 14:11:31 +0000 Subject: Rename func --- synapse/replication/resource.py | 2 +- synapse/storage/devices.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 36548c5eda..a30e647474 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -506,7 +506,7 @@ class ReplicationResource(Resource): device_lists = request_streams.get("device_lists") if device_lists is not None and device_lists != current_position: - changes = yield self.store.get_users_and_hosts_device_list_changes( + changes = yield self.store.get_all_device_list_changes_for_remotes( device_lists, ) writer.write_header_and_rows("device_lists", changes, ( diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index c05ca7c5e0..e68ee50152 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -465,7 +465,7 @@ class DeviceStore(SQLBaseStore): rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) defer.returnValue(set(row["user_id"] for row in rows)) - def get_users_and_hosts_device_list_changes(self, from_key): + def get_all_device_list_changes_for_remotes(self, from_key): """Return a list of `(stream_id, user_id, destination)` which is the combined list of changes to devices, and which destinations need to be poked. `destination` may be None if no destinations need to be poked. -- cgit 1.5.1 From 458b6f473314a81d7e671fc2fc8c30d3259924c4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 16:09:03 +0000 Subject: Only invalidate membership caches based on the cache stream Before we completely invalidated get_users_in_room whenever we updated any current_state_events table. This was way too aggressive. --- synapse/replication/resource.py | 3 --- synapse/replication/slave/storage/events.py | 21 +++++---------------- synapse/storage/events.py | 20 -------------------- synapse/storage/roommember.py | 2 -- 4 files changed, 5 insertions(+), 41 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index a30e647474..d8eb14592b 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -299,9 +299,6 @@ class ReplicationResource(Resource): "backward_ex_outliers", res.backward_ex_outliers, ("position", "event_id", "state_group"), ) - writer.write_header_and_rows( - "state_resets", res.state_resets, ("position",), - ) @defer.inlineCallbacks def presence(self, writer, current_token, request_streams): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index b3f3bf7488..15a025a019 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -192,10 +192,6 @@ class SlavedEventStore(BaseSlavedStore): return result def process_replication(self, result): - state_resets = set( - r[0] for r in result.get("state_resets", {"rows": []})["rows"] - ) - stream = result.get("events") if stream: self._stream_id_gen.advance(int(stream["position"])) @@ -205,7 +201,7 @@ class SlavedEventStore(BaseSlavedStore): for row in stream["rows"]: self._process_replication_row( - row, backfilled=False, state_resets=state_resets + row, backfilled=False, ) stream = result.get("backfill") @@ -213,7 +209,7 @@ class SlavedEventStore(BaseSlavedStore): self._backfill_id_gen.advance(-int(stream["position"])) for row in stream["rows"]: self._process_replication_row( - row, backfilled=True, state_resets=state_resets + row, backfilled=True, ) stream = result.get("forward_ex_outliers") @@ -232,20 +228,15 @@ class SlavedEventStore(BaseSlavedStore): return super(SlavedEventStore, self).process_replication(result) - def _process_replication_row(self, row, backfilled, state_resets): - position = row[0] + def _process_replication_row(self, row, backfilled): internal = json.loads(row[1]) event_json = json.loads(row[2]) event = FrozenEvent(event_json, internal_metadata_dict=internal) self.invalidate_caches_for_event( - event, backfilled, reset_state=position in state_resets + event, backfilled, ) - def invalidate_caches_for_event(self, event, backfilled, reset_state): - if reset_state: - self.get_rooms_for_user.invalidate_all() - self.get_users_in_room.invalidate((event.room_id,)) - + def invalidate_caches_for_event(self, event, backfilled): self._invalidate_get_event_cache(event.event_id) self.get_latest_event_ids_in_room.invalidate((event.room_id,)) @@ -267,8 +258,6 @@ class SlavedEventStore(BaseSlavedStore): self._invalidate_get_event_cache(event.redacts) if event.type == EventTypes.Member: - self.get_rooms_for_user.invalidate((event.state_key,)) - self.get_users_in_room.invalidate((event.room_id,)) self._membership_stream_cache.entity_has_changed( event.state_key, event.internal_metadata.stream_ordering ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6685b9da1c..f4352b326b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -572,14 +572,6 @@ class EventsStore(SQLBaseStore): txn, self.get_users_in_room, (room_id,) ) - # Add an entry to the current_state_resets table to record the point - # where we clobbered the current state - self._simple_insert_txn( - txn, - table="current_state_resets", - values={"event_stream_ordering": max_stream_order} - ) - for room_id, new_extrem in new_forward_extremeties.items(): self._simple_delete_txn( txn, @@ -1610,15 +1602,6 @@ class EventsStore(SQLBaseStore): else: upper_bound = current_forward_id - sql = ( - "SELECT event_stream_ordering FROM current_state_resets" - " WHERE ? < event_stream_ordering" - " AND event_stream_ordering <= ?" - " ORDER BY event_stream_ordering ASC" - ) - txn.execute(sql, (last_forward_id, upper_bound)) - state_resets = txn.fetchall() - sql = ( "SELECT event_stream_ordering, event_id, state_group" " FROM ex_outlier_stream" @@ -1630,7 +1613,6 @@ class EventsStore(SQLBaseStore): forward_ex_outliers = txn.fetchall() else: new_forward_events = [] - state_resets = [] forward_ex_outliers = [] sql = ( @@ -1670,7 +1652,6 @@ class EventsStore(SQLBaseStore): return AllNewEventsResult( new_forward_events, new_backfill_events, forward_ex_outliers, backward_ex_outliers, - state_resets, ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) @@ -1896,5 +1877,4 @@ class EventsStore(SQLBaseStore): AllNewEventsResult = namedtuple("AllNewEventsResult", [ "new_forward_events", "new_backfill_events", "forward_ex_outliers", "backward_ex_outliers", - "state_resets" ]) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0fdcf29085..845def8467 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -66,8 +66,6 @@ class RoomMemberStore(SQLBaseStore): ) for event in events: - txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) - txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering -- cgit 1.5.1 From 0f3e296cb7d63aa8b4cfcaa54a0b2a63fbe7c943 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Feb 2017 15:02:03 +0000 Subject: Fix replication --- synapse/replication/slave/storage/events.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 15a025a019..d72ff6055c 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -73,6 +73,9 @@ class SlavedEventStore(BaseSlavedStore): # to reach inside the __dict__ to extract them. get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] + get_users_who_share_room_with_user = ( + RoomMemberStore.__dict__["get_users_who_share_room_with_user"] + ) get_latest_event_ids_in_room = EventFederationStore.__dict__[ "get_latest_event_ids_in_room" ] -- cgit 1.5.1