diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 029f6612e6..49fa8614f2 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -166,7 +166,7 @@ class SQLBaseStore(object):
self._txn_perf_counters = PerformanceCounters()
self._get_event_counters = PerformanceCounters()
- self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
+ self._get_event_cache = Cache("*getEvent*", keylen=3,
max_entries=hs.config.event_cache_size)
self._state_group_cache = DictionaryCache(
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index b496b918b7..a854a87eab 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -366,8 +366,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
def get_new_events_for_appservice_txn(txn):
sql = (
"SELECT e.stream_ordering, e.event_id"
- " FROM events AS e, appservice_stream_position AS a"
- " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?"
+ " FROM events AS e"
+ " WHERE"
+ " (SELECT stream_ordering FROM appservice_stream_position)"
+ " < e.stream_ordering"
+ " AND e.stream_ordering <= ?"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 0ba0310c0d..eb15fb751b 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -56,7 +56,7 @@ class EventPushActionsStore(SQLBaseStore):
)
self._simple_insert_many_txn(txn, "event_push_actions", values)
- @cachedInlineCallbacks(num_args=3, lru=True, tree=True, max_entries=5000)
+ @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
):
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 8183b7f1b0..78334a98cf 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -16,6 +16,7 @@
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.push.baserules import list_with_base_rules
+from synapse.api.constants import EventTypes, Membership
from twisted.internet import defer
import logging
@@ -48,7 +49,7 @@ def _load_rules(rawrules, enabled_map):
class PushRuleStore(SQLBaseStore):
- @cachedInlineCallbacks(lru=True)
+ @cachedInlineCallbacks()
def get_push_rules_for_user(self, user_id):
rows = yield self._simple_select_list(
table="push_rules",
@@ -72,7 +73,7 @@ class PushRuleStore(SQLBaseStore):
defer.returnValue(rules)
- @cachedInlineCallbacks(lru=True)
+ @cachedInlineCallbacks()
def get_push_rules_enabled_for_user(self, user_id):
results = yield self._simple_select_list(
table="push_rules_enable",
@@ -123,6 +124,61 @@ class PushRuleStore(SQLBaseStore):
defer.returnValue(results)
+ def bulk_get_push_rules_for_room(self, room_id, state_group, current_state):
+ if not state_group:
+ # If state_group is None it means it has yet to be assigned a
+ # state group, i.e. we need to make sure that calls with a state_group
+ # of None don't hit previous cached calls with a None state_group.
+ # To do this we set the state_group to a new object as object() != object()
+ state_group = object()
+
+ return self._bulk_get_push_rules_for_room(room_id, state_group, current_state)
+
+ @cachedInlineCallbacks(num_args=2, cache_context=True)
+ def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state,
+ cache_context):
+ # We don't use `state_group`, its there so that we can cache based
+ # on it. However, its important that its never None, since two current_state's
+ # with a state_group of None are likely to be different.
+ # See bulk_get_push_rules_for_room for how we work around this.
+ assert state_group is not None
+
+ # We also will want to generate notifs for other people in the room so
+ # their unread countss are correct in the event stream, but to avoid
+ # generating them for bot / AS users etc, we only do so for people who've
+ # sent a read receipt into the room.
+ local_users_in_room = set(
+ e.state_key for e in current_state.values()
+ if e.type == EventTypes.Member and e.membership == Membership.JOIN
+ and self.hs.is_mine_id(e.state_key)
+ )
+
+ # users in the room who have pushers need to get push rules run because
+ # that's how their pushers work
+ if_users_with_pushers = yield self.get_if_users_have_pushers(
+ local_users_in_room, on_invalidate=cache_context.invalidate,
+ )
+ user_ids = set(
+ uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
+ )
+
+ users_with_receipts = yield self.get_users_with_read_receipts_in_room(
+ room_id, on_invalidate=cache_context.invalidate,
+ )
+
+ # any users with pushers must be ours: they have pushers
+ for uid in users_with_receipts:
+ if uid in local_users_in_room:
+ user_ids.add(uid)
+
+ rules_by_user = yield self.bulk_get_push_rules(
+ user_ids, on_invalidate=cache_context.invalidate,
+ )
+
+ rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None}
+
+ defer.returnValue(rules_by_user)
+
@cachedList(cached_method_name="get_push_rules_enabled_for_user",
list_name="user_ids", num_args=1, inlineCallbacks=True)
def bulk_get_push_rules_enabled(self, user_ids):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index a7d7c54d7e..8f5f8f24a9 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -135,7 +135,7 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
- @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000)
+ @cachedInlineCallbacks(num_args=1, max_entries=15000)
def get_if_user_has_pusher(self, user_id):
result = yield self._simple_select_many_batch(
table='pushers',
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 2c1df0e2b9..ccc3811e84 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -145,7 +145,7 @@ class ReceiptsStore(SQLBaseStore):
defer.returnValue([ev for res in results.values() for ev in res])
- @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True)
+ @cachedInlineCallbacks(num_args=3, max_entries=5000, tree=True)
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""Get receipts for a single room for sending to clients.
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index ea6823f18d..e1dca927d7 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -25,7 +25,7 @@ from synapse.util.caches.descriptors import cached, cachedList
class SignatureStore(SQLBaseStore):
"""Persistence for event signatures and hashes"""
- @cached(lru=True)
+ @cached()
def get_event_reference_hash(self, event_id):
return self._get_event_reference_hashes_txn(event_id)
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 5b743db67a..0e8fa93e1f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -174,7 +174,7 @@ class StateStore(SQLBaseStore):
return [r[0] for r in results]
return self.runInteraction("get_current_state_for_key", f)
- @cached(num_args=2, lru=True, max_entries=1000)
+ @cached(num_args=2, max_entries=1000)
def _get_state_group_from_group(self, group, types):
raise NotImplementedError()
@@ -272,7 +272,7 @@ class StateStore(SQLBaseStore):
state_map = yield self.get_state_for_events([event_id], types)
defer.returnValue(state_map[event_id])
- @cached(num_args=2, lru=True, max_entries=10000)
+ @cached(num_args=2, max_entries=10000)
def _get_state_group_for_event(self, room_id, event_id):
return self._simple_select_one_onecol(
table="event_to_state_groups",
|