diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index bf4f1c5836..7c680659b6 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -144,6 +144,7 @@ class BulkPushRuleEvaluator(object):
Deferred
"""
rules_by_user = yield self._get_rules_for_event(event, context)
+ actions_by_user = {}
room_members = yield self.store.get_joined_users_from_context(
event, context
@@ -189,14 +190,17 @@ class BulkPushRuleEvaluator(object):
if matches:
actions = [x for x in rule['actions'] if x != 'dont_notify']
if actions and 'notify' in actions:
- # Push rules say we should notify the user of this event,
- # so we mark it in the DB in the staging area. (This
- # will then get handled when we persist the event)
- yield self.store.add_push_actions_to_staging(
- event.event_id, uid, actions,
- )
+ # Push rules say we should notify the user of this event
+ actions_by_user[uid] = actions
break
+ # Mark in the DB staging area the push actions for users who should be
+ # notified for this event. (This will then get handled when we persist
+ # the event)
+ yield self.store.add_push_actions_to_staging(
+ event.event_id, actions_by_user,
+ )
+
def _condition_checker(evaluator, conditions, uid, display_name, cache):
for cond in conditions:
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index 0d3f31a50c..8cae3076f4 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,33 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.config.appservice import load_appservices
-from synapse.storage.appservice import _make_exclusive_regex
+from synapse.storage.appservice import (
+ ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
+)
-class SlavedApplicationServiceStore(BaseSlavedStore):
- def __init__(self, db_conn, hs):
- super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
- self.services_cache = load_appservices(
- hs.config.server_name,
- hs.config.app_service_config_files
- )
- self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
-
- get_app_service_by_token = DataStore.get_app_service_by_token.__func__
- get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
- get_app_services = DataStore.get_app_services.__func__
- get_new_events_for_appservice = DataStore.get_new_events_for_appservice.__func__
- create_appservice_txn = DataStore.create_appservice_txn.__func__
- get_appservices_by_state = DataStore.get_appservices_by_state.__func__
- get_oldest_unsent_txn = DataStore.get_oldest_unsent_txn.__func__
- _get_last_txn = DataStore._get_last_txn.__func__
- complete_appservice_txn = DataStore.complete_appservice_txn.__func__
- get_appservice_state = DataStore.get_appservice_state.__func__
- set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__
- set_appservice_state = DataStore.set_appservice_state.__func__
- get_if_app_services_interested_in_user = (
- DataStore.get_if_app_services_interested_in_user.__func__
- )
+class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
+ ApplicationServiceWorkerStore):
+ pass
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 79673b4273..90fb51d43c 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,6 +21,7 @@ from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices
+from synapse.storage.events import EventsWorkerStore
from synapse.storage.roommember import RoomsForUser
from ._base import SQLBaseStore
@@ -46,17 +48,16 @@ def _make_exclusive_regex(services_cache):
return exclusive_user_regex
-class ApplicationServiceStore(SQLBaseStore):
-
+class ApplicationServiceWorkerStore(SQLBaseStore):
def __init__(self, db_conn, hs):
- super(ApplicationServiceStore, self).__init__(db_conn, hs)
- self.hostname = hs.hostname
self.services_cache = load_appservices(
hs.hostname,
hs.config.app_service_config_files
)
self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
+ super(ApplicationServiceWorkerStore, self).__init__(db_conn, hs)
+
def get_app_services(self):
return self.services_cache
@@ -112,6 +113,13 @@ class ApplicationServiceStore(SQLBaseStore):
return service
return None
+
+class ApplicationServiceStore(ApplicationServiceWorkerStore):
+
+ def __init__(self, db_conn, hs):
+ super(ApplicationServiceStore, self).__init__(db_conn, hs)
+ self.hostname = hs.hostname
+
def get_app_service_rooms(self, service):
"""Get a list of RoomsForUser for this application service.
@@ -184,11 +192,8 @@ class ApplicationServiceStore(SQLBaseStore):
return rooms_for_user_matching_user_id
-class ApplicationServiceTransactionStore(SQLBaseStore):
-
- def __init__(self, db_conn, hs):
- super(ApplicationServiceTransactionStore, self).__init__(db_conn, hs)
-
+class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
+ EventsWorkerStore):
@defer.inlineCallbacks
def get_appservices_by_state(self, state):
"""Get a list of application services based on their state.
@@ -433,3 +438,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
events = yield self._get_events(event_ids)
defer.returnValue((upper_bound, events))
+
+
+class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
+ # This is currently empty due to there not being any AS storage functions
+ # that can't be run on the workers. Since this may change in future, and
+ # to keep consistency with the other stores, we keep this empty class for
+ # now.
+ pass
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 4cabf70ad0..fe6887414e 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -407,11 +407,21 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
self._rotate_notifs, 30 * 60 * 1000
)
- def _set_push_actions_for_event_and_users_txn(self, txn, event):
- """
+ def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
+ all_events_and_contexts):
+ """Handles moving push actions from staging table to main
+ event_push_actions table for all events in `events_and_contexts`.
+
+ Also ensures that all events in `all_events_and_contexts` are removed
+ from the push action staging area.
+
Args:
- event: the event set actions for
- tuples: list of tuples of (user_id, actions)
+ events_and_contexts (list[(EventBase, EventContext)]): events
+ we are persisting
+ all_events_and_contexts (list[(EventBase, EventContext)]): all
+ events that we were going to persist. This includes events
+ we've already persisted, etc, that wouldn't appear in
+ events_and_context.
"""
sql = """
@@ -424,33 +434,40 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
WHERE event_id = ?
"""
- txn.execute(sql, (
- event.room_id, event.internal_metadata.stream_ordering,
- event.depth, event.event_id,
- ))
-
- user_ids = self._simple_select_onecol_txn(
- txn,
- table="event_push_actions_staging",
- keyvalues={
- "event_id": event.event_id,
- },
- retcol="user_id",
- )
+ if events_and_contexts:
+ txn.executemany(sql, (
+ (
+ event.room_id, event.internal_metadata.stream_ordering,
+ event.depth, event.event_id,
+ )
+ for event, _ in events_and_contexts
+ ))
+
+ for event, _ in events_and_contexts:
+ user_ids = self._simple_select_onecol_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event.event_id,
+ },
+ retcol="user_id",
+ )
- self._simple_delete_txn(
- txn,
- table="event_push_actions_staging",
- keyvalues={
- "event_id": event.event_id,
- },
- )
+ for uid in user_ids:
+ txn.call_after(
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (event.room_id, uid,)
+ )
- for uid in user_ids:
- txn.call_after(
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (event.room_id, uid,)
+ # Now we delete the staging area for *all* events that were being
+ # persisted.
+ txn.executemany(
+ "DELETE FROM event_push_actions_staging WHERE event_id = ?",
+ (
+ (event.event_id,)
+ for event, _ in all_events_and_contexts
)
+ )
@defer.inlineCallbacks
def get_push_actions_for_user(self, user_id, before=None, limit=50,
@@ -758,32 +775,51 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
(rotate_to_stream_ordering,)
)
- def add_push_actions_to_staging(self, event_id, user_id, actions):
- """Add the push actions for the user and event to the push
- action staging area.
+ def add_push_actions_to_staging(self, event_id, user_id_actions):
+ """Add the push actions for the event to the push action staging area.
Args:
event_id (str)
- user_id (str)
- actions (list[dict|str]): An action can either be a string or
- dict.
+ user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
+ user_id to list of push actions, where an action can either be
+ a string or dict.
Returns:
Deferred
"""
- is_highlight = 1 if _action_has_highlight(actions) else 0
+ if not user_id_actions:
+ return
- return self._simple_insert(
- table="event_push_actions_staging",
- values={
- "event_id": event_id,
- "user_id": user_id,
- "actions": _serialize_action(actions, is_highlight),
- "notif": 1,
- "highlight": is_highlight,
- },
- desc="add_push_actions_to_staging",
+ # This is a helper function for generating the necessary tuple that
+ # can be used to inert into the `event_push_actions_staging` table.
+ def _gen_entry(user_id, actions):
+ is_highlight = 1 if _action_has_highlight(actions) else 0
+ return (
+ event_id, # event_id column
+ user_id, # user_id column
+ _serialize_action(actions, is_highlight), # actions column
+ 1, # notif column
+ is_highlight, # highlight column
+ )
+
+ def _add_push_actions_to_staging_txn(txn):
+ # We don't use _simple_insert_many here to avoid the overhead
+ # of generating lists of dicts.
+
+ sql = """
+ INSERT INTO event_push_actions_staging
+ (event_id, user_id, actions, notif, highlight)
+ VALUES (?, ?, ?, ?, ?)
+ """
+
+ txn.executemany(sql, (
+ _gen_entry(user_id, actions)
+ for user_id, actions in user_id_actions.iteritems()
+ ))
+
+ return self.runInteraction(
+ "add_push_actions_to_staging", _add_push_actions_to_staging_txn
)
def remove_push_actions_from_staging(self, event_id):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 99d6cca585..b63392a6cd 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -627,6 +627,8 @@ class EventsStore(EventsWorkerStore):
list of the event ids which are the forward extremities.
"""
+ all_events_and_contexts = events_and_contexts
+
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
@@ -689,6 +691,7 @@ class EventsStore(EventsWorkerStore):
self._update_metadata_tables_txn(
txn,
events_and_contexts=events_and_contexts,
+ all_events_and_contexts=all_events_and_contexts,
backfilled=backfilled,
)
@@ -1086,26 +1089,33 @@ class EventsStore(EventsWorkerStore):
ec for ec in events_and_contexts if ec[0] not in to_remove
]
- def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
+ def _update_metadata_tables_txn(self, txn, events_and_contexts,
+ all_events_and_contexts, backfilled):
"""Update all the miscellaneous tables for new events
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
+ all_events_and_contexts (list[(EventBase, EventContext)]): all
+ events that we were going to persist. This includes events
+ we've already persisted, etc, that wouldn't appear in
+ events_and_context.
backfilled (bool): True if the events were backfilled
"""
+ # Insert all the push actions into the event_push_actions table.
+ self._set_push_actions_for_event_and_users_txn(
+ txn,
+ events_and_contexts=events_and_contexts,
+ all_events_and_contexts=all_events_and_contexts,
+ )
+
if not events_and_contexts:
# nothing to do here
return
for event, context in events_and_contexts:
- # Insert all the push actions into the event_push_actions table.
- self._set_push_actions_for_event_and_users_txn(
- txn, event,
- )
-
if event.type == EventTypes.Redaction and event.redacts is not None:
# Remove the entries in the event_push_actions table for the
# redacted event.
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index 4780f2ab72..cb058d3142 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -230,10 +230,12 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
state_handler = self.hs.get_state_handler()
context = yield state_handler.compute_event_context(event)
- for user_id, actions in push_actions:
- yield self.master_store.add_push_actions_to_staging(
- event.event_id, user_id, actions,
- )
+ yield self.master_store.add_push_actions_to_staging(
+ event.event_id, {
+ user_id: actions
+ for user_id, actions in push_actions
+ },
+ )
ordering = None
if backfill:
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index d483e7cf9e..6c1aad149b 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -71,11 +71,11 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
event.depth = stream
yield self.store.add_push_actions_to_staging(
- event.event_id, user_id, action,
+ event.event_id, {user_id: action},
)
yield self.store.runInteraction(
"", self.store._set_push_actions_for_event_and_users_txn,
- event,
+ [(event, None)], [(event, None)],
)
def _rotate(stream):
|