From 7e6fa29cb5ba1abd8b4f3873b0ef171c7c8aba26 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 14 Nov 2017 11:22:42 +0000 Subject: Remove preserve_context_over_{fn, deferred} Both of these functions ae known to leak logcontexts. Replace the remaining calls to them and kill them off. --- synapse/storage/stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index dddd5fc0e7..52bdce5be2 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,7 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -234,7 +234,7 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): - res = yield preserve_context_over_deferred(defer.gatherResults([ + res = yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(self.get_room_events_stream_for_room)( room_id, from_key, to_key, limit, order=order, ) -- cgit 1.4.1 From 4dd1bfa8c18a3dc9df934a61771ae1e85b313b7e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 13 Nov 2017 10:30:38 +0000 Subject: Revert "Revert "move _state_group_cache to statestore"" We're going to fix this properly on this branch, so that the _state_group_cache can end up in StateGroupReadStore. This reverts commit ab335edb023d66cd0be439e045b10ca104b73cb5. --- synapse/storage/_base.py | 6 ------ synapse/storage/state.py | 19 ++++++++++++------- 2 files changed, 12 insertions(+), 13 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e94917d9cd..7ebd4f189d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -16,8 +16,6 @@ import logging from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext -from synapse.util.caches import CACHE_SIZE_FACTOR -from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache from synapse.storage.engines import PostgresEngine import synapse.metrics @@ -180,10 +178,6 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, max_entries=hs.config.event_cache_size) - self._state_group_cache = DictionaryCache( - "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR - ) - self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 diff --git a/synapse/storage/state.py b/synapse/storage/state.py index dd01b68762..ee3496123e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -13,16 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached, cachedList -from synapse.util.caches import intern_string -from synapse.util.stringutils import to_ascii -from synapse.storage.engines import PostgresEngine +from collections import namedtuple +import logging from twisted.internet import defer -from collections import namedtuple -import logging +from synapse.storage.engines import PostgresEngine +from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR +from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches.dictionary_cache import DictionaryCache +from synapse.util.stringutils import to_ascii +from ._base import SQLBaseStore logger = logging.getLogger(__name__) @@ -81,6 +82,10 @@ class StateStore(SQLBaseStore): where_clause="type='m.room.member'", ) + self._state_group_cache = DictionaryCache( + "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR + ) + @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): """Get the current state event ids for a room based on the -- cgit 1.4.1 From 35a4b632405be2ca91039f63a8c9c550f0f44ea3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 9 Nov 2017 19:00:20 +0000 Subject: Pull out bits of StateStore to a mixin ... so that we don't need to secretly gut-wrench it for use in the slaved stores. I haven't done the other stores yet, but we should. I'm tired of the workers breaking every time we tweak the stores because I forgot to gut-wrench the right method. fixes https://github.com/matrix-org/synapse/issues/2655. --- synapse/replication/slave/storage/events.py | 39 +-- synapse/storage/state.py | 424 +++++++++++++++------------- 2 files changed, 226 insertions(+), 237 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 94ebbffc1b..29d7296b43 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -12,20 +12,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker +import logging from synapse.api.constants import EventTypes from synapse.storage import DataStore -from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.event_push_actions import EventPushActionsStore -from synapse.storage.state import StateStore +from synapse.storage.roommember import RoomMemberStore +from synapse.storage.state import StateGroupReadStore from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache - -import logging - +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker logger = logging.getLogger(__name__) @@ -39,7 +37,7 @@ logger = logging.getLogger(__name__) # the method descriptor on the DataStore and chuck them into our class. -class SlavedEventStore(BaseSlavedStore): +class SlavedEventStore(StateGroupReadStore, BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedEventStore, self).__init__(db_conn, hs) @@ -90,25 +88,9 @@ class SlavedEventStore(BaseSlavedStore): _get_unread_counts_by_pos_txn = ( DataStore._get_unread_counts_by_pos_txn.__func__ ) - _get_state_group_for_events = ( - StateStore.__dict__["_get_state_group_for_events"] - ) - _get_state_group_for_event = ( - StateStore.__dict__["_get_state_group_for_event"] - ) - _get_state_groups_from_groups = ( - StateStore.__dict__["_get_state_groups_from_groups"] - ) - _get_state_groups_from_groups_txn = ( - DataStore._get_state_groups_from_groups_txn.__func__ - ) get_recent_event_ids_for_room = ( StreamStore.__dict__["get_recent_event_ids_for_room"] ) - get_current_state_ids = ( - StateStore.__dict__["get_current_state_ids"] - ) - get_state_group_delta = StateStore.__dict__["get_state_group_delta"] _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"] has_room_changed_since = DataStore.has_room_changed_since.__func__ @@ -134,12 +116,6 @@ class SlavedEventStore(BaseSlavedStore): DataStore.get_room_events_stream_for_room.__func__ ) get_events_around = DataStore.get_events_around.__func__ - get_state_for_event = DataStore.get_state_for_event.__func__ - get_state_for_events = DataStore.get_state_for_events.__func__ - get_state_groups = DataStore.get_state_groups.__func__ - get_state_groups_ids = DataStore.get_state_groups_ids.__func__ - get_state_ids_for_event = DataStore.get_state_ids_for_event.__func__ - get_state_ids_for_events = DataStore.get_state_ids_for_events.__func__ get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__ get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__ _get_joined_users_from_context = ( @@ -169,10 +145,7 @@ class SlavedEventStore(BaseSlavedStore): _get_rooms_for_user_where_membership_is_txn = ( DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) - _get_state_for_groups = DataStore._get_state_for_groups.__func__ - _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__ _get_events_around_txn = DataStore._get_events_around_txn.__func__ - _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ get_backfill_events = DataStore.get_backfill_events.__func__ _get_backfill_events = DataStore._get_backfill_events.__func__ diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ee3496123e..360e3e4355 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -18,6 +18,7 @@ import logging from twisted.internet import defer +from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import PostgresEngine from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR from synapse.util.caches.descriptors import cached, cachedList @@ -41,23 +42,11 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt return len(self.delta_ids) if self.delta_ids else 0 -class StateStore(SQLBaseStore): - """ Keeps track of the state at a given event. +class StateGroupReadStore(SQLBaseStore): + """The read-only parts of StateGroupStore - This is done by the concept of `state groups`. Every event is a assigned - a state group (identified by an arbitrary string), which references a - collection of state events. The current state of an event is then the - collection of state events referenced by the event's state group. - - Hence, every change in the current state causes a new state group to be - generated. However, if no change happens (e.g., if we get a message event - with only one parent it inherits the state group from its parent.) - - There are three tables: - * `state_groups`: Stores group name, first event with in the group and - room id. - * `event_to_state_groups`: Maps events to state groups. - * `state_groups_state`: Maps state group to state events. + None of these functions write to the state tables, so are suitable for + including in the SlavedStores. """ STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" @@ -65,22 +54,7 @@ class StateStore(SQLBaseStore): CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" def __init__(self, db_conn, hs): - super(StateStore, self).__init__(db_conn, hs) - self.register_background_update_handler( - self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, - self._background_deduplicate_state, - ) - self.register_background_update_handler( - self.STATE_GROUP_INDEX_UPDATE_NAME, - self._background_index_state, - ) - self.register_background_index_update( - self.CURRENT_STATE_INDEX_UPDATE_NAME, - index_name="current_state_events_member_index", - table="current_state_events", - columns=["state_key"], - where_clause="type='m.room.member'", - ) + super(StateGroupReadStore, self).__init__(db_conn, hs) self._state_group_cache = DictionaryCache( "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR @@ -195,178 +169,6 @@ class StateStore(SQLBaseStore): for group, event_id_map in group_to_ids.iteritems() }) - def _have_persisted_state_group_txn(self, txn, state_group): - txn.execute( - "SELECT count(*) FROM state_groups WHERE id = ?", - (state_group,) - ) - row = txn.fetchone() - return row and row[0] - - def _store_mult_state_groups_txn(self, txn, events_and_contexts): - state_groups = {} - for event, context in events_and_contexts: - if event.internal_metadata.is_outlier(): - continue - - if context.current_state_ids is None: - # AFAIK, this can never happen - logger.error( - "Non-outlier event %s had current_state_ids==None", - event.event_id) - continue - - # if the event was rejected, just give it the same state as its - # predecessor. - if context.rejected: - state_groups[event.event_id] = context.prev_group - continue - - state_groups[event.event_id] = context.state_group - - if self._have_persisted_state_group_txn(txn, context.state_group): - continue - - self._simple_insert_txn( - txn, - table="state_groups", - values={ - "id": context.state_group, - "room_id": event.room_id, - "event_id": event.event_id, - }, - ) - - # We persist as a delta if we can, while also ensuring the chain - # of deltas isn't tooo long, as otherwise read performance degrades. - if context.prev_group: - is_in_db = self._simple_select_one_onecol_txn( - txn, - table="state_groups", - keyvalues={"id": context.prev_group}, - retcol="id", - allow_none=True, - ) - if not is_in_db: - raise Exception( - "Trying to persist state with unpersisted prev_group: %r" - % (context.prev_group,) - ) - - potential_hops = self._count_state_group_hops_txn( - txn, context.prev_group - ) - if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS: - self._simple_insert_txn( - txn, - table="state_group_edges", - values={ - "state_group": context.state_group, - "prev_state_group": context.prev_group, - }, - ) - - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { - "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.delta_ids.iteritems() - ], - ) - else: - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { - "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.current_state_ids.iteritems() - ], - ) - - # Prefill the state group cache with this group. - # It's fine to use the sequence like this as the state group map - # is immutable. (If the map wasn't immutable then this prefill could - # race with another update) - txn.call_after( - self._state_group_cache.update, - self._state_group_cache.sequence, - key=context.state_group, - value=dict(context.current_state_ids), - full=True, - ) - - self._simple_insert_many_txn( - txn, - table="event_to_state_groups", - values=[ - { - "state_group": state_group_id, - "event_id": event_id, - } - for event_id, state_group_id in state_groups.iteritems() - ], - ) - - for event_id, state_group_id in state_groups.iteritems(): - txn.call_after( - self._get_state_group_for_event.prefill, - (event_id,), state_group_id - ) - - def _count_state_group_hops_txn(self, txn, state_group): - """Given a state group, count how many hops there are in the tree. - - This is used to ensure the delta chains don't get too long. - """ - if isinstance(self.database_engine, PostgresEngine): - sql = (""" - WITH RECURSIVE state(state_group) AS ( - VALUES(?::bigint) - UNION ALL - SELECT prev_state_group FROM state_group_edges e, state s - WHERE s.state_group = e.state_group - ) - SELECT count(*) FROM state; - """) - - txn.execute(sql, (state_group,)) - row = txn.fetchone() - if row and row[0]: - return row[0] - else: - return 0 - else: - # We don't use WITH RECURSIVE on sqlite3 as there are distributions - # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) - next_group = state_group - count = 0 - - while next_group: - next_group = self._simple_select_one_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"state_group": next_group}, - retcol="prev_state_group", - allow_none=True, - ) - if next_group: - count += 1 - - return count - @defer.inlineCallbacks def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) @@ -747,6 +549,220 @@ class StateStore(SQLBaseStore): defer.returnValue(results) + +class StateStore(StateGroupReadStore, BackgroundUpdateStore): + """ Keeps track of the state at a given event. + + This is done by the concept of `state groups`. Every event is a assigned + a state group (identified by an arbitrary string), which references a + collection of state events. The current state of an event is then the + collection of state events referenced by the event's state group. + + Hence, every change in the current state causes a new state group to be + generated. However, if no change happens (e.g., if we get a message event + with only one parent it inherits the state group from its parent.) + + There are three tables: + * `state_groups`: Stores group name, first event with in the group and + room id. + * `event_to_state_groups`: Maps events to state groups. + * `state_groups_state`: Maps state group to state events. + """ + + STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" + STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" + CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" + + def __init__(self, db_conn, hs): + super(StateStore, self).__init__(db_conn, hs) + self.register_background_update_handler( + self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, + self._background_deduplicate_state, + ) + self.register_background_update_handler( + self.STATE_GROUP_INDEX_UPDATE_NAME, + self._background_index_state, + ) + self.register_background_index_update( + self.CURRENT_STATE_INDEX_UPDATE_NAME, + index_name="current_state_events_member_index", + table="current_state_events", + columns=["state_key"], + where_clause="type='m.room.member'", + ) + + def _have_persisted_state_group_txn(self, txn, state_group): + txn.execute( + "SELECT count(*) FROM state_groups WHERE id = ?", + (state_group,) + ) + row = txn.fetchone() + return row and row[0] + + def _store_mult_state_groups_txn(self, txn, events_and_contexts): + state_groups = {} + for event, context in events_and_contexts: + if event.internal_metadata.is_outlier(): + continue + + if context.current_state_ids is None: + # AFAIK, this can never happen + logger.error( + "Non-outlier event %s had current_state_ids==None", + event.event_id) + continue + + # if the event was rejected, just give it the same state as its + # predecessor. + if context.rejected: + state_groups[event.event_id] = context.prev_group + continue + + state_groups[event.event_id] = context.state_group + + if self._have_persisted_state_group_txn(txn, context.state_group): + continue + + self._simple_insert_txn( + txn, + table="state_groups", + values={ + "id": context.state_group, + "room_id": event.room_id, + "event_id": event.event_id, + }, + ) + + # We persist as a delta if we can, while also ensuring the chain + # of deltas isn't tooo long, as otherwise read performance degrades. + if context.prev_group: + is_in_db = self._simple_select_one_onecol_txn( + txn, + table="state_groups", + keyvalues={"id": context.prev_group}, + retcol="id", + allow_none=True, + ) + if not is_in_db: + raise Exception( + "Trying to persist state with unpersisted prev_group: %r" + % (context.prev_group,) + ) + + potential_hops = self._count_state_group_hops_txn( + txn, context.prev_group + ) + if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS: + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ + "state_group": context.state_group, + "prev_state_group": context.prev_group, + }, + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.delta_ids.iteritems() + ], + ) + else: + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.current_state_ids.iteritems() + ], + ) + + # Prefill the state group cache with this group. + # It's fine to use the sequence like this as the state group map + # is immutable. (If the map wasn't immutable then this prefill could + # race with another update) + txn.call_after( + self._state_group_cache.update, + self._state_group_cache.sequence, + key=context.state_group, + value=dict(context.current_state_ids), + full=True, + ) + + self._simple_insert_many_txn( + txn, + table="event_to_state_groups", + values=[ + { + "state_group": state_group_id, + "event_id": event_id, + } + for event_id, state_group_id in state_groups.iteritems() + ], + ) + + for event_id, state_group_id in state_groups.iteritems(): + txn.call_after( + self._get_state_group_for_event.prefill, + (event_id,), state_group_id + ) + + def _count_state_group_hops_txn(self, txn, state_group): + """Given a state group, count how many hops there are in the tree. + + This is used to ensure the delta chains don't get too long. + """ + if isinstance(self.database_engine, PostgresEngine): + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT count(*) FROM state; + """) + + txn.execute(sql, (state_group,)) + row = txn.fetchone() + if row and row[0]: + return row[0] + else: + return 0 + else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) + next_group = state_group + count = 0 + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + count += 1 + + return count + def get_next_state_group(self): return self._state_groups_id_gen.get_next() -- cgit 1.4.1 From cdc9e50a5d4cba8a7dfec32d81756eaf45f54ca5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Nov 2017 15:29:10 +0000 Subject: Cleanup in _simple_upsert_txn Bail out early to reduce indentation --- synapse/storage/_base.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7ebd4f189d..740400d58b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -493,7 +493,7 @@ class SQLBaseStore(object): if lock: self.database_engine.lock_table(txn, table) - # Try to update + # First try to update. sql = "UPDATE %s SET %s WHERE %s" % ( table, ", ".join("%s = ?" % (k,) for k in values), @@ -502,23 +502,24 @@ class SQLBaseStore(object): sqlargs = values.values() + keyvalues.values() txn.execute(sql, sqlargs) - if txn.rowcount == 0: - # We didn't update and rows so insert a new one - allvalues = {} - allvalues.update(keyvalues) - allvalues.update(values) - allvalues.update(insertion_values) + if txn.rowcount > 0: + # successfully updated at least one row. + return False - sql = "INSERT INTO %s (%s) VALUES (%s)" % ( - table, - ", ".join(k for k in allvalues), - ", ".join("?" for _ in allvalues) - ) - txn.execute(sql, allvalues.values()) + # We didn't update any rows so insert a new one + allvalues = {} + allvalues.update(keyvalues) + allvalues.update(values) + allvalues.update(insertion_values) - return True - else: - return False + sql = "INSERT INTO %s (%s) VALUES (%s)" % ( + table, + ", ".join(k for k in allvalues), + ", ".join("?" for _ in allvalues) + ) + txn.execute(sql, allvalues.values()) + # successfully inserted + return True def _simple_select_one(self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one"): -- cgit 1.4.1 From 10aaa1bc15775a228ce22ab45efbb55b5099289b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Nov 2017 15:30:15 +0000 Subject: _simple_upsert: retry on IntegrityError wrap the call to _simple_upsert_txn in a loop so that we retry on an integrityerror: this means we can avoid locking the table provided there is an unique index. --- synapse/storage/_base.py | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 740400d58b..1582a58966 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -469,23 +469,46 @@ class SQLBaseStore(object): txn.executemany(sql, vals) + @defer.inlineCallbacks def _simple_upsert(self, table, keyvalues, values, insertion_values={}, desc="_simple_upsert", lock=True): """ + + `lock` should generally be set to True (the default), but can be set + to False if either of the following are true: + + * there is a UNIQUE INDEX on the key columns. In this case a conflict + will cause an IntegrityError in which case this function will retry + the update. + + * we somehow know that we are the only thread which will be updating + this table. + Args: table (str): The table to upsert into keyvalues (dict): The unique key tables and their new values values (dict): The nonunique columns and their new values - insertion_values (dict): key/values to use when inserting + insertion_values (dict): additional key/values to use only when + inserting + lock (bool): True to lock the table when doing the upsert. Returns: Deferred(bool): True if a new entry was created, False if an existing one was updated. """ - return self.runInteraction( - desc, - self._simple_upsert_txn, table, keyvalues, values, insertion_values, - lock - ) + while True: + try: + result = yield self.runInteraction( + desc, + self._simple_upsert_txn, table, keyvalues, values, insertion_values, + lock=lock + ) + defer.returnValue(result) + except self.database_engine.IntegrityError as e: + # presumably we raced with another transaction: let's retry. + logger.warn( + "IntegrityError when upserting into %s; retrying: %s", + table, e + ) def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}, lock=True): -- cgit 1.4.1 From 7ab2b69e188e84a1360f0381f36af4b8445395bf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Nov 2017 15:32:01 +0000 Subject: Avoid locking `pushers` table on upsert Now that _simple_upsert will retry on IntegrityError, we don't need to lock the table. --- synapse/storage/pusher.py | 55 ++++++++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 27 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 34d2f82b7f..19ce41fde9 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -204,34 +204,35 @@ class PusherStore(SQLBaseStore): pushkey, pushkey_ts, lang, data, last_stream_ordering, profile_tag=""): with self._pushers_id_gen.get_next() as stream_id: - def f(txn): - newly_inserted = self._simple_upsert_txn( - txn, - "pushers", - { - "app_id": app_id, - "pushkey": pushkey, - "user_name": user_id, - }, - { - "access_token": access_token, - "kind": kind, - "app_display_name": app_display_name, - "device_display_name": device_display_name, - "ts": pushkey_ts, - "lang": lang, - "data": encode_canonical_json(data), - "last_stream_ordering": last_stream_ordering, - "profile_tag": profile_tag, - "id": stream_id, - }, - ) - if newly_inserted: - # get_if_user_has_pusher only cares if the user has - # at least *one* pusher. - txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) + # no need to lock because `pushers` has a unique key on + # (app_id, pushkey, user_name) so _simple_upsert will retry + newly_inserted = yield self._simple_upsert( + table="pushers", + keyvalues={ + "app_id": app_id, + "pushkey": pushkey, + "user_name": user_id, + }, + values={ + "access_token": access_token, + "kind": kind, + "app_display_name": app_display_name, + "device_display_name": device_display_name, + "ts": pushkey_ts, + "lang": lang, + "data": encode_canonical_json(data), + "last_stream_ordering": last_stream_ordering, + "profile_tag": profile_tag, + "id": stream_id, + }, + desc="add_pusher", + lock=False, + ) - yield self.runInteraction("add_pusher", f) + if newly_inserted: + # get_if_user_has_pusher only cares if the user has + # at least *one* pusher. + self.get_if_user_has_pusher.invalidate(user_id,) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): -- cgit 1.4.1 From 77a122787022c302f0b2abcb63bd2f7d8514d692 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Nov 2017 16:03:38 +0000 Subject: Fix broken ref to IntegrityError --- synapse/storage/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1582a58966..e6eefdd6fe 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -503,7 +503,7 @@ class SQLBaseStore(object): lock=lock ) defer.returnValue(result) - except self.database_engine.IntegrityError as e: + except self.database_engine.module.IntegrityError as e: # presumably we raced with another transaction: let's retry. logger.warn( "IntegrityError when upserting into %s; retrying: %s", -- cgit 1.4.1 From 06e5bcfc83c6853a9c9c7bf0aadd0226051d365e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Nov 2017 17:44:52 +0000 Subject: Avoid locking for upsert on pushers tables * replace the upsert into deleted_pushers with an insert * no need to lock for upsert on pusher_throttle --- synapse/storage/pusher.py | 21 +++++++++---- .../delta/46/drop_unique_deleted_pushers.sql | 35 ++++++++++++++++++++++ 2 files changed, 51 insertions(+), 5 deletions(-) create mode 100644 synapse/storage/schema/delta/46/drop_unique_deleted_pushers.sql (limited to 'synapse/storage') diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 19ce41fde9..3d8b4d5d5b 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -244,11 +244,19 @@ class PusherStore(SQLBaseStore): "pushers", {"app_id": app_id, "pushkey": pushkey, "user_name": user_id} ) - self._simple_upsert_txn( + + # it's possible for us to end up with duplicate rows for + # (app_id, pushkey, user_id) at different stream_ids, but that + # doesn't really matter. + self._simple_insert_txn( txn, - "deleted_pushers", - {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, - {"stream_id": stream_id}, + table="deleted_pushers", + values={ + "stream_id": stream_id, + "app_id": app_id, + "pushkey": pushkey, + "user_id": user_id, + }, ) with self._pushers_id_gen.get_next() as stream_id: @@ -311,9 +319,12 @@ class PusherStore(SQLBaseStore): @defer.inlineCallbacks def set_throttle_params(self, pusher_id, room_id, params): + # no need to lock because `pusher_throttle` has a primary key on + # (pusher, room_id) so _simple_upsert will retry yield self._simple_upsert( "pusher_throttle", {"pusher": pusher_id, "room_id": room_id}, params, - desc="set_throttle_params" + desc="set_throttle_params", + lock=False, ) diff --git a/synapse/storage/schema/delta/46/drop_unique_deleted_pushers.sql b/synapse/storage/schema/delta/46/drop_unique_deleted_pushers.sql new file mode 100644 index 0000000000..bb307889c1 --- /dev/null +++ b/synapse/storage/schema/delta/46/drop_unique_deleted_pushers.sql @@ -0,0 +1,35 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- drop the unique constraint on deleted_pushers so that we can just insert +-- into it rather than upserting. + +CREATE TABLE deleted_pushers2 ( + stream_id BIGINT NOT NULL, + app_id TEXT NOT NULL, + pushkey TEXT NOT NULL, + user_id TEXT NOT NULL +); + +INSERT INTO deleted_pushers2 (stream_id, app_id, pushkey, user_id) + SELECT stream_id, app_id, pushkey, user_id from deleted_pushers; + +DROP TABLE deleted_pushers; +ALTER TABLE deleted_pushers2 RENAME TO deleted_pushers; + +-- create the index after doing the inserts because that's more efficient. +-- it also means we can give it the same name as the old one without renaming. +CREATE INDEX deleted_pushers_stream_id ON deleted_pushers (stream_id); + -- cgit 1.4.1 From c46139a17edd0237ab3e6243346e5b5e201e4673 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Nov 2017 18:07:01 +0000 Subject: Avoid locking account_data tables for upserts --- synapse/storage/account_data.py | 85 ++++++++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 36 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index c8a1eb016b..56a0bde549 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore): """ content_json = json.dumps(content) - def add_account_data_txn(txn, next_id): - self._simple_upsert_txn( - txn, + with self._account_data_id_gen.get_next() as next_id: + # no need to lock here as room_account_data has a unique constraint + # on (user_id, room_id, account_data_type) so _simple_upsert will + # retry if there is a conflict. + yield self._simple_upsert( + desc="add_room_account_data", table="room_account_data", keyvalues={ "user_id": user_id, @@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore): values={ "stream_id": next_id, "content": content_json, - } - ) - txn.call_after( - self._account_data_stream_cache.entity_has_changed, - user_id, next_id, + }, + lock=False, ) - txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) - self._update_max_stream_id(txn, next_id) - with self._account_data_id_gen.get_next() as next_id: - yield self.runInteraction( - "add_room_account_data", add_account_data_txn, next_id - ) + # it's theoretically possible for the above to succeed and the + # below to fail - in which case we might reuse a stream id on + # restart, and the above update might not get propagated. That + # doesn't sound any worse than the whole update getting lost, + # which is what would happen if we combined the two into one + # transaction. + yield self._update_max_stream_id(next_id) + + self._account_data_stream_cache.entity_has_changed(user_id, next_id) + self.get_account_data_for_user.invalidate((user_id,)) result = self._account_data_id_gen.get_current_token() defer.returnValue(result) @@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore): """ content_json = json.dumps(content) - def add_account_data_txn(txn, next_id): - self._simple_upsert_txn( - txn, + with self._account_data_id_gen.get_next() as next_id: + # no need to lock here as account_data has a unique constraint on + # (user_id, account_data_type) so _simple_upsert will retry if + # there is a conflict. + yield self._simple_upsert( + desc="add_user_account_data", table="account_data", keyvalues={ "user_id": user_id, @@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore): values={ "stream_id": next_id, "content": content_json, - } + }, + lock=False, ) - txn.call_after( - self._account_data_stream_cache.entity_has_changed, + + # it's theoretically possible for the above to succeed and the + # below to fail - in which case we might reuse a stream id on + # restart, and the above update might not get propagated. That + # doesn't sound any worse than the whole update getting lost, + # which is what would happen if we combined the two into one + # transaction. + yield self._update_max_stream_id(next_id) + + self._account_data_stream_cache.entity_has_changed( user_id, next_id, ) - txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) - txn.call_after( - self.get_global_account_data_by_type_for_user.invalidate, + self.get_account_data_for_user.invalidate((user_id,)) + self.get_global_account_data_by_type_for_user.invalidate( (account_data_type, user_id,) ) - self._update_max_stream_id(txn, next_id) - - with self._account_data_id_gen.get_next() as next_id: - yield self.runInteraction( - "add_user_account_data", add_account_data_txn, next_id - ) result = self._account_data_id_gen.get_current_token() defer.returnValue(result) - def _update_max_stream_id(self, txn, next_id): + def _update_max_stream_id(self, next_id): """Update the max stream_id Args: - txn: The database cursor next_id(int): The the revision to advance to. """ - update_max_id_sql = ( - "UPDATE account_data_max_stream_id" - " SET stream_id = ?" - " WHERE stream_id < ?" + def _update(txn): + update_max_id_sql = ( + "UPDATE account_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) + return self.runInteraction( + "update_account_data_max_stream_id", + _update, ) - txn.execute(update_max_id_sql, (next_id, next_id)) @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): -- cgit 1.4.1 From 7098b65cb8c7e0b41a3bcb8ac7d2cc9e63f06f82 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Nov 2017 11:03:21 +0000 Subject: Fix error on sqlite 3.7 Create the url_cache index on local_media_repository as a background update, so that we can detect whether we are on sqlite or not and create a partial or complete index accordingly. To avoid running the cleanup job before we have built the index, add a bailout which will defer the cleanup if the bg updates are still running. Fixes https://github.com/matrix-org/synapse/issues/2572. --- synapse/rest/media/v1/preview_url_resource.py | 10 ++++++--- synapse/storage/background_updates.py | 12 ++++++++++- synapse/storage/media_repository.py | 16 ++++++++++++--- .../storage/schema/delta/44/expire_url_cache.sql | 5 ++++- .../delta/46/local_media_repository_url_idx.sql | 24 ++++++++++++++++++++++ 5 files changed, 59 insertions(+), 8 deletions(-) create mode 100644 synapse/storage/schema/delta/46/local_media_repository_url_idx.sql (limited to 'synapse/storage') diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 723f7043f4..dd76e3f7d5 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -348,11 +348,16 @@ class PreviewUrlResource(Resource): def _expire_url_cache_data(self): """Clean up expired url cache content, media and thumbnails. """ - # TODO: Delete from backup media store now = self.clock.time_msec() + logger.info("Running url preview cache expiry") + + if not self.store.has_completed_background_updates(): + logger.info("Still running DB updates; skipping expiry") + return + # First we delete expired url cache entries media_ids = yield self.store.get_expired_url_cache(now) @@ -426,8 +431,7 @@ class PreviewUrlResource(Resource): yield self.store.delete_url_cache_media(removed_media) - if removed_media: - logger.info("Deleted %d media from url cache", len(removed_media)) + logger.info("Deleted %d media from url cache", len(removed_media)) def decode_and_calc_og(body, media_uri, request_encoding=None): diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 6f235ac051..e755afc18e 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -85,6 +85,7 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_performance = {} self._background_update_queue = [] self._background_update_handlers = {} + self._all_done = False @defer.inlineCallbacks def start_doing_background_updates(self): @@ -106,8 +107,17 @@ class BackgroundUpdateStore(SQLBaseStore): "No more background updates to do." " Unscheduling background update task." ) + self._all_done = True defer.returnValue(None) + def has_completed_background_updates(self): + """Check if all the background updates have completed + + Returns: + bool: True if all background updates have completed + """ + return self._all_done + @defer.inlineCallbacks def do_next_background_update(self, desired_duration_ms): """Does some amount of work on the next queued background update @@ -269,7 +279,7 @@ class BackgroundUpdateStore(SQLBaseStore): # Sqlite doesn't support concurrent creation of indexes. # # We don't use partial indices on SQLite as it wasn't introduced - # until 3.8, and wheezy has 3.7 + # until 3.8, and wheezy and CentOS 7 have 3.7 # # We assume that sqlite doesn't give us invalid indices; however # we may still end up with the index existing but the diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 52e5cdad70..a66ff7c1e0 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -12,13 +12,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from synapse.storage.background_updates import BackgroundUpdateStore -from ._base import SQLBaseStore - -class MediaRepositoryStore(SQLBaseStore): +class MediaRepositoryStore(BackgroundUpdateStore): """Persistence for attachments and avatars""" + def __init__(self, db_conn, hs): + super(MediaRepositoryStore, self).__init__(db_conn, hs) + + self.register_background_index_update( + update_name='local_media_repository_url_idx', + index_name='local_media_repository_url_idx', + table='local_media_repository', + columns=['created_ts'], + where_clause='url_cache IS NOT NULL', + ) + def get_default_thumbnails(self, top_level_type, sub_type): return [] diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql index e2b775f038..b12f9b2ebf 100644 --- a/synapse/storage/schema/delta/44/expire_url_cache.sql +++ b/synapse/storage/schema/delta/44/expire_url_cache.sql @@ -13,7 +13,10 @@ * limitations under the License. */ -CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL; +-- this didn't work on SQLite 3.7 (because of lack of partial indexes), so was +-- removed and replaced with 46/local_media_repository_url_idx.sql. +-- +-- CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL; -- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support -- indices on expressions until 3.9. diff --git a/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql new file mode 100644 index 0000000000..bbfc7f5d1a --- /dev/null +++ b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql @@ -0,0 +1,24 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- register a background update which will recreate the +-- local_media_repository_url_idx index. +-- +-- We do this as a bg update not because it is a particularly onerous +-- operation, but because we'd like it to be a partial index if possible, and +-- the background_index_update code will understand whether we are on +-- postgres or sqlite and behave accordingly. +INSERT INTO background_updates (update_name, progress_json) VALUES + ('local_media_repository_url_idx', '{}'); -- cgit 1.4.1 From 2908f955d12e8c9d6081a8d72096c85683fe1ebf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 22 Nov 2017 18:02:15 +0000 Subject: Check database in has_completed_background_updates so that the right thing happens on workers. --- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/storage/_base.py | 16 +++++++--------- synapse/storage/background_updates.py | 27 +++++++++++++++++++++++++-- 3 files changed, 33 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index dd76e3f7d5..385e4079ec 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -354,7 +354,7 @@ class PreviewUrlResource(Resource): logger.info("Running url preview cache expiry") - if not self.store.has_completed_background_updates(): + if not (yield self.store.has_completed_background_updates()): logger.info("Still running DB updates; skipping expiry") return diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e6eefdd6fe..476c84c621 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -600,20 +600,18 @@ class SQLBaseStore(object): @staticmethod def _simple_select_onecol_txn(txn, table, keyvalues, retcol): - if keyvalues: - where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) - else: - where = "" - sql = ( - "SELECT %(retcol)s FROM %(table)s %(where)s" + "SELECT %(retcol)s FROM %(table)s" ) % { "retcol": retcol, "table": table, - "where": where, } - txn.execute(sql, keyvalues.values()) + if keyvalues: + sql += "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) + txn.execute(sql, keyvalues.values()) + else: + txn.execute(sql) return [r[0] for r in txn] @@ -624,7 +622,7 @@ class SQLBaseStore(object): Args: table (str): table name - keyvalues (dict): column names and values to select the rows with + keyvalues (dict|None): column names and values to select the rows with retcol (str): column whos value we wish to retrieve. Returns: diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index e755afc18e..11a1b942f1 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -110,13 +110,36 @@ class BackgroundUpdateStore(SQLBaseStore): self._all_done = True defer.returnValue(None) + @defer.inlineCallbacks def has_completed_background_updates(self): """Check if all the background updates have completed Returns: - bool: True if all background updates have completed + Deferred[bool]: True if all background updates have completed """ - return self._all_done + # if we've previously determined that there is nothing left to do, that + # is easy + if self._all_done: + defer.returnValue(True) + + # obviously, if we have things in our queue, we're not done. + if self._background_update_queue: + defer.returnValue(False) + + # otherwise, check if there are updates to be run. This is important, + # as we may be running on a worker which doesn't perform the bg updates + # itself, but still wants to wait for them to happen. + updates = yield self._simple_select_onecol( + "background_updates", + keyvalues=None, + retcol="1", + desc="check_background_updates", + ) + if not updates: + self._all_done = True + defer.returnValue(True) + + defer.returnValue(False) @defer.inlineCallbacks def do_next_background_update(self, desired_duration_ms): -- cgit 1.4.1 From 6b48b3e277b2fe7c14493ddd6c07f43890584955 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 22 Nov 2017 18:06:24 +0000 Subject: fix sql fails --- synapse/storage/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 476c84c621..470f7881ab 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -608,7 +608,7 @@ class SQLBaseStore(object): } if keyvalues: - sql += "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) + sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) txn.execute(sql, keyvalues.values()) else: txn.execute(sql) -- cgit 1.4.1 From 63ccaa58736ca098a0d52000bdbc2df589b8cdaa Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 27 Nov 2017 11:56:57 +0000 Subject: Avoid retrying forever on IntegrityError --- synapse/storage/_base.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e6eefdd6fe..662c30187d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -495,6 +495,7 @@ class SQLBaseStore(object): Deferred(bool): True if a new entry was created, False if an existing one was updated. """ + attempts = 0 while True: try: result = yield self.runInteraction( @@ -504,6 +505,12 @@ class SQLBaseStore(object): ) defer.returnValue(result) except self.database_engine.module.IntegrityError as e: + attempts += 1 + if attempts >= 5: + # don't retry forever, because things other than races + # can cause IntegrityErrors + raise + # presumably we raced with another transaction: let's retry. logger.warn( "IntegrityError when upserting into %s; retrying: %s", -- cgit 1.4.1 From 2c6d63922a5033a17c7d53a928892fbbcbd6fa63 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 29 Nov 2017 14:33:05 +0000 Subject: Remove pushers when deleting access tokens Whenever an access token is invalidated, we should remove the associated pushers. --- synapse/handlers/auth.py | 16 ++++++++++++---- synapse/push/pusherpool.py | 24 +++++++++++++++--------- synapse/storage/registration.py | 10 +++++----- 3 files changed, 32 insertions(+), 18 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 080eb14271..0ba66bc947 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -664,9 +664,6 @@ class AuthHandler(BaseHandler): yield self.delete_access_tokens_for_user( user_id, except_token_id=except_access_token_id, ) - yield self.hs.get_pusherpool().remove_pushers_by_user( - user_id, except_access_token_id - ) @defer.inlineCallbacks def deactivate_account(self, user_id): @@ -706,6 +703,12 @@ class AuthHandler(BaseHandler): access_token=access_token, ) + # delete pushers associated with this access token + if user_info["token_id"] is not None: + yield self.hs.get_pusherpool().remove_pushers_by_access_token( + str(user_info["user"]), (user_info["token_id"], ) + ) + @defer.inlineCallbacks def delete_access_tokens_for_user(self, user_id, except_token_id=None, device_id=None): @@ -728,13 +731,18 @@ class AuthHandler(BaseHandler): # see if any of our auth providers want to know about this for provider in self.password_providers: if hasattr(provider, "on_logged_out"): - for token, device_id in tokens_and_devices: + for token, token_id, device_id in tokens_and_devices: yield provider.on_logged_out( user_id=user_id, device_id=device_id, access_token=token, ) + # delete pushers associated with the access tokens + yield self.hs.get_pusherpool().remove_pushers_by_access_token( + user_id, (token_id for _, token_id, _ in tokens_and_devices), + ) + @defer.inlineCallbacks def add_threepid(self, user_id, medium, address, validated_at): # 'Canonicalise' email addresses down to lower case. diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 34cb108dcb..134e89b371 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -103,19 +103,25 @@ class PusherPool: yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks - def remove_pushers_by_user(self, user_id, except_access_token_id=None): - all = yield self.store.get_all_pushers() - logger.info( - "Removing all pushers for user %s except access tokens id %r", - user_id, except_access_token_id - ) - for p in all: - if p['user_name'] == user_id and p['access_token'] != except_access_token_id: + def remove_pushers_by_access_token(self, user_id, access_tokens): + """Remove the pushers for a given user corresponding to a set of + access_tokens. + + Args: + user_id (str): user to remove pushers for + access_tokens (Iterable[int]): access token *ids* to remove pushers + for + """ + tokens = set(access_tokens) + for p in (yield self.store.get_pushers_by_user_id(user_id)): + if p['access_token'] in tokens: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p['app_id'], p['pushkey'], p['user_name'] ) - yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + yield self.remove_pusher( + p['app_id'], p['pushkey'], p['user_name'], + ) @defer.inlineCallbacks def on_new_notifications(self, min_stream_id, max_stream_id): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 8b9544c209..3aa810981f 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -254,8 +254,8 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): If None, tokens associated with any device (or no device) will be deleted Returns: - defer.Deferred[list[str, str|None]]: a list of the deleted tokens - and device IDs + defer.Deferred[list[str, int, str|None, int]]: a list of + (token, token id, device id) for each of the deleted tokens """ def f(txn): keyvalues = { @@ -272,12 +272,12 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): values.append(except_token_id) txn.execute( - "SELECT token, device_id FROM access_tokens WHERE %s" % where_clause, + "SELECT token, id, device_id FROM access_tokens WHERE %s" % where_clause, values ) - tokens_and_devices = [(r[0], r[1]) for r in txn] + tokens_and_devices = [(r[0], r[1], r[2]) for r in txn] - for token, _ in tokens_and_devices: + for token, _, _ in tokens_and_devices: self._invalidate_cache_and_stream( txn, self.get_user_by_access_token, (token,) ) -- cgit 1.4.1 From 47d99a20d5b4ae58ece15be8a43e96e39a9b943e Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 29 Nov 2017 16:46:45 +0000 Subject: Add user_directory_include_pattern config param to expand search results to additional users Initial commit; this doesn't work yet - the LIKE filtering seems too aggressive. It also needs _do_initial_spam to be aware of prepopulating the whole user_directory_search table with all users... ...and it needs a handle_user_signup() or something to be added so that new signups get incrementally added to the table too. Committing it here as a WIP --- synapse/config/homeserver.py | 3 ++- synapse/config/user_directory.py | 40 ++++++++++++++++++++++++++++++++++++++ synapse/handlers/user_directory.py | 4 ++-- synapse/server.py | 4 ++-- synapse/storage/user_directory.py | 16 +++++++++++---- 5 files changed, 58 insertions(+), 9 deletions(-) create mode 100644 synapse/config/user_directory.py (limited to 'synapse/storage') diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 05e242aef6..bf19cfee29 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -36,6 +36,7 @@ from .workers import WorkerConfig from .push import PushConfig from .spam_checker import SpamCheckerConfig from .groups import GroupsConfig +from .user_directory import UserDirectoryConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, @@ -44,7 +45,7 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, JWTConfig, PasswordConfig, EmailConfig, WorkerConfig, PasswordAuthProviderConfig, PushConfig, - SpamCheckerConfig, GroupsConfig,): + SpamCheckerConfig, GroupsConfig, UserDirectoryConfig,): pass diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py new file mode 100644 index 0000000000..03fb2090c7 --- /dev/null +++ b/synapse/config/user_directory.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + + +class UserDirectoryConfig(Config): + """User Directory Configuration + Configuration for the behaviour of the /user_directory API + """ + + def read_config(self, config): + self.user_directory_include_pattern = "%" + user_directory_config = config.get("user_directory", None) + if user_directory_config: + self.user_directory_include_pattern = ( + user_directory_config.get("include_pattern", "%") + ) + + def default_config(self, config_dir_path, server_name, **kwargs): + return """ + # User Directory configuration + # 'include_pattern' defines an optional SQL LIKE pattern when querying the + # user directory in addition to publicly visible users. Defaults to "%%" + # + #user_directory: + # include_pattern: "%%:%s" + """ % (server_name) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index b5be5d9623..51f8340df5 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -25,7 +25,7 @@ from synapse.util.async import sleep logger = logging.getLogger(__name__) -class UserDirectoyHandler(object): +class UserDirectoryHandler(object): """Handles querying of and keeping updated the user_directory. N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY @@ -389,7 +389,7 @@ class UserDirectoyHandler(object): """Called when we might need to add user to directory Args: - room_id (str): room_id that user joined or started being public that + room_id (str): room_id that user joined or started being public user_id (str) """ logger.debug("Adding user to dir, %r", user_id) diff --git a/synapse/server.py b/synapse/server.py index 10e3e9a4f1..6de33019d2 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -50,7 +50,7 @@ from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.read_marker import ReadMarkerHandler -from synapse.handlers.user_directory import UserDirectoyHandler +from synapse.handlers.user_directory import UserDirectoryHandler from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.profile import ProfileHandler from synapse.groups.groups_server import GroupsServerHandler @@ -321,7 +321,7 @@ class HomeServer(object): return ActionGenerator(self) def build_user_directory_handler(self): - return UserDirectoyHandler(self) + return UserDirectoryHandler(self) def build_groups_local_handler(self): return GroupsLocalHandler(self) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 5dc5b9582a..1022cdf7d0 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -629,6 +629,10 @@ class UserDirectoryStore(SQLBaseStore): ] } """ + + include_pattern = self.hs.config.user_directory_include_pattern or "%"; + logger.error("include pattern is %s" % (include_pattern)) + if isinstance(self.database_engine, PostgresEngine): full_query, exact_query, prefix_query = _parse_query_postgres(search_term) @@ -647,7 +651,9 @@ class UserDirectoryStore(SQLBaseStore): WHERE user_id = ? AND share_private ) AS s USING (user_id) WHERE - (s.user_id IS NOT NULL OR p.user_id IS NOT NULL) + (s.user_id IS NOT NULL OR + p.user_id IS NOT NULL OR + d.user_id LIKE ?) AND vector @@ to_tsquery('english', ?) ORDER BY (CASE WHEN s.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END) @@ -672,7 +678,7 @@ class UserDirectoryStore(SQLBaseStore): avatar_url IS NULL LIMIT ? """ - args = (user_id, full_query, exact_query, prefix_query, limit + 1,) + args = (user_id, include_pattern, full_query, exact_query, prefix_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): search_query = _parse_query_sqlite(search_term) @@ -686,7 +692,9 @@ class UserDirectoryStore(SQLBaseStore): WHERE user_id = ? AND share_private ) AS s USING (user_id) WHERE - (s.user_id IS NOT NULL OR p.user_id IS NOT NULL) + (s.user_id IS NOT NULL OR + p.user_id IS NOT NULL OR + d.user_id LIKE ?) AND value MATCH ? ORDER BY rank(matchinfo(user_directory_search)) DESC, @@ -694,7 +702,7 @@ class UserDirectoryStore(SQLBaseStore): avatar_url IS NULL LIMIT ? """ - args = (user_id, search_query, limit + 1) + args = (user_id, include_pattern, search_query, limit + 1) else: # This should be unreachable. raise Exception("Unrecognized database engine") -- cgit 1.4.1 From 3241c7aac3dc114a6abce46e5d241f42ed35a7fe Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 29 Nov 2017 18:27:05 +0000 Subject: untested WIP but might actually work --- synapse/config/user_directory.py | 5 ++--- synapse/handlers/profile.py | 14 ++++++++++++ synapse/handlers/user_directory.py | 46 ++++++++++++++++++++++++++++++++++---- synapse/storage/_base.py | 2 +- synapse/storage/profile.py | 14 ++++++++++++ synapse/storage/user_directory.py | 35 +++++++++++++++++++---------- 6 files changed, 96 insertions(+), 20 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py index 03fb2090c7..86177da401 100644 --- a/synapse/config/user_directory.py +++ b/synapse/config/user_directory.py @@ -22,18 +22,17 @@ class UserDirectoryConfig(Config): """ def read_config(self, config): - self.user_directory_include_pattern = "%" user_directory_config = config.get("user_directory", None) if user_directory_config: self.user_directory_include_pattern = ( - user_directory_config.get("include_pattern", "%") + user_directory_config.get("include_pattern", None) ) def default_config(self, config_dir_path, server_name, **kwargs): return """ # User Directory configuration # 'include_pattern' defines an optional SQL LIKE pattern when querying the - # user directory in addition to publicly visible users. Defaults to "%%" + # user directory in addition to publicly visible users. Defaults to None. # #user_directory: # include_pattern: "%%:%s" diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 5e5b1952dd..0a394a10b2 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -36,6 +36,8 @@ class ProfileHandler(BaseHandler): "profile", self.on_profile_query ) + self.user_directory_handler = hs.get_user_directory_handler() + self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS) @defer.inlineCallbacks @@ -139,6 +141,12 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_displayname ) + if self.hs.config.user_directory_include_pattern: + profile = yield self.store.get_profileinfo(target_user.localpart) + yield self.user_directory_handler.handle_local_profile_change( + target_user.to_string(), profile + ) + yield self._update_join_states(requester, target_user) @defer.inlineCallbacks @@ -183,6 +191,12 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_avatar_url ) + if self.hs.config.user_directory_include_pattern: + profile = yield self.store.get_profileinfo(target_user.localpart) + yield self.user_directory_handler.handle_local_profile_change( + target_user.user_id, profile + ) + yield self._update_join_states(requester, target_user) @defer.inlineCallbacks diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 51f8340df5..1769e84698 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -110,6 +110,15 @@ class UserDirectoryHandler(object): finally: self._is_processing = False + @defer.inlineCallbacks + def handle_local_profile_change(self, user_id, profile): + """Called to update index of our local user profiles when they change + irrespective of any rooms the user may be in. + """ + yield self.store.update_profile_in_user_dir( + user_id, profile.display_name, profile.avatar_url, + ) + @defer.inlineCallbacks def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB @@ -148,16 +157,30 @@ class UserDirectoryHandler(object): room_ids = yield self.store.get_all_rooms() logger.info("Doing initial update of user directory. %d rooms", len(room_ids)) - num_processed_rooms = 1 + num_processed_rooms = 0 for room_id in room_ids: - logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids)) + logger.info("Handling room %d/%d", num_processed_rooms+1, len(room_ids)) yield self._handle_initial_room(room_id) num_processed_rooms += 1 yield sleep(self.INITIAL_SLEEP_MS / 1000.) logger.info("Processed all rooms.") + if self.hs.config.user_directory_include_pattern: + logger.info("Doing initial update of user directory. %d users", len(user_ids)) + num_processed_users = 0 + user_ids = yield self.store.get_all_local_users() + for user_id in user_ids: + # We add profiles for all users even if they don't match the + # include pattern, just in case we want to change it in future + logger.info("Handling user %d/%d", num_processed_users+1, len(user_ids)) + yield self._handle_local_user(user_id) + num_processed_users += 1 + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + + logger.info("Processed all users") + self.initially_handled_users = None self.initially_handled_users_in_public = None self.initially_handled_users_share = None @@ -384,6 +407,21 @@ class UserDirectoryHandler(object): for user_id in users: yield self._handle_remove_user(room_id, user_id) + @defer.inlineCallbacks + def _handle_local_user(self, user_id): + """Adds a new local roomless user into the user_directory_search table. + Used to populate up the user index when we have an + user_directory_include_pattern specified. + """ + logger.debug("Adding new local user to dir, %r", user_id) + + profile = yield self.store.get_profileinfo(user_id) + + row = yield self.store.get_user_in_directory(user_id) + if not row: + yield self.store.add_profiles_to_user_dir(None, {user_id: profile}) + + @defer.inlineCallbacks def _handle_new_user(self, room_id, user_id, profile): """Called when we might need to add user to directory @@ -392,7 +430,7 @@ class UserDirectoryHandler(object): room_id (str): room_id that user joined or started being public user_id (str) """ - logger.debug("Adding user to dir, %r", user_id) + logger.debug("Adding new user to dir, %r", user_id) row = yield self.store.get_user_in_directory(user_id) if not row: @@ -407,7 +445,7 @@ class UserDirectoryHandler(object): if not row: yield self.store.add_users_to_public_room(room_id, [user_id]) else: - logger.debug("Not adding user to public dir, %r", user_id) + logger.debug("Not adding new user to public dir, %r", user_id) # Now we update users who share rooms with users. We do this by getting # all the current users in the room and seeing which aren't already diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e6eefdd6fe..0fdf49a2fd 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -547,7 +547,7 @@ class SQLBaseStore(object): def _simple_select_one(self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to - return a single row, returning a single column from it. + return a single row, returning multiple columns from it. Args: table : string giving the table name diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index beea3102fc..484ad59b62 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from synapse.storage.roommember import ProfileInfo + from ._base import SQLBaseStore @@ -26,6 +28,18 @@ class ProfileStore(SQLBaseStore): desc="create_profile", ) + def get_profileinfo(self, user_localpart): + profile = self._simple_select_one( + table="profiles", + keyvalues={"user_id": user_localpart}, + retcols=("displayname", "avatar_url"), + desc="get_profileinfo", + ) + return ProfileInfo( + avatar_url=profile.avatar_url, + displayname=profile.displayname, + ) + def get_profile_displayname(self, user_localpart): return self._simple_select_one_onecol( table="profiles", diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 1022cdf7d0..1cc73e3878 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -164,7 +164,7 @@ class UserDirectoryStore(SQLBaseStore): ) if isinstance(self.database_engine, PostgresEngine): - # We weight the loclpart most highly, then display name and finally + # We weight the localpart most highly, then display name and finally # server name if new_entry: sql = """ @@ -317,6 +317,16 @@ class UserDirectoryStore(SQLBaseStore): rows = yield self._execute("get_all_rooms", None, sql) defer.returnValue([room_id for room_id, in rows]) + @defer.inlineCallbacks + def get_all_local_users(self): + """Get all local users + """ + sql = """ + SELECT name FROM users + """ + rows = yield self._execute("get_all_local_users", None, sql) + defer.returnValue([name for name, in rows]) + def add_users_who_share_room(self, room_id, share_private, user_id_tuples): """Insert entries into the users_who_share_rooms table. The first user should be a local user. @@ -630,7 +640,12 @@ class UserDirectoryStore(SQLBaseStore): } """ - include_pattern = self.hs.config.user_directory_include_pattern or "%"; + include_pattern_clause = "" + if self.hs.config.user_directory_include_pattern: + include_pattern_clause = "OR d.user_id LIKE '%s'" % ( + self.hs.config.user_directory_include_pattern + ) + logger.error("include pattern is %s" % (include_pattern)) if isinstance(self.database_engine, PostgresEngine): @@ -651,9 +666,7 @@ class UserDirectoryStore(SQLBaseStore): WHERE user_id = ? AND share_private ) AS s USING (user_id) WHERE - (s.user_id IS NOT NULL OR - p.user_id IS NOT NULL OR - d.user_id LIKE ?) + (s.user_id IS NOT NULL OR p.user_id IS NOT NULL %s) AND vector @@ to_tsquery('english', ?) ORDER BY (CASE WHEN s.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END) @@ -677,8 +690,8 @@ class UserDirectoryStore(SQLBaseStore): display_name IS NULL, avatar_url IS NULL LIMIT ? - """ - args = (user_id, include_pattern, full_query, exact_query, prefix_query, limit + 1,) + """ % ( include_pattern_clause ) + args = (user_id, full_query, exact_query, prefix_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): search_query = _parse_query_sqlite(search_term) @@ -692,17 +705,15 @@ class UserDirectoryStore(SQLBaseStore): WHERE user_id = ? AND share_private ) AS s USING (user_id) WHERE - (s.user_id IS NOT NULL OR - p.user_id IS NOT NULL OR - d.user_id LIKE ?) + (s.user_id IS NOT NULL OR p.user_id IS NOT NULL %s) AND value MATCH ? ORDER BY rank(matchinfo(user_directory_search)) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? - """ - args = (user_id, include_pattern, search_query, limit + 1) + """ % ( include_pattern_clause ) + args = (user_id, search_query, limit + 1) else: # This should be unreachable. raise Exception("Unrecognized database engine") -- cgit 1.4.1 From a4bb133b6880c6adffce5feef3681504730cd484 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 30 Nov 2017 01:17:15 +0000 Subject: fix thinkos galore --- synapse/handlers/user_directory.py | 10 ++++++---- synapse/storage/profile.py | 31 ++++++++++++++++++++++--------- synapse/storage/user_directory.py | 20 +++++++++++++------- 3 files changed, 41 insertions(+), 20 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 1769e84698..b902f5d4e6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -20,6 +20,7 @@ from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.storage.roommember import ProfileInfo from synapse.util.metrics import Measure from synapse.util.async import sleep +from synapse.types import get_localpart_from_id logger = logging.getLogger(__name__) @@ -53,6 +54,7 @@ class UserDirectoryHandler(object): self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id self.update_user_directory = hs.config.update_user_directory + self.include_pattern = hs.config.user_directory_include_pattern # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already @@ -116,7 +118,7 @@ class UserDirectoryHandler(object): irrespective of any rooms the user may be in. """ yield self.store.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url, + user_id, profile.display_name, profile.avatar_url, None, ) @defer.inlineCallbacks @@ -167,10 +169,10 @@ class UserDirectoryHandler(object): logger.info("Processed all rooms.") - if self.hs.config.user_directory_include_pattern: - logger.info("Doing initial update of user directory. %d users", len(user_ids)) + if self.include_pattern: num_processed_users = 0 user_ids = yield self.store.get_all_local_users() + logger.info("Doing initial update of user directory. %d users", len(user_ids)) for user_id in user_ids: # We add profiles for all users even if they don't match the # include pattern, just in case we want to change it in future @@ -415,7 +417,7 @@ class UserDirectoryHandler(object): """ logger.debug("Adding new local user to dir, %r", user_id) - profile = yield self.store.get_profileinfo(user_id) + profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id)) row = yield self.store.get_user_in_directory(user_id) if not row: diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 484ad59b62..4e7b7c253b 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.storage.roommember import ProfileInfo +from synapse.api.errors import StoreError from ._base import SQLBaseStore @@ -28,16 +29,28 @@ class ProfileStore(SQLBaseStore): desc="create_profile", ) + @defer.inlineCallbacks def get_profileinfo(self, user_localpart): - profile = self._simple_select_one( - table="profiles", - keyvalues={"user_id": user_localpart}, - retcols=("displayname", "avatar_url"), - desc="get_profileinfo", - ) - return ProfileInfo( - avatar_url=profile.avatar_url, - displayname=profile.displayname, + try: + profile = yield self._simple_select_one( + table="profiles", + keyvalues={"user_id": user_localpart}, + retcols=("displayname", "avatar_url"), + desc="get_profileinfo", + ) + except StoreError, e: + if e.code == 404: + # no match + defer.returnValue(ProfileInfo(None, None)) + return + else: + raise + + defer.returnValue( + ProfileInfo( + avatar_url=profile['avatar_url'], + display_name=profile['displayname'], + ) ) def get_profile_displayname(self, user_localpart): diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 1cc73e3878..d8c9657125 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -640,13 +640,17 @@ class UserDirectoryStore(SQLBaseStore): } """ - include_pattern_clause = "" + include_pattern_join = "" + include_pattern_where_clause = "" if self.hs.config.user_directory_include_pattern: - include_pattern_clause = "OR d.user_id LIKE '%s'" % ( - self.hs.config.user_directory_include_pattern - ) + include_pattern_join = """ + LEFT JOIN ( + SELECT user_id FROM user_directory + WHERE user_id LIKE '%s' + ) AS ld USING (user_id) + """ % ( self.hs.config.user_directory_include_pattern ) - logger.error("include pattern is %s" % (include_pattern)) + include_pattern_where_clause = "OR ld.user_id IS NOT NULL" if isinstance(self.database_engine, PostgresEngine): full_query, exact_query, prefix_query = _parse_query_postgres(search_term) @@ -665,6 +669,7 @@ class UserDirectoryStore(SQLBaseStore): SELECT other_user_id AS user_id FROM users_who_share_rooms WHERE user_id = ? AND share_private ) AS s USING (user_id) + %s WHERE (s.user_id IS NOT NULL OR p.user_id IS NOT NULL %s) AND vector @@ to_tsquery('english', ?) @@ -690,7 +695,7 @@ class UserDirectoryStore(SQLBaseStore): display_name IS NULL, avatar_url IS NULL LIMIT ? - """ % ( include_pattern_clause ) + """ % ( include_pattern_join, include_pattern_where_clause ) args = (user_id, full_query, exact_query, prefix_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): search_query = _parse_query_sqlite(search_term) @@ -704,6 +709,7 @@ class UserDirectoryStore(SQLBaseStore): SELECT other_user_id AS user_id FROM users_who_share_rooms WHERE user_id = ? AND share_private ) AS s USING (user_id) + %s WHERE (s.user_id IS NOT NULL OR p.user_id IS NOT NULL %s) AND value MATCH ? @@ -712,7 +718,7 @@ class UserDirectoryStore(SQLBaseStore): display_name IS NULL, avatar_url IS NULL LIMIT ? - """ % ( include_pattern_clause ) + """ % ( include_pattern_join, include_pattern_where_clause ) args = (user_id, search_query, limit + 1) else: # This should be unreachable. -- cgit 1.4.1 From 4b1fceb913dbcd7472346e2d31c6cd4d99424f9a Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 30 Nov 2017 01:34:03 +0000 Subject: fix alternation operator for FTS4 - how did this ever work!? --- synapse/storage/user_directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index d8c9657125..44b7d8906a 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -748,7 +748,7 @@ def _parse_query_sqlite(search_term): # Pull out the individual words, discarding any non-word characters. results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) - return " & ".join("(%s* | %s)" % (result, result,) for result in results) + return " & ".join("(%s* OR %s)" % (result, result,) for result in results) def _parse_query_postgres(search_term): -- cgit 1.4.1 From f61e107f6351d0b5d0de76ec60912be915a7108c Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 30 Nov 2017 01:43:50 +0000 Subject: remove null constraint on user_dir.room_id --- .../schema/delta/46/user_dir_null_room_ids.sql | 35 ++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 synapse/storage/schema/delta/46/user_dir_null_room_ids.sql (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/46/user_dir_null_room_ids.sql b/synapse/storage/schema/delta/46/user_dir_null_room_ids.sql new file mode 100644 index 0000000000..cb0d5a2576 --- /dev/null +++ b/synapse/storage/schema/delta/46/user_dir_null_room_ids.sql @@ -0,0 +1,35 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- change the user_directory table to also cover global local user profiles +-- rather than just profiles within specific rooms. + +CREATE TABLE user_directory2 ( + user_id TEXT NOT NULL, + room_id TEXT, + display_name TEXT, + avatar_url TEXT +); + +INSERT INTO user_directory2(user_id, room_id, display_name, avatar_url) + SELECT user_id, room_id, display_name, avatar_url from user_directory; + +DROP TABLE user_directory; +ALTER TABLE user_directory2 RENAME TO user_directory; + +-- create indexes after doing the inserts because that's more efficient. +-- it also means we can give it the same name as the old one without renaming. +CREATE INDEX user_directory_room_idx ON user_directory(room_id); +CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id); -- cgit 1.4.1 From 1bd40ca73e1a6dc83d0a6c96f52071553960b3a8 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 4 Dec 2017 14:58:39 +0000 Subject: switch to a simpler 'search_all_users' button as per review feedback --- synapse/config/user_directory.py | 15 +++++++------- synapse/handlers/profile.py | 4 ++-- synapse/handlers/register.py | 2 +- synapse/handlers/user_directory.py | 6 +++--- synapse/storage/user_directory.py | 40 +++++++++++++++----------------------- 5 files changed, 30 insertions(+), 37 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py index cf358fe8a9..55b4e553ee 100644 --- a/synapse/config/user_directory.py +++ b/synapse/config/user_directory.py @@ -22,19 +22,20 @@ class UserDirectoryConfig(Config): """ def read_config(self, config): - self.user_directory_include_pattern = None + self.user_directory_search_all_users = False user_directory_config = config.get("user_directory", None) if user_directory_config: - self.user_directory_include_pattern = ( - user_directory_config.get("include_pattern", None) + self.user_directory_search_all_users = ( + user_directory_config.get("search_all_users", False) ) def default_config(self, config_dir_path, server_name, **kwargs): return """ # User Directory configuration - # 'include_pattern' defines an optional SQL LIKE pattern when querying the - # user directory in addition to publicly visible users. Defaults to None. + # 'search_all_users' defines whether to search all users visible to your HS + # when searching the user directory, rather than limiting to users visible + # in public rooms. Defaults to false. # #user_directory: - # include_pattern: "%%:%s" - """ % (server_name) + # search_all_users: false + """ diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 0a394a10b2..50e81a20e4 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -141,7 +141,7 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_displayname ) - if self.hs.config.user_directory_include_pattern: + if self.hs.config.user_directory_search_all_users: profile = yield self.store.get_profileinfo(target_user.localpart) yield self.user_directory_handler.handle_local_profile_change( target_user.to_string(), profile @@ -191,7 +191,7 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_avatar_url ) - if self.hs.config.user_directory_include_pattern: + if self.hs.config.user_directory_search_all_users: profile = yield self.store.get_profileinfo(target_user.localpart) yield self.user_directory_handler.handle_local_profile_change( target_user.user_id, profile diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 5db106dfca..4bc6ef51fe 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -167,7 +167,7 @@ class RegistrationHandler(BaseHandler): admin=admin, ) - if self.hs.config.user_directory_include_pattern: + if self.hs.config.user_directory_search_all_users: profile = yield self.store.get_profileinfo(localpart) yield self.user_directory_handler.handle_local_profile_change( user_id, profile diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index b902f5d4e6..c1f8e20bfd 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -54,7 +54,7 @@ class UserDirectoryHandler(object): self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id self.update_user_directory = hs.config.update_user_directory - self.include_pattern = hs.config.user_directory_include_pattern + self.search_all_users = hs.config.user_directory_search_all_users # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already @@ -169,7 +169,7 @@ class UserDirectoryHandler(object): logger.info("Processed all rooms.") - if self.include_pattern: + if self.search_all_users: num_processed_users = 0 user_ids = yield self.store.get_all_local_users() logger.info("Doing initial update of user directory. %d users", len(user_ids)) @@ -413,7 +413,7 @@ class UserDirectoryHandler(object): def _handle_local_user(self, user_id): """Adds a new local roomless user into the user_directory_search table. Used to populate up the user index when we have an - user_directory_include_pattern specified. + user_directory_search_all_users specified. """ logger.debug("Adding new local user to dir, %r", user_id) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 44b7d8906a..f9d2f8e60d 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -640,17 +640,19 @@ class UserDirectoryStore(SQLBaseStore): } """ - include_pattern_join = "" - include_pattern_where_clause = "" - if self.hs.config.user_directory_include_pattern: - include_pattern_join = """ - LEFT JOIN ( - SELECT user_id FROM user_directory - WHERE user_id LIKE '%s' - ) AS ld USING (user_id) - """ % ( self.hs.config.user_directory_include_pattern ) - include_pattern_where_clause = "OR ld.user_id IS NOT NULL" + if self.hs.config.user_directory_search_all_users: + join_clause = "" + where_clause = "?<>''" # naughty hack to keep the same number of binds + else: + join_clause = """ + LEFT JOIN users_in_public_rooms AS p USING (user_id) + LEFT JOIN ( + SELECT other_user_id AS user_id FROM users_who_share_rooms + WHERE user_id = ? AND share_private + ) AS s USING (user_id) + """ + where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)" if isinstance(self.database_engine, PostgresEngine): full_query, exact_query, prefix_query = _parse_query_postgres(search_term) @@ -664,14 +666,9 @@ class UserDirectoryStore(SQLBaseStore): SELECT d.user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory AS d USING (user_id) - LEFT JOIN users_in_public_rooms AS p USING (user_id) - LEFT JOIN ( - SELECT other_user_id AS user_id FROM users_who_share_rooms - WHERE user_id = ? AND share_private - ) AS s USING (user_id) %s WHERE - (s.user_id IS NOT NULL OR p.user_id IS NOT NULL %s) + %s AND vector @@ to_tsquery('english', ?) ORDER BY (CASE WHEN s.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END) @@ -695,7 +692,7 @@ class UserDirectoryStore(SQLBaseStore): display_name IS NULL, avatar_url IS NULL LIMIT ? - """ % ( include_pattern_join, include_pattern_where_clause ) + """ % ( join_clause, where_clause ) args = (user_id, full_query, exact_query, prefix_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): search_query = _parse_query_sqlite(search_term) @@ -704,21 +701,16 @@ class UserDirectoryStore(SQLBaseStore): SELECT d.user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory AS d USING (user_id) - LEFT JOIN users_in_public_rooms AS p USING (user_id) - LEFT JOIN ( - SELECT other_user_id AS user_id FROM users_who_share_rooms - WHERE user_id = ? AND share_private - ) AS s USING (user_id) %s WHERE - (s.user_id IS NOT NULL OR p.user_id IS NOT NULL %s) + %s AND value MATCH ? ORDER BY rank(matchinfo(user_directory_search)) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? - """ % ( include_pattern_join, include_pattern_where_clause ) + """ % ( join_clause, where_clause ) args = (user_id, search_query, limit + 1) else: # This should be unreachable. -- cgit 1.4.1 From 74e0cc74ceb48a8c55615180f67406b339c88a18 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 4 Dec 2017 15:11:38 +0000 Subject: fix pep8 and tests --- synapse/handlers/profile.py | 2 +- synapse/handlers/user_directory.py | 5 ++--- synapse/storage/user_directory.py | 7 +++---- 3 files changed, 6 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 50e81a20e4..9800e24453 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -194,7 +194,7 @@ class ProfileHandler(BaseHandler): if self.hs.config.user_directory_search_all_users: profile = yield self.store.get_profileinfo(target_user.localpart) yield self.user_directory_handler.handle_local_profile_change( - target_user.user_id, profile + target_user.to_string(), profile ) yield self._update_join_states(requester, target_user) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index c1f8e20bfd..667e98a218 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -162,7 +162,7 @@ class UserDirectoryHandler(object): num_processed_rooms = 0 for room_id in room_ids: - logger.info("Handling room %d/%d", num_processed_rooms+1, len(room_ids)) + logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids)) yield self._handle_initial_room(room_id) num_processed_rooms += 1 yield sleep(self.INITIAL_SLEEP_MS / 1000.) @@ -176,7 +176,7 @@ class UserDirectoryHandler(object): for user_id in user_ids: # We add profiles for all users even if they don't match the # include pattern, just in case we want to change it in future - logger.info("Handling user %d/%d", num_processed_users+1, len(user_ids)) + logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids)) yield self._handle_local_user(user_id) num_processed_users += 1 yield sleep(self.INITIAL_SLEEP_MS / 1000.) @@ -423,7 +423,6 @@ class UserDirectoryHandler(object): if not row: yield self.store.add_profiles_to_user_dir(None, {user_id: profile}) - @defer.inlineCallbacks def _handle_new_user(self, room_id, user_id, profile): """Called when we might need to add user to directory diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index f9d2f8e60d..c9bff408ef 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -640,10 +640,9 @@ class UserDirectoryStore(SQLBaseStore): } """ - if self.hs.config.user_directory_search_all_users: join_clause = "" - where_clause = "?<>''" # naughty hack to keep the same number of binds + where_clause = "?<>''" # naughty hack to keep the same number of binds else: join_clause = """ LEFT JOIN users_in_public_rooms AS p USING (user_id) @@ -692,7 +691,7 @@ class UserDirectoryStore(SQLBaseStore): display_name IS NULL, avatar_url IS NULL LIMIT ? - """ % ( join_clause, where_clause ) + """ % (join_clause, where_clause) args = (user_id, full_query, exact_query, prefix_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): search_query = _parse_query_sqlite(search_term) @@ -710,7 +709,7 @@ class UserDirectoryStore(SQLBaseStore): display_name IS NULL, avatar_url IS NULL LIMIT ? - """ % ( join_clause, where_clause ) + """ % (join_clause, where_clause) args = (user_id, search_query, limit + 1) else: # This should be unreachable. -- cgit 1.4.1 From cdc2cb5d11860f013c58923779a2226620453309 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 5 Dec 2017 11:09:31 +0000 Subject: fix StoreError syntax --- synapse/storage/profile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 4e7b7c253b..ec02e73bc2 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -38,7 +38,7 @@ class ProfileStore(SQLBaseStore): retcols=("displayname", "avatar_url"), desc="get_profileinfo", ) - except StoreError, e: + except StoreError as e: if e.code == 404: # no match defer.returnValue(ProfileInfo(None, None)) -- cgit 1.4.1