From 328378f9cb809e1428221dfaadd68bb9278b2123 Mon Sep 17 00:00:00 2001 From: Slipeer Date: Thu, 11 May 2017 11:42:08 +0300 Subject: Fix users claimed non-exclusively by an app service don't get notifications #2211 --- synapse/storage/appservice.py | 7 +++++-- synapse/storage/push_rule.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 514570561f..0e9e8d3452 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -39,12 +39,15 @@ class ApplicationServiceStore(SQLBaseStore): def get_app_services(self): return self.services_cache - def get_if_app_services_interested_in_user(self, user_id): + def get_if_app_services_interested_in_user(self, user_id, exclusive=False): """Check if the user is one associated with an app service """ for service in self.services_cache: if service.is_interested_in_user(user_id): - return True + if exclusive: + return service.is_exclusive_user(user_id) + else: + return True return False def get_app_service_by_user_id(self, user_id): diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 0a819d32c5..65bad3fad6 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -163,7 +163,7 @@ class PushRuleStore(SQLBaseStore): local_users_in_room = set( u for u in users_in_room if self.hs.is_mine_id(u) - and not self.get_if_app_services_interested_in_user(u) + and not self.get_if_app_services_interested_in_user(u, exclusive=True) ) # users in the room who have pushers need to get push rules run because -- cgit 1.4.1 From 6b95e35e969f730f7f92f6c3f814691c4bedaeff Mon Sep 17 00:00:00 2001 From: Krombel Date: Thu, 11 May 2017 16:05:30 +0200 Subject: add check to only add a new filter if the same filter does not exist previously Signed-off-by: Matthias Kesler --- synapse/storage/filtering.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index a2ccc66ea7..e7ec43cf97 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -51,6 +51,15 @@ class FilteringStore(SQLBaseStore): # Need an atomic transaction to SELECT the maximal ID so far then # INSERT a new one def _do_txn(txn): + sql = ( + "SELECT filter_id FROM user_filters " + "WHERE user_id = ? AND filter_json = ?" + ) + txn.execute(sql, (user_localpart,def_json)) + filter_id = txn.fetchone()[0] + if filter_id is not None: + return filter_id + sql = ( "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?" -- cgit 1.4.1 From eb7cbf27bc8723f5761f0b8bb4c57a7ceb0fc3e5 Mon Sep 17 00:00:00 2001 From: Krombel Date: Fri, 12 May 2017 12:09:42 +0200 Subject: insert whitespace to fix travis build --- synapse/storage/filtering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index e7ec43cf97..9101f57288 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore): "SELECT filter_id FROM user_filters " "WHERE user_id = ? AND filter_json = ?" ) - txn.execute(sql, (user_localpart,def_json)) + txn.execute(sql, (user_localpart, def_json)) filter_id = txn.fetchone()[0] if filter_id is not None: return filter_id -- cgit 1.4.1 From 64953c8ed235b38a31f7b2909493e63267bd2cfd Mon Sep 17 00:00:00 2001 From: Krombel Date: Mon, 15 May 2017 18:36:37 +0200 Subject: avoid access-error if no filter_id matches --- synapse/storage/filtering.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 9101f57288..73f7a078fa 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -56,9 +56,9 @@ class FilteringStore(SQLBaseStore): "WHERE user_id = ? AND filter_json = ?" ) txn.execute(sql, (user_localpart, def_json)) - filter_id = txn.fetchone()[0] - if filter_id is not None: - return filter_id + filter_id_response = txn.fetchone() + if filter_id_response is not None: + return filter_id_response[0] sql = ( "SELECT MAX(filter_id) FROM user_filters " -- cgit 1.4.1 From bbfe4e996c9b9729f19d5b104dc6abfe120531b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 May 2017 14:31:23 +0100 Subject: Make get_state_groups_from_groups faster. Most of the time was spent copying a dict to filter out sentinel values that indicated that keys did not exist in the dict. The sentinel values were added to ensure that we cached the non-existence of keys. By updating DictionaryCache to keep track of which keys were known to not exist itself we can remove a dictionary copy. --- synapse/storage/state.py | 40 +++++++---------------- synapse/util/caches/dictionary_cache.py | 57 ++++++++++++++++++++++++++------- tests/util/test_dict_cache.py | 2 +- 3 files changed, 58 insertions(+), 41 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 85acf2ad1e..a7c3d401d4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -563,20 +563,22 @@ class StateStore(SQLBaseStore): where a `state_key` of `None` matches all state_keys for the `type`. """ - is_all, state_dict_ids = self._state_group_cache.get(group) + is_all, known_absent, state_dict_ids = self._state_group_cache.get(group) type_to_key = {} missing_types = set() + for typ, state_key in types: + key = (typ, state_key) if state_key is None: type_to_key[typ] = None - missing_types.add((typ, state_key)) + missing_types.add(key) else: if type_to_key.get(typ, object()) is not None: type_to_key.setdefault(typ, set()).add(state_key) - if (typ, state_key) not in state_dict_ids: - missing_types.add((typ, state_key)) + if key not in state_dict_ids and key not in known_absent: + missing_types.add(key) sentinel = object() @@ -590,7 +592,7 @@ class StateStore(SQLBaseStore): return True return False - got_all = not (missing_types or types is None) + got_all = is_all or not missing_types return { k: v for k, v in state_dict_ids.iteritems() @@ -607,7 +609,7 @@ class StateStore(SQLBaseStore): Args: group: The state group to lookup """ - is_all, state_dict_ids = self._state_group_cache.get(group) + is_all, _, state_dict_ids = self._state_group_cache.get(group) return state_dict_ids, is_all @@ -624,7 +626,7 @@ class StateStore(SQLBaseStore): missing_groups = [] if types is not None: for group in set(groups): - state_dict_ids, missing_types, got_all = self._get_some_state_from_cache( + state_dict_ids, _, got_all = self._get_some_state_from_cache( group, types ) results[group] = state_dict_ids @@ -653,19 +655,7 @@ class StateStore(SQLBaseStore): # Now we want to update the cache with all the things we fetched # from the database. for group, group_state_dict in group_to_state_dict.iteritems(): - if types: - # We delibrately put key -> None mappings into the cache to - # cache absence of the key, on the assumption that if we've - # explicitly asked for some types then we will probably ask - # for them again. - state_dict = { - (intern_string(etype), intern_string(state_key)): None - for (etype, state_key) in types - } - state_dict.update(results[group]) - results[group] = state_dict - else: - state_dict = results[group] + state_dict = results[group] state_dict.update( ((intern_string(k[0]), intern_string(k[1])), to_ascii(v)) @@ -677,17 +667,9 @@ class StateStore(SQLBaseStore): key=group, value=state_dict, full=(types is None), + known_absent=types, ) - # Remove all the entries with None values. The None values were just - # used for bookkeeping in the cache. - for group, state_dict in results.iteritems(): - results[group] = { - key: event_id - for key, event_id in state_dict.iteritems() - if event_id - } - defer.returnValue(results) def get_next_state_group(self): diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index cb6933c61c..d4105822b3 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -23,7 +23,17 @@ import logging logger = logging.getLogger(__name__) -class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "value"))): +class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "known_absent", "value"))): + """Returned when getting an entry from the cache + + Attributes: + full (bool): Whether the cache has the full or dict or just some keys. + If not full then not all requested keys will necessarily be present + in `value` + known_absent (set): Keys that were looked up in the dict and were not + there. + value (dict): The full or partial dict value + """ def __len__(self): return len(self.value) @@ -58,21 +68,31 @@ class DictionaryCache(object): ) def get(self, key, dict_keys=None): + """Fetch an entry out of the cache + + Args: + key + dict_key(list): If given a set of keys then return only those keys + that exist in the cache. + + Returns: + DictionaryEntry + """ entry = self.cache.get(key, self.sentinel) if entry is not self.sentinel: self.metrics.inc_hits() if dict_keys is None: - return DictionaryEntry(entry.full, dict(entry.value)) + return DictionaryEntry(entry.full, entry.known_absent, dict(entry.value)) else: - return DictionaryEntry(entry.full, { + return DictionaryEntry(entry.full, entry.known_absent, { k: entry.value[k] for k in dict_keys if k in entry.value }) self.metrics.inc_misses() - return DictionaryEntry(False, {}) + return DictionaryEntry(False, set(), {}) def invalidate(self, key): self.check_thread() @@ -87,19 +107,34 @@ class DictionaryCache(object): self.sequence += 1 self.cache.clear() - def update(self, sequence, key, value, full=False): + def update(self, sequence, key, value, full=False, known_absent=None): + """Updates the entry in the cache + + Args: + sequence + key + value (dict): The value to update the cache with. + full (bool): Whether the given value is the full dict, or just a + partial subset there of. If not full then any existing entries + for the key will be updated. + known_absent (set): Set of keys that we know don't exist in the full + dict. + """ self.check_thread() if self.sequence == sequence: # Only update the cache if the caches sequence number matches the # number that the cache had before the SELECT was started (SYN-369) + if known_absent is None: + known_absent = set() if full: - self._insert(key, value) + self._insert(key, value, known_absent) else: - self._update_or_insert(key, value) + self._update_or_insert(key, value, known_absent) - def _update_or_insert(self, key, value): - entry = self.cache.setdefault(key, DictionaryEntry(False, {})) + def _update_or_insert(self, key, value, known_absent): + entry = self.cache.setdefault(key, DictionaryEntry(False, set(), {})) entry.value.update(value) + entry.known_absent.update(known_absent) - def _insert(self, key, value): - self.cache[key] = DictionaryEntry(True, value) + def _insert(self, key, value, known_absent): + self.cache[key] = DictionaryEntry(True, known_absent, value) diff --git a/tests/util/test_dict_cache.py b/tests/util/test_dict_cache.py index 272b71034a..bc92f85fa6 100644 --- a/tests/util/test_dict_cache.py +++ b/tests/util/test_dict_cache.py @@ -28,7 +28,7 @@ class DictCacheTestCase(unittest.TestCase): key = "test_simple_cache_hit_full" v = self.cache.get(key) - self.assertEqual((False, {}), v) + self.assertEqual((False, set(), {}), v) seq = self.cache.sequence test_value = {"test": "test_simple_cache_hit_full"} -- cgit 1.4.1 From 760625acbaef9ae7032ba5e59e91979f454febef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 May 2017 16:34:41 +0100 Subject: Make get_if_app_services_interested_in_user faster --- synapse/appservice/__init__.py | 10 ++++++++++ synapse/push/bulk_push_rule_evaluator.py | 4 +--- synapse/storage/appservice.py | 31 ++++++++++++++++++++++--------- synapse/storage/push_rule.py | 2 +- 4 files changed, 34 insertions(+), 13 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 7346206bb1..b989007314 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -241,6 +241,16 @@ class ApplicationService(object): def is_exclusive_room(self, room_id): return self._is_exclusive(ApplicationService.NS_ROOMS, room_id) + def get_exlusive_user_regexes(self): + """Get the list of regexes used to determine if a user is exclusively + registered by the AS + """ + return [ + regex_obj["regex"] + for regex_obj in self.namespaces[ApplicationService.NS_USERS] + if regex_obj["exclusive"] + ] + def is_rate_limited(self): return self.rate_limited diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7f04d5668f..386d7bed81 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -251,9 +251,7 @@ class RulesForRoom(object): if not self.is_mine_id(user_id): continue - if self.store.get_if_app_services_interested_in_user( - user_id, exclusive=True - ): + if self.store.get_if_app_services_interested_in_user(user_id): continue # If a user has left a room we remove their push rule. If they diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 0e9e8d3452..532df736a5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import re import simplejson as json from twisted.internet import defer @@ -36,19 +37,31 @@ class ApplicationServiceStore(SQLBaseStore): hs.config.app_service_config_files ) + # We precompie a regex constructed from all the regexes that the AS's + # have registered for exclusive users. + exclusive_user_regexes = [ + regex.pattern + for service in self.services_cache + for regex in service.get_exlusive_user_regexes() + ] + if exclusive_user_regexes: + exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes) + self.exclusive_user_regex = re.compile(exclusive_user_regex) + else: + # We handle this case specially otherwise the constructed regex + # will always match + self.exclusive_user_regex = None + def get_app_services(self): return self.services_cache - def get_if_app_services_interested_in_user(self, user_id, exclusive=False): - """Check if the user is one associated with an app service + def get_if_app_services_interested_in_user(self, user_id): + """Check if the user is one associated with an app service (exclusively) """ - for service in self.services_cache: - if service.is_interested_in_user(user_id): - if exclusive: - return service.is_exclusive_user(user_id) - else: - return True - return False + if self.exclusive_user_regex: + return bool(self.exclusive_user_regex.match(user_id)) + else: + return False def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 65bad3fad6..0a819d32c5 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -163,7 +163,7 @@ class PushRuleStore(SQLBaseStore): local_users_in_room = set( u for u in users_in_room if self.hs.is_mine_id(u) - and not self.get_if_app_services_interested_in_user(u, exclusive=True) + and not self.get_if_app_services_interested_in_user(u) ) # users in the room who have pushers need to get push rules run because -- cgit 1.4.1 From 107ac7ac9690e638f62924eba6c29d192632c75a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 May 2017 17:17:53 +0100 Subject: Increase size of push rule caches --- synapse/storage/push_rule.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 0a819d32c5..8758b1c0c7 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -49,7 +49,7 @@ def _load_rules(rawrules, enabled_map): class PushRuleStore(SQLBaseStore): - @cachedInlineCallbacks() + @cachedInlineCallbacks(max_entries=5000) def get_push_rules_for_user(self, user_id): rows = yield self._simple_select_list( table="push_rules", @@ -73,7 +73,7 @@ class PushRuleStore(SQLBaseStore): defer.returnValue(rules) - @cachedInlineCallbacks() + @cachedInlineCallbacks(max_entries=5000) def get_push_rules_enabled_for_user(self, user_id): results = yield self._simple_select_list( table="push_rules_enable", -- cgit 1.4.1 From 58ebb96cce1992a79cc363d4157643c5db569396 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 May 2017 14:38:50 +0100 Subject: Fix invalidation of get_users_with_read_receipts_in_room --- synapse/storage/receipts.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index efb90c3c91..f42b8014c7 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -45,7 +45,9 @@ class ReceiptsStore(SQLBaseStore): return # Returns an ObservableDeferred - res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None) + res = self.get_users_with_read_receipts_in_room.cache.get( + room_id, None, update_metrics=False, + ) if res: if isinstance(res, defer.Deferred) and res.called: -- cgit 1.4.1 From 74bf4ee7bf4467453854fb554d711f3bec5bfd2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 16:19:22 +0100 Subject: Stream count_e2e_one_time_keys cache invalidation --- synapse/storage/end_to_end_keys.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index e00f31da2b..ad170c951e 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -185,8 +185,8 @@ class EndToEndKeyStore(SQLBaseStore): for algorithm, key_id, json_bytes in new_keys ], ) - txn.call_after( - self.count_e2e_one_time_keys.invalidate, (user_id, device_id,) + self._invalidate_cache_and_stream( + txn, self.count_e2e_one_time_keys, (user_id, device_id,) ) yield self.runInteraction( "add_e2e_one_time_keys_insert", _add_e2e_one_time_keys @@ -245,16 +245,21 @@ class EndToEndKeyStore(SQLBaseStore): "claim_e2e_one_time_keys", _claim_e2e_one_time_keys ) - @defer.inlineCallbacks def delete_e2e_keys_by_device(self, user_id, device_id): - yield self._simple_delete( - table="e2e_device_keys_json", - keyvalues={"user_id": user_id, "device_id": device_id}, - desc="delete_e2e_device_keys_by_device" - ) - yield self._simple_delete( - table="e2e_one_time_keys_json", - keyvalues={"user_id": user_id, "device_id": device_id}, - desc="delete_e2e_one_time_keys_by_device" + def delete_e2e_keys_by_device_txn(txn): + self._simple_delete_txn( + txn, + table="e2e_device_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + ) + self._simple_delete_txn( + txn, + table="e2e_one_time_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + ) + self._invalidate_cache_and_stream( + txn, self.count_e2e_one_time_keys, (user_id, device_id,) + ) + return self.runInteraction( + "delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn ) - self.count_e2e_one_time_keys.invalidate((user_id, device_id,)) -- cgit 1.4.1 From e6618ece2d7fa857e9649584483e20d2e0a82a81 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 May 2017 09:36:52 +0100 Subject: Missed an invalidation --- synapse/storage/end_to_end_keys.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index ad170c951e..f3e5331fde 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -240,6 +240,9 @@ class EndToEndKeyStore(SQLBaseStore): txn.call_after( self.count_e2e_one_time_keys.invalidate, (user_id, device_id,) ) + self._invalidate_cache_and_stream( + txn, self.count_e2e_one_time_keys, (user_id, device_id,) + ) return result return self.runInteraction( "claim_e2e_one_time_keys", _claim_e2e_one_time_keys -- cgit 1.4.1 From 8cf9f0a3e7073f4e8bdddf81d4599f9a9ca7e978 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 May 2017 09:46:59 +0100 Subject: Remove redundant invalidation --- synapse/storage/end_to_end_keys.py | 3 --- 1 file changed, 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index f3e5331fde..2cebb203c6 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -237,9 +237,6 @@ class EndToEndKeyStore(SQLBaseStore): ) for user_id, device_id, algorithm, key_id in delete: txn.execute(sql, (user_id, device_id, algorithm, key_id)) - txn.call_after( - self.count_e2e_one_time_keys.invalidate, (user_id, device_id,) - ) self._invalidate_cache_and_stream( txn, self.count_e2e_one_time_keys, (user_id, device_id,) ) -- cgit 1.4.1 From c049472b8ad75d1d9a627803cd698cfe8c5570b8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 May 2017 14:22:41 +0100 Subject: Only store event_auth for state events --- synapse/handlers/federation.py | 20 ++++++++++++++------ synapse/storage/event_federation.py | 35 +++++++++++++++++++++++++++++------ synapse/storage/events.py | 1 + 3 files changed, 44 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 63e633548d..a333acc4aa 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -832,7 +832,11 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_event_auth(self, event_id): - auth = yield self.store.get_auth_chain([event_id]) + event = yield self.store.get_event(event_id) + auth = yield self.store.get_auth_chain( + [auth_id for auth_id, _ in event.auth_events], + include_given=True + ) for event in auth: event.signatures.update( @@ -1047,9 +1051,7 @@ class FederationHandler(BaseHandler): yield user_joined_room(self.distributor, user, event.room_id) state_ids = context.prev_state_ids.values() - auth_chain = yield self.store.get_auth_chain(set( - [event.event_id] + state_ids - )) + auth_chain = yield self.store.get_auth_chain(state_ids) state = yield self.store.get_events(context.prev_state_ids.values()) @@ -1598,7 +1600,11 @@ class FederationHandler(BaseHandler): pass # Now get the current auth_chain for the event. - local_auth_chain = yield self.store.get_auth_chain([event_id]) + event = yield self.store.get_event(event_id) + local_auth_chain = yield self.store.get_auth_chain( + [auth_id for auth_id, _ in event.auth_events], + include_given=True + ) # TODO: Check if we would now reject event_id. If so we need to tell # everyone. @@ -1791,7 +1797,9 @@ class FederationHandler(BaseHandler): auth_ids = yield self.auth.compute_auth_events( event, context.prev_state_ids ) - local_auth_chain = yield self.store.get_auth_chain(auth_ids) + local_auth_chain = yield self.store.get_auth_chain( + auth_ids, include_given=True + ) try: # 2. Get remote difference. diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 519059c306..72126c682e 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -44,18 +44,41 @@ class EventFederationStore(SQLBaseStore): self._delete_old_forward_extrem_cache, 60 * 60 * 1000 ) - def get_auth_chain(self, event_ids): - return self.get_auth_chain_ids(event_ids).addCallback(self._get_events) + def get_auth_chain(self, event_ids, include_given=False): + """Get auth events for given event_ids. The events *must* be state events. - def get_auth_chain_ids(self, event_ids): + Args: + event_ids (list): state events + include_given (bool): include the given events in result + + Returns: + list of events + """ + return self.get_auth_chain_ids( + event_ids, include_given=include_given, + ).addCallback(self._get_events) + + def get_auth_chain_ids(self, event_ids, include_given=False): + """Get auth events for given event_ids. The events *must* be state events. + + Args: + event_ids (list): state events + include_given (bool): include the given events in result + + Returns: + list of event_ids + """ return self.runInteraction( "get_auth_chain_ids", self._get_auth_chain_ids_txn, - event_ids + event_ids, include_given ) - def _get_auth_chain_ids_txn(self, txn, event_ids): - results = set() + def _get_auth_chain_ids_txn(self, txn, event_ids, include_given): + if include_given: + results = set(event_ids) + else: + results = set() base_sql = ( "SELECT auth_id FROM event_auth WHERE event_id IN (%s)" diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c4aeb48800..3d4f53ea55 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1120,6 +1120,7 @@ class EventsStore(SQLBaseStore): } for event, _ in events_and_contexts for auth_id, _ in event.auth_events + if event.is_state() ], ) -- cgit 1.4.1 From 6e614e9e10d35006707cc6eceafe80c13eb13948 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 May 2017 14:58:13 +0100 Subject: Add background task to clear out old event_auth --- synapse/storage/event_federation.py | 56 ++++++++++++++++++++++ synapse/storage/prepare_database.py | 2 +- .../schema/delta/42/event_auth_state_only.sql | 17 +++++++ 3 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/42/event_auth_state_only.sql (limited to 'synapse/storage') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 72126c682e..e8133de2fa 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -37,9 +37,16 @@ class EventFederationStore(SQLBaseStore): and backfilling from another server respectively. """ + EVENT_AUTH_STATE_ONLY = "event_auth_state_only" + def __init__(self, hs): super(EventFederationStore, self).__init__(hs) + self.register_background_update_handler( + self.EVENT_AUTH_STATE_ONLY, + self._background_delete_non_state_event_auth, + ) + hs.get_clock().looping_call( self._delete_old_forward_extrem_cache, 60 * 60 * 1000 ) @@ -527,3 +534,52 @@ class EventFederationStore(SQLBaseStore): txn.execute(query, (room_id,)) txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,)) + + @defer.inlineCallbacks + def _background_delete_non_state_event_auth(self, progress, batch_size): + def delete_event_auth(txn): + target_min_stream_id = progress.get("target_min_stream_id_inclusive") + max_stream_id = progress.get("max_stream_id_exclusive") + + if not target_min_stream_id or not max_stream_id: + txn.execute("SELECT COALESCE(MIN(stream_ordering), 0) FROM events") + rows = txn.fetchall() + target_min_stream_id = rows[0][0] + + txn.execute("SELECT COALESCE(MAX(stream_ordering), 0) FROM events") + rows = txn.fetchall() + max_stream_id = rows[0][0] + + min_stream_id = max_stream_id - batch_size + + sql = """ + DELETE FROM event_auth + WHERE event_id IN ( + SELECT event_id FROM events + LEFT JOIN state_events USING (room_id, event_id) + WHERE ? <= stream_ordering AND stream_ordering < ? + AND state_key IS null + ) + """ + + txn.execute(sql, (min_stream_id, max_stream_id,)) + + new_progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + } + + self._background_update_progress_txn( + txn, self.EVENT_AUTH_STATE_ONLY, new_progress + ) + + return min_stream_id >= target_min_stream_id + + result = yield self.runInteraction( + self.EVENT_AUTH_STATE_ONLY, delete_event_auth + ) + + if not result: + yield self._end_background_update(self.EVENT_AUTH_STATE_ONLY) + + defer.returnValue(batch_size) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 6e623843d5..eaba699e29 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 41 +SCHEMA_VERSION = 42 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/42/event_auth_state_only.sql b/synapse/storage/schema/delta/42/event_auth_state_only.sql new file mode 100644 index 0000000000..b8821ac759 --- /dev/null +++ b/synapse/storage/schema/delta/42/event_auth_state_only.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('event_auth_state_only', '{}'); -- cgit 1.4.1 From dfbda5e0250adfa762f5491c7efb2666866db034 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 25 May 2017 17:08:41 +0100 Subject: Faster cache for get_joined_hosts --- synapse/federation/transaction_queue.py | 2 + synapse/replication/slave/storage/events.py | 2 + synapse/state.py | 16 ++--- synapse/storage/roommember.py | 93 ++++++++++++++++++++++------- synapse/storage/state.py | 33 ++++++++++ 5 files changed, 117 insertions(+), 29 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index a15198e05d..4c25ef1106 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -187,6 +187,8 @@ class TransactionQueue(object): prev_id for prev_id, _ in event.prev_events ], ) + destinations = set(destinations) + logger.info("destinations: %r", destinations) if send_on_behalf_of is not None: # If we are sending the event on behalf of another server diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index fcaf58b93b..6cd3a843df 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -108,6 +108,8 @@ class SlavedEventStore(BaseSlavedStore): get_current_state_ids = ( StateStore.__dict__["get_current_state_ids"] ) + get_state_group_delta = DataStore.get_state_group_delta.__func__ + _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"] has_room_changed_since = DataStore.has_room_changed_since.__func__ get_unread_push_actions_for_user_in_range_for_http = ( diff --git a/synapse/state.py b/synapse/state.py index 3f93f9e27f..95dbe02e50 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -170,9 +170,7 @@ class StateHandler(object): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_user_in_room") entry = yield self.resolve_state_groups(room_id, latest_event_ids) - joined_users = yield self.store.get_joined_users_from_state( - room_id, entry.state_id, entry.state - ) + joined_users = yield self.store.get_joined_users_from_state(room_id, entry) defer.returnValue(joined_users) @defer.inlineCallbacks @@ -181,9 +179,9 @@ class StateHandler(object): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_hosts_in_room") entry = yield self.resolve_state_groups(room_id, latest_event_ids) - joined_hosts = yield self.store.get_joined_hosts( - room_id, entry.state_id, entry.state - ) + logger.info("State: %r", entry.state_group) + joined_hosts = yield self.store.get_joined_hosts(room_id, entry) + logger.info("returning: %r", joined_hosts) defer.returnValue(joined_hosts) @defer.inlineCallbacks @@ -307,11 +305,13 @@ class StateHandler(object): if len(group_names) == 1: name, state_list = state_groups_ids.items().pop() + prev_group, delta_ids = yield self.store.get_state_group_delta(name) + defer.returnValue(_StateCacheEntry( state=state_list, state_group=name, - prev_group=None, - delta_ids=None, + prev_group=prev_group, + delta_ids=delta_ids, )) with (yield self.resolve_linearizer.queue(group_names)): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0829ae5bee..8c4a5f9f2e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -18,6 +18,7 @@ from twisted.internet import defer from collections import namedtuple from ._base import SQLBaseStore +from synapse.util.async import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.stringutils import to_ascii @@ -392,7 +393,8 @@ class RoomMemberStore(SQLBaseStore): context=context, ) - def get_joined_users_from_state(self, room_id, state_group, state_ids): + def get_joined_users_from_state(self, room_id, state_entry): + state_group = state_entry.state_group if not state_group: # If state_group is None it means it has yet to be assigned a # state group, i.e. we need to make sure that calls with a state_group @@ -401,7 +403,7 @@ class RoomMemberStore(SQLBaseStore): state_group = object() return self._get_joined_users_from_context( - room_id, state_group, state_ids, + room_id, state_group, state_entry.state, context=state_entry, ) @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True, @@ -534,7 +536,8 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(False) - def get_joined_hosts(self, room_id, state_group, state_ids): + def get_joined_hosts(self, room_id, state_entry): + state_group = state_entry.state_group if not state_group: # If state_group is None it means it has yet to be assigned a # state group, i.e. we need to make sure that calls with a state_group @@ -543,33 +546,21 @@ class RoomMemberStore(SQLBaseStore): state_group = object() return self._get_joined_hosts( - room_id, state_group, state_ids + room_id, state_group, state_entry.state, state_entry=state_entry, ) @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) - def _get_joined_hosts(self, room_id, state_group, current_state_ids): + # @defer.inlineCallbacks + def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry): # We don't use `state_group`, its there so that we can cache based # on it. However, its important that its never None, since two current_state's # with a state_group of None are likely to be different. # See bulk_get_push_rules_for_room for how we work around this. assert state_group is not None - joined_hosts = set() - for etype, state_key in current_state_ids: - if etype == EventTypes.Member: - try: - host = get_domain_from_id(state_key) - except: - logger.warn("state_key not user_id: %s", state_key) - continue - - if host in joined_hosts: - continue - - event_id = current_state_ids[(etype, state_key)] - event = yield self.get_event(event_id, allow_none=True) - if event and event.content["membership"] == Membership.JOIN: - joined_hosts.add(intern_string(host)) + cache = self._get_joined_hosts_cache(room_id) + joined_hosts = yield cache.get_destinations(state_entry) + logger.info("returning: %r", joined_hosts) defer.returnValue(joined_hosts) @@ -647,3 +638,63 @@ class RoomMemberStore(SQLBaseStore): yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME) defer.returnValue(result) + + @cached(max_entries=10000, iterable=True) + def _get_joined_hosts_cache(self, room_id): + return _JoinedHostsCache(self, room_id) + + +class _JoinedHostsCache(object): + def __init__(self, store, room_id): + self.store = store + self.room_id = room_id + + self.hosts_to_joined_users = {} + + self.state_group = object() + + self.linearizer = Linearizer("_JoinedHostsCache") + + self._len = 0 + + @defer.inlineCallbacks + def get_destinations(self, state_entry): + if state_entry.state_group == self.state_group: + defer.returnValue(frozenset(self.hosts_to_joined_users)) + + with (yield self.linearizer.queue(())): + if state_entry.state_group == self.state_group: + pass + elif state_entry.prev_group == self.state_group: + for (typ, state_key), event_id in state_entry.delta_ids.iteritems(): + if typ != EventTypes.Member: + continue + + host = intern_string(get_domain_from_id(state_key)) + user_id = state_key + known_joins = self.hosts_to_joined_users.setdefault(host, set()) + + event = yield self.store.get_event(event_id) + if event.membership == Membership.JOIN: + known_joins.add(user_id) + else: + known_joins.discard(user_id) + + if not known_joins: + self.hosts_to_joined_users.pop(host, None) + else: + joined_users = yield self.store.get_joined_users_from_state( + self.room_id, state_entry, + ) + + self.hosts_to_joined_users = {} + for user_id in joined_users: + host = intern_string(get_domain_from_id(user_id)) + self.hosts_to_joined_users.setdefault(host, set()).add(user_id) + + self.state_group = state_entry.state_group + self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues()) + defer.returnValue(frozenset(self.hosts_to_joined_users)) + + def __len__(self): + return self._len diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a7c3d401d4..01474ff5ff 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -98,6 +98,39 @@ class StateStore(SQLBaseStore): _get_current_state_ids_txn, ) + def get_state_group_delta(self, state_group): + def _get_state_group_delta_txn(txn): + prev_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": state_group, + }, + retcol="prev_state_group", + allow_none=True, + ) + + if not prev_group: + return None, None + + delta_ids = self._simple_select_list_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": state_group, + }, + retcols=("type", "state_key", "event_id",) + ) + + return prev_group, { + (row["type"], row["state_key"]): row["event_id"] + for row in delta_ids + } + return self.runInteraction( + "get_state_group_delta", + _get_state_group_delta_txn, + ) + @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): if not event_ids: -- cgit 1.4.1 From 619e8ecd0c123602c20eb3a4ce45bdefb79d5900 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 26 May 2017 10:46:03 +0100 Subject: Handle None state group correctly --- synapse/state.py | 6 +++--- synapse/storage/roommember.py | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/state.py b/synapse/state.py index 95dbe02e50..5fbe0a0977 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -364,11 +364,11 @@ class StateHandler(object): prev_group = None delta_ids = None - for old_group, old_ids in state_groups_ids.items(): - if not set(new_state.iterkeys()) - set(old_ids.iterkeys()): + for old_group, old_ids in state_groups_ids.iteritems(): + if not set(new_state) - set(old_ids): n_delta_ids = { k: v - for k, v in new_state.items() + for k, v in new_state.iteritems() if old_ids.get(k) != v } if not delta_ids or len(n_delta_ids) < len(delta_ids): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8c4a5f9f2e..0e9e71f600 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -692,7 +692,10 @@ class _JoinedHostsCache(object): host = intern_string(get_domain_from_id(user_id)) self.hosts_to_joined_users.setdefault(host, set()).add(user_id) - self.state_group = state_entry.state_group + if state_entry.state_group: + self.state_group = state_entry.state_group + else: + self.state_group = object() self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues()) defer.returnValue(frozenset(self.hosts_to_joined_users)) -- cgit 1.4.1 From a584a81b3e59e9a4763d81d1a8b893fbd1a45ce0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 May 2017 14:41:42 +0100 Subject: Add current_state_delta_stream table --- synapse/storage/events.py | 31 ++++++++++++++++------ .../schema/delta/42/current_state_delta.sql | 25 +++++++++++++++++ 2 files changed, 48 insertions(+), 8 deletions(-) create mode 100644 synapse/storage/schema/delta/42/current_state_delta.sql (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3d4f53ea55..c37a2a6f16 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -648,9 +648,10 @@ class EventsStore(SQLBaseStore): list of the event ids which are the forward extremities. """ - self._update_current_state_txn(txn, current_state_for_room) - max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering + + self._update_current_state_txn(txn, current_state_for_room, max_stream_order) + self._update_forward_extremities_txn( txn, new_forward_extremities=new_forward_extremeties, @@ -713,7 +714,7 @@ class EventsStore(SQLBaseStore): backfilled=backfilled, ) - def _update_current_state_txn(self, txn, state_delta_by_room): + def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): for room_id, current_state_tuple in state_delta_by_room.iteritems(): to_delete, to_insert, _ = current_state_tuple txn.executemany( @@ -735,6 +736,24 @@ class EventsStore(SQLBaseStore): ], ) + state_deltas = {key: None for key in to_delete} + state_deltas.update(to_insert) + + self._simple_insert_many_txn( + txn, + table="current_state_delta_stream", + values=[ + { + "stream_id": max_stream_order, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": ev_id, + } + for key, ev_id in state_deltas.iteritems() + ] + ) + # Invalidate the various caches # Figure out the changes of membership to invalidate the @@ -743,11 +762,7 @@ class EventsStore(SQLBaseStore): # and which we have added, then we invlidate the caches for all # those users. members_changed = set( - state_key for ev_type, state_key in to_delete.iterkeys() - if ev_type == EventTypes.Member - ) - members_changed.update( - state_key for ev_type, state_key in to_insert.iterkeys() + state_key for ev_type, state_key in state_deltas if ev_type == EventTypes.Member ) diff --git a/synapse/storage/schema/delta/42/current_state_delta.sql b/synapse/storage/schema/delta/42/current_state_delta.sql new file mode 100644 index 0000000000..1a55aa912b --- /dev/null +++ b/synapse/storage/schema/delta/42/current_state_delta.sql @@ -0,0 +1,25 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE current_state_delta_stream ( + stream_id BIGINT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + event_id TEXT -- Is null if the key was removed +); + +CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream(stream_id); -- cgit 1.4.1 From 04095f75810176d7ba2b5ef70b40dd1a3281850d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 May 2017 14:53:01 +0100 Subject: Add clobbered event_id --- synapse/storage/events.py | 1 + synapse/storage/schema/delta/42/current_state_delta.sql | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c37a2a6f16..dfb57f9d12 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -749,6 +749,7 @@ class EventsStore(SQLBaseStore): "type": key[0], "state_key": key[1], "event_id": ev_id, + "prev_event_id": to_delete.get(key, None), } for key, ev_id in state_deltas.iteritems() ] diff --git a/synapse/storage/schema/delta/42/current_state_delta.sql b/synapse/storage/schema/delta/42/current_state_delta.sql index 1a55aa912b..bf124f3def 100644 --- a/synapse/storage/schema/delta/42/current_state_delta.sql +++ b/synapse/storage/schema/delta/42/current_state_delta.sql @@ -19,7 +19,8 @@ CREATE TABLE current_state_delta_stream ( room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, - event_id TEXT -- Is null if the key was removed + event_id TEXT, -- Is null if the key was removed + prev_event_id TEXT ); CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream(stream_id); -- cgit 1.4.1 From dd48f7204c511d5ba4438dfe01679bcb7367216d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 May 2017 15:01:22 +0100 Subject: Add comment --- synapse/storage/schema/delta/42/current_state_delta.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/42/current_state_delta.sql b/synapse/storage/schema/delta/42/current_state_delta.sql index bf124f3def..d28851aff8 100644 --- a/synapse/storage/schema/delta/42/current_state_delta.sql +++ b/synapse/storage/schema/delta/42/current_state_delta.sql @@ -20,7 +20,7 @@ CREATE TABLE current_state_delta_stream ( type TEXT NOT NULL, state_key TEXT NOT NULL, event_id TEXT, -- Is null if the key was removed - prev_event_id TEXT + prev_event_id TEXT -- Is null if the key was added ); CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream(stream_id); -- cgit 1.4.1 From eeb2f9e546060ca9f2ef7260220b51d85d9b0d92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 11:51:01 +0100 Subject: Add user_directory to database --- synapse/handlers/user_directory.py | 218 ++++++++++++++++++++++++++++ synapse/notifier.py | 6 +- synapse/server.py | 5 + synapse/storage/__init__.py | 2 + synapse/storage/schema/delta/42/user_dir.py | 69 +++++++++ synapse/storage/user_directory.py | 145 ++++++++++++++++++ 6 files changed, 444 insertions(+), 1 deletion(-) create mode 100644 synapse/handlers/user_directory.py create mode 100644 synapse/storage/schema/delta/42/user_dir.py create mode 100644 synapse/storage/user_directory.py (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py new file mode 100644 index 0000000000..43e917c1a0 --- /dev/null +++ b/synapse/handlers/user_directory.py @@ -0,0 +1,218 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.storage.roommember import ProfileInfo +from synapse.util.metrics import Measure + + +logger = logging.getLogger(__name__) + + +class UserDirectoyHandler(object): + def __init__(self, hs): + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.server_name = hs.hostname + self.clock = hs.get_clock() + + self.initially_handled_users = set() + + self.pos = None + + self._is_processing = False + + @defer.inlineCallbacks + def notify_new_event(self): + if self._is_processing: + return + + self._is_processing = True + try: + yield self._unsafe_process() + finally: + self._is_processing = False + + @defer.inlineCallbacks + def _unsafe_process(self): + if self.pos is None: + self.pos = yield self.store.get_user_directory_stream_pos() + + if self.pos is None: + yield self._do_initial_spam() + self.pos = yield self.store.get_user_directory_stream_pos() + + while True: + with Measure(self.clock, "user_dir_delta"): + deltas = yield self.store.get_current_state_deltas(self.pos) + if not deltas: + return + + yield self._handle_deltas(deltas) + + max_stream_id = deltas[-1]["stream_id"] + yield self.store.update_user_directory_stream_pos(max_stream_id) + + @defer.inlineCallbacks + def _handle_room(self, room_id): + # TODO: Check we're still joined to room + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) + if not is_public: + return + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + unhandled_users = set(users_with_profile) - self.initially_handled_users + + yield self.store.add_profiles_to_user_dir( + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } + ) + + self.initially_handled_users |= unhandled_users + + @defer.inlineCallbacks + def _do_initial_spam(self): + yield self.store.delete_all_from_user_dir() + + room_ids = yield self.store.get_all_rooms() + + for room_id in room_ids: + yield self._handle_room(room_id) + + self.initially_handled_users = None + + yield self.store.update_user_directory_stream_pos(-1) + + @defer.inlineCallbacks + def _handle_new_user(self, room_id, user_id, profile): + row = yield self.store.get_user_in_directory(user_id) + if row: + return + + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + def _handle_remove_user(self, room_id, user_id): + row = yield self.store.get_user_in_directory(user_id) + if not row or row["room_id"] != room_id: + return + + # TODO: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if is_public: + return + + yield self.store.remove_from_user_dir(user_id) + + @defer.inlineCallbacks + def _handle_deltas(self, deltas): + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + if typ == EventTypes.RoomHistoryVisibility: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="history_visibility", + public_value="world_readable", + ) + if change is None: + continue + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): + if change: + yield self._handle_new_user(room_id, user_id, profile) + else: + yield self._handle_remove_user(room_id, user_id) + elif typ == EventTypes.JoinRules: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="join_rules", + public_value=JoinRules.PUBLIC, + ) + if change is None: + continue + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): + if change: + yield self._handle_new_user(room_id, user_id, profile) + else: + yield self._handle_remove_user(room_id, user_id) + elif typ == EventTypes.Member: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="membership", + public_value=Membership.JOIN, + ) + + if change is None: + continue + + if change: + event = yield self.store.get_event(event_id) + profile = ProfileInfo( + avatar_url=event.content.get("avatar_url"), + display_name=event.content.get("displayname"), + ) + + yield self._handle_new_user(room_id, state_key, profile) + else: + yield self._handle_remove_user(room_id, state_key) + + @defer.inlineCallbacks + def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + prev_event = None + event = None + if prev_event_id: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + + if event_id: + event = yield self.store.get_event(event_id, allow_none=True) + + if not event and not prev_event: + defer.returnValue(None) + + prev_hist_vis = None + hist_vis = None + + if prev_event: + prev_hist_vis = prev_event.content.get(key_name, None) + + if event: + hist_vis = event.content.get(key_name, None) + + logger.info("prev: %r, new: %r", prev_hist_vis, hist_vis) + + if hist_vis == public_value and prev_hist_vis != public_value: + defer.returnValue(True) + elif hist_vis != public_value and prev_hist_vis == public_value: + defer.returnValue(False) + else: + defer.returnValue(None) diff --git a/synapse/notifier.py b/synapse/notifier.py index 48566187ab..6b1709d700 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -167,6 +167,7 @@ class Notifier(object): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self.user_directory_handler = hs.get_user_directory_handler() if hs.should_send_federation(): self.federation_sender = hs.get_federation_sender() @@ -251,7 +252,10 @@ class Notifier(object): """Notify any user streams that are interested in this room event""" # poke any interested application service. preserve_fn(self.appservice_handler.notify_interested_services)( - room_stream_id) + room_stream_id + ) + + preserve_fn(self.user_directory_handler.notify_new_event)() if self.federation_sender: preserve_fn(self.federation_sender.notify_new_events)( diff --git a/synapse/server.py b/synapse/server.py index e400e278c6..a38e5179e0 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -49,6 +49,7 @@ from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.read_marker import ReadMarkerHandler +from synapse.handlers.user_directory import UserDirectoyHandler from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier @@ -137,6 +138,7 @@ class HomeServer(object): 'tcp_replication', 'read_marker_handler', 'action_generator', + 'user_directory_handler', ] def __init__(self, hostname, **kwargs): @@ -304,6 +306,9 @@ class HomeServer(object): def build_action_generator(self): return ActionGenerator(self) + def build_user_directory_handler(self): + return UserDirectoyHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d604e7668f..11655bf60f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -49,6 +49,7 @@ from .tags import TagsStore from .account_data import AccountDataStore from .openid import OpenIdStore from .client_ips import ClientIpStore +from .user_directory import UserDirectoryStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator from .engines import PostgresEngine @@ -86,6 +87,7 @@ class DataStore(RoomMemberStore, RoomStore, ClientIpStore, DeviceStore, DeviceInboxStore, + UserDirectoryStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py new file mode 100644 index 0000000000..38538960a4 --- /dev/null +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -0,0 +1,69 @@ +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.storage.prepare_database import get_statements +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + +logger = logging.getLogger(__name__) + + +BOTH_TABLES = """ +CREATE TABLE user_directory_stream_pos ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT, + CHECK (Lock='X') +); + +INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); +""" + + +POSTGRES_TABLE = """ +CREATE TABLE user_directory ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + display_name TEXT, + avatar_url TEXT, + vector tsvector +); + +CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); +CREATE INDEX user_directory_user_idx ON user_directory(user_id); +""" + + +SQLITE_TABLE = """ +CREATE VIRTUAL TABLE user_directory + USING fts4 ( user_id, room_id, display_name, avatar_url, value ); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + for statement in get_statements(BOTH_TABLES.splitlines()): + cur.execute(statement) + + if isinstance(database_engine, PostgresEngine): + for statement in get_statements(POSTGRES_TABLE.splitlines()): + cur.execute(statement) + elif isinstance(database_engine, Sqlite3Engine): + for statement in get_statements(SQLITE_TABLE.splitlines()): + cur.execute(statement) + else: + raise Exception("Unrecognized database engine") + + +def run_upgrade(*args, **kwargs): + pass diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py new file mode 100644 index 0000000000..6c7c8c4bee --- /dev/null +++ b/synapse/storage/user_directory.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.api.constants import EventTypes, JoinRules +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + + +class UserDirectoryStore(SQLBaseStore): + + @cachedInlineCallbacks(cache_context=True) + def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): + current_state_ids = yield self.get_current_state_ids( + room_id, on_invalidate=cache_context.invalidate + ) + + join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) + if join_rules_id: + join_rule_ev = yield self.get_event(join_rules_id, allow_none=True) + if join_rule_ev: + if join_rule_ev.content.get("join_rules") == JoinRules.PUBLIC: + defer.returnValue(True) + + hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, "")) + if hist_vis_id: + hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True) + if hist_vis_ev: + if hist_vis_ev.content.get("history_visibility") == "world_readable": + defer.returnValue(True) + + defer.returnValue(False) + + def add_profiles_to_user_dir(self, room_id, users_with_profile): + if isinstance(self.database_engine, PostgresEngine): + sql = """ + INSERT INTO user_directory + (user_id, room_id, display_name, avatar_url, vector) + VALUES (?,?,?,?,to_tsvector('english', ?)) + """ + elif isinstance(self.database_engine, Sqlite3Engine): + sql = """ + INSERT INTO user_directory + (user_id, room_id, display_name, avatar_url, value) + VALUES (?,?,?,?,?) + """ + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + def _add_profiles_to_user_dir_txn(txn): + txn.executemany(sql, ( + ( + user_id, room_id, p.display_name, p.avatar_url, + "%s %s" % (user_id, p.display_name,) if p.display_name else user_id + ) + for user_id, p in users_with_profile.iteritems() + )) + for user_id in users_with_profile: + txn.call_after( + self.get_user_in_directory.invalidate, (user_id,) + ) + + return self.runInteraction( + "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn + ) + + @defer.inlineCallbacks + def remove_from_user_dir(self, user_id): + yield self._simple_delete( + table="user_directory", + keyvalues={"user_id": user_id}, + desc="remove_from_user_dir", + ) + self.get_user_in_directory.invalidate((user_id,)) + + def get_all_rooms(self): + return self._simple_select_onecol( + table="current_state_events", + keyvalues={}, + retcol="DISTINCT room_id", + desc="get_all_rooms", + ) + + def delete_all_from_user_dir(self): + def _delete_all_from_user_dir_txn(txn): + txn.execute("DELETE FROM user_directory") + txn.call_after(self.get_user_in_directory.invalidate_all) + return self.runInteraction( + "delete_all_from_user_dir", _delete_all_from_user_dir_txn + ) + + @cached() + def get_user_in_directory(self, user_id): + return self._simple_select_one( + table="user_directory", + keyvalues={"user_id": user_id}, + retcols=("room_id", "display_name", "avatar_url",), + allow_none=True, + desc="get_user_in_directory", + ) + + def get_user_directory_stream_pos(self): + return self._simple_select_one_onecol( + table="user_directory_stream_pos", + keyvalues={}, + retcol="stream_id", + desc="get_user_directory_stream_pos", + ) + + def update_user_directory_stream_pos(self, stream_id): + return self._simple_update_one( + table="user_directory_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="update_user_directory_stream_pos", + ) + + def get_current_state_deltas(self, prev_stream_id): + # TODO: Add stream change cache + # TODO: Add limit + sql = """ + SELECT stream_id, room_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE stream_id > ? + ORDER BY stream_id ASC + """ + + return self._execute( + "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id + ) -- cgit 1.4.1 From b5db4ed5f68ee81557393e94436d768b955b1aa0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 13:30:12 +0100 Subject: Update room column when room becomes unpublic --- synapse/handlers/user_directory.py | 23 +++++++++++++++++++++-- synapse/storage/user_directory.py | 10 ++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index e9488ce554..0cf403f599 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -121,12 +121,13 @@ class UserDirectoyHandler(object): # TODO: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) - for room_id in rooms: + for j_room_id in rooms: is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id + j_room_id ) if is_public: + yield self.store.update_user_in_user_dir(user_id, j_room_id) return yield self.store.remove_from_user_dir(user_id) @@ -149,6 +150,15 @@ class UserDirectoyHandler(object): if change is None: continue + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if change and is_public: + continue + elif not change and not is_public: + continue + users_with_profile = yield self.state.get_current_user_in_room(room_id) for user_id, profile in users_with_profile.iteritems(): if change: @@ -164,6 +174,15 @@ class UserDirectoyHandler(object): if change is None: continue + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if change and is_public: + continue + elif not change and not is_public: + continue + users_with_profile = yield self.state.get_current_user_in_room(room_id) for user_id, profile in users_with_profile.iteritems(): if change: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 6c7c8c4bee..d72b93b585 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -79,6 +79,16 @@ class UserDirectoryStore(SQLBaseStore): "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn ) + @defer.inlineCallbacks + def update_user_in_user_dir(self, user_id, room_id): + yield self._simple_update_one( + table="user_directory", + keyvalues={"user_id": user_id}, + updatevalues={"room_id": room_id}, + desc="update_user_in_user_dir", + ) + self.get_user_in_directory.invalidate((user_id,)) + @defer.inlineCallbacks def remove_from_user_dir(self, user_id): yield self._simple_delete( -- cgit 1.4.1 From 3b5f22ca40303392e45c8407952ecf3ee15785f6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 14:00:01 +0100 Subject: Add search --- synapse/handlers/user_directory.py | 3 +++ synapse/storage/user_directory.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 0cf403f599..4a9565df93 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -40,6 +40,9 @@ class UserDirectoyHandler(object): self.clock.call_later(0, self.notify_new_event) + def search_users(self, search_term, limit): + return self.store.search_user_dir(search_term, limit) + @defer.inlineCallbacks def notify_new_event(self): if self._is_processing: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index d72b93b585..650c49982d 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -153,3 +153,38 @@ class UserDirectoryStore(SQLBaseStore): return self._execute( "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id ) + + @defer.inlineCallbacks + def search_user_dir(self, search_term, limit): + if isinstance(self.database_engine, PostgresEngine): + sql = """ + SELECT user_id, display_name, avatar_url + FROM user_directory + WHERE vector @@ to_tsquery('english', ?) + ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC + LIMIT ? + """ + args = (search_term, search_term, limit + 1,) + elif isinstance(self.database_engine, Sqlite3Engine): + sql = """ + SELECT user_id, display_name, avatar_url + FROM user_directory + WHERE value MATCH ? + ORDER BY rank(matchinfo(user_directory)) DESC + LIMIT ? + """ + args = (search_term, limit + 1) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + results = yield self._execute( + "search_user_dir", self.cursor_to_dict, sql, *args + ) + + limited = len(results) > limit + + defer.returnValue({ + "limited": limited, + "results": results, + }) -- cgit 1.4.1 From 293ef296559fa5bb721592bfa9605f7282df0f6e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 14:29:32 +0100 Subject: Weight differently --- synapse/storage/user_directory.py | 34 ++++++++++++++++++++++++---------- synapse/types.py | 7 +++++++ 2 files changed, 31 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 650c49982d..ebcc8b9633 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -19,6 +19,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.types import get_domain_from_id, get_localpart_from_id class UserDirectoryStore(SQLBaseStore): @@ -50,26 +51,39 @@ class UserDirectoryStore(SQLBaseStore): sql = """ INSERT INTO user_directory (user_id, room_id, display_name, avatar_url, vector) - VALUES (?,?,?,?,to_tsvector('english', ?)) + VALUES (?,?,?,?, + setweight(to_tsvector('english', ?), 'A') + || to_tsvector('english', ?) + || to_tsvector('english', COALESCE(?, '')) + ) """ + args = ( + ( + user_id, room_id, p.display_name, p.avatar_url, + get_localpart_from_id(user_id), get_domain_from_id(user_id), + p.display_name, + ) + for user_id, p in users_with_profile.iteritems() + ) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ INSERT INTO user_directory (user_id, room_id, display_name, avatar_url, value) VALUES (?,?,?,?,?) """ - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") - - def _add_profiles_to_user_dir_txn(txn): - txn.executemany(sql, ( + args = ( ( user_id, room_id, p.display_name, p.avatar_url, "%s %s" % (user_id, p.display_name,) if p.display_name else user_id ) for user_id, p in users_with_profile.iteritems() - )) + ) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + def _add_profiles_to_user_dir_txn(txn): + txn.executemany(sql, args) for user_id in users_with_profile: txn.call_after( self.get_user_in_directory.invalidate, (user_id,) @@ -160,8 +174,8 @@ class UserDirectoryStore(SQLBaseStore): sql = """ SELECT user_id, display_name, avatar_url FROM user_directory - WHERE vector @@ to_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC + WHERE vector @@ plainto_tsquery('english', ?) + ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? """ args = (search_term, search_term, limit + 1,) diff --git a/synapse/types.py b/synapse/types.py index 445bdcb4d7..111948540d 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -62,6 +62,13 @@ def get_domain_from_id(string): return string[idx + 1:] +def get_localpart_from_id(string): + idx = string.find(":") + if idx == -1: + raise SynapseError(400, "Invalid ID: %r" % (string,)) + return string[1:idx] + + class DomainSpecificString( namedtuple("DomainSpecificString", ("localpart", "domain")) ): -- cgit 1.4.1 From 63fda37e20015f0fe56aed86f907035d42fdc2ca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:00:29 +0100 Subject: Add comments --- synapse/handlers/user_directory.py | 161 ++++++++++++++++++------- synapse/rest/client/v2_alpha/user_directory.py | 16 +++ synapse/storage/schema/delta/42/user_dir.py | 2 +- synapse/storage/user_directory.py | 39 +++++- 4 files changed, 173 insertions(+), 45 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 4a9565df93..88b79e3325 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -26,25 +26,54 @@ logger = logging.getLogger(__name__) class UserDirectoyHandler(object): + """Handles querying of and keeping updated the user_directory. + + N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY + """ + def __init__(self, hs): self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() + # When start up for the first time we need to populate the user_directory. + # This is a set of user_id's we've inserted already self.initially_handled_users = set() + # The current position in the current_state_delta stream self.pos = None + # Guard to ensure we only process deltas one at a time self._is_processing = False + # We kick this off so that we don't have to wait for a change before + # we start populating the user directory self.clock.call_later(0, self.notify_new_event) def search_users(self, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ return self.store.search_user_dir(search_term, limit) @defer.inlineCallbacks def notify_new_event(self): + """Called when there may be more deltas to process + """ if self._is_processing: return @@ -56,13 +85,16 @@ class UserDirectoyHandler(object): @defer.inlineCallbacks def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB if self.pos is None: self.pos = yield self.store.get_user_directory_stream_pos() + # If still None then we need to do the initial fill of directory if self.pos is None: yield self._do_initial_spam() self.pos = yield self.store.get_user_directory_stream_pos() + # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "user_dir_delta"): deltas = yield self.store.get_current_state_deltas(self.pos) @@ -74,69 +106,53 @@ class UserDirectoyHandler(object): self.pos = deltas[-1]["stream_id"] yield self.store.update_user_directory_stream_pos(self.pos) - @defer.inlineCallbacks - def _handle_room(self, room_id): - # TODO: Check we're still joined to room - - is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) - if not is_public: - return - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - unhandled_users = set(users_with_profile) - self.initially_handled_users - - yield self.store.add_profiles_to_user_dir( - room_id, { - user_id: users_with_profile[user_id] for user_id in unhandled_users - } - ) - - self.initially_handled_users |= unhandled_users - @defer.inlineCallbacks def _do_initial_spam(self): + """Populates the user_directory from the current state of the DB, used + when synapse first starts with user_directory support + """ + # TODO: pull from current delta stream_id new_pos = self.store.get_room_max_stream_ordering() + # Delete any existing entries just in case there are any yield self.store.delete_all_from_user_dir() + # We process by going through each existing room at a time. room_ids = yield self.store.get_all_rooms() for room_id in room_ids: - yield self._handle_room(room_id) + yield self._handle_intial_room(room_id) self.initially_handled_users = None yield self.store.update_user_directory_stream_pos(new_pos) @defer.inlineCallbacks - def _handle_new_user(self, room_id, user_id, profile): - row = yield self.store.get_user_in_directory(user_id) - if row: - return - - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + def _handle_intial_room(self, room_id): + """Called when we initially fill out user_directory one room at a time + """ + # TODO: Check we're still joined to room - def _handle_remove_user(self, room_id, user_id): - row = yield self.store.get_user_in_directory(user_id) - if not row or row["room_id"] != room_id: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) + if not is_public: return - # TODO: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) + users_with_profile = yield self.state.get_current_user_in_room(room_id) + unhandled_users = set(users_with_profile) - self.initially_handled_users - if is_public: - yield self.store.update_user_in_user_dir(user_id, j_room_id) - return + yield self.store.add_profiles_to_user_dir( + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } + ) - yield self.store.remove_from_user_dir(user_id) + self.initially_handled_users |= unhandled_users @defer.inlineCallbacks def _handle_deltas(self, deltas): + """Called with the state deltas to process + """ for delta in deltas: typ = delta["type"] state_key = delta["state_key"] @@ -144,22 +160,33 @@ class UserDirectoyHandler(object): event_id = delta["event_id"] prev_event_id = delta["prev_event_id"] + # For join rule and visibility changes we need to check if the room + # may have become public or not and add/remove the users in said room if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( prev_event_id, event_id, key_name="history_visibility", public_value="world_readable", ) + + # If change is None, no change. True => become world readable, + # False => was world readable if change is None: continue + # There's been a change to or from being world readable. + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( room_id ) - if change and is_public: + if change and not is_public: + # If we became world readable but room isn't currently public then + # we ignore the change continue - elif not change and not is_public: + elif not change and is_public: + # If we stopped being world readable but are still public, + # ignore the change continue users_with_profile = yield self.state.get_current_user_in_room(room_id) @@ -213,8 +240,60 @@ class UserDirectoyHandler(object): else: yield self._handle_remove_user(room_id, state_key) + @defer.inlineCallbacks + def _handle_new_user(self, room_id, user_id, profile): + """Called when we might need to add user to directory + + Args: + room_id (str): room_id that user joined or started being public that + user_id (str) + """ + row = yield self.store.get_user_in_directory(user_id) + if row: + return + + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + def _handle_remove_user(self, room_id, user_id): + """Called when we might need to remove user to directory + + Args: + room_id (str): room_id that user left or stopped being public that + user_id (str) + """ + row = yield self.store.get_user_in_directory(user_id) + if not row or row["room_id"] != room_id: + # Either the user wasn't in directory or we're still in a room that + # is public (i.e. the room_id in the database) + return + + # TODO: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for j_room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + yield self.store.update_user_in_user_dir(user_id, j_room_id) + return + + yield self.store.remove_from_user_dir(user_id) + @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + """Given two events check if the `key_name` field in content changed + from not matching `public_value` to doing so. + + For example, check if `history_visibility` (`key_name`) changed from + `shared` to `world_readable` (`public_value`). + + Returns: + None if the field in the events either both match `public_value` o + neither do, i.e. there has been no change. + True if it didnt match `public_value` but now does + Falsse if it did match `public_value` but now doesn't + """ prev_event = None event = None if prev_event_id: diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py index fe91207195..17d3dffc8f 100644 --- a/synapse/rest/client/v2_alpha/user_directory.py +++ b/synapse/rest/client/v2_alpha/user_directory.py @@ -39,6 +39,22 @@ class UserDirectorySearchRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ yield self.auth.get_user_by_req(request, allow_guest=False) body = parse_json_object_from_request(request) diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 38538960a4..57b89ba552 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -34,7 +34,7 @@ INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); POSTGRES_TABLE = """ CREATE TABLE user_directory ( user_id TEXT NOT NULL, - room_id TEXT NOT NULL, + room_id TEXT NOT NULL, -- A room_id that we know is public display_name TEXT, avatar_url TEXT, vector tsvector diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index ebcc8b9633..83812bf092 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -26,6 +26,8 @@ class UserDirectoryStore(SQLBaseStore): @cachedInlineCallbacks(cache_context=True) def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): + """Check if the room is either world_readable or publically joinable + """ current_state_ids = yield self.get_current_state_ids( room_id, on_invalidate=cache_context.invalidate ) @@ -47,14 +49,24 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(False) def add_profiles_to_user_dir(self, room_id, users_with_profile): + """Add profiles to the user directory + + Args: + room_id (str): A room_id that all users are in that is world_readable + or publically joinable + users_with_profile (dict): Users to add to directory in the form of + mapping of user_id -> ProfileInfo + """ if isinstance(self.database_engine, PostgresEngine): + # We weight the loclpart most highly, then display name and finally + # server name sql = """ INSERT INTO user_directory (user_id, room_id, display_name, avatar_url, vector) VALUES (?,?,?,?, setweight(to_tsvector('english', ?), 'A') - || to_tsvector('english', ?) - || to_tsvector('english', COALESCE(?, '')) + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') ) """ args = ( @@ -113,6 +125,8 @@ class UserDirectoryStore(SQLBaseStore): self.get_user_in_directory.invalidate((user_id,)) def get_all_rooms(self): + """Get all room_ids we've ever known about + """ return self._simple_select_onecol( table="current_state_events", keyvalues={}, @@ -121,6 +135,8 @@ class UserDirectoryStore(SQLBaseStore): ) def delete_all_from_user_dir(self): + """Delete the entire user directory + """ def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.call_after(self.get_user_in_directory.invalidate_all) @@ -170,12 +186,29 @@ class UserDirectoryStore(SQLBaseStore): @defer.inlineCallbacks def search_user_dir(self, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ + if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url FROM user_directory WHERE vector @@ plainto_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC + ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? """ args = (search_term, search_term, limit + 1,) -- cgit 1.4.1 From 350622a107c356da630eba09b63ed4b6de94b198 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:11:36 +0100 Subject: Handle the server leaving a public room --- synapse/handlers/user_directory.py | 23 ++++++++++++++++++++--- synapse/state.py | 11 +++++++++++ synapse/storage/schema/delta/42/user_dir.py | 4 ++++ synapse/storage/user_directory.py | 11 +++++++++++ 4 files changed, 46 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 88b79e3325..4e491a43e6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -132,7 +132,9 @@ class UserDirectoyHandler(object): def _handle_intial_room(self, room_id): """Called when we initially fill out user_directory one room at a time """ - # TODO: Check we're still joined to room + is_in_room = yield self.store.get_is_host_in_room(room_id, self.server_name) + if not is_in_room: + return is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) if not is_public: @@ -229,7 +231,22 @@ class UserDirectoyHandler(object): if change is None: continue - if change: + if not change: + # Need to check if the server left the room entirely, if so + # we might need to remove all the users in that room + is_in_room = yield self.store.get_is_host_in_room( + room_id, self.server_name, + ) + if not is_in_room: + # Fetch all the users that we marked as being in user + # directory due to being in the room and then check if + # need to remove those users or not + user_ids = yield self.store.get_users_in_dir_due_to_room(room_id) + for user_id in user_ids: + yield self._handle_remove_user(room_id, user_id) + return + + if change: # The user joined event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -237,7 +254,7 @@ class UserDirectoyHandler(object): ) yield self._handle_new_user(room_id, state_key, profile) - else: + else: # The user left yield self._handle_remove_user(room_id, state_key) @defer.inlineCallbacks diff --git a/synapse/state.py b/synapse/state.py index 02fee47f39..dffa79e4c9 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -186,6 +186,17 @@ class StateHandler(object): ) defer.returnValue(joined_hosts) + @defer.inlineCallbacks + def get_is_host_in_room(self, room_id, host, latest_event_ids=None): + if not latest_event_ids: + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + logger.debug("calling resolve_state_groups from get_is_host_in_room") + entry = yield self.resolve_state_groups(room_id, latest_event_ids) + is_host_joined = yield self.store.is_host_joined( + room_id, host, entry.state_id, entry.state + ) + defer.returnValue(is_host_joined) + @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): """Build an EventContext structure for the event. diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 57b89ba552..95a7a79fd3 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -41,6 +41,7 @@ CREATE TABLE user_directory ( ); CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); +CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ @@ -48,6 +49,9 @@ CREATE INDEX user_directory_user_idx ON user_directory(user_id); SQLITE_TABLE = """ CREATE VIRTUAL TABLE user_directory USING fts4 ( user_id, room_id, display_name, avatar_url, value ); + +CREATE INDEX user_directory_room_idx ON user_directory(room_id); +CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 83812bf092..0df979cb01 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -124,6 +124,17 @@ class UserDirectoryStore(SQLBaseStore): ) self.get_user_in_directory.invalidate((user_id,)) + def get_users_in_dir_due_to_room(self, room_id): + """Get all user_ids that are in the room directory becuase they're + in the given room_id + """ + return self._simple_select_onecol( + table="user_directory", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_dir_due_to_room", + ) + def get_all_rooms(self): """Get all room_ids we've ever known about """ -- cgit 1.4.1 From dc51af3d031030fdf553f8478f7930596f2694f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:13:49 +0100 Subject: Pull max id from correct table --- synapse/handlers/user_directory.py | 6 ++---- synapse/storage/user_directory.py | 8 ++++++++ 2 files changed, 10 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 4e491a43e6..8331f6422e 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -111,9 +111,7 @@ class UserDirectoyHandler(object): """Populates the user_directory from the current state of the DB, used when synapse first starts with user_directory support """ - - # TODO: pull from current delta stream_id - new_pos = self.store.get_room_max_stream_ordering() + new_pos = yield self.store.get_max_stream_id_in_current_state_deltas() # Delete any existing entries just in case there are any yield self.store.delete_all_from_user_dir() @@ -284,7 +282,7 @@ class UserDirectoyHandler(object): # is public (i.e. the room_id in the database) return - # TODO: Make this faster? + # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: is_public = yield self.store.is_room_world_readable_or_publicly_joinable( diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 0df979cb01..011c711ec1 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -195,6 +195,14 @@ class UserDirectoryStore(SQLBaseStore): "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id ) + def get_max_stream_id_in_current_state_deltas(self): + return self._simple_select_one_onecol( + table="current_state_delta_stream", + keyvalues={}, + retcol="COALESCE(MAX(stream_id), -1)", + desc="get_max_stream_id_in_current_state_deltas", + ) + @defer.inlineCallbacks def search_user_dir(self, search_term, limit): """Searches for users in directory -- cgit 1.4.1 From 5d79d728f5f38463171f0d063713905f8cb9faec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:23:49 +0100 Subject: Split out directory and search tables --- synapse/storage/schema/delta/42/user_dir.py | 25 ++++++------ synapse/storage/user_directory.py | 60 ++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 29 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 95a7a79fd3..7e32662928 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -28,30 +28,33 @@ CREATE TABLE user_directory_stream_pos ( ); INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); -""" - -POSTGRES_TABLE = """ CREATE TABLE user_directory ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, -- A room_id that we know is public display_name TEXT, - avatar_url TEXT, - vector tsvector + avatar_url TEXT ); -CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ -SQLITE_TABLE = """ -CREATE VIRTUAL TABLE user_directory - USING fts4 ( user_id, room_id, display_name, avatar_url, value ); +POSTGRES_TABLE = """ +CREATE TABLE user_directory_search ( + user_id TEXT NOT NULL, + vector tsvector +); -CREATE INDEX user_directory_room_idx ON user_directory(room_id); -CREATE INDEX user_directory_user_idx ON user_directory(user_id); +CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin(vector); +CREATE INDEX user_directory_search_user_idx ON user_directory_search(user_id); +""" + + +SQLITE_TABLE = """ +CREATE VIRTUAL TABLE user_directory_search + USING fts4 ( user_id, value ); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 011c711ec1..b1957cb873 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -61,9 +61,8 @@ class UserDirectoryStore(SQLBaseStore): # We weight the loclpart most highly, then display name and finally # server name sql = """ - INSERT INTO user_directory - (user_id, room_id, display_name, avatar_url, vector) - VALUES (?,?,?,?, + INSERT INTO user_directory_search(user_id, vector) + VALUES (?, setweight(to_tsvector('english', ?), 'A') || setweight(to_tsvector('english', ?), 'D') || setweight(to_tsvector('english', COALESCE(?, '')), 'B') @@ -71,21 +70,19 @@ class UserDirectoryStore(SQLBaseStore): """ args = ( ( - user_id, room_id, p.display_name, p.avatar_url, - get_localpart_from_id(user_id), get_domain_from_id(user_id), - p.display_name, + user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id), + profile.display_name, ) - for user_id, p in users_with_profile.iteritems() + for user_id, profile in users_with_profile.iteritems() ) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ - INSERT INTO user_directory - (user_id, room_id, display_name, avatar_url, value) - VALUES (?,?,?,?,?) + INSERT INTO user_directory_search(user_id, value) + VALUES (?,?) """ args = ( ( - user_id, room_id, p.display_name, p.avatar_url, + user_id, "%s %s" % (user_id, p.display_name,) if p.display_name else user_id ) for user_id, p in users_with_profile.iteritems() @@ -96,6 +93,19 @@ class UserDirectoryStore(SQLBaseStore): def _add_profiles_to_user_dir_txn(txn): txn.executemany(sql, args) + self._simple_insert_many_txn( + txn, + table="user_directory", + values=[ + { + "user_id": user_id, + "room_id": room_id, + "display_name": profile.display_name, + "avatar_url": profile.avatar_url, + } + for user_id, profile in users_with_profile.iteritems() + ] + ) for user_id in users_with_profile: txn.call_after( self.get_user_in_directory.invalidate, (user_id,) @@ -117,12 +127,23 @@ class UserDirectoryStore(SQLBaseStore): @defer.inlineCallbacks def remove_from_user_dir(self, user_id): - yield self._simple_delete( - table="user_directory", - keyvalues={"user_id": user_id}, - desc="remove_from_user_dir", + def _remove_from_user_dir_txn(txn): + self._simple_delete_txn( + txn, + table="user_directory", + keyvalues={"user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="user_directory_search", + keyvalues={"user_id": user_id}, + ) + txn.call_after( + self.get_user_in_directory.invalidate, (user_id,) + ) + return self.runInteraction( + "remove_from_user_dir", _remove_from_user_dir_txn, ) - self.get_user_in_directory.invalidate((user_id,)) def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're @@ -150,6 +171,7 @@ class UserDirectoryStore(SQLBaseStore): """ def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") + txn.execute("DELETE FROM user_directory_search") txn.call_after(self.get_user_in_directory.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -225,7 +247,8 @@ class UserDirectoryStore(SQLBaseStore): if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url - FROM user_directory + FROM user_directory_search + INNER JOIN user_directory USING (user_id) WHERE vector @@ plainto_tsquery('english', ?) ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? @@ -234,7 +257,8 @@ class UserDirectoryStore(SQLBaseStore): elif isinstance(self.database_engine, Sqlite3Engine): sql = """ SELECT user_id, display_name, avatar_url - FROM user_directory + FROM user_directory_search + INNER JOIN user_directory USING (user_id) WHERE value MATCH ? ORDER BY rank(matchinfo(user_directory)) DESC LIMIT ? -- cgit 1.4.1 From 304880d18545b59a51c5d4b928e563c6d1514fdc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:46:36 +0100 Subject: Add stream change cache --- synapse/storage/__init__.py | 12 ++++++++++++ synapse/storage/events.py | 4 ++++ synapse/storage/user_directory.py | 4 +++- synapse/util/caches/stream_change_cache.py | 15 +++++++++++++++ 4 files changed, 34 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 11655bf60f..3c88ba9860 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -223,6 +223,18 @@ class DataStore(RoomMemberStore, RoomStore, "DeviceListFederationStreamChangeCache", device_list_max, ) + curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( + db_conn, "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache = StreamChangeCache( + "_curr_state_delta_stream_cache", min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/events.py b/synapse/storage/events.py index dfb57f9d12..77861488d2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -755,6 +755,10 @@ class EventsStore(SQLBaseStore): ] ) + self._curr_state_delta_stream_cache.enttity_has_changed( + room_id, max_stream_order, + ) + # Invalidate the various caches # Figure out the changes of membership to invalidate the diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index b1957cb873..15b8ea0460 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -204,7 +204,9 @@ class UserDirectoryStore(SQLBaseStore): ) def get_current_state_deltas(self, prev_stream_id): - # TODO: Add stream change cache + if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): + return [] + # TODO: Add limit sql = """ SELECT stream_id, room_id, type, state_key, event_id, prev_event_id diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 70fe00ce0b..c498aee46c 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -89,6 +89,21 @@ class StreamChangeCache(object): return result + def has_any_entity_changed(self, stream_pos): + """Returns if any entity has changed + """ + assert type(stream_pos) is int + + if stream_pos >= self._earliest_known_stream_pos: + self.metrics.inc_hits() + if stream_pos >= max(self._cache): + return False + else: + return True + else: + self.metrics.inc_misses() + return True + def get_all_entities_changed(self, stream_pos): """Returns all entites that have had new things since the given position. If the position is too old it will return None. -- cgit 1.4.1 From 63c58c2a3fced42c254da1c1ae5e55a977b7141c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 16:17:58 +0100 Subject: Limit number of things we fetch out of the db --- synapse/storage/user_directory.py | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 15b8ea0460..9137fc24ea 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -207,16 +207,37 @@ class UserDirectoryStore(SQLBaseStore): if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): return [] - # TODO: Add limit - sql = """ - SELECT stream_id, room_id, type, state_key, event_id, prev_event_id - FROM current_state_delta_stream - WHERE stream_id > ? - ORDER BY stream_id ASC - """ + def get_current_state_deltas_txn(txn): + # First we calculate the max stream id that will give us less than + # N results + sql = """ + SELECT stream_id, count(*) + FROM current_state_delta_stream + WHERE stream_id > ? + GROUP BY stream_id + ORDER BY stream_id ASC + LIMIT 100 + """ + txn.execute(sql, (prev_stream_id,)) + + total = 0 + for max_stream_id, count in txn: + total += count + if total > 50: + break - return self._execute( - "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id + # Now actually get the deltas + sql = """ + SELECT stream_id, room_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + """ + txn.execute(sql, (prev_stream_id, max_stream_id,)) + return self.cursor_to_dict(txn) + + return self.runInteraction( + "get_current_state_deltas", get_current_state_deltas_txn ) def get_max_stream_id_in_current_state_deltas(self): -- cgit 1.4.1 From 4abcff0177768c43eb64ed7784ca8ebf30f3435c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 16:22:36 +0100 Subject: Fix typo --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 77861488d2..528f19eb87 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -755,7 +755,7 @@ class EventsStore(SQLBaseStore): ] ) - self._curr_state_delta_stream_cache.enttity_has_changed( + self._curr_state_delta_stream_cache.entity_has_changed( room_id, max_stream_order, ) -- cgit 1.4.1 From f0910617111fe81b79777a2c597b1e4240d61a9a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 16:34:40 +0100 Subject: Fix tests --- synapse/handlers/user_directory.py | 4 ++-- synapse/storage/user_directory.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 8331f6422e..75f259ee4e 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -130,7 +130,7 @@ class UserDirectoyHandler(object): def _handle_intial_room(self, room_id): """Called when we initially fill out user_directory one room at a time """ - is_in_room = yield self.store.get_is_host_in_room(room_id, self.server_name) + is_in_room = yield self.state.get_is_host_in_room(room_id, self.server_name) if not is_in_room: return @@ -232,7 +232,7 @@ class UserDirectoyHandler(object): if not change: # Need to check if the server left the room entirely, if so # we might need to remove all the users in that room - is_in_room = yield self.store.get_is_host_in_room( + is_in_room = yield self.state.get_is_host_in_room( room_id, self.server_name, ) if not is_in_room: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 9137fc24ea..348064436f 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -221,6 +221,7 @@ class UserDirectoryStore(SQLBaseStore): txn.execute(sql, (prev_stream_id,)) total = 0 + max_stream_id = prev_stream_id for max_stream_id, count in txn: total += count if total > 50: -- cgit 1.4.1 From f1378aef9199390ae0130cc6bda5c7f4fa7a2e33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:03:08 +0100 Subject: Convert to int --- synapse/storage/user_directory.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 348064436f..71b0502646 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -204,6 +204,7 @@ class UserDirectoryStore(SQLBaseStore): ) def get_current_state_deltas(self, prev_stream_id): + prev_stream_id = int(prev_stream_id) if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): return [] -- cgit 1.4.1 From 5dd1b2c525cb786614d1503757bf52a7086f3bf9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:29:12 +0100 Subject: Use unique indices --- synapse/storage/schema/delta/42/user_dir.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 7e32662928..c34aa5e7d2 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -37,7 +37,7 @@ CREATE TABLE user_directory ( ); CREATE INDEX user_directory_room_idx ON user_directory(room_id); -CREATE INDEX user_directory_user_idx ON user_directory(user_id); +CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id); """ @@ -48,7 +48,7 @@ CREATE TABLE user_directory_search ( ); CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin(vector); -CREATE INDEX user_directory_search_user_idx ON user_directory_search(user_id); +CREATE UNIQUE INDEX user_directory_search_user_idx ON user_directory_search(user_id); """ -- cgit 1.4.1 From f5cc22bdc63e58857f435227b70d145d07aabb77 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:30:26 +0100 Subject: Comment on why arbitrary comments --- synapse/storage/user_directory.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 71b0502646..2e9175f50a 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -210,7 +210,9 @@ class UserDirectoryStore(SQLBaseStore): def get_current_state_deltas_txn(txn): # First we calculate the max stream id that will give us less than - # N results + # N results. + # We arbitarily limit to 100 stream_id entries to ensure we don't + # select toooo many. sql = """ SELECT stream_id, count(*) FROM current_state_delta_stream @@ -225,7 +227,9 @@ class UserDirectoryStore(SQLBaseStore): max_stream_id = prev_stream_id for max_stream_id, count in txn: total += count - if total > 50: + if total > 100: + # We arbitarily limit to 100 entries to ensure we don't + # select toooo many. break # Now actually get the deltas -- cgit 1.4.1 From a757dd4863d0a467becf5b73ca15eafeb3c2823c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 18:07:12 +0100 Subject: Use prefix matching --- synapse/storage/user_directory.py | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 2e9175f50a..ca2be9daf2 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -21,6 +21,8 @@ from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id, get_localpart_from_id +import re + class UserDirectoryStore(SQLBaseStore): @@ -272,17 +274,17 @@ class UserDirectoryStore(SQLBaseStore): ] } """ - + search_query = _parse_query(self.database_engine, search_term) if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) - WHERE vector @@ plainto_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC + WHERE vector @@ to_tsquery('english', ?) + ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC LIMIT ? """ - args = (search_term, search_term, limit + 1,) + args = (search_query, search_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ SELECT user_id, display_name, avatar_url @@ -292,7 +294,7 @@ class UserDirectoryStore(SQLBaseStore): ORDER BY rank(matchinfo(user_directory)) DESC LIMIT ? """ - args = (search_term, limit + 1) + args = (search_query, limit + 1) else: # This should be unreachable. raise Exception("Unrecognized database engine") @@ -307,3 +309,25 @@ class UserDirectoryStore(SQLBaseStore): "limited": limited, "results": results, }) + + +def _parse_query(database_engine, search_term): + """Takes a plain unicode string from the user and converts it into a form + that can be passed to database. + We use this so that we can add prefix matching, which isn't something + that is supported by default. + + We specifically add both a prefix and non prefix matching term so that + exact matches get ranked higher. + """ + + # Pull out the individual words, discarding any non-word characters. + results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + + if isinstance(database_engine, PostgresEngine): + return " & ".join("%s:* & %s" % (result, result,) for result in results) + elif isinstance(database_engine, Sqlite3Engine): + return " & ".join("%s* & %s" % (result, result,) for result in results) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") -- cgit 1.4.1 From 036362ede6cadc4d6f289dbcabfc5e06d370a587 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 18:17:47 +0100 Subject: Order by if they have profile info --- synapse/storage/user_directory.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index ca2be9daf2..79161f2745 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -274,14 +274,20 @@ class UserDirectoryStore(SQLBaseStore): ] } """ + search_query = _parse_query(self.database_engine, search_term) + if isinstance(self.database_engine, PostgresEngine): + # We order by rank and then if they have profile info sql = """ SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) WHERE vector @@ to_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC + ORDER BY + ts_rank_cd(vector, to_tsquery('english', ?)) DESC, + display_name IS NULL, + avatar_url IS NULL LIMIT ? """ args = (search_query, search_query, limit + 1,) @@ -291,7 +297,10 @@ class UserDirectoryStore(SQLBaseStore): FROM user_directory_search INNER JOIN user_directory USING (user_id) WHERE value MATCH ? - ORDER BY rank(matchinfo(user_directory)) DESC + ORDER BY + rank(matchinfo(user_directory)) DESC, + display_name IS NULL, + avatar_url IS NULL LIMIT ? """ args = (search_query, limit + 1) -- cgit 1.4.1 From 0fe6f3c521498fc92c58c49d8edcc6984471da08 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 11:09:49 +0100 Subject: Bug fixes and logging - Check if room is public when a user joins before adding to user dir - Fix typo of field name "content.join_rules" -> "content.join_rule" --- synapse/handlers/user_directory.py | 22 +++++++++++++++++++++- synapse/storage/user_directory.py | 2 +- 2 files changed, 22 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 7130cc6ee3..130ff45ec5 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -170,6 +170,8 @@ class UserDirectoyHandler(object): event_id = delta["event_id"] prev_event_id = delta["prev_event_id"] + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + # For join rule and visibility changes we need to check if the room # may have become public or not and add/remove the users in said room if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): @@ -201,7 +203,14 @@ class UserDirectoyHandler(object): yield self._handle_remove_user(room_id, user_id) return + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + if change: # The user joined + if not is_public: + return + event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -211,7 +220,10 @@ class UserDirectoyHandler(object): yield self._handle_new_user(room_id, state_key, profile) else: # The user left yield self._handle_remove_user(room_id, state_key) + else: + logger.debug("Ignoring irrelevant type: %r", typ) + @defer.inlineCallbacks def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): """Handle a room having potentially changed from/to world_readable/publically joinable. @@ -222,6 +234,8 @@ class UserDirectoyHandler(object): event_id (str|None): The new event after the state change typ (str): Type of the event """ + logger.debug("Handling change for %s", typ) + if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( prev_event_id, event_id, @@ -231,7 +245,7 @@ class UserDirectoyHandler(object): elif typ == EventTypes.JoinRules: change = yield self._get_key_change( prev_event_id, event_id, - key_name="join_rules", + key_name="join_rule", public_value=JoinRules.PUBLIC, ) else: @@ -239,6 +253,7 @@ class UserDirectoyHandler(object): # If change is None, no change. True => become world_readable/public, # False => was world_readable/public if change is None: + logger.debug("No change") return # There's been a change to or from being world readable. @@ -247,6 +262,8 @@ class UserDirectoyHandler(object): room_id ) + logger.debug("Change: %r, is_public: %r", change, is_public) + if change and not is_public: # If we became world readable but room isn't currently public then # we ignore the change @@ -326,6 +343,7 @@ class UserDirectoyHandler(object): event = yield self.store.get_event(event_id, allow_none=True) if not event and not prev_event: + logger.debug("Neither event exists: %r %r", prev_event_id, event_id) defer.returnValue(None) prev_value = None @@ -337,6 +355,8 @@ class UserDirectoyHandler(object): if event: value = event.content.get(key_name, None) + logger.debug("prev_value: %r -> value: %r", prev_value, value) + if value == public_value and prev_value != public_value: defer.returnValue(True) elif value != public_value and prev_value == public_value: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 79161f2745..7323d783ac 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -38,7 +38,7 @@ class UserDirectoryStore(SQLBaseStore): if join_rules_id: join_rule_ev = yield self.get_event(join_rules_id, allow_none=True) if join_rule_ev: - if join_rule_ev.content.get("join_rules") == JoinRules.PUBLIC: + if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC: defer.returnValue(True) hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, "")) -- cgit 1.4.1 From 59dbb470654ff812975f888a3ec41537916091ab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 11:41:29 +0100 Subject: Remove spurious inlineCallbacks --- synapse/storage/user_directory.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 7323d783ac..a251aee465 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -127,7 +127,6 @@ class UserDirectoryStore(SQLBaseStore): ) self.get_user_in_directory.invalidate((user_id,)) - @defer.inlineCallbacks def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): self._simple_delete_txn( -- cgit 1.4.1 From 02a6108235610304b981939bd2c74ae7f36dd929 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 13:16:40 +0100 Subject: Tweak search query --- synapse/storage/user_directory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index a251aee465..c2ea261289 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -333,9 +333,9 @@ def _parse_query(database_engine, search_term): results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) if isinstance(database_engine, PostgresEngine): - return " & ".join("%s:* & %s" % (result, result,) for result in results) + return " & ".join("(%s:* | %s)" % (result, result,) for result in results) elif isinstance(database_engine, Sqlite3Engine): - return " & ".join("%s* & %s" % (result, result,) for result in results) + return " & ".join("(%s* | %s)" % (result, result,) for result in results) else: # This should be unreachable. raise Exception("Unrecognized database engine") -- cgit 1.4.1 From d5477c7afd884f200c55a1c6a187983756f49577 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 13:27:28 +0100 Subject: Tweak search query --- synapse/storage/user_directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index c2ea261289..4fe30ce72e 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -284,7 +284,7 @@ class UserDirectoryStore(SQLBaseStore): INNER JOIN user_directory USING (user_id) WHERE vector @@ to_tsquery('english', ?) ORDER BY - ts_rank_cd(vector, to_tsquery('english', ?)) DESC, + ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? -- cgit 1.4.1 From 21e255a8f1948c2fd298ce2e037d20bdd25f2f69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 14:50:46 +0100 Subject: Split the table in two --- synapse/handlers/user_directory.py | 77 +++++++++++++++++++---------- synapse/storage/_base.py | 5 ++ synapse/storage/schema/delta/42/user_dir.py | 10 +++- synapse/storage/user_directory.py | 77 +++++++++++++++++++++++++++-- 4 files changed, 138 insertions(+), 31 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index a8525fc86a..d795a9f8d5 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -50,6 +50,7 @@ class UserDirectoyHandler(object): # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already self.initially_handled_users = set() + self.initially_handled_users_in_public = set() # The current position in the current_state_delta stream self.pos = None @@ -145,8 +146,6 @@ class UserDirectoyHandler(object): return is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) - if not is_public: - return users_with_profile = yield self.state.get_current_user_in_room(room_id) unhandled_users = set(users_with_profile) - self.initially_handled_users @@ -159,6 +158,13 @@ class UserDirectoyHandler(object): self.initially_handled_users |= unhandled_users + if is_public: + yield self.store.add_users_to_public_room( + room_id, + user_ids=unhandled_users - self.initially_handled_users_in_public + ) + self.initially_handled_users_in_public != unhandled_users + @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process @@ -206,14 +212,7 @@ class UserDirectoyHandler(object): else: logger.debug("Server is still in room: %r", room_id) - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id - ) - if change: # The user joined - if not is_public: - return - event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -276,11 +275,13 @@ class UserDirectoyHandler(object): # ignore the change return - users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): - if change: + if change: + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): yield self._handle_new_user(room_id, user_id, profile) - else: + else: + users = yield self.store.get_users_in_public_due_to_room(room_id) + for user_id in users: yield self._handle_remove_user(room_id, user_id) @defer.inlineCallbacks @@ -292,11 +293,21 @@ class UserDirectoyHandler(object): user_id (str) """ logger.debug("Adding user to dir, %r", user_id) + row = yield self.store.get_user_in_directory(user_id) - if row: + if not row: + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if not is_public: return - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + row = yield self.store.get_user_in_public_room(user_id) + if not row: + yield self.store.add_users_to_public_room(room_id, [user_id]) @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): @@ -309,15 +320,20 @@ class UserDirectoyHandler(object): logger.debug("Maybe removing user %r", user_id) row = yield self.store.get_user_in_directory(user_id) - if not row or row["room_id"] != room_id: - # Either the user wasn't in directory or we're still in a room that - # is public (i.e. the room_id in the database) - logger.debug("Not removing as row: %r", row) + update_user_dir = row and row["room_id"] == room_id + + row = yield self.store.get_user_in_public_room(user_id) + update_user_in_public = row and row["room_id"] == room_id + + if not update_user_in_public and not update_user_dir: return # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: + if not update_user_in_public and not update_user_dir: + break + is_in_room = yield self.state.get_is_host_in_room( j_room_id, self.server_name, ) @@ -325,16 +341,23 @@ class UserDirectoyHandler(object): if not is_in_room: continue - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) - - if is_public: + if update_user_dir: + update_user_dir = False yield self.store.update_user_in_user_dir(user_id, j_room_id) - logger.debug("Not removing as found other public room: %r", j_room_id) - return - yield self.store.remove_from_user_dir(user_id) + if update_user_in_public: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + yield self.store.update_user_in_public_user_list(user_id, j_room_id) + update_user_in_public = False + + if update_user_dir: + yield self.store.remove_from_user_dir(user_id) + elif update_user_in_public: + yield self.store.remove_from_user_in_public_room(user_id) @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 58b73af7d2..db816346f5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -425,6 +425,11 @@ class SQLBaseStore(object): txn.execute(sql, vals) + def _simple_insert_many(self, table, values, desc): + return self.runInteraction( + desc, self._simple_insert_many_txn, table, values + ) + @staticmethod def _simple_insert_many_txn(txn, table, values): if not values: diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index c34aa5e7d2..ea6a18196d 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -31,13 +31,21 @@ INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); CREATE TABLE user_directory ( user_id TEXT NOT NULL, - room_id TEXT NOT NULL, -- A room_id that we know is public + room_id TEXT NOT NULL, -- A room_id that we know the user is joined to display_name TEXT, avatar_url TEXT ); CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id); + +CREATE TABLE users_in_pubic_room ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL -- A room_id that we know is public +); + +CREATE INDEX users_in_pubic_room_room_idx ON users_in_pubic_room(room_id); +CREATE UNIQUE INDEX users_in_pubic_room_user_idx ON users_in_pubic_room(user_id); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 4fe30ce72e..cab0afc5c3 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -50,12 +50,34 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(False) - def add_profiles_to_user_dir(self, room_id, users_with_profile): - """Add profiles to the user directory + @defer.inlineCallbacks + def add_users_to_public_room(self, room_id, user_ids): + """Add user to the list of users in public rooms Args: room_id (str): A room_id that all users are in that is world_readable or publically joinable + user_ids (list(str)): Users to add + """ + yield self._simple_insert_many( + table="users_in_pubic_room", + values=[ + { + "user_id": user_id, + "room_id": room_id, + } + for user_id in user_ids + ], + desc="add_users_to_public_room" + ) + for user_id in user_ids: + self.get_user_in_public_room.invalidate((user_id,)) + + def add_profiles_to_user_dir(self, room_id, users_with_profile): + """Add profiles to the user directory + + Args: + room_id (str): A room_id that all users are joined to users_with_profile (dict): Users to add to directory in the form of mapping of user_id -> ProfileInfo """ @@ -125,7 +147,15 @@ class UserDirectoryStore(SQLBaseStore): updatevalues={"room_id": room_id}, desc="update_user_in_user_dir", ) - self.get_user_in_directory.invalidate((user_id,)) + + @defer.inlineCallbacks + def update_user_in_public_user_list(self, user_id, room_id): + yield self._simple_update_one( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + updatevalues={"room_id": room_id}, + desc="update_user_in_public_user_list", + ) def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): @@ -139,13 +169,41 @@ class UserDirectoryStore(SQLBaseStore): table="user_directory_search", keyvalues={"user_id": user_id}, ) + self._simple_delete_txn( + txn, + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + ) txn.call_after( self.get_user_in_directory.invalidate, (user_id,) ) + txn.call_after( + self.get_user_in_public_room.invalidate, (user_id,) + ) return self.runInteraction( "remove_from_user_dir", _remove_from_user_dir_txn, ) + @defer.inlineCallbacks + def remove_from_user_in_public_room(self, user_id): + yield self._simple_delete( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + desc="remove_from_user_in_public_room", + ) + self.get_user_in_public_room.invalidate((user_id,)) + + def get_users_in_public_due_to_room(self, room_id): + """Get all user_ids that are in the room directory becuase they're + in the given room_id + """ + return self._simple_select_onecol( + table="users_in_pubic_room", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_public_due_to_room", + ) + def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're in the given room_id @@ -173,6 +231,7 @@ class UserDirectoryStore(SQLBaseStore): def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") + txn.execute("DELETE FROM users_in_pubic_room") txn.call_after(self.get_user_in_directory.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -188,6 +247,16 @@ class UserDirectoryStore(SQLBaseStore): desc="get_user_in_directory", ) + @cached() + def get_user_in_public_room(self, user_id): + return self._simple_select_one( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + retcols=("room_id",), + allow_none=True, + desc="get_user_in_public_room", + ) + def get_user_directory_stream_pos(self): return self._simple_select_one_onecol( table="user_directory_stream_pos", @@ -282,6 +351,7 @@ class UserDirectoryStore(SQLBaseStore): SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) + INNER JOIN users_in_pubic_room USING (user_id) WHERE vector @@ to_tsquery('english', ?) ORDER BY ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC, @@ -295,6 +365,7 @@ class UserDirectoryStore(SQLBaseStore): SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) + INNER JOIN users_in_pubic_room USING (user_id) WHERE value MATCH ? ORDER BY rank(matchinfo(user_directory)) DESC, -- cgit 1.4.1 From 4d039aa2ca78730ca5f8bb9043ab75328004d7a1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 14:58:48 +0100 Subject: Fix sqlite --- synapse/storage/user_directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index cab0afc5c3..bcf24fa4d0 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -368,7 +368,7 @@ class UserDirectoryStore(SQLBaseStore): INNER JOIN users_in_pubic_room USING (user_id) WHERE value MATCH ? ORDER BY - rank(matchinfo(user_directory)) DESC, + rank(matchinfo(user_directory_search)) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? -- cgit 1.4.1 From 1a01af079e97d25bbac46127b8bd069189c02c97 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 15:39:51 +0100 Subject: Handle profile updates in user directory --- synapse/handlers/user_directory.py | 25 +++++++++++++++++++ synapse/storage/user_directory.py | 49 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index d795a9f8d5..0182cf86d6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -192,6 +192,8 @@ class UserDirectoyHandler(object): ) if change is None: + # Handle any profile changes + yield self._handle_profile_change(state_key, prev_event_id, event_id) continue if not change: @@ -359,6 +361,29 @@ class UserDirectoyHandler(object): elif update_user_in_public: yield self.store.remove_from_user_in_public_room(user_id) + @defer.inlineCallbacks + def _handle_profile_change(self, user_id, prev_event_id, event_id): + """Check member event changes for any profile changes and update the + database if there are. + """ + if not prev_event_id or not event_id: + return + + prev_event = yield self.store.get_event(prev_event_id) + event = yield self.store.get_event(event_id) + + if event.membership != Membership.JOIN: + return + + prev_name = prev_event.content.get("displayname") + new_name = event.content.get("displayname") + + prev_avatar = prev_event.content.get("avatar_url") + new_avatar = event.content.get("avatar_url") + + if prev_name != new_name or prev_avatar != new_avatar: + yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar) + @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): """Given two events check if the `key_name` field in content changed diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index bcf24fa4d0..6a4bf63f0d 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -147,6 +147,53 @@ class UserDirectoryStore(SQLBaseStore): updatevalues={"room_id": room_id}, desc="update_user_in_user_dir", ) + self.get_user_in_directory.invalidate((user_id,)) + + def update_profile_in_user_dir(self, user_id, display_name, avatar_url): + def _update_profile_in_user_dir_txn(txn): + self._simple_update_one_txn( + txn, + table="user_directory", + keyvalues={"user_id": user_id}, + updatevalues={"display_name": display_name, "avatar_url": avatar_url}, + ) + + if isinstance(self.database_engine, PostgresEngine): + # We weight the loclpart most highly, then display name and finally + # server name + sql = """ + UPDATE user_directory_search + SET vector = setweight(to_tsvector('english', ?), 'A') + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') + WHERE user_id = ? + """ + args = ( + get_localpart_from_id(user_id), get_domain_from_id(user_id), + display_name, + user_id, + ) + elif isinstance(self.database_engine, Sqlite3Engine): + sql = """ + UPDATE user_directory_search + set value = ? + WHERE user_id = ? + """ + args = ( + "%s %s" % (user_id, display_name,) if display_name else user_id, + user_id, + ) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + txn.execute(sql, args) + + txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) + + return self.runInteraction( + "update_profile_in_user_dir", _update_profile_in_user_dir_txn + ) @defer.inlineCallbacks def update_user_in_public_user_list(self, user_id, room_id): @@ -156,6 +203,7 @@ class UserDirectoryStore(SQLBaseStore): updatevalues={"room_id": room_id}, desc="update_user_in_public_user_list", ) + self.get_user_in_public_room.invalidate((user_id,)) def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): @@ -233,6 +281,7 @@ class UserDirectoryStore(SQLBaseStore): txn.execute("DELETE FROM user_directory_search") txn.execute("DELETE FROM users_in_pubic_room") txn.call_after(self.get_user_in_directory.invalidate_all) + txn.call_after(self.get_user_in_public_room.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn ) -- cgit 1.4.1 From 6f83c4537cd6b0eeef2cfa735df085bf70228131 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 10:18:44 +0100 Subject: Increase size of IP cache --- synapse/storage/client_ips.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 747d2df622..014ab635b7 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -20,6 +20,8 @@ from twisted.internet import defer from ._base import Cache from . import background_updates +import os + logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller @@ -28,12 +30,15 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) + + class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): self.client_ip_last_seen = Cache( name="client_ip_last_seen", keylen=4, - max_entries=5000, + max_entries=50000 * CACHE_SIZE_FACTOR, ) super(ClientIpStore, self).__init__(hs) -- cgit 1.4.1 From 65f0513a3306d21aa5e6959b21642ba15dbdcad5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:02:38 +0100 Subject: Split up device_lists_outbound_pokes table for faster updates. --- synapse/storage/devices.py | 82 +++++++--------------- .../schema/delta/42/device_list_last_id.sql | 33 +++++++++ 2 files changed, 57 insertions(+), 58 deletions(-) create mode 100644 synapse/storage/schema/delta/42/device_list_last_id.sql (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index d9936c88bb..77b02c8a28 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -37,10 +37,6 @@ class DeviceStore(SQLBaseStore): max_entries=10000, ) - self._clock.looping_call( - self._prune_old_outbound_device_pokes, 60 * 60 * 1000 - ) - self.register_background_index_update( "device_lists_stream_idx", index_name="device_lists_stream_user_id", @@ -368,7 +364,7 @@ class DeviceStore(SQLBaseStore): prev_sent_id_sql = """ SELECT coalesce(max(stream_id), 0) as stream_id - FROM device_lists_outbound_pokes + FROM device_lists_outbound_last_success WHERE destination = ? AND user_id = ? AND stream_id <= ? """ @@ -510,32 +506,43 @@ class DeviceStore(SQLBaseStore): ) def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): - # First we DELETE all rows such that only the latest row for each - # (destination, user_id is left. We do this by selecting first and - # deleting. + # We update the device_lists_outbound_last_success with the successfully + # poked users. We do the join to see which users need to be inserted and + # which updated. sql = """ - SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes - WHERE destination = ? AND stream_id <= ? + SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL) + FROM device_lists_outbound_pokes as o + LEFT JOIN device_lists_outbound_last_success as s + USING (destination, user_id) + WHERE destination = ? AND o.stream_id <= ? GROUP BY user_id - HAVING count(*) > 1 """ txn.execute(sql, (destination, stream_id,)) rows = txn.fetchall() sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE destination = ? AND user_id = ? AND stream_id < ? + UPDATE device_lists_outbound_last_success + SET stream_id = ? + WHERE destination = ? AND user_id = ? """ txn.executemany( - sql, ((destination, row[0], row[1],) for row in rows) + sql, ((row[1], destination, row[0],) for row in rows if row[2]) ) - # Mark everything that is left as sent sql = """ - UPDATE device_lists_outbound_pokes SET sent = ? + INSERT INTO device_lists_outbound_last_success + (destination, user_id, stream_id) VALUES (?, ?, ?) + """ + txn.executemany( + sql, ((destination, row[0], row[1],) for row in rows if not row[2]) + ) + + # Delete all sent outbound pokes + sql = """ + DELETE FROM device_lists_outbound_pokes WHERE destination = ? AND stream_id <= ? """ - txn.execute(sql, (True, destination, stream_id,)) + txn.execute(sql, (destination, stream_id,)) @defer.inlineCallbacks def get_user_whose_devices_changed(self, from_key): @@ -634,44 +641,3 @@ class DeviceStore(SQLBaseStore): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() - - def _prune_old_outbound_device_pokes(self): - """Delete old entries out of the device_lists_outbound_pokes to ensure - that we don't fill up due to dead servers. We keep one entry per - (destination, user_id) tuple to ensure that the prev_ids remain correct - if the server does come back. - """ - yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000 - - def _prune_txn(txn): - select_sql = """ - SELECT destination, user_id, max(stream_id) as stream_id - FROM device_lists_outbound_pokes - GROUP BY destination, user_id - HAVING min(ts) < ? AND count(*) > 1 - """ - - txn.execute(select_sql, (yesterday,)) - rows = txn.fetchall() - - if not rows: - return - - delete_sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? - """ - - txn.executemany( - delete_sql, - ( - (yesterday, row[0], row[1], row[2]) - for row in rows - ) - ) - - logger.info("Pruned %d device list outbound pokes", txn.rowcount) - - return self.runInteraction( - "_prune_old_outbound_device_pokes", _prune_txn - ) diff --git a/synapse/storage/schema/delta/42/device_list_last_id.sql b/synapse/storage/schema/delta/42/device_list_last_id.sql new file mode 100644 index 0000000000..9ab8c14fa3 --- /dev/null +++ b/synapse/storage/schema/delta/42/device_list_last_id.sql @@ -0,0 +1,33 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- Table of last stream_id that we sent to destination for user_id. This is +-- used to fill out the `prev_id` fields of outbound device list updates. +CREATE TABLE device_lists_outbound_last_success ( + destination TEXT NOT NULL, + user_id TEXT NOT NULL, + stream_id BIGINT NOT NULL +); + +INSERT INTO device_lists_outbound_last_success + SELECT destination, user_id, coalesce(max(stream_id), 0) as stream_id + FROM device_lists_outbound_pokes + WHERE sent = (1 = 1) -- sqlite doesn't have inbuilt boolean values + GROUP BY destination, user_id; + +CREATE INDEX device_lists_outbound_last_success_idx ON device_lists_outbound_last_success( + destination, user_id, stream_id +); -- cgit 1.4.1 From 6e2a7ee1bc7f576592573de2300a3a2e0f82e001 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:05:17 +0100 Subject: Remove spurious log lines --- synapse/federation/transaction_queue.py | 1 - synapse/state.py | 2 -- synapse/storage/roommember.py | 1 - 3 files changed, 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 4c25ef1106..003eaba893 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -188,7 +188,6 @@ class TransactionQueue(object): ], ) destinations = set(destinations) - logger.info("destinations: %r", destinations) if send_on_behalf_of is not None: # If we are sending the event on behalf of another server diff --git a/synapse/state.py b/synapse/state.py index 5fbe0a0977..d1b1a70a91 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -179,9 +179,7 @@ class StateHandler(object): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_hosts_in_room") entry = yield self.resolve_state_groups(room_id, latest_event_ids) - logger.info("State: %r", entry.state_group) joined_hosts = yield self.store.get_joined_hosts(room_id, entry) - logger.info("returning: %r", joined_hosts) defer.returnValue(joined_hosts) @defer.inlineCallbacks diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0e9e71f600..7155bfdc69 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -560,7 +560,6 @@ class RoomMemberStore(SQLBaseStore): cache = self._get_joined_hosts_cache(room_id) joined_hosts = yield cache.get_destinations(state_entry) - logger.info("returning: %r", joined_hosts) defer.returnValue(joined_hosts) -- cgit 1.4.1 From 6ba21bf2b8207e714d0d523379727bcf2c14684d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:08:36 +0100 Subject: Comments --- synapse/storage/roommember.py | 9 +++++++++ synapse/storage/state.py | 6 ++++++ 2 files changed, 15 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7155bfdc69..8656455f6e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -644,6 +644,10 @@ class RoomMemberStore(SQLBaseStore): class _JoinedHostsCache(object): + """Cache for joined hosts in a room that is optimised to handle updates + via state deltas. + """ + def __init__(self, store, room_id): self.store = store self.room_id = room_id @@ -658,6 +662,11 @@ class _JoinedHostsCache(object): @defer.inlineCallbacks def get_destinations(self, state_entry): + """Get set of destinations for a state entry + + Args: + state_entry(synapse.state._StateCacheEntry) + """ if state_entry.state_group == self.state_group: defer.returnValue(frozenset(self.hosts_to_joined_users)) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 01474ff5ff..c3eecbe824 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -99,6 +99,12 @@ class StateStore(SQLBaseStore): ) def get_state_group_delta(self, state_group): + """Given a state group try to return a previous group and a delta between + the old and the new. + + Returns: + (prev_group, delta_ids), where both may be None. + """ def _get_state_group_delta_txn(txn): prev_group = self._simple_select_one_onecol_txn( txn, -- cgit 1.4.1 From 1a81a1898e3ce1cae7e4a96183b6f404231bceab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:16:56 +0100 Subject: Keep pruning background task --- synapse/storage/devices.py | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 77b02c8a28..83b1d2eeba 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -37,6 +37,10 @@ class DeviceStore(SQLBaseStore): max_entries=10000, ) + self._clock.looping_call( + self._prune_old_outbound_device_pokes, 60 * 60 * 1000 + ) + self.register_background_index_update( "device_lists_stream_idx", index_name="device_lists_stream_user_id", @@ -641,3 +645,44 @@ class DeviceStore(SQLBaseStore): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() + + def _prune_old_outbound_device_pokes(self): + """Delete old entries out of the device_lists_outbound_pokes to ensure + that we don't fill up due to dead servers. We keep one entry per + (destination, user_id) tuple to ensure that the prev_ids remain correct + if the server does come back. + """ + yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000 + + def _prune_txn(txn): + select_sql = """ + SELECT destination, user_id, max(stream_id) as stream_id + FROM device_lists_outbound_pokes + GROUP BY destination, user_id + HAVING min(ts) < ? AND count(*) > 1 + """ + + txn.execute(select_sql, (yesterday,)) + rows = txn.fetchall() + + if not rows: + return + + delete_sql = """ + DELETE FROM device_lists_outbound_pokes + WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? + """ + + txn.executemany( + delete_sql, + ( + (yesterday, row[0], row[1], row[2]) + for row in rows + ) + ) + + logger.info("Pruned %d device list outbound pokes", txn.rowcount) + + return self.runInteraction( + "_prune_old_outbound_device_pokes", _prune_txn + ) -- cgit 1.4.1 From 64ed74c01efa035a2c9d95c97b1bf1b1f1c83ff6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:20:47 +0100 Subject: When pruning, delete from device_lists_outbound_last_success --- synapse/storage/devices.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 83b1d2eeba..bb27fd1f70 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -681,6 +681,14 @@ class DeviceStore(SQLBaseStore): ) ) + # Since we've deleted unsent deltas, we need to remove the entry + # of last successful sent so that the prev_ids are correctly set. + sql = """ + DELETE FROM device_lists_outbound_last_success + WHERE destination = ? AND user_id = ? + """ + txn.executemany(sql, ((row[0], row[1]) for row in rows)) + logger.info("Pruned %d device list outbound pokes", txn.rowcount) return self.runInteraction( -- cgit 1.4.1 From 197bd126f09b0df42b2cbb0bd7e121b04ab9d670 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 17:34:20 +0100 Subject: Fix bug where state_group tables got corrupted This is due to the fact that we prefilled caches using txn.call_after, which always gets called including on error. We fix this by making txn.call_after only fire when a transaction completes successfully, which is what we want most of the time anyway. --- synapse/storage/__init__.py | 3 ++- synapse/storage/_base.py | 29 +++++++++++++++++++++-------- synapse/storage/events.py | 2 +- 3 files changed, 24 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d604e7668f..349f96e24b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -225,7 +225,8 @@ class DataStore(RoomMemberStore, RoomStore, db_conn.cursor(), name="_find_stream_orderings_for_times_txn", database_engine=self.database_engine, - after_callbacks=[] + after_callbacks=[], + final_callbacks=[], ) self._find_stream_orderings_for_times_txn(cur) cur.close() diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 58b73af7d2..f214b9d4c4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -52,13 +52,17 @@ class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() method.""" - __slots__ = ["txn", "name", "database_engine", "after_callbacks"] + __slots__ = [ + "txn", "name", "database_engine", "after_callbacks", "final_callbacks", + ] - def __init__(self, txn, name, database_engine, after_callbacks): + def __init__(self, txn, name, database_engine, after_callbacks, + final_callbacks): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "after_callbacks", after_callbacks) + object.__setattr__(self, "final_callbacks", final_callbacks) def call_after(self, callback, *args, **kwargs): """Call the given callback on the main twisted thread after the @@ -67,6 +71,9 @@ class LoggingTransaction(object): """ self.after_callbacks.append((callback, args, kwargs)) + def call_finally(self, callback, *args, **kwargs): + self.final_callbacks.append((callback, args, kwargs)) + def __getattr__(self, name): return getattr(self.txn, name) @@ -217,8 +224,8 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) - def _new_transaction(self, conn, desc, after_callbacks, logging_context, - func, *args, **kwargs): + def _new_transaction(self, conn, desc, after_callbacks, final_callbacks, + logging_context, func, *args, **kwargs): start = time.time() * 1000 txn_id = self._TXN_ID @@ -237,7 +244,8 @@ class SQLBaseStore(object): try: txn = conn.cursor() txn = LoggingTransaction( - txn, name, self.database_engine, after_callbacks + txn, name, self.database_engine, after_callbacks, + final_callbacks, ) r = func(txn, *args, **kwargs) conn.commit() @@ -298,6 +306,7 @@ class SQLBaseStore(object): start_time = time.time() * 1000 after_callbacks = [] + final_callbacks = [] def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: @@ -309,7 +318,7 @@ class SQLBaseStore(object): current_context.copy_to(context) return self._new_transaction( - conn, desc, after_callbacks, current_context, + conn, desc, after_callbacks, final_callbacks, current_context, func, *args, **kwargs ) @@ -318,9 +327,13 @@ class SQLBaseStore(object): result = yield self._db_pool.runWithConnection( inner_func, *args, **kwargs ) - finally: + for after_callback, after_args, after_kwargs in after_callbacks: after_callback(*after_args, **after_kwargs) + finally: + for after_callback, after_args, after_kwargs in final_callbacks: + after_callback(*after_args, **after_kwargs) + defer.returnValue(result) @defer.inlineCallbacks @@ -936,7 +949,7 @@ class SQLBaseStore(object): # __exit__ called after the transaction finishes. ctx = self._cache_id_gen.get_next() stream_id = ctx.__enter__() - txn.call_after(ctx.__exit__, None, None, None) + txn.call_finally(ctx.__exit__, None, None, None) txn.call_after(self.hs.get_notifier().on_new_replication_data) self._simple_insert_txn( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c4aeb48800..73283eb4c7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1419,7 +1419,7 @@ class EventsStore(SQLBaseStore): ] rows = self._new_transaction( - conn, "do_fetch", [], None, self._fetch_event_rows, event_ids + conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids ) row_dict = { -- cgit 1.4.1 From ea11ee09f31ce239120aceaf11b38df3cd94cf69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Jun 2017 11:59:57 +0100 Subject: Ensure we don't use unpersisted state group as prev group --- synapse/state.py | 13 ++++++------- synapse/storage/state.py | 13 +++++++++++++ 2 files changed, 19 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/state.py b/synapse/state.py index a98145598f..576eb6b788 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -273,7 +273,8 @@ class StateHandler(object): } elif entry.prev_group: context.prev_group = entry.prev_group - context.delta_ids = entry.delta_ids + context.delta_ids = dict(entry.delta_ids) + context.delta_ids[key] = event.event_id else: if entry.state_group is None: entry.state_group = self.store.get_next_state_group() @@ -364,12 +365,10 @@ class StateHandler(object): if new_state_event_ids == frozenset(e_id for e_id in events): state_group = sg break - if state_group is None: - # Worker instances don't have access to this method, but we want - # to set the state_group on the main instance to increase cache - # hits. - if hasattr(self.store, "get_next_state_group"): - state_group = self.store.get_next_state_group() + + # TODO: We want to create a state group for this set of events, to + # increase cache hits, but we need to make sure that it doesn't + # end up as a prev_group without being added to the database prev_group = None delta_ids = None diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c3eecbe824..151223219d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -223,6 +223,19 @@ class StateStore(SQLBaseStore): # We persist as a delta if we can, while also ensuring the chain # of deltas isn't tooo long, as otherwise read performance degrades. if context.prev_group: + is_in_db = self._simple_select_one_onecol_txn( + txn, + table="state_groups", + keyvalues={"id": context.prev_group}, + retcol="id", + allow_none=True, + ) + if not is_in_db: + raise Exception( + "Trying to persist state with unpersisted prev_group: %r" + % (context.prev_group,) + ) + potential_hops = self._count_state_group_hops_txn( txn, context.prev_group ) -- cgit 1.4.1 From 0185b75381c160edaf2bec0b9f4def0bb0d67a02 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Jun 2017 10:52:26 +0100 Subject: Change is_host_joined to use current_state table This bypasses a bug where using the state groups to figure out if a host is in a room sometimes errors if the servers isn't in the room. (For example when the server rejected an invite to a remote room) --- synapse/api/auth.py | 13 ++-------- synapse/handlers/user_directory.py | 6 ++--- synapse/state.py | 11 -------- synapse/storage/roommember.py | 52 +++++++++++++++++--------------------- 4 files changed, 28 insertions(+), 54 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 9dbc7993df..0c297cb022 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -144,17 +144,8 @@ class Auth(object): @defer.inlineCallbacks def check_host_in_room(self, room_id, host): with Measure(self.clock, "check_host_in_room"): - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - - logger.debug("calling resolve_state_groups from check_host_in_room") - entry = yield self.state.resolve_state_groups( - room_id, latest_event_ids - ) - - ret = yield self.store.is_host_joined( - room_id, host, entry.state_group, entry.state - ) - defer.returnValue(ret) + latest_event_ids = yield self.store.is_host_joined(room_id, host) + defer.returnValue(latest_event_ids) def _check_joined_room(self, member, user_id, room_id): if not member or member.membership != Membership.JOIN: diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 43eb1c78e9..02b720b654 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -151,7 +151,7 @@ class UserDirectoyHandler(object): def _handle_intial_room(self, room_id): """Called when we initially fill out user_directory one room at a time """ - is_in_room = yield self.state.get_is_host_in_room(room_id, self.server_name) + is_in_room = yield self.store.is_host_joined(room_id, self.server_name) if not is_in_room: return @@ -209,7 +209,7 @@ class UserDirectoyHandler(object): if not change: # Need to check if the server left the room entirely, if so # we might need to remove all the users in that room - is_in_room = yield self.state.get_is_host_in_room( + is_in_room = yield self.store.is_host_joined( room_id, self.server_name, ) if not is_in_room: @@ -346,7 +346,7 @@ class UserDirectoyHandler(object): if not update_user_in_public and not update_user_dir: break - is_in_room = yield self.state.get_is_host_in_room( + is_in_room = yield self.store.is_host_joined( j_room_id, self.server_name, ) diff --git a/synapse/state.py b/synapse/state.py index 576eb6b788..5b386e3183 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -182,17 +182,6 @@ class StateHandler(object): joined_hosts = yield self.store.get_joined_hosts(room_id, entry) defer.returnValue(joined_hosts) - @defer.inlineCallbacks - def get_is_host_in_room(self, room_id, host, latest_event_ids=None): - if not latest_event_ids: - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - logger.debug("calling resolve_state_groups from get_is_host_in_room") - entry = yield self.resolve_state_groups(room_id, latest_event_ids) - is_host_joined = yield self.store.is_host_joined( - room_id, host, entry.state_id, entry.state - ) - defer.returnValue(is_host_joined) - @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): """Build an EventContext structure for the event. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8656455f6e..447c6364e7 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -501,40 +501,34 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(users_in_room) - def is_host_joined(self, room_id, host, state_group, state_ids): - if not state_group: - # If state_group is None it means it has yet to be assigned a - # state group, i.e. we need to make sure that calls with a state_group - # of None don't hit previous cached calls with a None state_group. - # To do this we set the state_group to a new object as object() != object() - state_group = object() + @defer.inlineCallbacks + def is_host_joined(self, room_id, host): + if '%' in host or '_' in host: + raise Exception("Invalid host name") + + sql = """ + SELECT state_key FROM current_state_events + INNER JOIN room_memberships USING (room_id, event_id) + WHERE membership = 'join' AND room_id = ? AND state_key LIKE ? + LIMIT 1 + """ - return self._is_host_joined( - room_id, host, state_group, state_ids - ) + # We do need to be careful to ensure that host doesn't have any wild cards + # in it, but we checked above for known ones and we'll check below that + # the returned user actually has the correct domain. + like_clause = "%:" + host - @cachedInlineCallbacks(num_args=3) - def _is_host_joined(self, room_id, host, state_group, current_state_ids): - # We don't use `state_group`, its there so that we can cache based - # on it. However, its important that its never None, since two current_state's - # with a state_group of None are likely to be different. - # See bulk_get_push_rules_for_room for how we work around this. - assert state_group is not None + rows = yield self._execute("is_host_joined", None, sql, room_id, like_clause) - for (etype, state_key), event_id in current_state_ids.items(): - if etype == EventTypes.Member: - try: - if get_domain_from_id(state_key) != host: - continue - except: - logger.warn("state_key not user_id: %s", state_key) - continue + if not rows: + defer.returnValue(False) - event = yield self.get_event(event_id, allow_none=True) - if event and event.content["membership"] == Membership.JOIN: - defer.returnValue(True) + user_id = rows[0][0] + if get_domain_from_id(user_id) != host: + # This can only happen if the host name has something funky in it + raise Exception("Invalid host name") - defer.returnValue(False) + defer.returnValue(True) def get_joined_hosts(self, room_id, state_entry): state_group = state_entry.state_group -- cgit 1.4.1 From 935e588799cac738d808faa8024bcba5fd5c6c06 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Jun 2017 13:01:23 +0100 Subject: Tweak SQL --- synapse/storage/roommember.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 447c6364e7..e38bbd22a3 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -507,9 +507,12 @@ class RoomMemberStore(SQLBaseStore): raise Exception("Invalid host name") sql = """ - SELECT state_key FROM current_state_events - INNER JOIN room_memberships USING (room_id, event_id) - WHERE membership = 'join' AND room_id = ? AND state_key LIKE ? + SELECT state_key FROM current_state_events AS c + INNER JOIN room_memberships USING (event_id) + WHERE membership = 'join' + AND type = 'm.room.member' + AND c.room_id = ? + AND state_key LIKE ? LIMIT 1 """ -- cgit 1.4.1 From e54d7d536ef58271a0fbbf28d9bc1aeaa5428a4b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Jun 2017 16:24:00 +0100 Subject: Cache state deltas --- synapse/storage/state.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 151223219d..0bea7374f7 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -20,6 +20,7 @@ from synapse.util.stringutils import to_ascii from synapse.storage.engines import PostgresEngine from twisted.internet import defer +from collections import namedtuple import logging @@ -29,6 +30,13 @@ logger = logging.getLogger(__name__) MAX_STATE_DELTA_HOPS = 100 +class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delta_ids"))): + __slots__ = [] + + def __len__(self): + return len(self.delta_ids) if self.delta_ids else None + + class StateStore(SQLBaseStore): """ Keeps track of the state at a given event. @@ -98,6 +106,7 @@ class StateStore(SQLBaseStore): _get_current_state_ids_txn, ) + @cached(max_entries=10000, iterable=True) def get_state_group_delta(self, state_group): """Given a state group try to return a previous group and a delta between the old and the new. @@ -117,7 +126,7 @@ class StateStore(SQLBaseStore): ) if not prev_group: - return None, None + return _GetStateGroupDelta(None, None) delta_ids = self._simple_select_list_txn( txn, @@ -128,10 +137,10 @@ class StateStore(SQLBaseStore): retcols=("type", "state_key", "event_id",) ) - return prev_group, { + return _GetStateGroupDelta(prev_group, { (row["type"], row["state_key"]): row["event_id"] for row in delta_ids - } + }) return self.runInteraction( "get_state_group_delta", _get_state_group_delta_txn, -- cgit 1.4.1 From b0d975e21619b6eb1dbcf24a5ed68629298bfdce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Jun 2017 16:25:42 +0100 Subject: Comments --- synapse/storage/state.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0bea7374f7..24503cd5aa 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -31,6 +31,9 @@ MAX_STATE_DELTA_HOPS = 100 class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delta_ids"))): + """Return type of get_state_group_delta that implements __len__, which lets + us use the itrable flag when caching + """ __slots__ = [] def __len__(self): -- cgit 1.4.1 From 80609743440467a69c4bdcd111c60d84f617d6c4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Jun 2017 16:40:52 +0100 Subject: Fix replication --- synapse/replication/slave/storage/events.py | 2 +- synapse/storage/state.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 6cd3a843df..f0a367aa9b 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -108,7 +108,7 @@ class SlavedEventStore(BaseSlavedStore): get_current_state_ids = ( StateStore.__dict__["get_current_state_ids"] ) - get_state_group_delta = DataStore.get_state_group_delta.__func__ + get_state_group_delta = StateStore.__dict__["get_state_group_delta"] _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"] has_room_changed_since = DataStore.has_room_changed_since.__func__ diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 24503cd5aa..d1e679719b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -37,7 +37,7 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt __slots__ = [] def __len__(self): - return len(self.delta_ids) if self.delta_ids else None + return len(self.delta_ids) if self.delta_ids else 0 class StateStore(SQLBaseStore): -- cgit 1.4.1 From d53fe399ebb497f67efc99ebb12d96486503629a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Jun 2017 09:56:18 +0100 Subject: Add cache for is_host_joined --- synapse/replication/slave/storage/events.py | 2 +- synapse/storage/events.py | 5 +++++ synapse/storage/roommember.py | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 7034f48b50..94ebbffc1b 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -153,7 +153,7 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_rooms = ( DataStore.get_room_events_stream_for_rooms.__func__ ) - is_host_joined = DataStore.is_host_joined.__func__ + is_host_joined = RoomMemberStore.__dict__["is_host_joined"] get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__ _set_before_and_after = staticmethod(DataStore._set_before_and_after) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c80d181fc7..72ce84b0b8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -776,6 +776,11 @@ class EventsStore(SQLBaseStore): txn, self.get_rooms_for_user, (member,) ) + for host in set(get_domain_from_id(u) for u in members_changed): + self._invalidate_cache_and_stream( + txn, self.is_host_joined, (room_id, host) + ) + self._invalidate_cache_and_stream( txn, self.get_users_in_room, (room_id,) ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index e38bbd22a3..457ca288d0 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -501,7 +501,7 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(users_in_room) - @defer.inlineCallbacks + @cachedInlineCallbacks(max_entries=10000) def is_host_joined(self, room_id, host): if '%' in host or '_' in host: raise Exception("Invalid host name") -- cgit 1.4.1 From b58e24cc3c328e0577e6a8a44889d3639c0b289e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Jun 2017 10:16:31 +0100 Subject: Tweak the ranking of PG user dir search --- synapse/storage/user_directory.py | 53 +++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 13 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 6a4bf63f0d..0b874e0227 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -391,11 +391,14 @@ class UserDirectoryStore(SQLBaseStore): ] } """ - - search_query = _parse_query(self.database_engine, search_term) - if isinstance(self.database_engine, PostgresEngine): + full_query, exact_query, prefix_query = _parse_query_postgres(search_term) + # We order by rank and then if they have profile info + # The ranking algorithm is hand tweaked for "best" results. Broadly + # the idea is we give a higher weight to exact matches. + # The array of numbers are the weights for the various part of the + # search: (domain, _, display name, localpart) sql = """ SELECT user_id, display_name, avatar_url FROM user_directory_search @@ -403,13 +406,27 @@ class UserDirectoryStore(SQLBaseStore): INNER JOIN users_in_pubic_room USING (user_id) WHERE vector @@ to_tsquery('english', ?) ORDER BY - ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC, + 2 * ts_rank_cd( + '{0.1, 0.1, 0.9, 1.0}', + vector, + to_tsquery('english', ?), + 8 + ) + + ts_rank_cd( + '{0.1, 0.1, 0.9, 1.0}', + vector, + to_tsquery('english', ?), + 8 + ) + DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? """ - args = (search_query, search_query, limit + 1,) + args = (full_query, exact_query, prefix_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): + search_query = _parse_query_sqlite(search_term) + sql = """ SELECT user_id, display_name, avatar_url FROM user_directory_search @@ -439,7 +456,7 @@ class UserDirectoryStore(SQLBaseStore): }) -def _parse_query(database_engine, search_term): +def _parse_query_sqlite(search_term): """Takes a plain unicode string from the user and converts it into a form that can be passed to database. We use this so that we can add prefix matching, which isn't something @@ -451,11 +468,21 @@ def _parse_query(database_engine, search_term): # Pull out the individual words, discarding any non-word characters. results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + return " & ".join("(%s* | %s)" % (result, result,) for result in results) + + +def _parse_query_postgres(search_term): + """Takes a plain unicode string from the user and converts it into a form + that can be passed to database. + We use this so that we can add prefix matching, which isn't something + that is supported by default. + """ + + # Pull out the individual words, discarding any non-word characters. + results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + + both = " & ".join("(%s:* | %s)" % (result, result,) for result in results) + exact = " & ".join("%s" % (result,) for result in results) + prefix = " & ".join("%s:*" % (result,) for result in results) - if isinstance(database_engine, PostgresEngine): - return " & ".join("(%s:* | %s)" % (result, result,) for result in results) - elif isinstance(database_engine, Sqlite3Engine): - return " & ".join("(%s* | %s)" % (result, result,) for result in results) - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") + return both, exact, prefix -- cgit 1.4.1 From 6fd7e6db3d8c896d2e1b92efaad29596fd9cb39c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Jun 2017 11:11:26 +0100 Subject: Fix user dir to not assume existence of user --- synapse/storage/user_directory.py | 59 ++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 22 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 0b874e0227..3f3eee8621 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -151,37 +151,52 @@ class UserDirectoryStore(SQLBaseStore): def update_profile_in_user_dir(self, user_id, display_name, avatar_url): def _update_profile_in_user_dir_txn(txn): - self._simple_update_one_txn( + new_entry = self._simple_upsert_txn( txn, table="user_directory", keyvalues={"user_id": user_id}, - updatevalues={"display_name": display_name, "avatar_url": avatar_url}, + values={"display_name": display_name, "avatar_url": avatar_url}, + lock=False, # We're only inserter ) if isinstance(self.database_engine, PostgresEngine): # We weight the loclpart most highly, then display name and finally # server name - sql = """ - UPDATE user_directory_search - SET vector = setweight(to_tsvector('english', ?), 'A') - || setweight(to_tsvector('english', ?), 'D') - || setweight(to_tsvector('english', COALESCE(?, '')), 'B') - WHERE user_id = ? - """ - args = ( - get_localpart_from_id(user_id), get_domain_from_id(user_id), - display_name, - user_id, - ) + if new_entry: + sql = """ + INSERT INTO user_directory_search(user_id, vector) + VALUES (?, + setweight(to_tsvector('english', ?), 'A') + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') + ) + """ + args = ( + user_id, + get_localpart_from_id(user_id), get_domain_from_id(user_id), + display_name, + ) + else: + sql = """ + UPDATE user_directory_search + SET vector = setweight(to_tsvector('english', ?), 'A') + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') + WHERE user_id = ? + """ + args = ( + get_localpart_from_id(user_id), get_domain_from_id(user_id), + display_name, + user_id, + ) elif isinstance(self.database_engine, Sqlite3Engine): - sql = """ - UPDATE user_directory_search - set value = ? - WHERE user_id = ? - """ - args = ( - "%s %s" % (user_id, display_name,) if display_name else user_id, - user_id, + value = "%s %s" % (user_id, display_name,) if display_name else user_id + self._simple_upsert_txn( + txn, + table="user_directory_search", + keyvalues={"user_id": user_id}, + values={"value": value}, + lock=False, # We're only inserter ) else: # This should be unreachable. -- cgit 1.4.1 From 505e7e8b9d51a2529790632799e47a047be1f7d2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Jun 2017 11:19:18 +0100 Subject: Fix up sql --- synapse/storage/user_directory.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 3f3eee8621..67b14cf26a 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -171,10 +171,12 @@ class UserDirectoryStore(SQLBaseStore): || setweight(to_tsvector('english', COALESCE(?, '')), 'B') ) """ - args = ( - user_id, - get_localpart_from_id(user_id), get_domain_from_id(user_id), - display_name, + txn.execute( + sql, + ( + user_id, get_localpart_from_id(user_id), + get_domain_from_id(user_id), display_name, + ) ) else: sql = """ @@ -184,10 +186,12 @@ class UserDirectoryStore(SQLBaseStore): || setweight(to_tsvector('english', COALESCE(?, '')), 'B') WHERE user_id = ? """ - args = ( - get_localpart_from_id(user_id), get_domain_from_id(user_id), - display_name, - user_id, + txn.execute( + sql, + ( + get_localpart_from_id(user_id), get_domain_from_id(user_id), + display_name, user_id, + ) ) elif isinstance(self.database_engine, Sqlite3Engine): value = "%s %s" % (user_id, display_name,) if display_name else user_id @@ -202,8 +206,6 @@ class UserDirectoryStore(SQLBaseStore): # This should be unreachable. raise Exception("Unrecognized database engine") - txn.execute(sql, args) - txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) return self.runInteraction( -- cgit 1.4.1 From d9fd937e39cf04a3da15412bbdbd352bb175750a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Jun 2017 11:49:39 +0100 Subject: Fix user directory insertion due to missing room_id --- synapse/handlers/user_directory.py | 10 +++++++--- synapse/storage/user_directory.py | 3 ++- 2 files changed, 9 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index ce5a506b74..f4451e5dfb 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -203,7 +203,9 @@ class UserDirectoyHandler(object): if change is None: # Handle any profile changes - yield self._handle_profile_change(state_key, prev_event_id, event_id) + yield self._handle_profile_change( + state_key, room_id, prev_event_id, event_id, + ) continue if not change: @@ -372,7 +374,7 @@ class UserDirectoyHandler(object): yield self.store.remove_from_user_in_public_room(user_id) @defer.inlineCallbacks - def _handle_profile_change(self, user_id, prev_event_id, event_id): + def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): """Check member event changes for any profile changes and update the database if there are. """ @@ -395,7 +397,9 @@ class UserDirectoyHandler(object): new_avatar = event.content.get("avatar_url") if prev_name != new_name or prev_avatar != new_avatar: - yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar) + yield self.store.update_profile_in_user_dir( + user_id, new_name, new_avatar, room_id, + ) @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 67b14cf26a..137aca2881 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -149,12 +149,13 @@ class UserDirectoryStore(SQLBaseStore): ) self.get_user_in_directory.invalidate((user_id,)) - def update_profile_in_user_dir(self, user_id, display_name, avatar_url): + def update_profile_in_user_dir(self, user_id, display_name, avatar_url, room_id): def _update_profile_in_user_dir_txn(txn): new_entry = self._simple_upsert_txn( txn, table="user_directory", keyvalues={"user_id": user_id}, + insertion_values={"room_id": room_id}, values={"display_name": display_name, "avatar_url": avatar_url}, lock=False, # We're only inserter ) -- cgit 1.4.1 From ebcd55d641345467d583836ce1f06ffe5d36f98d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 09:45:48 +0100 Subject: Add DB schema for tracking users who share rooms --- synapse/storage/prepare_database.py | 2 +- synapse/storage/schema/delta/43/user_share.sql | 32 ++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/43/user_share.sql (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index eaba699e29..72b670b83b 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 42 +SCHEMA_VERSION = 43 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/43/user_share.sql b/synapse/storage/schema/delta/43/user_share.sql new file mode 100644 index 0000000000..f552b6eb7b --- /dev/null +++ b/synapse/storage/schema/delta/43/user_share.sql @@ -0,0 +1,32 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Table keeping track of who shares a room with who. We only keep track +-- of this for local users, so `user_id` is local users only (but we do keep track +-- of which remote users share a room) +CREATE TABLE users_who_share_rooms ( + user_id TEXT NOT NULL, + other_user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + share_private BOOLEAN NOT NULL -- is the shared room private? i.e. they share a private room +); + + +CREATE UNIQUE INDEX users_who_share_rooms_u_idx ON users_who_share_rooms(user_id, other_user_id); +CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id, user_id); + + +-- Make sure that we popualte the table initially +UPDATE user_directory_stream_pos SET stream_id = NULL; -- cgit 1.4.1 From 72613bc3798d34a7bf93defd6624b84669078e2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 09:59:04 +0100 Subject: Implement initial population of users who share rooms table --- synapse/handlers/user_directory.py | 78 ++++++++++++++++++++++- synapse/storage/user_directory.py | 124 ++++++++++++++++++++++++++++++++++--- 2 files changed, 193 insertions(+), 9 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index f4451e5dfb..581c078bb2 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -14,12 +14,12 @@ # limitations under the License. import logging - from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.storage.roommember import ProfileInfo from synapse.util.metrics import Measure +from synapse.util.async import sleep logger = logging.getLogger(__name__) @@ -41,12 +41,15 @@ class UserDirectoyHandler(object): one public room. """ + INITIAL_SLEEP_MS = 50 + def __init__(self, hs): self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id self.notifier.add_replication_callback(self.notify_new_event) @@ -55,6 +58,9 @@ class UserDirectoyHandler(object): self.initially_handled_users = set() self.initially_handled_users_in_public = set() + self.initially_handled_users_share = set() + self.initially_handled_users_share_private_room = set() + # The current position in the current_state_delta stream self.pos = None @@ -140,10 +146,14 @@ class UserDirectoyHandler(object): logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids)) yield self._handle_intial_room(room_id) num_processed_rooms += 1 + yield sleep(self.INITIAL_SLEEP_MS / 1000.) logger.info("Processed all rooms.") self.initially_handled_users = None + self.initially_handled_users_in_public = None + self.initially_handled_users_share = None + self.initially_handled_users_share_private_room = None yield self.store.update_user_directory_stream_pos(new_pos) @@ -158,7 +168,8 @@ class UserDirectoyHandler(object): is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) users_with_profile = yield self.state.get_current_user_in_room(room_id) - unhandled_users = set(users_with_profile) - self.initially_handled_users + user_ids = set(users_with_profile) + unhandled_users = user_ids - self.initially_handled_users yield self.store.add_profiles_to_user_dir( room_id, { @@ -175,6 +186,69 @@ class UserDirectoyHandler(object): ) self.initially_handled_users_in_public != unhandled_users + # We now go and figure out the new users who share rooms with user entries + # We sleep aggressively here as otherwise it can starve resources. + # We also batch up inserts/updates, but try to avoid too many at once. + to_insert = set() + to_update = set() + count = 0 + for user_id in user_ids: + if count % 100 == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + + if not self.is_mine_id(user_id): + count += 1 + continue + + for other_user_id in user_ids: + if user_id == other_user_id: + continue + + if count % 100 == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + count += 1 + + user_set = (user_id, other_user_id) + + if user_set in self.initially_handled_users_share_private_room: + continue + + if user_set in self.initially_handled_users_share: + if is_public: + continue + to_update.add(user_set) + else: + to_insert.add(user_set) + + if is_public: + self.initially_handled_users_share.add(user_set) + else: + self.initially_handled_users_share_private_room.add(user_set) + + if len(to_insert) > 100: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if len(to_update) > 100: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() + + if to_insert: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if to_update: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() + @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 137aca2881..0123e28f99 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -16,16 +16,19 @@ from twisted.internet import defer from ._base import SQLBaseStore + from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id, get_localpart_from_id import re +import logging +logger = logging.getLogger(__name__) -class UserDirectoryStore(SQLBaseStore): +class UserDirectoryStore(SQLBaseStore): @cachedInlineCallbacks(cache_context=True) def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): """Check if the room is either world_readable or publically joinable @@ -281,14 +284,118 @@ class UserDirectoryStore(SQLBaseStore): desc="get_users_in_dir_due_to_room", ) + @defer.inlineCallbacks def get_all_rooms(self): - """Get all room_ids we've ever known about + """Get all room_ids we've ever known about, in ascending order of "size" """ - return self._simple_select_onecol( - table="current_state_events", - keyvalues={}, - retcol="DISTINCT room_id", - desc="get_all_rooms", + sql = """ + SELECT room_id FROM current_state_events + GROUP BY room_id + ORDER BY count(*) ASC + """ + rows = yield self._execute("get_all_rooms", None, sql) + defer.returnValue([room_id for room_id, in rows]) + + def add_users_who_share_room(self, room_id, share_private, user_id_tuples): + """Insert entries into the users_who_share_rooms table. The first + user should be a local user. + + Args: + room_id (str) + share_private (bool): Is the room private + user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. + """ + def _add_users_who_share_room_txn(txn): + self._simple_insert_many_txn( + txn, + table="users_who_share_rooms", + values=[ + { + "user_id": user_id, + "other_user_id": other_user_id, + "room_id": room_id, + "share_private": share_private, + } + for user_id, other_user_id in user_id_tuples + ], + ) + for user_id, other_user_id in user_id_tuples: + txn.call_after( + self.get_users_who_share_room_from_dir.invalidate, + (user_id,), + ) + txn.call_after( + self.get_if_users_share_a_room.invalidate, + (user_id, other_user_id), + ) + return self.runInteraction( + "add_users_who_share_room", _add_users_who_share_room_txn + ) + + def update_users_who_share_room(self, room_id, share_private, user_id_sets): + """Updates entries in the users_who_share_rooms table. The first + user should be a local user. + + Args: + room_id (str) + share_private (bool): Is the room private + user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. + """ + def _update_users_who_share_room_txn(txn): + sql = """ + UPDATE users_who_share_rooms + SET room_id = ?, share_private = ? + WHERE user_id = ? AND other_user_id = ? + """ + txn.executemany( + sql, + ( + (room_id, share_private, uid, oid) + for uid, oid in user_id_sets + ) + ) + for user_id, other_user_id in user_id_sets: + txn.call_after( + self.get_users_who_share_room_from_dir.invalidate, + (user_id,), + ) + txn.call_after( + self.get_if_users_share_a_room.invalidate, + (user_id, other_user_id), + ) + return self.runInteraction( + "update_users_who_share_room", _update_users_who_share_room_txn + ) + + def remove_user_who_share_room(self, user_id, other_user_id): + """Deletes entries in the users_who_share_rooms table. The first + user should be a local user. + + Args: + room_id (str) + share_private (bool): Is the room private + user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. + """ + def _remove_user_who_share_room_txn(txn): + self._simple_delete_txn( + txn, + table="users_who_share_rooms", + keyvalues={ + "user_id": user_id, + "other_user_id": other_user_id, + }, + ) + txn.call_after( + self.get_users_who_share_room_from_dir.invalidate, + (user_id,), + ) + txn.call_after( + self.get_if_users_share_a_room.invalidate, + (user_id, other_user_id), + ) + + return self.runInteraction( + "remove_user_who_share_room", _remove_user_who_share_room_txn ) def delete_all_from_user_dir(self): @@ -298,8 +405,11 @@ class UserDirectoryStore(SQLBaseStore): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") txn.execute("DELETE FROM users_in_pubic_room") + txn.execute("DELETE FROM users_who_share_rooms") txn.call_after(self.get_user_in_directory.invalidate_all) txn.call_after(self.get_user_in_public_room.invalidate_all) + txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all) + txn.call_after(self.get_if_users_share_a_room.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn ) -- cgit 1.4.1 From 4564b05483c8568b5435cfc527a73a6a7696fa61 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 10:15:00 +0100 Subject: Implement updating users who share rooms on the fly --- synapse/handlers/user_directory.py | 148 +++++++++++++++++++++++++++++++------ synapse/storage/user_directory.py | 111 +++++++++++++++++++++++++++- 2 files changed, 235 insertions(+), 24 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 581c078bb2..aa8af95177 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -390,12 +390,77 @@ class UserDirectoyHandler(object): room_id ) - if not is_public: - return + if is_public: + row = yield self.store.get_user_in_public_room(user_id) + if not row: + yield self.store.add_users_to_public_room(room_id, [user_id]) - row = yield self.store.get_user_in_public_room(user_id) - if not row: - yield self.store.add_users_to_public_room(room_id, [user_id]) + # Now we update users who share rooms with users. We do this by getting + # all the current users in the room and seeing which aren't already + # marked in the database as sharing with `user_id` + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + to_insert = set() + to_update = set() + + # First, if they're our user then we need to update for every user + if self.is_mine_id(user_id): + # Returns a map of other_user_id -> shared_private. We only need + # to update mappings if for users that either don't share a room + # already (aren't in the map) or, if the room is private, those that + # only share a public room. + user_ids_shared = yield self.store.get_users_who_share_room_from_dir( + user_id + ) + + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue + + shared_is_private = user_ids_shared.get(other_user_id) + if shared_is_private is True: + # We've already marked in the database they share a private room + continue + elif shared_is_private is False: + # They already share a public room, so only update if this is + # a private room + if not is_public: + to_update.add((user_id, other_user_id)) + elif shared_is_private is None: + # This is the first time they both share a room + to_insert.add((user_id, other_user_id)) + + # Next we need to update for every local user in the room + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue + + if self.is_mine_id(other_user_id): + shared_is_private = yield self.store.get_if_users_share_a_room( + other_user_id, user_id, + ) + if shared_is_private is True: + # We've already marked in the database they share a private room + continue + elif shared_is_private is False: + # They already share a public room, so only update if this is + # a private room + if not is_public: + to_update.add((other_user_id, user_id)) + elif shared_is_private is None: + # This is the first time they both share a room + to_insert.add((other_user_id, user_id)) + + if to_insert: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + + if to_update: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): @@ -413,32 +478,29 @@ class UserDirectoyHandler(object): row = yield self.store.get_user_in_public_room(user_id) update_user_in_public = row and row["room_id"] == room_id - if not update_user_in_public and not update_user_dir: - return - - # XXX: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - if not update_user_in_public and not update_user_dir: - break + if (update_user_in_public or update_user_dir): + # XXX: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for j_room_id in rooms: + if (not update_user_in_public and not update_user_dir): + break - is_in_room = yield self.store.is_host_joined( - j_room_id, self.server_name, - ) + is_in_room = yield self.store.is_host_joined( + j_room_id, self.server_name, + ) - if not is_in_room: - continue + if not is_in_room: + continue - if update_user_dir: - update_user_dir = False - yield self.store.update_user_in_user_dir(user_id, j_room_id) + if update_user_dir: + update_user_dir = False + yield self.store.update_user_in_user_dir(user_id, j_room_id) - if update_user_in_public: is_public = yield self.store.is_room_world_readable_or_publicly_joinable( j_room_id ) - if is_public: + if update_user_in_public and is_public: yield self.store.update_user_in_public_user_list(user_id, j_room_id) update_user_in_public = False @@ -447,6 +509,46 @@ class UserDirectoyHandler(object): elif update_user_in_public: yield self.store.remove_from_user_in_public_room(user_id) + # Now handle users_who_share_rooms. + + # Get a list of user tuples that were in the DB due to this room and + # users (this includes tuples where the other user matches `user_id`) + user_tuples = yield self.store.get_users_in_share_dir_with_room_id( + user_id, room_id, + ) + + for user_id, other_user_id in user_tuples: + # For each user tuple get a list of rooms that they still share, + # trying to find a private room, and update the entry in the DB + rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id) + + # If they dont share a room anymore, remove the mapping + if not rooms: + yield self.store.remove_user_who_share_room( + user_id, other_user_id, + ) + continue + + found_public_share = None + for j_room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + found_public_share = j_room_id + else: + found_public_share = None + yield self.store.update_users_who_share_room( + room_id, not is_public, [(user_id, other_user_id)], + ) + break + + if found_public_share: + yield self.store.update_users_who_share_room( + room_id, not is_public, [(user_id, other_user_id)], + ) + @defer.inlineCallbacks def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): """Check member event changes for any profile changes and update the diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 0123e28f99..2a17cbc9e9 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -273,17 +273,38 @@ class UserDirectoryStore(SQLBaseStore): desc="get_users_in_public_due_to_room", ) + @defer.inlineCallbacks def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're in the given room_id """ - return self._simple_select_onecol( + user_ids_dir = yield self._simple_select_onecol( table="user_directory", keyvalues={"room_id": room_id}, retcol="user_id", desc="get_users_in_dir_due_to_room", ) + user_ids_pub = yield self._simple_select_onecol( + table="users_in_pubic_room", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_dir_due_to_room", + ) + + user_ids_share = yield self._simple_select_onecol( + table="users_who_share_rooms", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_dir_due_to_room", + ) + + user_ids = set(user_ids_dir) + user_ids.update(user_ids_pub) + user_ids.update(user_ids_share) + + defer.returnValue(user_ids) + @defer.inlineCallbacks def get_all_rooms(self): """Get all room_ids we've ever known about, in ascending order of "size" @@ -398,6 +419,94 @@ class UserDirectoryStore(SQLBaseStore): "remove_user_who_share_room", _remove_user_who_share_room_txn ) + @cached(max_entries=500000) + def get_if_users_share_a_room(self, user_id, other_user_id): + """Gets if users share a room. + + Args: + user_id (str): Must be a local user_id + other_user_id (str) + + Returns: + bool|None: None if they don't share a room, otherwise whether they + share a private room or not. + """ + return self._simple_select_one_onecol( + table="users_who_share_rooms", + keyvalues={ + "user_id": user_id, + "other_user_id": other_user_id, + }, + retcol="share_private", + allow_none=True, + ) + + @cachedInlineCallbacks(max_entries=500000, iterable=True) + def get_users_who_share_room_from_dir(self, user_id): + """Returns the set of users who share a room with `user_id` + + Args: + user_id(str): Must be a local user + + Returns: + dict: user_id -> share_private mapping + """ + rows = yield self._simple_select_list( + table="users_who_share_rooms", + keyvalues={ + "user_id": user_id, + }, + retcols=("other_user_id", "share_private",), + desc="get_users_who_share_room_with_user", + ) + + defer.returnValue({ + row["other_user_id"]: row["share_private"] + for row in rows + }) + + def get_users_in_share_dir_with_room_id(self, user_id, room_id): + """Get all user tuples that are in the users_who_share_rooms due to the + given room_id. + + Returns: + [(user_id, other_user_id)]: where one of the two will match the given + user_id. + """ + sql = """ + SELECT user_id, other_user_id FROM users_who_share_rooms + WHERE room_id = ? AND (user_id = ? OR other_user_id = ?) + """ + return self._execute( + "get_users_in_share_dir_with_room_id", None, sql, room_id, user_id, user_id + ) + + @defer.inlineCallbacks + def get_rooms_in_common_for_users(self, user_id, other_user_id): + """Given two user_ids find out the list of rooms they share. + """ + sql = """ + SELECT room_id FROM ( + SELECT c.room_id FROM current_state_events AS c + INNER JOIN room_memberships USING (event_id) + WHERE type = 'm.room.member' + AND membership = 'join' + AND state_key = ? + ) AS f1 INNER JOIN ( + SELECT c.room_id FROM current_state_events AS c + INNER JOIN room_memberships USING (event_id) + WHERE type = 'm.room.member' + AND membership = 'join' + AND state_key = ? + ) f2 USING (room_id) + """ + + rows = yield self._execute( + "get_rooms_in_common_for_users", None, sql, user_id, other_user_id + ) + + defer.returnValue([room_id for room_id, in rows]) + def delete_all_from_user_dir(self): """Delete the entire user directory """ -- cgit 1.4.1 From a9d6fa8b2b31096b8f9fdb01b8fb5a2c6386e61f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 10:00:28 +0100 Subject: Include users who share room with requester in user directory --- synapse/handlers/user_directory.py | 4 +- synapse/rest/client/v2_alpha/user_directory.py | 8 +++- synapse/storage/user_directory.py | 61 ++++++++++++++++---------- 3 files changed, 47 insertions(+), 26 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index aa8af95177..8928786fd6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -71,7 +71,7 @@ class UserDirectoyHandler(object): # we start populating the user directory self.clock.call_later(0, self.notify_new_event) - def search_users(self, search_term, limit): + def search_users(self, user_id, search_term, limit): """Searches for users in directory Returns: @@ -88,7 +88,7 @@ class UserDirectoyHandler(object): ] } """ - return self.store.search_user_dir(search_term, limit) + return self.store.search_user_dir(user_id, search_term, limit) @defer.inlineCallbacks def notify_new_event(self): diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py index 17d3dffc8f..6e012da4aa 100644 --- a/synapse/rest/client/v2_alpha/user_directory.py +++ b/synapse/rest/client/v2_alpha/user_directory.py @@ -55,7 +55,9 @@ class UserDirectorySearchRestServlet(RestServlet): ] } """ - yield self.auth.get_user_by_req(request, allow_guest=False) + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + body = parse_json_object_from_request(request) limit = body.get("limit", 10) @@ -66,7 +68,9 @@ class UserDirectorySearchRestServlet(RestServlet): except: raise SynapseError(400, "`search_term` is required field") - results = yield self.user_directory_handler.search_users(search_term, limit) + results = yield self.user_directory_handler.search_users( + user_id, search_term, limit, + ) defer.returnValue((200, results)) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 2a17cbc9e9..52b184fe78 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -611,7 +611,7 @@ class UserDirectoryStore(SQLBaseStore): ) @defer.inlineCallbacks - def search_user_dir(self, search_term, limit): + def search_user_dir(self, user_id, search_term, limit): """Searches for users in directory Returns: @@ -637,46 +637,63 @@ class UserDirectoryStore(SQLBaseStore): # The array of numbers are the weights for the various part of the # search: (domain, _, display name, localpart) sql = """ - SELECT user_id, display_name, avatar_url + SELECT d.user_id, display_name, avatar_url FROM user_directory_search - INNER JOIN user_directory USING (user_id) - INNER JOIN users_in_pubic_room USING (user_id) - WHERE vector @@ to_tsquery('english', ?) + INNER JOIN user_directory AS d USING (user_id) + LEFT JOIN users_in_pubic_room AS p USING (user_id) + LEFT JOIN ( + SELECT other_user_id AS user_id FROM users_who_share_rooms + WHERE user_id = ? AND share_private + ) AS s USING (user_id) + WHERE + (s.user_id IS NOT NULL OR p.user_id IS NOT NULL) + AND vector @@ to_tsquery('english', ?) ORDER BY - 2 * ts_rank_cd( - '{0.1, 0.1, 0.9, 1.0}', - vector, - to_tsquery('english', ?), - 8 - ) - + ts_rank_cd( - '{0.1, 0.1, 0.9, 1.0}', - vector, - to_tsquery('english', ?), - 8 + (CASE WHEN s.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END) + * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END) + * (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END) + * ( + 3 * ts_rank_cd( + '{0.1, 0.1, 0.9, 1.0}', + vector, + to_tsquery('english', ?), + 8 + ) + + ts_rank_cd( + '{0.1, 0.1, 0.9, 1.0}', + vector, + to_tsquery('english', ?), + 8 + ) ) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? """ - args = (full_query, exact_query, prefix_query, limit + 1,) + args = (user_id, full_query, exact_query, prefix_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): search_query = _parse_query_sqlite(search_term) sql = """ - SELECT user_id, display_name, avatar_url + SELECT d.user_id, display_name, avatar_url FROM user_directory_search - INNER JOIN user_directory USING (user_id) - INNER JOIN users_in_pubic_room USING (user_id) - WHERE value MATCH ? + INNER JOIN user_directory AS d USING (user_id) + LEFT JOIN users_in_pubic_room AS p USING (user_id) + LEFT JOIN ( + SELECT other_user_id AS user_id FROM users_who_share_rooms + WHERE user_id = ? AND share_private + ) AS s USING (user_id) + WHERE + (s.user_id IS NOT NULL OR p.user_id IS NOT NULL) + AND value MATCH ? ORDER BY rank(matchinfo(user_directory_search)) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? """ - args = (search_query, limit + 1) + args = (user_id, search_query, limit + 1) else: # This should be unreachable. raise Exception("Unrecognized database engine") -- cgit 1.4.1 From 6aa5bc86351a617546f0adacfebab3388716be3f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 12:47:05 +0100 Subject: Initial worker impl --- synapse/app/federation_sender.py | 2 +- synapse/app/user_dir.py | 270 +++++++++++++++++++++++++++++++++++++ synapse/config/server.py | 4 + synapse/handlers/user_directory.py | 19 ++- synapse/replication/tcp/streams.py | 22 +++ synapse/storage/events.py | 18 +++ 6 files changed, 328 insertions(+), 7 deletions(-) create mode 100644 synapse/app/user_dir.py (limited to 'synapse/storage') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index e51a69074d..03327dc47a 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -51,7 +51,7 @@ import sys import logging import gc -logger = logging.getLogger("synapse.app.appservice") +logger = logging.getLogger("synapse.app.federation_sender") class FederationSenderSlaveStore( diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py new file mode 100644 index 0000000000..9d8edaa8e3 --- /dev/null +++ b/synapse/app/user_dir.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.config._base import ConfigError +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig +from synapse.crypto import context_factory +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v2_alpha import user_directory +from synapse.storage.engines import create_engine +from synapse.storage.client_ips import ClientIpStore +from synapse.storage.user_directory import UserDirectoryStore +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.util.caches.stream_change_cache import StreamChangeCache + +from synapse import events + +from twisted.internet import reactor +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.user_dir") + + +class UserDirectorySlaveStore( + SlavedEventStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + UserDirectoryStore, + BaseSlavedStore, + ClientIpStore, # After BaseSlavedStore because the constructor is different +): + def __init__(self, db_conn, hs): + super(UserDirectorySlaveStore, self).__init__(db_conn, hs) + + events_max = self._stream_id_gen.get_current_token() + curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( + db_conn, "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache = StreamChangeCache( + "_curr_state_delta_stream_cache", min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, + ) + + self._current_state_delta_pos = events_max + + def stream_positions(self): + result = super(UserDirectorySlaveStore, self).stream_positions() + result["current_state_deltas"] = self._current_state_delta_pos + return result + + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "current_state_deltas": + self._current_state_delta_pos = token + for row in rows: + self._curr_state_delta_stream_cache.entity_has_changed( + row.room_id, token + ) + return super(UserDirectorySlaveStore, self).process_replication_rows( + stream_name, token, rows + ) + + +class UserDirectoryServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_addresses = listener_config["bind_addresses"] + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + user_directory.register_servlets(self, resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + + for address in bind_addresses: + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=address + ) + + logger.info("Synapse user_dir now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + bind_addresses = listener["bind_addresses"] + + for address in bind_addresses: + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=address + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return UserDirectoryReplicationHandler(self) + + +class UserDirectoryReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore()) + self.user_directory = hs.get_user_directory_handler() + + def on_rdata(self, stream_name, token, rows): + super(UserDirectoryReplicationHandler, self).on_rdata( + stream_name, token, rows + ) + if stream_name == "current_state_deltas": + preserve_fn(self.user_directory.notify_new_event)() + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse user directory", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.user_dir" + + setup_logging(config, use_worker_options=True) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + if config.update_user_directory: + sys.stderr.write( + "\nThe update_user_directory must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``update_user_directory: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.update_user_directory = True + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ps = UserDirectoryServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ps.setup() + ps.start_listening(config.worker_listeners) + + def run(): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ps.get_datastore().start_profiling() + ps.get_state_handler().start_caching() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-user-dir", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/config/server.py b/synapse/config/server.py index 3910b9dc31..28b4e5f50c 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -35,6 +35,10 @@ class ServerConfig(Config): # "disable" federation self.send_federation = config.get("send_federation", True) + # Whether to update the user directory or not. This should be set to + # false only if we are updating the user directory in a worker + self.update_user_directory = config.get("update_user_directory", True) + self.filter_timeline_limit = config.get("filter_timeline_limit", -1) if self.public_baseurl is not None: diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 8928786fd6..d33a20a1f2 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -50,8 +50,7 @@ class UserDirectoyHandler(object): self.clock = hs.get_clock() self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id - - self.notifier.add_replication_callback(self.notify_new_event) + self.update_user_directory = hs.config.update_user_directory # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already @@ -67,9 +66,12 @@ class UserDirectoyHandler(object): # Guard to ensure we only process deltas one at a time self._is_processing = False - # We kick this off so that we don't have to wait for a change before - # we start populating the user directory - self.clock.call_later(0, self.notify_new_event) + if self.update_user_directory: + self.notifier.add_replication_callback(self.notify_new_event) + + # We kick this off so that we don't have to wait for a change before + # we start populating the user directory + self.clock.call_later(0, self.notify_new_event) def search_users(self, user_id, search_term, limit): """Searches for users in directory @@ -94,6 +96,9 @@ class UserDirectoyHandler(object): def notify_new_event(self): """Called when there may be more deltas to process """ + if not self.update_user_directory: + return + if self._is_processing: return @@ -324,7 +329,7 @@ class UserDirectoyHandler(object): event_id (str|None): The new event after the state change typ (str): Type of the event """ - logger.debug("Handling change for %s", typ) + logger.debug("Handling change for %s: %s", typ, room_id) if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( @@ -394,6 +399,8 @@ class UserDirectoyHandler(object): row = yield self.store.get_user_in_public_room(user_id) if not row: yield self.store.add_users_to_public_room(room_id, [user_id]) + else: + logger.debug("Not adding user to public dir, %r", user_id) # Now we update users who share rooms with users. We do this by getting # all the current users in the room and seeing which aren't already diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 369d5f2428..fbafe12cc2 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -112,6 +112,12 @@ AccountDataStreamRow = namedtuple("AccountDataStream", ( "data_type", # str "data", # dict )) +CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( + "room_id", # str + "type", # str + "state_key", # str + "event_id", # str, optional +)) class Stream(object): @@ -443,6 +449,21 @@ class AccountDataStream(Stream): defer.returnValue(results) +class CurrentStateDeltaStream(Stream): + """Current state for a room was changed + """ + NAME = "current_state_deltas" + ROW_TYPE = CurrentStateDeltaStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_current_state_delta_stream_id + self.update_function = store.get_all_updated_current_state_deltas + + super(CurrentStateDeltaStream, self).__init__(hs) + + STREAMS_MAP = { stream.NAME: stream for stream in ( @@ -460,5 +481,6 @@ STREAMS_MAP = { FederationStream, TagAccountDataStream, AccountDataStream, + CurrentStateDeltaStream, ) } diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 72ce84b0b8..90041b0da4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2284,6 +2284,24 @@ class EventsStore(SQLBaseStore): defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) + def get_max_current_state_delta_stream_id(self): + return self._stream_id_gen.get_current_token() + + def get_all_updated_current_state_deltas(self, from_token, to_token, limit): + def get_all_updated_current_state_deltas_txn(txn): + sql = """ + SELECT stream_id, room_id, type, state_key, event_id + FROM current_state_delta_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC LIMIT ? + """ + txn.execute(sql, (from_token, to_token, limit)) + return txn.fetchall() + return self.runInteraction( + "get_all_updated_current_state_deltas", + get_all_updated_current_state_deltas_txn, + ) + AllNewEventsResult = namedtuple("AllNewEventsResult", [ "new_forward_events", "new_backfill_events", -- cgit 1.4.1 From d7fe6b356c5b74ffc5681f85a0d6100f4b4f2295 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Jun 2017 12:36:28 +0100 Subject: Add shutdown room API --- synapse/handlers/federation.py | 4 ++ synapse/handlers/room_member.py | 5 ++ synapse/rest/client/v1/admin.py | 67 ++++++++++++++++++++++- synapse/storage/directory.py | 14 +++++ synapse/storage/room.py | 24 ++++++++ synapse/storage/schema/delta/43/blocked_rooms.sql | 21 +++++++ 6 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/43/blocked_rooms.sql (limited to 'synapse/storage') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 39d2bee8da..f7ae369a1d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1068,6 +1068,10 @@ class FederationHandler(BaseHandler): """ event = pdu + is_blocked = yield self.store.is_room_blocked(event.room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + event.internal_metadata.outlier = True event.internal_metadata.invite_from_remote = True diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 1ca88517a2..c0d9c08367 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -203,6 +203,11 @@ class RoomMemberHandler(BaseHandler): if not remote_room_hosts: remote_room_hosts = [] + if effective_membership_state not in ("leave", "ban",): + is_blocked = yield self.store.is_room_blocked(room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 29fcd72375..086f7a0984 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -15,8 +15,9 @@ from twisted.internet import defer +from synapse.api.constants import Membership from synapse.api.errors import AuthError, SynapseError -from synapse.types import UserID +from synapse.types import UserID, create_requester from synapse.http.servlet import parse_json_object_from_request from .base import ClientV1RestServlet, client_path_patterns @@ -157,6 +158,69 @@ class DeactivateAccountRestServlet(ClientV1RestServlet): defer.returnValue((200, {})) +class ShutdownRoomRestServlet(ClientV1RestServlet): + """Shuts down a room by removing all local users from the room and blocking + all future invites and joins to the room. Any local aliases will be repointed + to a given room id. + """ + PATTERNS = client_path_patterns("/admin/shutdown_room/(?P[^/]+)") + + def __init__(self, hs): + super(ShutdownRoomRestServlet, self).__init__(hs) + self.store = hs.get_datastore() + self.handlers = hs.get_handlers() + self.state = hs.get_state_handler() + + @defer.inlineCallbacks + def on_POST(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + if not is_admin: + raise AuthError(403, "You are not a server admin") + + content = parse_json_object_from_request(request) + + repoint_aliases_to_room_id = content.get("repoint_aliases_to_room_id") + if not repoint_aliases_to_room_id: + raise SynapseError(400, "Please provide field `repoint_aliases_to_room_id`") + + requester_user_id = requester.user.to_string() + + logger.info("Shutting down room %r", room_id) + + yield self.store.block_room(room_id, requester_user_id) + + users = yield self.state.get_current_user_in_room(room_id) + kicked_users = [] + for user_id in users: + if not self.hs.is_mine_id(user_id): + continue + + logger.info("Kicking %r from %r...", user_id, room_id) + + target_requester = create_requester(user_id) + yield self.handlers.room_member_handler.update_membership( + requester=target_requester, + target=target_requester.user, + room_id=room_id, + action=Membership.LEAVE, + content={}, + ) + + kicked_users.append(user_id) + + aliases_for_room = yield self.store.get_aliases_for_room(room_id) + + yield self.store.update_aliases_for_room( + room_id, repoint_aliases_to_room_id, requester_user_id + ) + + defer.returnValue((200, { + "kicked_users": kicked_users, + "local_aliases": aliases_for_room, + })) + + class ResetPasswordRestServlet(ClientV1RestServlet): """Post request to allow an administrator reset password for a user. This need a user have a administrator access in Synapse. @@ -353,3 +417,4 @@ def register_servlets(hs, http_server): ResetPasswordRestServlet(hs).register(http_server) GetUsersPaginatedRestServlet(hs).register(http_server) SearchUsersRestServlet(hs).register(http_server) + ShutdownRoomRestServlet(hs).register(http_server) diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 9caaf81f2c..79e7c540ad 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -170,3 +170,17 @@ class DirectoryStore(SQLBaseStore): "room_alias", desc="get_aliases_for_room", ) + + def update_aliases_for_room(self, old_room_id, new_room_id, creator): + def _update_aliases_for_room_txn(txn): + sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?" + txn.execute(sql, (new_room_id, creator, old_room_id,)) + self._invalidate_cache_and_stream( + txn, self.get_aliases_for_room, (old_room_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_aliases_for_room, (new_room_id,) + ) + return self.runInteraction( + "_update_aliases_for_room_txn", _update_aliases_for_room_txn + ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 5d543652bb..07366f66b6 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -507,3 +507,27 @@ class RoomStore(SQLBaseStore): )) else: defer.returnValue(None) + + @cached(max_entries=10000) + def is_room_blocked(self, room_id): + return self._simple_select_one_onecol( + table="blocked_rooms", + keyvalues={ + "room_id": room_id, + }, + retcol="1", + allow_none=True, + desc="is_room_blocked", + ) + + @defer.inlineCallbacks + def block_room(self, room_id, user_id): + yield self._simple_insert( + table="blocked_rooms", + values={ + "room_id": room_id, + "user_id": user_id, + }, + desc="block_room", + ) + self.is_room_blocked.invalidate((room_id,)) diff --git a/synapse/storage/schema/delta/43/blocked_rooms.sql b/synapse/storage/schema/delta/43/blocked_rooms.sql new file mode 100644 index 0000000000..0e3cd143ff --- /dev/null +++ b/synapse/storage/schema/delta/43/blocked_rooms.sql @@ -0,0 +1,21 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE blocked_rooms ( + room_id TEXT NOT NULL, + user_id TEXT NOT NULL -- Admin who blocked the room +); + +CREATE UNIQUE INDEX blocked_rooms_idx ON blocked_rooms(room_id); -- cgit 1.4.1 From b8b936a6eab46cec2460fb723124bb3a750d3c83 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Jun 2017 17:39:21 +0100 Subject: Add API to quarantine media --- synapse/rest/client/v1/admin.py | 25 ++++++++ synapse/rest/media/v1/download_resource.py | 2 +- synapse/rest/media/v1/media_repository.py | 2 + synapse/rest/media/v1/thumbnail_resource.py | 4 +- synapse/storage/media_repository.py | 4 +- synapse/storage/room.py | 70 ++++++++++++++++++++++ .../storage/schema/delta/43/quarantine_media.sql | 17 ++++++ 7 files changed, 119 insertions(+), 5 deletions(-) create mode 100644 synapse/storage/schema/delta/43/quarantine_media.sql (limited to 'synapse/storage') diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index aaa3dffb1b..7d786e8de3 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -270,6 +270,30 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): })) +class QuarantineMediaInRoom(ClientV1RestServlet): + """Quarantines all media in a room so that no one can download it via + this server. + """ + PATTERNS = client_path_patterns("/admin/quarantine_media/(?P[^/]+)") + + def __init__(self, hs): + super(QuarantineMediaInRoom, self).__init__(hs) + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_POST(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + if not is_admin: + raise AuthError(403, "You are not a server admin") + + num_quarantined = yield self.store.quarantine_media_ids_in_room( + room_id, requester.user.to_string(), + ) + + defer.returnValue((200, {"num_quarantined": num_quarantined})) + + class ResetPasswordRestServlet(ClientV1RestServlet): """Post request to allow an administrator reset password for a user. This need a user have a administrator access in Synapse. @@ -467,3 +491,4 @@ def register_servlets(hs, http_server): GetUsersPaginatedRestServlet(hs).register(http_server) SearchUsersRestServlet(hs).register(http_server) ShutdownRoomRestServlet(hs).register(http_server) + QuarantineMediaInRoom(hs).register(http_server) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 6788375e85..39a286b83c 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -66,7 +66,7 @@ class DownloadResource(Resource): @defer.inlineCallbacks def _respond_local_file(self, request, media_id, name): media_info = yield self.store.get_local_media(media_id) - if not media_info: + if not media_info or media_info["quarantined_by"]: respond_404(request) return diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index bae2b4c757..0718f75241 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -135,6 +135,8 @@ class MediaRepository(object): media_info = yield self._download_remote_file( server_name, media_id ) + elif media_info["quarantined_by"]: + raise NotFoundError() else: self.recently_accessed_remotes.add((server_name, media_id)) yield self.store.update_cached_last_access_time( diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index d8f54adc99..59b2c39b2f 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -81,7 +81,7 @@ class ThumbnailResource(Resource): method, m_type): media_info = yield self.store.get_local_media(media_id) - if not media_info: + if not media_info or media_info["quarantined_by"]: respond_404(request) return @@ -117,7 +117,7 @@ class ThumbnailResource(Resource): desired_type): media_info = yield self.store.get_local_media(media_id) - if not media_info: + if not media_info or media_info["quarantined_by"]: respond_404(request) return diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 4c0f82353d..5f0f18ee66 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -30,7 +30,7 @@ class MediaRepositoryStore(SQLBaseStore): return self._simple_select_one( "local_media_repository", {"media_id": media_id}, - ("media_type", "media_length", "upload_name", "created_ts"), + ("media_type", "media_length", "upload_name", "created_ts", "quarantined_by"), allow_none=True, desc="get_local_media", ) @@ -138,7 +138,7 @@ class MediaRepositoryStore(SQLBaseStore): {"media_origin": origin, "media_id": media_id}, ( "media_type", "media_length", "upload_name", "created_ts", - "filesystem_id", + "filesystem_id", "quarantined_by", ), allow_none=True, desc="get_cached_remote_media", diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 07366f66b6..e9c1549c00 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -24,6 +24,7 @@ from .engines import PostgresEngine, Sqlite3Engine import collections import logging import ujson as json +import re logger = logging.getLogger(__name__) @@ -531,3 +532,72 @@ class RoomStore(SQLBaseStore): desc="block_room", ) self.is_room_blocked.invalidate((room_id,)) + + def quarantine_media_ids_in_room(self, room_id, quarantined_by): + """For a room loops through all events with media and quarantines + the associated media + """ + def _get_media_ids_in_room(txn): + mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)") + + next_token = self.get_current_events_token() + 1 + + total_media_quarantined = 0 + + while next_token: + sql = """ + SELECT stream_ordering, content FROM events + WHERE room_id = ? + AND stream_ordering < ? + AND contains_url = ? AND outlier = ? + ORDER BY stream_ordering DESC + LIMIT ? + """ + txn.execute(sql, (room_id, next_token, True, False, 100)) + + next_token = None + local_media_mxcs = [] + remote_media_mxcs = [] + for stream_ordering, content_json in txn: + next_token = stream_ordering + content = json.loads(content_json) + + url = content.get("url") + if not url: + continue + + matches = mxc_re.match(url) + if matches: + hostname = matches.group(1) + media_id = matches.group(2) + if hostname == self.hostname: + local_media_mxcs.append(media_id) + else: + remote_media_mxcs.append((hostname, media_id)) + + # Now update all the tables to set the quarantined_by flag + + txn.executemany(""" + UPDATE local_media_repository + SET quarantined_by = ? + WHERE media_id = ? + """, ((quarantined_by, media_id) for media_id in local_media_mxcs)) + + txn.executemany( + """ + UPDATE remote_media_cache + SET quarantined_by = ? + WHERE media_origin AND media_id = ? + """, + ( + (quarantined_by, origin, media_id) + for origin, media_id in remote_media_mxcs + ) + ) + + total_media_quarantined += len(local_media_mxcs) + total_media_quarantined += len(remote_media_mxcs) + + return total_media_quarantined + + return self.runInteraction("get_media_ids_in_room", _get_media_ids_in_room) diff --git a/synapse/storage/schema/delta/43/quarantine_media.sql b/synapse/storage/schema/delta/43/quarantine_media.sql new file mode 100644 index 0000000000..630907ec4f --- /dev/null +++ b/synapse/storage/schema/delta/43/quarantine_media.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE local_media_repository ADD COLUMN quarantined_by TEXT; +ALTER TABLE remote_media_cache ADD COLUMN quarantined_by TEXT; -- cgit 1.4.1 From 385dcb7c60a0762f29779e9c2ab6a984636092df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Jun 2017 17:47:55 +0100 Subject: Handle thumbnail urls --- synapse/storage/room.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/room.py b/synapse/storage/room.py index e9c1549c00..23688430b7 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -562,18 +562,20 @@ class RoomStore(SQLBaseStore): next_token = stream_ordering content = json.loads(content_json) - url = content.get("url") - if not url: - continue - - matches = mxc_re.match(url) - if matches: - hostname = matches.group(1) - media_id = matches.group(2) - if hostname == self.hostname: - local_media_mxcs.append(media_id) - else: - remote_media_mxcs.append((hostname, media_id)) + content_url = content.get("url") + thumbnail_url = content.get("info", {}).get("thumbnail_url") + + for url in (content_url, thumbnail_url): + if not url: + continue + matches = mxc_re.match(url) + if matches: + hostname = matches.group(1) + media_id = matches.group(2) + if hostname == self.hostname: + local_media_mxcs.append(media_id) + else: + remote_media_mxcs.append((hostname, media_id)) # Now update all the tables to set the quarantined_by flag -- cgit 1.4.1 From 812c030e87f4a31e13496dbf2fa0c2e61aa918d2 Mon Sep 17 00:00:00 2001 From: Krombel Date: Wed, 21 Jun 2017 14:48:12 +0200 Subject: replaced json.dumps with encode_canonical_json --- synapse/storage/filtering.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 73f7a078fa..78b1e30945 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -19,6 +19,7 @@ from ._base import SQLBaseStore from synapse.api.errors import SynapseError, Codes from synapse.util.caches.descriptors import cachedInlineCallbacks +from canonicaljson import encode_canonical_json import simplejson as json @@ -46,7 +47,7 @@ class FilteringStore(SQLBaseStore): defer.returnValue(json.loads(str(def_json).decode("utf-8"))) def add_user_filter(self, user_localpart, user_filter): - def_json = json.dumps(user_filter).encode("utf-8") + def_json = encode_canonical_json(user_filter) # Need an atomic transaction to SELECT the maximal ID so far then # INSERT a new one -- cgit 1.4.1 From dae9a00a28cbd9e40be521effb25b0b2528caa4f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Jun 2017 14:19:33 +0100 Subject: Initialise exclusive_user_regex --- synapse/replication/slave/storage/appservice.py | 2 ++ synapse/storage/appservice.py | 35 ++++++++++++++----------- 2 files changed, 22 insertions(+), 15 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py index 73048327c5..0d3f31a50c 100644 --- a/synapse/replication/slave/storage/appservice.py +++ b/synapse/replication/slave/storage/appservice.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStore from synapse.storage import DataStore from synapse.config.appservice import load_appservices +from synapse.storage.appservice import _make_exclusive_regex class SlavedApplicationServiceStore(BaseSlavedStore): @@ -25,6 +26,7 @@ class SlavedApplicationServiceStore(BaseSlavedStore): hs.config.server_name, hs.config.app_service_config_files ) + self.exclusive_user_regex = _make_exclusive_regex(self.services_cache) get_app_service_by_token = DataStore.get_app_service_by_token.__func__ get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__ diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 532df736a5..c63935cb07 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -27,6 +27,25 @@ from ._base import SQLBaseStore logger = logging.getLogger(__name__) +def _make_exclusive_regex(services_cache): + # We precompie a regex constructed from all the regexes that the AS's + # have registered for exclusive users. + exclusive_user_regexes = [ + regex.pattern + for service in services_cache + for regex in service.get_exlusive_user_regexes() + ] + if exclusive_user_regexes: + exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes) + exclusive_user_regex = re.compile(exclusive_user_regex) + else: + # We handle this case specially otherwise the constructed regex + # will always match + exclusive_user_regex = None + + return exclusive_user_regex + + class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): @@ -36,21 +55,7 @@ class ApplicationServiceStore(SQLBaseStore): hs.hostname, hs.config.app_service_config_files ) - - # We precompie a regex constructed from all the regexes that the AS's - # have registered for exclusive users. - exclusive_user_regexes = [ - regex.pattern - for service in self.services_cache - for regex in service.get_exlusive_user_regexes() - ] - if exclusive_user_regexes: - exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes) - self.exclusive_user_regex = re.compile(exclusive_user_regex) - else: - # We handle this case specially otherwise the constructed regex - # will always match - self.exclusive_user_regex = None + self.exclusive_user_regex = _make_exclusive_regex(self.services_cache) def get_app_services(self): return self.services_cache -- cgit 1.4.1 From e0004aa28a9a1f9dc984e8bd6d206a7e2c0ccc78 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Jun 2017 10:03:48 +0100 Subject: Add desc --- synapse/storage/user_directory.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 52b184fe78..2a4db3f03c 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -439,6 +439,7 @@ class UserDirectoryStore(SQLBaseStore): }, retcol="share_private", allow_none=True, + desc="get_if_users_share_a_room", ) @cachedInlineCallbacks(max_entries=500000, iterable=True) -- cgit 1.4.1 From 7fe8ed1787ca0c1f7a4c56aa1c85e486b76f4550 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Jun 2017 11:14:11 +0100 Subject: Store URL cache preview downloads seperately This makes it easier to clear old media out at a later date --- synapse/rest/media/v1/download_resource.py | 7 ++++++- synapse/rest/media/v1/filepath.py | 18 ++++++++++++++++ synapse/rest/media/v1/media_repository.py | 30 ++++++++++++++++++++------- synapse/rest/media/v1/preview_url_resource.py | 7 ++++--- synapse/rest/media/v1/thumbnail_resource.py | 28 +++++++++++++++++++------ synapse/storage/media_repository.py | 8 +++++-- synapse/storage/schema/delta/43/url_cache.sql | 16 ++++++++++++++ 7 files changed, 94 insertions(+), 20 deletions(-) create mode 100644 synapse/storage/schema/delta/43/url_cache.sql (limited to 'synapse/storage') diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 39a286b83c..6879249c8a 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -73,7 +73,12 @@ class DownloadResource(Resource): media_type = media_info["media_type"] media_length = media_info["media_length"] upload_name = name if name else media_info["upload_name"] - file_path = self.filepaths.local_media_filepath(media_id) + if media_info["url_cache"]: + # TODO: Check the file still exists, if it doesn't we can redownload + # it from the url `media_info["url_cache"]` + file_path = self.filepaths.url_cache_filepath(media_id) + else: + file_path = self.filepaths.local_media_filepath(media_id) yield respond_with_file( request, media_type, file_path, media_length, diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index 0137458f71..d92b7ff337 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -71,3 +71,21 @@ class MediaFilePaths(object): self.base_path, "remote_thumbnail", server_name, file_id[0:2], file_id[2:4], file_id[4:], ) + + def url_cache_filepath(self, media_id): + return os.path.join( + self.base_path, "url_cache", + media_id[0:2], media_id[2:4], media_id[4:] + ) + + def url_cache_thumbnail(self, media_id, width, height, content_type, + method): + top_level_type, sub_type = content_type.split("/") + file_name = "%i-%i-%s-%s-%s" % ( + width, height, top_level_type, sub_type, method + ) + return os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[0:2], media_id[2:4], media_id[4:], + file_name + ) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 0718f75241..0ea1248ce6 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -326,13 +326,17 @@ class MediaRepository(object): defer.returnValue(t_path) @defer.inlineCallbacks - def _generate_local_thumbnails(self, media_id, media_info): + def _generate_local_thumbnails(self, media_id, media_info, url_cache=False): media_type = media_info["media_type"] requirements = self._get_thumbnail_requirements(media_type) if not requirements: return - input_path = self.filepaths.local_media_filepath(media_id) + if url_cache: + input_path = self.filepaths.url_cache_filepath(media_id) + else: + input_path = self.filepaths.local_media_filepath(media_id) + thumbnailer = Thumbnailer(input_path) m_width = thumbnailer.width m_height = thumbnailer.height @@ -360,9 +364,14 @@ class MediaRepository(object): for t_width, t_height, t_type in scales: t_method = "scale" - t_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) + if url_cache: + t_path = self.filepaths.url_cache_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) + else: + t_path = self.filepaths.local_media_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) self._makedirs(t_path) t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) @@ -377,9 +386,14 @@ class MediaRepository(object): # thumbnail. continue t_method = "crop" - t_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) + if url_cache: + t_path = self.filepaths.url_cache_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) + else: + t_path = self.filepaths.local_media_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) self._makedirs(t_path) t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) local_thumbnails.append(( diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index c680fddab5..b81a336c5d 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -164,7 +164,7 @@ class PreviewUrlResource(Resource): if _is_media(media_info['media_type']): dims = yield self.media_repo._generate_local_thumbnails( - media_info['filesystem_id'], media_info + media_info['filesystem_id'], media_info, url_cache=True, ) og = { @@ -210,7 +210,7 @@ class PreviewUrlResource(Resource): if _is_media(image_info['media_type']): # TODO: make sure we don't choke on white-on-transparent images dims = yield self.media_repo._generate_local_thumbnails( - image_info['filesystem_id'], image_info + image_info['filesystem_id'], image_info, url_cache=True, ) if dims: og["og:image:width"] = dims['width'] @@ -256,7 +256,7 @@ class PreviewUrlResource(Resource): # XXX: horrible duplication with base_resource's _download_remote_file() file_id = random_string(24) - fname = self.filepaths.local_media_filepath(file_id) + fname = self.filepaths.url_cache_filepath(file_id) self.media_repo._makedirs(fname) try: @@ -303,6 +303,7 @@ class PreviewUrlResource(Resource): upload_name=download_name, media_length=length, user_id=user, + url_cache=url, ) except Exception as e: diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 59b2c39b2f..68d56b2b10 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -101,9 +101,16 @@ class ThumbnailResource(Resource): t_type = thumbnail_info["thumbnail_type"] t_method = thumbnail_info["thumbnail_method"] - file_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method, - ) + if media_info["url_cache"]: + # TODO: Check the file still exists, if it doesn't we can redownload + # it from the url `media_info["url_cache"]` + file_path = self.filepaths.url_cache_thumbnail( + media_id, t_width, t_height, t_type, t_method, + ) + else: + file_path = self.filepaths.local_media_thumbnail( + media_id, t_width, t_height, t_type, t_method, + ) yield respond_with_file(request, t_type, file_path) else: @@ -134,9 +141,18 @@ class ThumbnailResource(Resource): t_type = info["thumbnail_type"] == desired_type if t_w and t_h and t_method and t_type: - file_path = self.filepaths.local_media_thumbnail( - media_id, desired_width, desired_height, desired_type, desired_method, - ) + if media_info["url_cache"]: + # TODO: Check the file still exists, if it doesn't we can redownload + # it from the url `media_info["url_cache"]` + file_path = self.filepaths.url_cache_thumbnail( + media_id, desired_width, desired_height, desired_type, + desired_method, + ) + else: + file_path = self.filepaths.local_media_thumbnail( + media_id, desired_width, desired_height, desired_type, + desired_method, + ) yield respond_with_file(request, desired_type, file_path) return diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 5f0f18ee66..82bb61b811 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -30,13 +30,16 @@ class MediaRepositoryStore(SQLBaseStore): return self._simple_select_one( "local_media_repository", {"media_id": media_id}, - ("media_type", "media_length", "upload_name", "created_ts", "quarantined_by"), + ( + "media_type", "media_length", "upload_name", "created_ts", + "quarantined_by", "url_cache", + ), allow_none=True, desc="get_local_media", ) def store_local_media(self, media_id, media_type, time_now_ms, upload_name, - media_length, user_id): + media_length, user_id, url_cache=None): return self._simple_insert( "local_media_repository", { @@ -46,6 +49,7 @@ class MediaRepositoryStore(SQLBaseStore): "upload_name": upload_name, "media_length": media_length, "user_id": user_id.to_string(), + "url_cache": url_cache, }, desc="store_local_media", ) diff --git a/synapse/storage/schema/delta/43/url_cache.sql b/synapse/storage/schema/delta/43/url_cache.sql new file mode 100644 index 0000000000..45ebe020da --- /dev/null +++ b/synapse/storage/schema/delta/43/url_cache.sql @@ -0,0 +1,16 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE local_media_repository ADD COLUMN url_cache TEXT; -- cgit 1.4.1 From 8abdd7b553868613f246bbd02b9fc5de03050a81 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Jun 2017 14:01:30 +0100 Subject: Fix up indices for users_who_share_rooms --- synapse/storage/schema/delta/43/user_share.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/43/user_share.sql b/synapse/storage/schema/delta/43/user_share.sql index f552b6eb7b..4501d90cbb 100644 --- a/synapse/storage/schema/delta/43/user_share.sql +++ b/synapse/storage/schema/delta/43/user_share.sql @@ -25,7 +25,8 @@ CREATE TABLE users_who_share_rooms ( CREATE UNIQUE INDEX users_who_share_rooms_u_idx ON users_who_share_rooms(user_id, other_user_id); -CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id, user_id); +CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id); +CREATE INDEX users_who_share_rooms_o_idx ON users_who_share_rooms(other_user_id); -- Make sure that we popualte the table initially -- cgit 1.4.1 From ed3d0170d9317644004b1203ecedaba58866ab9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Jun 2017 13:37:04 +0100 Subject: Batch upsert user ips --- synapse/api/auth.py | 3 +-- synapse/storage/client_ips.py | 57 ++++++++++++++++++++++++++++--------------- 2 files changed, 39 insertions(+), 21 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 0c297cb022..10f4972369 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -23,7 +23,6 @@ from synapse import event_auth from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, Codes from synapse.types import UserID -from synapse.util import logcontext from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -200,7 +199,7 @@ class Auth(object): default=[""] )[0] if user and access_token and ip_addr: - logcontext.preserve_fn(self.store.insert_client_ip)( + self.store.insert_client_ip( user=user, access_token=access_token, ip=ip_addr, diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 014ab635b7..68955e740b 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,7 +15,7 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, reactor from ._base import Cache from . import background_updates @@ -50,7 +50,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): columns=["user_id", "device_id", "last_seen"], ) - @defer.inlineCallbacks + self._batch_row_update = {} + + self._client_ip_looper = self._clock.looping_call( + self._update_client_ips_batch, 5 * 1000 + ) + reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch) + def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) key = (user.to_string(), access_token, ip) @@ -62,28 +68,41 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): # Rate-limited inserts if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: - defer.returnValue(None) + return self.client_ip_last_seen.prefill(key, now) - # It's safe not to lock here: a) no unique constraint, - # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely - yield self._simple_upsert( - "user_ips", - keyvalues={ - "user_id": user.to_string(), - "access_token": access_token, - "ip": ip, - "user_agent": user_agent, - "device_id": device_id, - }, - values={ - "last_seen": now, - }, - desc="insert_client_ip", - lock=False, + self._batch_row_update[key] = (user_agent, device_id, now) + + def _update_client_ips_batch(self): + to_update = self._batch_row_update + self._batch_row_update = {} + return self.runInteraction( + "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update ) + def _update_client_ips_batch_txn(self, txn, to_update): + self.database_engine.lock_table(txn, "user_ips") + + for entry in to_update.iteritems(): + (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry + + self._simple_upsert_txn( + txn, + table="user_ips", + keyvalues={ + "user_id": user_id, + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "device_id": device_id, + }, + values={ + "last_seen": last_seen, + }, + lock=False, + ) + @defer.inlineCallbacks def get_last_client_ip_by_device(self, devices): """For each device_id listed, give the user_ip it was last seen on -- cgit 1.4.1 From a0a561ae852c6a031f8611859d95e2ad563770ca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Jun 2017 14:46:12 +0100 Subject: Fix up client ips to read from pending data --- synapse/handlers/device.py | 4 +-- synapse/storage/__init__.py | 10 ------ synapse/storage/client_ips.py | 70 +++++++++++++++++++++++++++++++++------- tests/storage/test_client_ips.py | 5 +-- 4 files changed, 62 insertions(+), 27 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 982cda3edf..ed60d494ff 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -106,7 +106,7 @@ class DeviceHandler(BaseHandler): device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id) for device_id in device_map.keys()) + user_id, device_id=None ) devices = device_map.values() @@ -133,7 +133,7 @@ class DeviceHandler(BaseHandler): except errors.StoreError: raise errors.NotFoundError ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id),) + user_id, device_id, ) _update_device_from_client_ips(device, ips) defer.returnValue(device) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f119c5a758..b92472df33 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -304,16 +304,6 @@ class DataStore(RoomMemberStore, RoomStore, ret = yield self.runInteraction("count_users", _count_users) defer.returnValue(ret) - def get_user_ip_and_agents(self, user): - return self._simple_select_list( - table="user_ips", - keyvalues={"user_id": user.to_string()}, - retcols=[ - "access_token", "ip", "user_agent", "last_seen" - ], - desc="get_user_ip_and_agents", - ) - def get_users(self): """Function to reterive a list of users in users table. diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 68955e740b..88a5eb232f 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -50,6 +50,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): columns=["user_id", "device_id", "last_seen"], ) + # (user_id, access_token, ip) -> (user_agent, device_id, last_seen) self._batch_row_update = {} self._client_ip_looper = self._clock.looping_call( @@ -104,11 +105,12 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): ) @defer.inlineCallbacks - def get_last_client_ip_by_device(self, devices): + def get_last_client_ip_by_device(self, user_id, device_id): """For each device_id listed, give the user_ip it was last seen on Args: - devices (iterable[(str, str)]): list of (user_id, device_id) pairs + user_id (str) + device_id (str): If None fetches all devices for the user Returns: defer.Deferred: resolves to a dict, where the keys @@ -119,6 +121,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): res = yield self.runInteraction( "get_last_client_ip_by_device", self._get_last_client_ip_by_device_txn, + user_id, device_id, retcols=( "user_id", "access_token", @@ -127,23 +130,34 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "device_id", "last_seen", ), - devices=devices ) ret = {(d["user_id"], d["device_id"]): d for d in res} + for key in self._batch_row_update: + uid, access_token, ip = key + if uid == user_id: + user_agent, did, last_seen = self._batch_row_update[key] + if not device_id or did == device_id: + ret[(user_id, device_id)] = { + "user_id": user_id, + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "device_id": did, + "last_seen": last_seen, + } defer.returnValue(ret) @classmethod - def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols): + def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols): where_clauses = [] bindings = [] - for (user_id, device_id) in devices: - if device_id is None: - where_clauses.append("(user_id = ? AND device_id IS NULL)") - bindings.extend((user_id, )) - else: - where_clauses.append("(user_id = ? AND device_id = ?)") - bindings.extend((user_id, device_id)) + if device_id is None: + where_clauses.append("user_id = ?") + bindings.extend((user_id, )) + else: + where_clauses.append("(user_id = ? AND device_id = ?)") + bindings.extend((user_id, device_id)) if not where_clauses: return [] @@ -171,3 +185,37 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): txn.execute(sql, bindings) return cls.cursor_to_dict(txn) + + @defer.inlineCallbacks + def get_user_ip_and_agents(self, user): + user_id = user.to_string() + results = {} + + for key in self._batch_row_update: + uid, access_token, ip = key + if uid == user_id: + user_agent, _, last_seen = self._batch_row_update[key] + results[(access_token, ip)] = (user_agent, last_seen) + + rows = yield self._simple_select_list( + table="user_ips", + keyvalues={"user_id": user_id}, + retcols=[ + "access_token", "ip", "user_agent", "last_seen" + ], + desc="get_user_ip_and_agents", + ) + + results.update( + ((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"])) + for row in rows + ) + defer.returnValue(list( + { + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "last_seen": last_seen, + } + for (access_token, ip), (user_agent, last_seen) in results.iteritems() + )) diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py index 1f0c0e7c37..03df697575 100644 --- a/tests/storage/test_client_ips.py +++ b/tests/storage/test_client_ips.py @@ -43,10 +43,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase): "access_token", "ip", "user_agent", "device_id", ) - # deliberately use an iterable here to make sure that the lookup - # method doesn't iterate it twice - device_list = iter(((user_id, "device_id"),)) - result = yield self.store.get_last_client_ip_by_device(device_list) + result = yield self.store.get_last_client_ip_by_device(user_id, "device_id") r = result[(user_id, "device_id")] self.assertDictContainsSubset( -- cgit 1.4.1 From 5946aa0877b043ad912d72fa6038adb8abbb7690 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Jun 2017 15:38:48 +0100 Subject: Prefill forward extrems and event to state groups --- synapse/storage/events.py | 5 +++++ synapse/storage/state.py | 10 ++++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2b7340c1d9..274426950a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -403,6 +403,11 @@ class EventsStore(SQLBaseStore): (room_id, ), new_state ) + for room_id, latest_event_ids in new_forward_extremeties.iteritems(): + self.get_latest_event_ids_in_room.prefill( + (room_id,), latest_event_ids + ) + @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): """Calculates the new forward extremeties for a room given events to diff --git a/synapse/storage/state.py b/synapse/storage/state.py index d1e679719b..5673e4aa96 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -315,6 +315,12 @@ class StateStore(SQLBaseStore): ], ) + for event_id, state_group_id in state_groups.iteritems(): + txn.call_after( + self._get_state_group_for_event.prefill, + (event_id,), state_group_id + ) + def _count_state_group_hops_txn(self, txn, state_group): """Given a state group, count how many hops there are in the tree. @@ -584,8 +590,8 @@ class StateStore(SQLBaseStore): state_map = yield self.get_state_ids_for_events([event_id], types) defer.returnValue(state_map[event_id]) - @cached(num_args=2, max_entries=50000) - def _get_state_group_for_event(self, room_id, event_id): + @cached(max_entries=50000) + def _get_state_group_for_event(self, event_id): return self._simple_select_one_onecol( table="event_to_state_groups", keyvalues={ -- cgit 1.4.1 From 6ff14ddd2e93f569c3b4ace979f795f57da6c09c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Jun 2017 15:47:37 +0100 Subject: Make into list --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 274426950a..7002b3752e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -405,7 +405,7 @@ class EventsStore(SQLBaseStore): for room_id, latest_event_ids in new_forward_extremeties.iteritems(): self.get_latest_event_ids_in_room.prefill( - (room_id,), latest_event_ids + (room_id,), list(latest_event_ids) ) @defer.inlineCallbacks -- cgit 1.4.1 From b5e8d529e610352e0eb3887fac67a8ca5897bec1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Jul 2017 09:56:44 +0100 Subject: Define CACHE_SIZE_FACTOR once --- synapse/state.py | 5 +---- synapse/storage/_base.py | 5 +---- synapse/storage/client_ips.py | 6 ++---- synapse/util/caches/descriptors.py | 5 +---- synapse/util/caches/stream_change_cache.py | 6 +----- 5 files changed, 6 insertions(+), 21 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/state.py b/synapse/state.py index 5b386e3183..390799fbd5 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -24,13 +24,13 @@ from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.events.snapshot import EventContext from synapse.util.async import Linearizer +from synapse.util.caches import CACHE_SIZE_FACTOR from collections import namedtuple from frozendict import frozendict import logging import hashlib -import os logger = logging.getLogger(__name__) @@ -38,9 +38,6 @@ logger = logging.getLogger(__name__) KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) - - SIZE_OF_CACHE = int(100000 * CACHE_SIZE_FACTOR) EVICTION_TIMEOUT_SECONDS = 60 * 60 diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 51730a88bf..6f54036d67 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -16,6 +16,7 @@ import logging from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache from synapse.storage.engines import PostgresEngine @@ -27,10 +28,6 @@ from twisted.internet import defer import sys import time import threading -import os - - -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) logger = logging.getLogger(__name__) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 88a5eb232f..fc468ea185 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -20,7 +20,8 @@ from twisted.internet import defer, reactor from ._base import Cache from . import background_updates -import os +from synapse.util.caches import CACHE_SIZE_FACTOR + logger = logging.getLogger(__name__) @@ -30,9 +31,6 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) - - class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): self.client_ip_last_seen = Cache( diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index cbdff86596..af65bfe7b8 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -16,6 +16,7 @@ import logging from synapse.util.async import ObservableDeferred from synapse.util import unwrapFirstError, logcontext +from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry from synapse.util.stringutils import to_ascii @@ -25,7 +26,6 @@ from . import register_cache from twisted.internet import defer from collections import namedtuple -import os import functools import inspect import threading @@ -37,9 +37,6 @@ logger = logging.getLogger(__name__) _CacheSentinel = object() -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) - - class CacheEntry(object): __slots__ = [ "deferred", "sequence", "callbacks", "invalidated" diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 609625b322..941d873ab8 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -13,20 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import register_cache +from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR from blist import sorteddict import logging -import os logger = logging.getLogger(__name__) -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) - - class StreamChangeCache(object): """Keeps track of the stream positions of the latest change in a set of entities. -- cgit 1.4.1