diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 16 | ||||
-rw-r--r-- | synapse/storage/_base.py | 2 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 95 | ||||
-rw-r--r-- | synapse/storage/push_rule.py | 13 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 2 | ||||
-rw-r--r-- | synapse/storage/schema/delta/30/as_users.py | 4 | ||||
-rw-r--r-- | synapse/storage/signatures.py | 25 |
7 files changed, 133 insertions, 24 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d970fde9e8..8581796b7e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) -from ._base import Cache +from ._base import Cache, LoggingTransaction from .directory import DirectoryStore from .events import EventsStore from .presence import PresenceStore, UserPresenceState @@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, db_conn, hs): self.hs = hs + self._clock = hs.get_clock() self.database_engine = hs.database_engine self.client_ip_last_seen = Cache( @@ -173,6 +174,19 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + cur = LoggingTransaction( + db_conn.cursor(), + name="_find_stream_orderings_for_times_txn", + database_engine=self.database_engine, + after_callbacks=[] + ) + self._find_stream_orderings_for_times_txn(cur) + cur.close() + + self.find_stream_orderings_looping_call = self._clock.looping_call( + self._find_stream_orderings_for_times, 60 * 60 * 1000 + ) + super(DataStore, self).__init__(hs) def take_presence_startup_info(self): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e0d7098692..32c6677d47 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -152,8 +152,8 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs - self._db_pool = hs.get_db_pool() self._clock = hs.get_clock() + self._db_pool = hs.get_db_pool() self._previous_txn_total_time = 0 self._current_txn_total_time = 0 diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 9705db5c47..940e11d7a2 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -24,6 +24,10 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): + def __init__(self, hs): + self.stream_ordering_month_ago = None + super(EventPushActionsStore, self).__init__(hs) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -115,7 +119,8 @@ class EventPushActionsStore(SQLBaseStore): @defer.inlineCallbacks def get_unread_push_actions_for_user_in_range(self, user_id, min_stream_ordering, - max_stream_ordering=None): + max_stream_ordering=None, + limit=20): def get_after_receipt(txn): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " @@ -147,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore): if max_stream_ordering is not None: sql += " AND ep.stream_ordering <= ?" args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering ASC" + sql += " ORDER BY ep.stream_ordering ASC LIMIT ?" + args.append(limit) txn.execute(sql, args) return txn.fetchall() after_read_receipt = yield self.runInteraction( @@ -224,18 +230,93 @@ class EventPushActionsStore(SQLBaseStore): (room_id, event_id) ) - def _remove_push_actions_before_txn(self, txn, room_id, user_id, - topological_ordering): + def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, + topological_ordering): + """ + Purges old, stale push actions for a user and room before a given + topological_ordering + Args: + txn: The transcation + room_id: Room ID to delete from + user_id: user ID to delete for + topological_ordering: The lowest topological ordering which will + not be deleted. + """ txn.call_after( self.get_unread_event_push_actions_by_room_for_user.invalidate_many, (room_id, user_id, ) ) + + # We need to join on the events table to get the received_ts for + # event_push_actions and sqlite won't let us use a join in a delete so + # we can't just delete where received_ts < x. Furthermore we can + # only identify event_push_actions by a tuple of room_id, event_id + # we we can't use a subquery. + # Instead, we look up the stream ordering for the last event in that + # room received before the threshold time and delete event_push_actions + # in the room with a stream_odering before that. txn.execute( - "DELETE FROM event_push_actions" - " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?", - (room_id, user_id, topological_ordering,) + "DELETE FROM event_push_actions " + " WHERE user_id = ? AND room_id = ? AND " + " topological_ordering < ? AND stream_ordering < ?", + (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) + ) + + @defer.inlineCallbacks + def _find_stream_orderings_for_times(self): + yield self.runInteraction( + "_find_stream_orderings_for_times", + self._find_stream_orderings_for_times_txn + ) + + def _find_stream_orderings_for_times_txn(self, txn): + logger.info("Searching for stream ordering 1 month ago") + self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn( + txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000 + ) + logger.info( + "Found stream ordering 1 month ago: it's %d", + self.stream_ordering_month_ago ) + def _find_first_stream_ordering_after_ts_txn(self, txn, ts): + """ + Find the stream_ordering of the first event that was received after + a given timestamp. This is relatively slow as there is no index on + received_ts but we can then use this to delete push actions before + this. + + received_ts must necessarily be in the same order as stream_ordering + and stream_ordering is indexed, so we manually binary search using + stream_ordering + """ + txn.execute("SELECT MAX(stream_ordering) FROM events") + max_stream_ordering = txn.fetchone()[0] + + if max_stream_ordering is None: + return 0 + + range_start = 0 + range_end = max_stream_ordering + + sql = ( + "SELECT received_ts FROM events" + " WHERE stream_ordering > ?" + " ORDER BY stream_ordering" + " LIMIT 1" + ) + + while range_end - range_start > 1: + middle = int((range_end + range_start) / 2) + txn.execute(sql, (middle,)) + middle_ts = txn.fetchone()[0] + if ts > middle_ts: + range_start = middle + else: + range_end = middle + + return range_end + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index f285f59afd..ebb97c8474 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -66,7 +66,10 @@ class PushRuleStore(SQLBaseStore): if not user_ids: defer.returnValue({}) - results = {} + results = { + user_id: [] + for user_id in user_ids + } rows = yield self._simple_select_many_batch( table="push_rules", @@ -90,7 +93,10 @@ class PushRuleStore(SQLBaseStore): if not user_ids: defer.returnValue({}) - results = {} + results = { + user_id: {} + for user_id in user_ids + } rows = yield self._simple_select_many_batch( table="push_rules_enable", @@ -100,7 +106,8 @@ class PushRuleStore(SQLBaseStore): desc="bulk_get_push_rules_enabled", ) for row in rows: - results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled'] + enabled = bool(row['enabled']) + results.setdefault(row['user_name'], {})[row['rule_id']] = enabled defer.returnValue(results) @defer.inlineCallbacks diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 964f30dff7..8c26f39fbb 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -321,7 +321,7 @@ class ReceiptsStore(SQLBaseStore): ) if receipt_type == "m.read" and topological_ordering: - self._remove_push_actions_before_txn( + self._remove_old_push_actions_before_txn( txn, room_id=room_id, user_id=user_id, diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py index b417e3ac08..5b7d8d1ab5 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from synapse.storage.appservice import ApplicationServiceStore +from synapse.config.appservice import load_appservices logger = logging.getLogger(__name__) @@ -38,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): logger.warning("Could not get app_service_config_files from config") pass - appservices = ApplicationServiceStore.load_appservices( + appservices = load_appservices( config.server_name, config_files ) diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index b10f2a5787..ea6823f18d 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -19,17 +19,24 @@ from ._base import SQLBaseStore from unpaddedbase64 import encode_base64 from synapse.crypto.event_signing import compute_event_reference_hash +from synapse.util.caches.descriptors import cached, cachedList class SignatureStore(SQLBaseStore): """Persistence for event signatures and hashes""" + @cached(lru=True) + def get_event_reference_hash(self, event_id): + return self._get_event_reference_hashes_txn(event_id) + + @cachedList(cached_method_name="get_event_reference_hash", + list_name="event_ids", num_args=1) def get_event_reference_hashes(self, event_ids): def f(txn): - return [ - self._get_event_reference_hashes_txn(txn, ev) - for ev in event_ids - ] + return { + event_id: self._get_event_reference_hashes_txn(txn, event_id) + for event_id in event_ids + } return self.runInteraction( "get_event_reference_hashes", @@ -41,15 +48,15 @@ class SignatureStore(SQLBaseStore): hashes = yield self.get_event_reference_hashes( event_ids ) - hashes = [ - { + hashes = { + e_id: { k: encode_base64(v) for k, v in h.items() if k == "sha256" } - for h in hashes - ] + for e_id, h in hashes.items() + } - defer.returnValue(zip(event_ids, hashes)) + defer.returnValue(hashes.items()) def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. |