summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2016-05-13 17:27:46 +0100
committerMark Haines <mjark@negativecurvature.net>2016-05-13 17:27:46 +0100
commit077468f6a965c3986bb5373d46d53bbaec7e078b (patch)
tree5f62c041b70409f3fe2b347d5d80debd64261396 /synapse
parentMerge pull request #783 from matrix-org/markjh/slave_account_data (diff)
parentManually expire broken caches like the who_forgot_in_room (diff)
downloadsynapse-077468f6a965c3986bb5373d46d53bbaec7e078b.tar.xz
Merge pull request #780 from matrix-org/dbkr/email_notifs_on_pusher
Make email notifs work on the pusher synapse
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/pusher.py35
-rw-r--r--synapse/push/mailer.py1
-rw-r--r--synapse/replication/slave/storage/events.py19
3 files changed, 43 insertions, 12 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 8e9c0e1960..135dd58c15 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -23,10 +23,11 @@ from synapse.config.logger import LoggingConfig
 from synapse.config.emailconfig import EmailConfig
 from synapse.http.site import SynapseSite
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.storage.state import StateStore
+from synapse.storage.roommember import RoomMemberStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.storage.engines import create_engine
 from synapse.storage import DataStore
 from synapse.util.async import sleep
@@ -60,6 +61,7 @@ class SlaveConfig(DatabaseConfig):
         self.soft_file_limit = config.get("soft_file_limit")
         self.daemonize = config.get("daemonize")
         self.pid_file = self.abspath(config.get("pid_file"))
+        self.public_baseurl = config["public_baseurl"]
 
     def default_config(self, server_name, **kwargs):
         pid_file = self.abspath("pusher.pid")
@@ -98,7 +100,8 @@ class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig):
 
 
 class PusherSlaveStore(
-    SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore
+    SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
+    SlavedAccountDataStore
 ):
     update_pusher_last_stream_ordering_and_success = (
         DataStore.update_pusher_last_stream_ordering_and_success.__func__
@@ -128,16 +131,13 @@ class PusherSlaveStore(
         DataStore.get_profile_displayname.__func__
     )
 
-    get_state_groups = (
-        DataStore.get_state_groups.__func__
-    )
-
-    _get_state_group_for_events = (
-        StateStore.__dict__["_get_state_group_for_events"]
-    )
-
-    _get_state_group_for_event = (
-        StateStore.__dict__["_get_state_group_for_event"]
+    # XXX: This is a bit broken because we don't persist forgotten rooms
+    # in a way that they can be streamed. This means that we don't have a
+    # way to invalidate the forgotten rooms cache correctly.
+    # For now we expire the cache every 10 minutes.
+    BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
+    who_forgot_in_room = (
+        RoomMemberStore.__dict__["who_forgot_in_room"]
     )
 
 
@@ -219,6 +219,7 @@ class PusherServer(HomeServer):
         store = self.get_datastore()
         replication_url = self.config.replication_url
         pusher_pool = self.get_pusherpool()
+        clock = self.get_clock()
 
         def stop_pusher(user_id, app_id, pushkey):
             key = "%s:%s" % (app_id, pushkey)
@@ -270,11 +271,21 @@ class PusherServer(HomeServer):
                     min_stream_id, max_stream_id, affected_room_ids
                 )
 
+        def expire_broken_caches():
+            store.who_forgot_in_room.invalidate_all()
+
+        next_expire_broken_caches_ms = 0
         while True:
             try:
                 args = store.stream_positions()
                 args["timeout"] = 30000
                 result = yield http_client.get_json(replication_url, args=args)
+                now_ms = clock.time_msec()
+                if now_ms > next_expire_broken_caches_ms:
+                    expire_broken_caches()
+                    next_expire_broken_caches_ms = (
+                        now_ms + store.BROKEN_CACHE_EXPIRY_MS
+                    )
                 yield store.process_replication(result)
                 poke_pushers(result)
             except:
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 5d60c1efcf..2be294f52e 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -397,6 +397,7 @@ class Mailer(object):
             return ""
 
         serverAndMediaId = value[6:]
+        fragment = None
         if '#' in serverAndMediaId:
             (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1)
             fragment = "#" + fragment
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 635febb174..99cddf2518 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -75,6 +75,18 @@ class SlavedEventStore(BaseSlavedStore):
     get_unread_event_push_actions_by_room_for_user = (
         EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
     )
+    _get_state_group_for_events = (
+        StateStore.__dict__["_get_state_group_for_events"]
+    )
+    _get_state_group_for_event = (
+        StateStore.__dict__["_get_state_group_for_event"]
+    )
+    _get_state_groups_from_groups = (
+        StateStore.__dict__["_get_state_groups_from_groups"]
+    )
+    _get_state_group_from_group = (
+        StateStore.__dict__["_get_state_group_from_group"]
+    )
 
     get_unread_push_actions_for_user_in_range = (
         DataStore.get_unread_push_actions_for_user_in_range.__func__
@@ -96,6 +108,9 @@ class SlavedEventStore(BaseSlavedStore):
     get_room_events_stream_for_room = (
         DataStore.get_room_events_stream_for_room.__func__
     )
+    get_events_around = DataStore.get_events_around.__func__
+    get_state_for_events = DataStore.get_state_for_events.__func__
+    get_state_groups = DataStore.get_state_groups.__func__
 
     _set_before_and_after = DataStore._set_before_and_after
 
@@ -116,6 +131,10 @@ class SlavedEventStore(BaseSlavedStore):
         DataStore._get_rooms_for_user_where_membership_is_txn.__func__
     )
     _get_members_rows_txn = DataStore._get_members_rows_txn.__func__
+    _get_state_for_groups = DataStore._get_state_for_groups.__func__
+    _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
+    _get_events_around_txn = DataStore._get_events_around_txn.__func__
+    _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
 
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()