diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index d970fde9e8..8581796b7e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
-from ._base import Cache
+from ._base import Cache, LoggingTransaction
from .directory import DirectoryStore
from .events import EventsStore
from .presence import PresenceStore, UserPresenceState
@@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
def __init__(self, db_conn, hs):
self.hs = hs
+ self._clock = hs.get_clock()
self.database_engine = hs.database_engine
self.client_ip_last_seen = Cache(
@@ -173,6 +174,19 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=push_rules_prefill,
)
+ cur = LoggingTransaction(
+ db_conn.cursor(),
+ name="_find_stream_orderings_for_times_txn",
+ database_engine=self.database_engine,
+ after_callbacks=[]
+ )
+ self._find_stream_orderings_for_times_txn(cur)
+ cur.close()
+
+ self.find_stream_orderings_looping_call = self._clock.looping_call(
+ self._find_stream_orderings_for_times, 60 * 60 * 1000
+ )
+
super(DataStore, self).__init__(hs)
def take_presence_startup_info(self):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e0d7098692..56a0dd80f3 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -153,7 +153,6 @@ class SQLBaseStore(object):
def __init__(self, hs):
self.hs = hs
self._db_pool = hs.get_db_pool()
- self._clock = hs.get_clock()
self._previous_txn_total_time = 0
self._current_txn_total_time = 0
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 9705db5c47..940e11d7a2 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -24,6 +24,10 @@ logger = logging.getLogger(__name__)
class EventPushActionsStore(SQLBaseStore):
+ def __init__(self, hs):
+ self.stream_ordering_month_ago = None
+ super(EventPushActionsStore, self).__init__(hs)
+
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
Args:
@@ -115,7 +119,8 @@ class EventPushActionsStore(SQLBaseStore):
@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range(self, user_id,
min_stream_ordering,
- max_stream_ordering=None):
+ max_stream_ordering=None,
+ limit=20):
def get_after_receipt(txn):
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
@@ -147,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore):
if max_stream_ordering is not None:
sql += " AND ep.stream_ordering <= ?"
args.append(max_stream_ordering)
- sql += " ORDER BY ep.stream_ordering ASC"
+ sql += " ORDER BY ep.stream_ordering ASC LIMIT ?"
+ args.append(limit)
txn.execute(sql, args)
return txn.fetchall()
after_read_receipt = yield self.runInteraction(
@@ -224,18 +230,93 @@ class EventPushActionsStore(SQLBaseStore):
(room_id, event_id)
)
- def _remove_push_actions_before_txn(self, txn, room_id, user_id,
- topological_ordering):
+ def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
+ topological_ordering):
+ """
+ Purges old, stale push actions for a user and room before a given
+ topological_ordering
+ Args:
+ txn: The transcation
+ room_id: Room ID to delete from
+ user_id: user ID to delete for
+ topological_ordering: The lowest topological ordering which will
+ not be deleted.
+ """
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id, user_id, )
)
+
+ # We need to join on the events table to get the received_ts for
+ # event_push_actions and sqlite won't let us use a join in a delete so
+ # we can't just delete where received_ts < x. Furthermore we can
+ # only identify event_push_actions by a tuple of room_id, event_id
+ # we we can't use a subquery.
+ # Instead, we look up the stream ordering for the last event in that
+ # room received before the threshold time and delete event_push_actions
+ # in the room with a stream_odering before that.
txn.execute(
- "DELETE FROM event_push_actions"
- " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
- (room_id, user_id, topological_ordering,)
+ "DELETE FROM event_push_actions "
+ " WHERE user_id = ? AND room_id = ? AND "
+ " topological_ordering < ? AND stream_ordering < ?",
+ (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
+ )
+
+ @defer.inlineCallbacks
+ def _find_stream_orderings_for_times(self):
+ yield self.runInteraction(
+ "_find_stream_orderings_for_times",
+ self._find_stream_orderings_for_times_txn
+ )
+
+ def _find_stream_orderings_for_times_txn(self, txn):
+ logger.info("Searching for stream ordering 1 month ago")
+ self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+ txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+ )
+ logger.info(
+ "Found stream ordering 1 month ago: it's %d",
+ self.stream_ordering_month_ago
)
+ def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
+ """
+ Find the stream_ordering of the first event that was received after
+ a given timestamp. This is relatively slow as there is no index on
+ received_ts but we can then use this to delete push actions before
+ this.
+
+ received_ts must necessarily be in the same order as stream_ordering
+ and stream_ordering is indexed, so we manually binary search using
+ stream_ordering
+ """
+ txn.execute("SELECT MAX(stream_ordering) FROM events")
+ max_stream_ordering = txn.fetchone()[0]
+
+ if max_stream_ordering is None:
+ return 0
+
+ range_start = 0
+ range_end = max_stream_ordering
+
+ sql = (
+ "SELECT received_ts FROM events"
+ " WHERE stream_ordering > ?"
+ " ORDER BY stream_ordering"
+ " LIMIT 1"
+ )
+
+ while range_end - range_start > 1:
+ middle = int((range_end + range_start) / 2)
+ txn.execute(sql, (middle,))
+ middle_ts = txn.fetchone()[0]
+ if ts > middle_ts:
+ range_start = middle
+ else:
+ range_end = middle
+
+ return range_end
+
def _action_has_highlight(actions):
for action in actions:
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index d2bf7f2aec..ebb97c8474 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -14,7 +14,7 @@
# limitations under the License.
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from twisted.internet import defer
import logging
@@ -24,7 +24,7 @@ logger = logging.getLogger(__name__)
class PushRuleStore(SQLBaseStore):
- @cachedInlineCallbacks()
+ @cachedInlineCallbacks(lru=True)
def get_push_rules_for_user(self, user_id):
rows = yield self._simple_select_list(
table="push_rules",
@@ -44,7 +44,7 @@ class PushRuleStore(SQLBaseStore):
defer.returnValue(rows)
- @cachedInlineCallbacks()
+ @cachedInlineCallbacks(lru=True)
def get_push_rules_enabled_for_user(self, user_id):
results = yield self._simple_select_list(
table="push_rules_enable",
@@ -60,12 +60,16 @@ class PushRuleStore(SQLBaseStore):
r['rule_id']: False if r['enabled'] == 0 else True for r in results
})
- @defer.inlineCallbacks
+ @cachedList(cached_method_name="get_push_rules_for_user",
+ list_name="user_ids", num_args=1, inlineCallbacks=True)
def bulk_get_push_rules(self, user_ids):
if not user_ids:
defer.returnValue({})
- results = {}
+ results = {
+ user_id: []
+ for user_id in user_ids
+ }
rows = yield self._simple_select_many_batch(
table="push_rules",
@@ -75,18 +79,24 @@ class PushRuleStore(SQLBaseStore):
desc="bulk_get_push_rules",
)
- rows.sort(key=lambda e: (-e["priority_class"], -e["priority"]))
+ rows.sort(
+ key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))
+ )
for row in rows:
results.setdefault(row['user_name'], []).append(row)
defer.returnValue(results)
- @defer.inlineCallbacks
+ @cachedList(cached_method_name="get_push_rules_enabled_for_user",
+ list_name="user_ids", num_args=1, inlineCallbacks=True)
def bulk_get_push_rules_enabled(self, user_ids):
if not user_ids:
defer.returnValue({})
- results = {}
+ results = {
+ user_id: {}
+ for user_id in user_ids
+ }
rows = yield self._simple_select_many_batch(
table="push_rules_enable",
@@ -96,7 +106,8 @@ class PushRuleStore(SQLBaseStore):
desc="bulk_get_push_rules_enabled",
)
for row in rows:
- results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled']
+ enabled = bool(row['enabled'])
+ results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
defer.returnValue(results)
@defer.inlineCallbacks
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index fdcf28f3e1..f1774f0e44 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore):
)
if receipt_type == "m.read" and topological_ordering:
- self._remove_push_actions_before_txn(
+ self._remove_old_push_actions_before_txn(
txn,
room_id=room_id,
user_id=user_id,
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index b417e3ac08..5b7d8d1ab5 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from synapse.storage.appservice import ApplicationServiceStore
+from synapse.config.appservice import load_appservices
logger = logging.getLogger(__name__)
@@ -38,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
logger.warning("Could not get app_service_config_files from config")
pass
- appservices = ApplicationServiceStore.load_appservices(
+ appservices = load_appservices(
config.server_name, config_files
)
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index b10f2a5787..ea6823f18d 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -19,17 +19,24 @@ from ._base import SQLBaseStore
from unpaddedbase64 import encode_base64
from synapse.crypto.event_signing import compute_event_reference_hash
+from synapse.util.caches.descriptors import cached, cachedList
class SignatureStore(SQLBaseStore):
"""Persistence for event signatures and hashes"""
+ @cached(lru=True)
+ def get_event_reference_hash(self, event_id):
+ return self._get_event_reference_hashes_txn(event_id)
+
+ @cachedList(cached_method_name="get_event_reference_hash",
+ list_name="event_ids", num_args=1)
def get_event_reference_hashes(self, event_ids):
def f(txn):
- return [
- self._get_event_reference_hashes_txn(txn, ev)
- for ev in event_ids
- ]
+ return {
+ event_id: self._get_event_reference_hashes_txn(txn, event_id)
+ for event_id in event_ids
+ }
return self.runInteraction(
"get_event_reference_hashes",
@@ -41,15 +48,15 @@ class SignatureStore(SQLBaseStore):
hashes = yield self.get_event_reference_hashes(
event_ids
)
- hashes = [
- {
+ hashes = {
+ e_id: {
k: encode_base64(v) for k, v in h.items()
if k == "sha256"
}
- for h in hashes
- ]
+ for e_id, h in hashes.items()
+ }
- defer.returnValue(zip(event_ids, hashes))
+ defer.returnValue(hashes.items())
def _get_event_reference_hashes_txn(self, txn, event_id):
"""Get all the hashes for a given PDU.
|