summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2017-05-19 13:42:49 +0100
committerGitHub <noreply@github.com>2017-05-19 13:42:49 +0100
commit99713dc7d33a4f65e68e7a5b83c0b6cb346f41ac (patch)
tree7dcbb8503bcf13dc2e9b74a642b5a7701a3023a9 /synapse
parentDon't push users who have left (diff)
parentMove invalidation cb to its own structure (diff)
downloadsynapse-99713dc7d33a4f65e68e7a5b83c0b6cb346f41ac.tar.xz
Merge pull request #2234 from matrix-org/erikj/fix_push
Store ActionGenerator in HomeServer
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation.py5
-rw-r--r--synapse/handlers/message.py3
-rw-r--r--synapse/push/action_generator.py2
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py23
-rw-r--r--synapse/server.py5
5 files changed, 25 insertions, 13 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 52d97dfbf3..63e633548d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -43,7 +43,6 @@ from synapse.events.utils import prune_event
 
 from synapse.util.retryutils import NotRetryingDestination
 
-from synapse.push.action_generator import ActionGenerator
 from synapse.util.distributor import user_joined_room
 
 from twisted.internet import defer
@@ -75,6 +74,7 @@ class FederationHandler(BaseHandler):
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
+        self.action_generator = hs.get_action_generator()
 
         self.replication_layer.set_handler(self)
 
@@ -1389,8 +1389,7 @@ class FederationHandler(BaseHandler):
         )
 
         if not event.internal_metadata.is_outlier():
-            action_generator = ActionGenerator(self.hs)
-            yield action_generator.handle_push_actions_for_event(
+            yield self.action_generator.handle_push_actions_for_event(
                 event, context
             )
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ba8776f288..a04f634c5c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -20,7 +20,6 @@ from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
-from synapse.push.action_generator import ActionGenerator
 from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
@@ -54,7 +53,7 @@ class MessageHandler(BaseHandler):
         # This is to stop us from diverging history *too* much.
         self.limiter = Limiter(max_count=5)
 
-        self.action_generator = ActionGenerator(self.hs)
+        self.action_generator = hs.get_action_generator()
 
     @defer.inlineCallbacks
     def purge_history(self, room_id, event_id):
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 0658497d9b..fe09d50d55 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -24,7 +24,7 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class ActionGenerator:
+class ActionGenerator(object):
     def __init__(self, hs):
         self.hs = hs
         self.clock = hs.get_clock()
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 0158026915..760d567ca1 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -24,6 +24,8 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.util.caches.descriptors import cached
 from synapse.util.async import Linearizer
 
+from collections import namedtuple
+
 
 logger = logging.getLogger(__name__)
 
@@ -31,7 +33,7 @@ logger = logging.getLogger(__name__)
 rules_by_room = {}
 
 
-class BulkPushRuleEvaluator:
+class BulkPushRuleEvaluator(object):
     """Calculates the outcome of push rules for an event for all users in the
     room at once.
     """
@@ -204,12 +206,7 @@ class RulesForRoom(object):
         # To get around this we pass a function that on invalidations looks ups
         # the RoomsForUser entry in the cache, rather than keeping a reference
         # to self around in the callback.
-        def invalidate_all_cb():
-            rules = rules_for_room_cache.get(room_id, update_metrics=False)
-            if rules:
-                rules.invalidate_all()
-
-        self.invalidate_all_cb = invalidate_all_cb
+        self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id)
 
     @defer.inlineCallbacks
     def get_rules(self, context):
@@ -347,3 +344,15 @@ class RulesForRoom(object):
             self.member_map.update(members)
             self.rules_by_user = rules_by_user
             self.state_group = state_group
+
+
+class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))):
+    # We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
+    # which namedtuple does for us (i.e. two _CacheContext are the same if
+    # their caches and keys match). This is important in particular to
+    # dedupe when we add callbacks to lru cache nodes, otherwise the number
+    # of callbacks would grow.
+    def __call__(self):
+        rules = self.cache.get(self.room_id, None, update_metrics=False)
+        if rules:
+            rules.invalidate_all()
diff --git a/synapse/server.py b/synapse/server.py
index 12754c89ae..e400e278c6 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -52,6 +52,7 @@ from synapse.handlers.read_marker import ReadMarkerHandler
 from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.notifier import Notifier
+from synapse.push.action_generator import ActionGenerator
 from synapse.push.pusherpool import PusherPool
 from synapse.rest.media.v1.media_repository import MediaRepository
 from synapse.state import StateHandler
@@ -135,6 +136,7 @@ class HomeServer(object):
         'macaroon_generator',
         'tcp_replication',
         'read_marker_handler',
+        'action_generator',
     ]
 
     def __init__(self, hostname, **kwargs):
@@ -299,6 +301,9 @@ class HomeServer(object):
     def build_tcp_replication(self):
         raise NotImplementedError()
 
+    def build_action_generator(self):
+        return ActionGenerator(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)