diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 32ccea3f13..98a4a7c62c 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -32,7 +32,6 @@ from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
-from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
@@ -75,10 +74,6 @@ class PusherSlaveStore(
DataStore.get_profile_displayname.__func__
)
- who_forgot_in_room = (
- RoomMemberStore.__dict__["who_forgot_in_room"]
- )
-
class PusherServer(HomeServer):
def setup(self):
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index f87531f1b6..abe91dcfbd 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -62,8 +62,6 @@ logger = logging.getLogger("synapse.app.synchrotron")
class SynchrotronSlavedStore(
- SlavedPushRuleStore,
- SlavedEventStore,
SlavedReceiptsStore,
SlavedAccountDataStore,
SlavedApplicationServiceStore,
@@ -73,14 +71,12 @@ class SynchrotronSlavedStore(
SlavedGroupServerStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
+ SlavedPushRuleStore,
+ SlavedEventStore,
SlavedClientIpStore,
RoomStore,
BaseSlavedStore,
):
- who_forgot_in_room = (
- RoomMemberStore.__dict__["who_forgot_in_room"]
- )
-
did_forget = (
RoomMemberStore.__dict__["did_forget"]
)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index fe6887414e..6454045c2d 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -380,6 +380,69 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# Now return the first `limit`
defer.returnValue(notifs[:limit])
+ def add_push_actions_to_staging(self, event_id, user_id_actions):
+ """Add the push actions for the event to the push action staging area.
+
+ Args:
+ event_id (str)
+ user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
+ user_id to list of push actions, where an action can either be
+ a string or dict.
+
+ Returns:
+ Deferred
+ """
+
+ if not user_id_actions:
+ return
+
+ # This is a helper function for generating the necessary tuple that
+ # can be used to inert into the `event_push_actions_staging` table.
+ def _gen_entry(user_id, actions):
+ is_highlight = 1 if _action_has_highlight(actions) else 0
+ return (
+ event_id, # event_id column
+ user_id, # user_id column
+ _serialize_action(actions, is_highlight), # actions column
+ 1, # notif column
+ is_highlight, # highlight column
+ )
+
+ def _add_push_actions_to_staging_txn(txn):
+ # We don't use _simple_insert_many here to avoid the overhead
+ # of generating lists of dicts.
+
+ sql = """
+ INSERT INTO event_push_actions_staging
+ (event_id, user_id, actions, notif, highlight)
+ VALUES (?, ?, ?, ?, ?)
+ """
+
+ txn.executemany(sql, (
+ _gen_entry(user_id, actions)
+ for user_id, actions in user_id_actions.iteritems()
+ ))
+
+ return self.runInteraction(
+ "add_push_actions_to_staging", _add_push_actions_to_staging_txn
+ )
+
+ def remove_push_actions_from_staging(self, event_id):
+ """Called if we failed to persist the event to ensure that stale push
+ actions don't build up in the DB
+
+ Args:
+ event_id (str)
+ """
+
+ return self._simple_delete(
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event_id,
+ },
+ desc="remove_push_actions_from_staging",
+ )
+
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@@ -775,69 +838,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
(rotate_to_stream_ordering,)
)
- def add_push_actions_to_staging(self, event_id, user_id_actions):
- """Add the push actions for the event to the push action staging area.
-
- Args:
- event_id (str)
- user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
- user_id to list of push actions, where an action can either be
- a string or dict.
-
- Returns:
- Deferred
- """
-
- if not user_id_actions:
- return
-
- # This is a helper function for generating the necessary tuple that
- # can be used to inert into the `event_push_actions_staging` table.
- def _gen_entry(user_id, actions):
- is_highlight = 1 if _action_has_highlight(actions) else 0
- return (
- event_id, # event_id column
- user_id, # user_id column
- _serialize_action(actions, is_highlight), # actions column
- 1, # notif column
- is_highlight, # highlight column
- )
-
- def _add_push_actions_to_staging_txn(txn):
- # We don't use _simple_insert_many here to avoid the overhead
- # of generating lists of dicts.
-
- sql = """
- INSERT INTO event_push_actions_staging
- (event_id, user_id, actions, notif, highlight)
- VALUES (?, ?, ?, ?, ?)
- """
-
- txn.executemany(sql, (
- _gen_entry(user_id, actions)
- for user_id, actions in user_id_actions.iteritems()
- ))
-
- return self.runInteraction(
- "add_push_actions_to_staging", _add_push_actions_to_staging_txn
- )
-
- def remove_push_actions_from_staging(self, event_id):
- """Called if we failed to persist the event to ensure that stale push
- actions don't build up in the DB
-
- Args:
- event_id (str)
- """
-
- return self._simple_delete(
- table="event_push_actions_staging",
- keyvalues={
- "event_id": event_id,
- },
- desc="remove_push_actions_from_staging",
- )
-
def _action_has_highlight(actions):
for action in actions:
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 583efb7bdf..04a0b59a39 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -15,6 +15,10 @@
# limitations under the License.
from ._base import SQLBaseStore
+from synapse.storage.appservice import ApplicationServiceWorkerStore
+from synapse.storage.pusher import PusherWorkerStore
+from synapse.storage.receipts import ReceiptsWorkerStore
+from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.push.baserules import list_with_base_rules
@@ -51,7 +55,11 @@ def _load_rules(rawrules, enabled_map):
return rules
-class PushRulesWorkerStore(SQLBaseStore):
+class PushRulesWorkerStore(ApplicationServiceWorkerStore,
+ ReceiptsWorkerStore,
+ PusherWorkerStore,
+ RoomMemberWorkerStore,
+ SQLBaseStore):
"""This is an abstract base class where subclasses must implement
`get_max_push_rules_stream_id` which can be called in the initializer.
"""
@@ -140,8 +148,6 @@ class PushRulesWorkerStore(SQLBaseStore):
"have_push_rules_changed", have_push_rules_changed_txn
)
-
-class PushRuleStore(PushRulesWorkerStore):
@cachedList(cached_method_name="get_push_rules_for_user",
list_name="user_ids", num_args=1, inlineCallbacks=True)
def bulk_get_push_rules(self, user_ids):
@@ -281,6 +287,8 @@ class PushRuleStore(PushRulesWorkerStore):
results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
defer.returnValue(results)
+
+class PushRuleStore(PushRulesWorkerStore):
@defer.inlineCallbacks
def add_push_rule(
self, user_id, rule_id, priority_class, conditions, actions,
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index f4af3e4caa..307660b99a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -175,11 +175,6 @@ class PusherWorkerStore(SQLBaseStore):
"get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
)
-
-class PusherStore(PusherWorkerStore):
- def get_pushers_stream_token(self):
- return self._pushers_id_gen.get_current_token()
-
@cachedInlineCallbacks(num_args=1, max_entries=15000)
def get_if_user_has_pusher(self, user_id):
# This only exists for the cachedList decorator
@@ -201,6 +196,11 @@ class PusherStore(PusherWorkerStore):
defer.returnValue(result)
+
+class PusherStore(PusherWorkerStore):
+ def get_pushers_stream_token(self):
+ return self._pushers_id_gen.get_current_token()
+
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
app_display_name, device_display_name,
@@ -233,14 +233,18 @@ class PusherStore(PusherWorkerStore):
)
if newly_inserted:
- # get_if_user_has_pusher only cares if the user has
- # at least *one* pusher.
- self.get_if_user_has_pusher.invalidate(user_id,)
+ self.runInteraction(
+ "add_pusher",
+ self._invalidate_cache_and_stream,
+ self.get_if_user_has_pusher, (user_id,)
+ )
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
def delete_pusher_txn(txn, stream_id):
- txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+ self._invalidate_cache_and_stream(
+ txn, self.get_if_user_has_pusher, (user_id,)
+ )
self._simple_delete_one_txn(
txn,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index b9158b9896..d79877dac7 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -432,6 +432,18 @@ class RoomMemberWorkerStore(EventsWorkerStore):
def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id)
+ @cached()
+ def who_forgot_in_room(self, room_id):
+ return self._simple_select_list(
+ table="room_memberships",
+ retcols=("user_id", "event_id"),
+ keyvalues={
+ "room_id": room_id,
+ "forgotten": 1,
+ },
+ desc="who_forgot"
+ )
+
class RoomMemberStore(RoomMemberWorkerStore):
def __init__(self, db_conn, hs):
@@ -595,18 +607,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
forgot = yield self.runInteraction("did_forget_membership_at", f)
defer.returnValue(forgot == 1)
- @cached()
- def who_forgot_in_room(self, room_id):
- return self._simple_select_list(
- table="room_memberships",
- retcols=("user_id", "event_id"),
- keyvalues={
- "room_id": room_id,
- "forgotten": 1,
- },
- desc="who_forgot"
- )
-
@defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get(
|