summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py16
-rw-r--r--synapse/replication/slave/storage/appservice.py35
-rw-r--r--synapse/storage/appservice.py31
-rw-r--r--synapse/storage/event_push_actions.py126
-rw-r--r--synapse/storage/events.py22
-rw-r--r--tests/replication/slave/storage/test_events.py10
-rw-r--r--tests/storage/test_event_push_actions.py4
7 files changed, 144 insertions, 100 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index bf4f1c5836..7c680659b6 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -144,6 +144,7 @@ class BulkPushRuleEvaluator(object):
             Deferred
         """
         rules_by_user = yield self._get_rules_for_event(event, context)
+        actions_by_user = {}
 
         room_members = yield self.store.get_joined_users_from_context(
             event, context
@@ -189,14 +190,17 @@ class BulkPushRuleEvaluator(object):
                 if matches:
                     actions = [x for x in rule['actions'] if x != 'dont_notify']
                     if actions and 'notify' in actions:
-                        # Push rules say we should notify the user of this event,
-                        # so we mark it in the DB in the staging area. (This
-                        # will then get handled when we persist the event)
-                        yield self.store.add_push_actions_to_staging(
-                            event.event_id, uid, actions,
-                        )
+                        # Push rules say we should notify the user of this event
+                        actions_by_user[uid] = actions
                     break
 
+        # Mark in the DB staging area the push actions for users who should be
+        # notified for this event. (This will then get handled when we persist
+        # the event)
+        yield self.store.add_push_actions_to_staging(
+            event.event_id, actions_by_user,
+        )
+
 
 def _condition_checker(evaluator, conditions, uid, display_name, cache):
     for cond in conditions:
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index 0d3f31a50c..8cae3076f4 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,33 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.config.appservice import load_appservices
-from synapse.storage.appservice import _make_exclusive_regex
+from synapse.storage.appservice import (
+    ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
+)
 
 
-class SlavedApplicationServiceStore(BaseSlavedStore):
-    def __init__(self, db_conn, hs):
-        super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
-        self.services_cache = load_appservices(
-            hs.config.server_name,
-            hs.config.app_service_config_files
-        )
-        self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
-
-    get_app_service_by_token = DataStore.get_app_service_by_token.__func__
-    get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
-    get_app_services = DataStore.get_app_services.__func__
-    get_new_events_for_appservice = DataStore.get_new_events_for_appservice.__func__
-    create_appservice_txn = DataStore.create_appservice_txn.__func__
-    get_appservices_by_state = DataStore.get_appservices_by_state.__func__
-    get_oldest_unsent_txn = DataStore.get_oldest_unsent_txn.__func__
-    _get_last_txn = DataStore._get_last_txn.__func__
-    complete_appservice_txn = DataStore.complete_appservice_txn.__func__
-    get_appservice_state = DataStore.get_appservice_state.__func__
-    set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__
-    set_appservice_state = DataStore.set_appservice_state.__func__
-    get_if_app_services_interested_in_user = (
-        DataStore.get_if_app_services_interested_in_user.__func__
-    )
+class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
+                                    ApplicationServiceWorkerStore):
+    pass
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 79673b4273..90fb51d43c 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -20,6 +21,7 @@ from twisted.internet import defer
 from synapse.api.constants import Membership
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
+from synapse.storage.events import EventsWorkerStore
 from synapse.storage.roommember import RoomsForUser
 from ._base import SQLBaseStore
 
@@ -46,17 +48,16 @@ def _make_exclusive_regex(services_cache):
     return exclusive_user_regex
 
 
-class ApplicationServiceStore(SQLBaseStore):
-
+class ApplicationServiceWorkerStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
-        super(ApplicationServiceStore, self).__init__(db_conn, hs)
-        self.hostname = hs.hostname
         self.services_cache = load_appservices(
             hs.hostname,
             hs.config.app_service_config_files
         )
         self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
 
+        super(ApplicationServiceWorkerStore, self).__init__(db_conn, hs)
+
     def get_app_services(self):
         return self.services_cache
 
@@ -112,6 +113,13 @@ class ApplicationServiceStore(SQLBaseStore):
                 return service
         return None
 
+
+class ApplicationServiceStore(ApplicationServiceWorkerStore):
+
+    def __init__(self, db_conn, hs):
+        super(ApplicationServiceStore, self).__init__(db_conn, hs)
+        self.hostname = hs.hostname
+
     def get_app_service_rooms(self, service):
         """Get a list of RoomsForUser for this application service.
 
