From 3d33eef6fcbba474664a9bccdcb8822c6f72ee8c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Feb 2018 14:31:24 +0000 Subject: Store state groups separately from events (#2784) * Split state group persist into seperate storage func * Add per database engine code for state group id gen * Move store_state_group to StateReadStore This allows other workers to use it, and so resolve state. * Hook up store_state_group * Fix tests * Rename _store_mult_state_groups_txn * Rename StateGroupReadStore * Remove redundant _have_persisted_state_group_txn * Update comments * Comment compute_event_context * Set start val for state_group_id_seq ... otherwise we try to recreate old state groups * Update comments * Don't store state for outliers * Update comment * Update docstring as state groups are ints --- tests/replication/slave/storage/test_events.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'tests/replication/slave/storage/test_events.py') diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 105e1228bb..f430cce931 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -226,11 +226,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): context = EventContext() context.current_state_ids = state_ids context.prev_state_ids = state_ids - elif not backfill: + else: state_handler = self.hs.get_state_handler() context = yield state_handler.compute_event_context(event) - else: - context = EventContext() context.push_actions = push_actions -- cgit 1.5.1 From 4810f7effd0fc3fd97f9edaf8ea0af48477adb0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Feb 2018 15:18:37 +0000 Subject: Remove context.push_actions --- synapse/events/snapshot.py | 4 ---- synapse/push/action_generator.py | 6 +----- synapse/push/bulk_push_rule_evaluator.py | 9 +++------ synapse/storage/events.py | 7 +++---- tests/replication/slave/storage/test_events.py | 5 ++++- 5 files changed, 11 insertions(+), 20 deletions(-) (limited to 'tests/replication/slave/storage/test_events.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f9445bef13..8e684d91b5 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -52,7 +52,6 @@ class EventContext(object): "prev_state_ids", "state_group", "rejected", - "push_actions", "prev_group", "delta_ids", "prev_state_events", @@ -67,7 +66,6 @@ class EventContext(object): self.state_group = None self.rejected = False - self.push_actions = [] # A previously persisted state group and a delta between that # and this state. @@ -104,7 +102,6 @@ class EventContext(object): "event_state_key": event.state_key if event.is_state() else None, "state_group": self.state_group, "rejected": self.rejected, - "push_actions": self.push_actions, "prev_group": self.prev_group, "delta_ids": _encode_state_dict(self.delta_ids), "prev_state_events": self.prev_state_events, @@ -127,7 +124,6 @@ class EventContext(object): context = EventContext() context.state_group = input["state_group"] context.rejected = input["rejected"] - context.push_actions = input["push_actions"] context.prev_group = input["prev_group"] context.delta_ids = _decode_state_dict(input["delta_ids"]) context.prev_state_events = input["prev_state_events"] diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index fe09d50d55..8f619a7a1b 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -40,10 +40,6 @@ class ActionGenerator(object): @defer.inlineCallbacks def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "action_for_event_by_user"): - actions_by_user = yield self.bulk_evaluator.action_for_event_by_user( + yield self.bulk_evaluator.action_for_event_by_user( event, context ) - - context.push_actions = [ - (uid, actions) for uid, actions in actions_by_user.iteritems() - ] diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 841ccbd1f1..1140788aa7 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -137,14 +137,13 @@ class BulkPushRuleEvaluator(object): @defer.inlineCallbacks def action_for_event_by_user(self, event, context): - """Given an event and context, evaluate the push rules and return - the results + """Given an event and context, evaluate the push rules and insert the + results into the event_push_actions_staging table. Returns: - dict of user_id -> action + Deferred """ rules_by_user = yield self._get_rules_for_event(event, context) - actions_by_user = {} room_members = yield self.store.get_joined_users_from_context( event, context @@ -190,12 +189,10 @@ class BulkPushRuleEvaluator(object): if matches: actions = [x for x in rule['actions'] if x != 'dont_notify'] if actions and 'notify' in actions: - actions_by_user[uid] = actions yield self.store.add_push_actions_to_staging( event.event_id, uid, actions, ) break - defer.returnValue(actions_by_user) def _condition_checker(evaluator, conditions, uid, display_name, cache): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ca64aacb1c..52b7b34749 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1168,10 +1168,9 @@ class EventsStore(SQLBaseStore): for event, context in events_and_contexts: # Insert all the push actions into the event_push_actions table. - if context.push_actions: - self._set_push_actions_for_event_and_users_txn( - txn, event, - ) + self._set_push_actions_for_event_and_users_txn( + txn, event, + ) if event.type == EventTypes.Redaction and event.redacts is not None: # Remove the entries in the event_push_actions table for the diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index f430cce931..4780f2ab72 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -230,7 +230,10 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): state_handler = self.hs.get_state_handler() context = yield state_handler.compute_event_context(event) - context.push_actions = push_actions + for user_id, actions in push_actions: + yield self.master_store.add_push_actions_to_staging( + event.event_id, user_id, actions, + ) ordering = None if backfill: -- cgit 1.5.1 From e440e2845642a27f15d073aea58ce20b2785f66e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Feb 2018 11:41:40 +0000 Subject: Fix unit tests --- tests/replication/slave/storage/test_events.py | 10 ++++++---- tests/storage/test_event_push_actions.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) (limited to 'tests/replication/slave/storage/test_events.py') diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 4780f2ab72..cb058d3142 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -230,10 +230,12 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): state_handler = self.hs.get_state_handler() context = yield state_handler.compute_event_context(event) - for user_id, actions in push_actions: - yield self.master_store.add_push_actions_to_staging( - event.event_id, user_id, actions, - ) + yield self.master_store.add_push_actions_to_staging( + event.event_id, { + user_id: actions + for user_id, actions in push_actions + }, + ) ordering = None if backfill: diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index d483e7cf9e..550f9ba662 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -71,7 +71,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): event.depth = stream yield self.store.add_push_actions_to_staging( - event.event_id, user_id, action, + event.event_id, {user_id: action}, ) yield self.store.runInteraction( "", self.store._set_push_actions_for_event_and_users_txn, -- cgit 1.5.1