diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 8 | ||||
-rw-r--r-- | synapse/storage/account_data.py | 13 | ||||
-rw-r--r-- | synapse/storage/background_updates.py | 83 | ||||
-rw-r--r-- | synapse/storage/events.py | 32 | ||||
-rw-r--r-- | synapse/storage/push_rule.py | 5 | ||||
-rw-r--r-- | synapse/storage/state.py | 17 |
6 files changed, 115 insertions, 43 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c659004e8d..58b73af7d2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -60,12 +60,12 @@ class LoggingTransaction(object): object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "after_callbacks", after_callbacks) - def call_after(self, callback, *args): + def call_after(self, callback, *args, **kwargs): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. """ - self.after_callbacks.append((callback, args)) + self.after_callbacks.append((callback, args, kwargs)) def __getattr__(self, name): return getattr(self.txn, name) @@ -319,8 +319,8 @@ class SQLBaseStore(object): inner_func, *args, **kwargs ) finally: - for after_callback, after_args in after_callbacks: - after_callback(*after_args) + for after_callback, after_args, after_kwargs in after_callbacks: + after_callback(*after_args, **after_kwargs) defer.returnValue(result) @defer.inlineCallbacks diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index aa84ffc2b0..ff14e54c11 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -308,3 +308,16 @@ class AccountDataStore(SQLBaseStore): " WHERE stream_id < ?" ) txn.execute(update_max_id_sql, (next_id, next_id)) + + @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) + def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): + ignored_account_data = yield self.get_global_account_data_by_type_for_user( + "m.ignored_user_list", ignorer_user_id, + on_invalidate=cache_context.invalidate, + ) + if not ignored_account_data: + defer.returnValue(False) + + defer.returnValue( + ignored_user_id in ignored_account_data.get("ignored_users", {}) + ) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 813ad59e56..d4cf0fc59b 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -228,46 +228,69 @@ class BackgroundUpdateStore(SQLBaseStore): columns (list[str]): columns/expressions to include in index """ - # if this is postgres, we add the indexes concurrently. Otherwise - # we fall back to doing it inline - if isinstance(self.database_engine, engines.PostgresEngine): - conc = True - else: - conc = False - # We don't use partial indices on SQLite as it wasn't introduced - # until 3.8, and wheezy has 3.7 - where_clause = None - - sql = ( - "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" - " %(where_clause)s" - ) % { - "conc": "CONCURRENTLY" if conc else "", - "name": index_name, - "table": table, - "columns": ", ".join(columns), - "where_clause": "WHERE " + where_clause if where_clause else "" - } - - def create_index_concurrently(conn): + def create_index_psql(conn): conn.rollback() # postgres insists on autocommit for the index conn.set_session(autocommit=True) - c = conn.cursor() - c.execute(sql) - conn.set_session(autocommit=False) - def create_index(conn): + try: + c = conn.cursor() + + # If a previous attempt to create the index was interrupted, + # we may already have a half-built index. Let's just drop it + # before trying to create it again. + + sql = "DROP INDEX IF EXISTS %s" % (index_name,) + logger.debug("[SQL] %s", sql) + c.execute(sql) + + sql = ( + "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s" + " (%(columns)s) %(where_clause)s" + ) % { + "name": index_name, + "table": table, + "columns": ", ".join(columns), + "where_clause": "WHERE " + where_clause if where_clause else "" + } + logger.debug("[SQL] %s", sql) + c.execute(sql) + finally: + conn.set_session(autocommit=False) + + def create_index_sqlite(conn): + # Sqlite doesn't support concurrent creation of indexes. + # + # We don't use partial indices on SQLite as it wasn't introduced + # until 3.8, and wheezy has 3.7 + # + # We assume that sqlite doesn't give us invalid indices; however + # we may still end up with the index existing but the + # background_updates not having been recorded if synapse got shut + # down at the wrong moment - hance we use IF NOT EXISTS. (SQLite + # has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.) + sql = ( + "CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s" + " (%(columns)s)" + ) % { + "name": index_name, + "table": table, + "columns": ", ".join(columns), + } + c = conn.cursor() + logger.debug("[SQL] %s", sql) c.execute(sql) + if isinstance(self.database_engine, engines.PostgresEngine): + runner = create_index_psql + else: + runner = create_index_sqlite + @defer.inlineCallbacks def updater(progress, batch_size): logger.info("Adding index %s to %s", index_name, table) - if conc: - yield self.runWithConnection(create_index_concurrently) - else: - yield self.runWithConnection(create_index) + yield self.runWithConnection(runner) yield self._end_background_update(update_name) defer.returnValue(1) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3790419dd..d946024c9b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -29,6 +29,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.state import resolve_events from synapse.util.caches.descriptors import cached +from synapse.types import get_domain_from_id from canonicaljson import encode_canonical_json from collections import deque, namedtuple, OrderedDict @@ -49,6 +50,9 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) persist_event_counter = metrics.register_counter("persisted_events") +event_counter = metrics.register_counter( + "persisted_events_sep", labels=["type", "origin_type", "origin_entity"] +) def encode_json(json_object): @@ -371,6 +375,24 @@ class EventsStore(SQLBaseStore): ) persist_event_counter.inc_by(len(chunk)) + for room_id, (_, _, new_state) in current_state_for_room.iteritems(): + self.get_current_state_ids.prefill( + (room_id, ), new_state + ) + + for event, context in chunk: + if context.app_service: + origin_type = "local" + origin_entity = context.app_service.id + elif self.hs.is_mine_id(event.sender): + origin_type = "local" + origin_entity = "*client*" + else: + origin_type = "remote" + origin_entity = get_domain_from_id(event.sender) + + event_counter.inc(event.type, origin_type, origin_entity) + @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): """Calculates the new forward extremeties for a room given events to @@ -419,10 +441,10 @@ class EventsStore(SQLBaseStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, i.e. - (type, state_key) -> event_id. `to_delete` are the entries to + 3-tuple (to_delete, to_insert, new_state) where both are state dicts, + i.e. (type, state_key) -> event_id. `to_delete` are the entries to first be deleted from current_state_events, `to_insert` are entries - to insert. + to insert. `new_state` is the full set of state. May return None if there are no changes to be applied. """ # Now we need to work out the different state sets for @@ -529,7 +551,7 @@ class EventsStore(SQLBaseStore): if ev_id in events_to_insert } - defer.returnValue((to_delete, to_insert)) + defer.returnValue((to_delete, to_insert, current_state)) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, @@ -682,7 +704,7 @@ class EventsStore(SQLBaseStore): def _update_current_state_txn(self, txn, state_delta_by_room): for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert = current_state_tuple + to_delete, to_insert, _ = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", [(ev_id,) for ev_id in to_delete.itervalues()], diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index cbec255966..353a135c4e 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -188,7 +188,7 @@ class PushRuleStore(SQLBaseStore): user_ids, on_invalidate=cache_context.invalidate, ) - rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} + rules_by_user = {k: v for k, v in rules_by_user.iteritems() if v is not None} defer.returnValue(rules_by_user) @@ -398,7 +398,8 @@ class PushRuleStore(SQLBaseStore): with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids yield self.runInteraction( - "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering + "delete_push_rule", delete_push_rule_txn, stream_id, + event_stream_ordering, ) @defer.inlineCallbacks diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e89001d994..03981f5d2b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches import intern_string +from synapse.util.stringutils import to_ascii from synapse.storage.engines import PostgresEngine from twisted.internet import defer @@ -89,7 +90,7 @@ class StateStore(SQLBaseStore): ) return { - (r[0], r[1]): r[2] for r in txn + (intern_string(r[0]), intern_string(r[1])): to_ascii(r[2]) for r in txn } return self.runInteraction( @@ -226,6 +227,18 @@ class StateStore(SQLBaseStore): ], ) + # Prefill the state group cache with this group. + # It's fine to use the sequence like this as the state group map + # is immutable. (If the map wasn't immutable then this prefill could + # race with another update) + txn.call_after( + self._state_group_cache.update, + self._state_group_cache.sequence, + key=context.state_group, + value=context.current_state_ids, + full=True, + ) + self._simple_insert_many_txn( txn, table="event_to_state_groups", @@ -655,7 +668,7 @@ class StateStore(SQLBaseStore): state_dict = results[group] state_dict.update( - ((intern_string(k[0]), intern_string(k[1])), v) + ((intern_string(k[0]), intern_string(k[1])), to_ascii(v)) for k, v in group_state_dict.iteritems() ) |