summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-04-14 13:30:57 +0100
committerMark Haines <mark.haines@matrix.org>2016-04-14 13:30:57 +0100
commitf41b1a87237dd670d469aaed528fbd8e9b66c9d3 (patch)
treee305d52e355d4caea9dab83fa628bf4f68b29d1e
parentOptionally split out the pusher into a separate process (diff)
downloadsynapse-f41b1a87237dd670d469aaed528fbd8e9b66c9d3.tar.xz
Make push sort of work
-rw-r--r--synapse/app/pusher.py13
-rw-r--r--synapse/replication/slave/storage/event_push_actions.py25
-rw-r--r--synapse/replication/slave/storage/events.py4
-rw-r--r--synapse/replication/slave/storage/receipts.py62
4 files changed, 96 insertions, 8 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index f77deeed85..7755fbf00a 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -24,7 +24,9 @@ from synapse.config.logger import LoggingConfig
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.event_push_actions import SlavedPushActionsStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.storage.engines import create_engine
+from synapse.storage import DataStore
 from synapse.util.async import sleep
 from synapse.util.logcontext import (LoggingContext, preserve_fn)
 
@@ -40,7 +42,7 @@ class SlaveConfig(DatabaseConfig):
     def read_config(self, config):
         self.replication_url = config["replication_url"]
         self.server_name = config["server_name"]
-        self.use_insecure_ssl_client_just_for_testing_do_not_use = False
+        self.use_insecure_ssl_client_just_for_testing_do_not_use = True
         self.user_agent_suffix = None
         self.start_pushers = True
 
@@ -58,9 +60,13 @@ class PusherSlaveConfig(SlaveConfig, LoggingConfig):
 
 
 class PusherSlaveStore(
-    SlavedEventStore, SlavedPusherStore, SlavedPushActionsStore
+    SlavedPushActionsStore,
+    SlavedEventStore, SlavedPusherStore,
+    SlavedReceiptsStore
 ):
-    pass
+    update_pusher_last_stream_ordering_and_success = (
+        DataStore.update_pusher_last_stream_ordering_and_success.__func__
+    )
 
 
 class PusherServer(HomeServer):
@@ -135,7 +141,6 @@ class PusherServer(HomeServer):
                 args = store.stream_positions()
                 args["timeout"] = 30000
                 result = yield http_client.get_json(replication_url, args=args)
-                logger.error("FNARG %r", result)
                 yield store.process_replication(result)
                 poke_pushers(result)
             except:
diff --git a/synapse/replication/slave/storage/event_push_actions.py b/synapse/replication/slave/storage/event_push_actions.py
index 7ff1b8531a..26efe6daf6 100644
--- a/synapse/replication/slave/storage/event_push_actions.py
+++ b/synapse/replication/slave/storage/event_push_actions.py
@@ -14,12 +14,17 @@
 # limitations under the License.
 
 
-from ._base import BaseSlavedStore
+from .events import SlavedEventStore
+from .receipts import SlavedReceiptsStore
 
 from synapse.storage import DataStore
+from synapse.storage.event_push_actions import EventPushActionsStore
 
 
-class SlavedPushActionsStore(BaseSlavedStore):
+class SlavedPushActionsStore(SlavedEventStore, SlavedReceiptsStore):
+    get_unread_event_push_actions_by_room_for_user = (
+        EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
+    )
 
     get_unread_push_actions_for_user_in_range = (
         DataStore.get_unread_push_actions_for_user_in_range.__func__
@@ -28,3 +33,19 @@ class SlavedPushActionsStore(BaseSlavedStore):
     get_push_action_users_in_range = (
         DataStore.get_push_action_users_in_range.__func__
     )
+
+    def invalidate_caches_for_event(self, event, backfilled, reset_state):
+        self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
+            (event.room_id,)
+        )
+        super(SlavedPushActionsStore, self).invalidate_caches_for_event(
+            event, backfilled, reset_state
+        )
+
+    def invalidate_caches_for_receipt(self, user_id, room_id):
+        self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
+            (room_id,)
+        )
+        super(SlavedPushActionsStore, self).invalidate_caches_for_receipt(
+            user_id, room_id
+        )
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 25802ed44f..82f171c257 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -151,11 +151,11 @@ class SlavedEventStore(BaseSlavedStore):
         internal = json.loads(row[1])
         event_json = json.loads(row[2])
         event = FrozenEvent(event_json, internal_metadata_dict=internal)
-        self._invalidate_caches_for_event(
+        self.invalidate_caches_for_event(
             event, backfilled, reset_state=position in state_resets
         )
 
-    def _invalidate_caches_for_event(self, event, backfilled, reset_state):
+    def invalidate_caches_for_event(self, event, backfilled, reset_state):
         if reset_state:
             self._get_current_state_for_key.invalidate_all()
             self.get_rooms_for_user.invalidate_all()
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
new file mode 100644
index 0000000000..aaf9015ebf
--- /dev/null
+++ b/synapse/replication/slave/storage/receipts.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.storage import DataStore
+from synapse.storage.receipts import ReceiptsStore
+
+# So, um, we want to borrow a load of functions intended for reading from
+# a DataStore, but we don't want to take functions that either write to the
+# DataStore or are cached and don't have cache invalidation logic.
+#
+# Rather than write duplicate versions of those functions, or lift them to
+# a common base class, we going to grab the underlying __func__ object from
+# the method descriptor on the DataStore and chuck them into our class.
+
+
+class SlavedReceiptsStore(BaseSlavedStore):
+
+    def __init__(self, db_conn, hs):
+        super(SlavedReceiptsStore, self).__init__(db_conn, hs)
+
+        self._receipts_id_gen = SlavedIdTracker(
+            db_conn, "receipts_linearized", "stream_id"
+        )
+
+    get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
+
+    get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
+
+    def stream_positions(self):
+        result = super(SlavedReceiptsStore, self).stream_positions()
+        result["receipts"] = self._receipts_id_gen.get_current_token()
+        return result
+
+    def process_replication(self, result):
+        stream = result.get("receipts")
+        if stream:
+            self._receipts_id_gen.advance(stream["position"])
+            for row in stream["rows"]:
+                room_id = row[1]
+                user_id = row[3]
+                self.invalidate_caches_for_receipt(user_id, room_id)
+                self.get_receipts_for_user.invalidate((user_id,))
+
+        return super(SlavedReceiptsStore, self).process_replication(result)
+
+    def invalidate_caches_for_receipt(self, user_id, room_id):
+        self.get_receipts_for_user.invalidate((user_id,))