@@ -184,11 +192,8 @@ class ApplicationServiceStore(SQLBaseStore):
         return rooms_for_user_matching_user_id
 
 
-class ApplicationServiceTransactionStore(SQLBaseStore):
-
-    def __init__(self, db_conn, hs):
-        super(ApplicationServiceTransactionStore, self).__init__(db_conn, hs)
-
+class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
+                                               EventsWorkerStore):
     @defer.inlineCallbacks
     def get_appservices_by_state(self, state):
         """Get a list of application services based on their state.
@@ -433,3 +438,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
         events = yield self._get_events(event_ids)
 
         defer.returnValue((upper_bound, events))
+
+
+class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
+    # This is currently empty due to there not being any AS storage functions
+    # that can't be run on the workers. Since this may change in future, and
+    # to keep consistency with the other stores, we keep this empty class for
+    # now.
+    pass
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 4cabf70ad0..fe6887414e 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -407,11 +407,21 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             self._rotate_notifs, 30 * 60 * 1000
         )
 
-    def _set_push_actions_for_event_and_users_txn(self, txn, event):
-        """
+    def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
+                                                  all_events_and_contexts):
+        """Handles moving push actions from staging table to main
+        event_push_actions table for all events in `events_and_contexts`.
+
+        Also ensures that all events in `all_events_and_contexts` are removed
+        from the push action staging area.
+
         Args:
-            event: the event set actions for
-            tuples: list of tuples of (user_id, actions)
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+            all_events_and_contexts (list[(EventBase, EventContext)]): all
+                events that we were going to persist. This includes events
+                we've already persisted, etc, that wouldn't appear in
+                events_and_context.
         """
 
         sql = """
@@ -424,33 +434,40 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             WHERE event_id = ?
         """
 
-        txn.execute(sql, (
-            event.room_id, event.internal_metadata.stream_ordering,
-            event.depth, event.event_id,
-        ))
-
-        user_ids = self._simple_select_onecol_txn(
-            txn,
-            table="event_push_actions_staging",
-            keyvalues={
-                "event_id": event.event_id,
-            },
-            retcol="user_id",
-        )
+        if events_and_contexts:
+            txn.executemany(sql, (
+                (
+                    event.room_id, event.internal_metadata.stream_ordering,
+                    event.depth, event.event_id,
+                )
+                for event, _ in events_and_contexts
+            ))
+
+        for event, _ in events_and_contexts:
+            user_ids = self._simple_select_onecol_txn(
+                txn,
+                table="event_push_actions_staging",
+                keyvalues={
+                    "event_id": event.event_id,
+                },
+                retcol="user_id",
+            )
 
-        self._simple_delete_txn(
-            txn,
-            table="event_push_actions_staging",
-            keyvalues={
-                "event_id": event.event_id,
-            },
-        )
+            for uid in user_ids:
+                txn.call_after(
+                    self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+                    (event.room_id, uid,)
+                )
 
