diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index f51200d18e..8a475417a6 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -20,3 +20,4 @@ class EventContext(object):
self.current_state = current_state
self.state_group = None
self.rejected = False
+ self.push_actions = []
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index d3f722b22e..064e8723c8 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -264,13 +264,13 @@ class BaseHandler(object):
"You don't have permission to redact events"
)
- (event_stream_id, max_stream_id) = yield self.store.persist_event(
- event, context=context
- )
-
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
- event, self, context.current_state
+ event, context, self
+ )
+
+ (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event, context=context
)
destinations = set()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b78b0502d9..da55d43541 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -236,12 +236,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- if not backfilled and not event.internal_metadata.is_outlier():
- action_generator = ActionGenerator(self.hs)
- yield action_generator.handle_push_actions_for_event(
- event, self
- )
-
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events(
@@ -1073,6 +1067,12 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
+ if not backfilled and not event.internal_metadata.is_outlier():
+ action_generator = ActionGenerator(self.hs)
+ yield action_generator.handle_push_actions_for_event(
+ event, context, self
+ )
+
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index d8f8256a1f..e0da0868ec 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -19,8 +19,6 @@ import bulk_push_rule_evaluator
import logging
-from synapse.api.constants import EventTypes
-
logger = logging.getLogger(__name__)
@@ -36,23 +34,15 @@ class ActionGenerator:
# tag (ie. we just need all the users).
@defer.inlineCallbacks
- def handle_push_actions_for_event(self, event, handler, current_state):
- if event.type == EventTypes.Redaction and event.redacts is not None:
- yield self.store.remove_push_actions_for_event_id(
- event.room_id, event.redacts
- )
-
+ def handle_push_actions_for_event(self, event, context, handler):
bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
event.room_id, self.hs, self.store
)
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
- event, handler, current_state
+ event, handler, context.current_state
)
- yield self.store.set_push_actions_for_event_and_users(
- event,
- [
- (uid, None, actions) for uid, actions in actions_by_user.items()
- ]
- )
+ context.push_actions = [
+ (uid, None, actions) for uid, actions in actions_by_user.items()
+ ]
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index d0a969f50b..466f07a1c4 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -24,8 +24,7 @@ logger = logging.getLogger(__name__)
class EventPushActionsStore(SQLBaseStore):
- @defer.inlineCallbacks
- def set_push_actions_for_event_and_users(self, event, tuples):
+ def _set_push_actions_for_event_and_users(self, txn, event, tuples):
"""
:param event: the event set actions for
:param tuples: list of tuples of (user_id, profile_tag, actions)
@@ -44,18 +43,12 @@ class EventPushActionsStore(SQLBaseStore):
'highlight': 1 if _action_has_highlight(actions) else 0,
})
- def f(txn):
- for uid, _, __ in tuples:
- txn.call_after(
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (event.room_id, uid)
- )
- return self._simple_insert_many_txn(txn, "event_push_actions", values)
-
- yield self.runInteraction(
- "set_actions_for_event_and_users",
- f,
- )
+ for uid, _, __ in tuples:
+ txn.call_after(
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (event.room_id, uid)
+ )
+ self._simple_insert_many_txn(txn, "event_push_actions", values)
@cachedInlineCallbacks(num_args=3, lru=True, tree=True)
def get_unread_event_push_actions_by_room_for_user(
@@ -107,21 +100,15 @@ class EventPushActionsStore(SQLBaseStore):
)
defer.returnValue(ret)
- @defer.inlineCallbacks
- def remove_push_actions_for_event_id(self, room_id, event_id):
- def f(txn):
- # Sad that we have to blow away the cache for the whole room here
- txn.call_after(
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (room_id,)
- )
- txn.execute(
- "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
- (room_id, event_id)
- )
- yield self.runInteraction(
- "remove_push_actions_for_event_id",
- f
+ def _remove_push_actions_for_event_id(self, txn, room_id, event_id):
+ # Sad that we have to blow away the cache for the whole room here
+ txn.call_after(
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (room_id,)
+ )
+ txn.execute(
+ "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
+ (room_id, event_id)
)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c6ed54721c..7d4012c414 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -205,23 +205,29 @@ class EventsStore(SQLBaseStore):
@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
is_new_state=True):
-
- # Remove the any existing cache entries for the event_ids
- for event, _ in events_and_contexts:
+ depth_updates = {}
+ for event, context in events_and_contexts:
+ # Remove the any existing cache entries for the event_ids
txn.call_after(self._invalidate_get_event_cache, event.event_id)
-
if not backfilled:
txn.call_after(
self._events_stream_cache.entity_has_changed,
event.room_id, event.internal_metadata.stream_ordering,
)
- depth_updates = {}
- for event, _ in events_and_contexts:
- if event.internal_metadata.is_outlier():
- continue
- depth_updates[event.room_id] = max(
- event.depth, depth_updates.get(event.room_id, event.depth)
+ if not event.internal_metadata.is_outlier():
+ depth_updates[event.room_id] = max(
+ event.depth, depth_updates.get(event.room_id, event.depth)
+ )
+
+ if context.push_actions:
+ self._set_push_actions_for_event_and_users(
+ txn, event, context.push_actions
+ )
+
+ if event.type == EventTypes.Redaction and event.redacts is not None:
+ self._remove_push_actions_for_event_id(
+ txn, event.room_id, event.redacts
)
for room_id, depth in depth_updates.items():
|