-        for uid in user_ids:
-            txn.call_after(
-                self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
-                (event.room_id, uid,)
+        # Now we delete the staging area for *all* events that were being
+        # persisted.
+        txn.executemany(
+            "DELETE FROM event_push_actions_staging WHERE event_id = ?",
+            (
+                (event.event_id,)
+                for event, _ in all_events_and_contexts
             )
+        )
 
     @defer.inlineCallbacks
     def get_push_actions_for_user(self, user_id, before=None, limit=50,
@@ -758,32 +775,51 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             (rotate_to_stream_ordering,)
         )
 
-    def add_push_actions_to_staging(self, event_id, user_id, actions):
-        """Add the push actions for the user and event to the push
-        action staging area.
+    def add_push_actions_to_staging(self, event_id, user_id_actions):
+        """Add the push actions for the event to the push action staging area.
 
         Args:
             event_id (str)
-            user_id (str)
-            actions (list[dict|str]): An action can either be a string or
-                dict.
+            user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
+                user_id to list of push actions, where an action can either be
+                a string or dict.
 
         Returns:
             Deferred
         """
 
-        is_highlight = 1 if _action_has_highlight(actions) else 0
+        if not user_id_actions:
+            return
 
-        return self._simple_insert(
-            table="event_push_actions_staging",
-            values={
-                "event_id": event_id,
-                "user_id": user_id,
-                "actions": _serialize_action(actions, is_highlight),
-                "notif": 1,
-                "highlight": is_highlight,
-            },
-            desc="add_push_actions_to_staging",
+        # This is a helper function for generating the necessary tuple that
+        # can be used to inert into the `event_push_actions_staging` table.
+        def _gen_entry(user_id, actions):
+            is_highlight = 1 if _action_has_highlight(actions) else 0
+            return (
+                event_id,  # event_id column
+                user_id,  # user_id column
+                _serialize_action(actions, is_highlight),  # actions column
+                1,  # notif column
+                is_highlight,  # highlight column
+            )
+
+        def _add_push_actions_to_staging_txn(txn):
+            # We don't use _simple_insert_many here to avoid the overhead
+            # of generating lists of dicts.
+
+            sql = """
+                INSERT INTO event_push_actions_staging
+                    (event_id, user_id, actions, notif, highlight)
+                VALUES (?, ?, ?, ?, ?)
+            """
+
+            txn.executemany(sql, (
+                _gen_entry(user_id, actions)
+                for user_id, actions in user_id_actions.iteritems()
+            ))
+
+        return self.runInteraction(
+            "add_push_actions_to_staging", _add_push_actions_to_staging_txn
         )
 
     def remove_push_actions_from_staging(self, event_id):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 99d6cca585..b63392a6cd 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -627,6 +627,8 @@ class EventsStore(EventsWorkerStore):
                 list of the event ids which are the forward extremities.
 
         """
+        all_events_and_contexts = events_and_contexts
+
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
 
         self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
@@ -689,6 +691,7 @@ class EventsStore(EventsWorkerStore):
         self._update_metadata_tables_txn(
             txn,
             events_and_contexts=events_and_contexts,
+            all_events_and_contexts=all_events_and_contexts,
             backfilled=backfilled,
         )
 
@@ -1086,26 +1089,33 @@ class EventsStore(EventsWorkerStore):
             ec for ec in events_and_contexts if ec[0] not in to_remove
         ]
 
-    def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
+    def _update_metadata_tables_txn(self, txn, events_and_contexts,
+                                    all_events_and_contexts, backfilled):
         """Update all the miscellaneous tables for new events
 
         Args:
             txn (twisted.enterprise.adbapi.Connection): db connection
             events_and_contexts (list[(EventBase, EventContext)]): events
                 we are persisting
+            all_events_and_contexts (list[(EventBase, EventContext)]): all
+                events that we were going to persist. This includes events
+                we've already persisted, etc, that wouldn't appear in
+                events_and_context.
             backfilled (bool): True if the events were backfilled
         """
 
+        # Insert all the push actions into the event_push_actions table.
+        self._set_push_actions_for_event_and_users_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+            all_events_and_contexts=all_events_and_contexts,
+        )
+
         if not events_and_contexts:
             # nothing to do here
             return
 
         for event, context in events_and_contexts:
-            # Insert all the push actions into the event_push_actions table.
-            self._set_push_actions_for_event_and_users_txn(
-                txn, event,
-            )
-
             if event.type == EventTypes.Redaction and event.redacts is not None:
                 # Remove the entries in the event_push_actions table for the
                 # redacted event.
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index 4780f2ab72..cb058d3142 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -230,10 +230,12 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
             state_handler = self.hs.get_state_handler()
             context = yield state_handler.compute_event_context(event)
 
-        for user_id, actions in push_actions:
-            yield self.master_store.add_push_actions_to_staging(
-                event.event_id, user_id, actions,
-            )
+        yield self.master_store.add_push_actions_to_staging(
+            event.event_id, {
+                user_id: actions
+                for user_id, actions in push_actions
+            },
+        )
 
         ordering = None
         if backfill:
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index d483e7cf9e..6c1aad149b 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -71,11 +71,11 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
             event.depth = stream
 
             yield self.store.add_push_actions_to_staging(
-                event.event_id, user_id, action,
+                event.event_id, {user_id: action},
             )
             yield self.store.runInteraction(
                 "", self.store._set_push_actions_for_event_and_users_txn,
-                event,
+                [(event, None)], [(event, None)],
             )
 
         def _rotate(stream):