summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-05-17 17:09:25 +0100
committerMark Haines <mark.haines@matrix.org>2016-05-17 17:09:25 +0100
commit8e44e34ed51591fb4c170ca6991610ae4e5b4b62 (patch)
tree6d48ed2e4327c592b89eaca1119fb15062ef4d2a
parentMerge branch 'markjh/liberate_typing_handler' into markjh/synchrotron (diff)
downloadsynapse-8e44e34ed51591fb4c170ca6991610ae4e5b4b62.tar.xz
Split out the /sync handler to a separate process
-rw-r--r--synapse/app/synchrotron.py20
-rw-r--r--synapse/replication/slave/storage/account_data.py39
-rw-r--r--synapse/replication/slave/storage/appservice.py1
-rw-r--r--synapse/replication/slave/storage/events.py10
-rw-r--r--synapse/replication/slave/storage/push_rule.py13
-rw-r--r--synapse/replication/slave/storage/receipts.py11
-rw-r--r--synapse/replication/slave/storage/registration.py2
7 files changed, 92 insertions, 4 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 865c7f34e7..a46edd3e7c 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -107,6 +107,14 @@ class SynchrotronSlavedStore(
     SlavedFilteringStore,
 ):
     def get_current_presence_token(self):
+        return 0
+
+    presence_stream_cache = ()
+
+    def get_presence_list_accepted(self, user_localpart):
+        return ()
+
+    def insert_client_ip(self, user, access_token, ip, user_agent):
         pass
 
 
@@ -114,10 +122,21 @@ class SynchrotronPresence(object):
     def set_state(self, user, state):
         pass
 
+    def get_states(self, user_ids, as_event=False):
+        return {}
+
     @contextlib.contextmanager
     def user_syncing(self, user, affect_presence):
         yield
 
+    def current_state_for_users(self, user_ids):
+        return {}
+
+
+class SynchrotronTyping(object):
+    _latest_room_serial = 0
+    _room_serials = ()
+
 
 class SynchrotronServer(HomeServer):
     def get_db_conn(self, run_new_connection=True):
@@ -210,6 +229,7 @@ def setup(config_options):
         version_string=get_version_string("Synapse", synapse),
         database_engine=database_engine,
         presence_handler=SynchrotronPresence(),
+        typing_handler=SynchrotronTyping(),
     )
 
     ss.setup()
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index f59b0eabbc..5a44d314a3 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -15,7 +15,10 @@
 
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
 from synapse.storage.account_data import AccountDataStore
+from synapse.storage.tags import TagsStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 
 class SlavedAccountDataStore(BaseSlavedStore):
@@ -25,6 +28,14 @@ class SlavedAccountDataStore(BaseSlavedStore):
         self._account_data_id_gen = SlavedIdTracker(
             db_conn, "account_data_max_stream_id", "stream_id",
         )
+        self._account_data_stream_cache = StreamChangeCache(
+            "AccountDataAndTagsChangeCache",
+            self._account_data_id_gen.get_current_token(),
+        )
+
+    get_account_data_for_user = (
+        AccountDataStore.__dict__["get_account_data_for_user"]
+    )
 
     get_global_account_data_by_type_for_users = (
         AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
@@ -34,6 +45,16 @@ class SlavedAccountDataStore(BaseSlavedStore):
         AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
     )
 
+    get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
+
+    get_updated_tags = DataStore.get_updated_tags.__func__
+    get_updated_account_data_for_user = (
+        DataStore.get_updated_account_data_for_user.__func__
+    )
+
+    def get_max_account_data_stream_id(self):
+        return self._account_data_id_gen.get_current_token()
+
     def stream_positions(self):
         result = super(SlavedAccountDataStore, self).stream_positions()
         position = self._account_data_id_gen.get_current_token()
@@ -47,15 +68,31 @@ class SlavedAccountDataStore(BaseSlavedStore):
         if stream:
             self._account_data_id_gen.advance(int(stream["position"]))
             for row in stream["rows"]:
-                user_id, data_type = row[1:3]
+                position, user_id, data_type = row[:3]
                 self.get_global_account_data_by_type_for_user.invalidate(
                     (data_type, user_id,)
                 )
+                self.get_account_data_for_user.invalidate((user_id,))
+                self._account_data_stream_cache.entity_has_changed(
+                    user_id, position
+                )
 
         stream = result.get("room_account_data")
         if stream:
             self._account_data_id_gen.advance(int(stream["position"]))
+            for row in stream["rows"]:
+                position, user_id = row[:2]
+                self.get_account_data_for_user.invalidate((user_id,))
+                self._account_data_stream_cache.entity_has_changed(
+                    user_id, position
+                )
 
         stream = result.get("tag_account_data")
         if stream:
             self._account_data_id_gen.advance(int(stream["position"]))
+            for row in stream["rows"]:
+                position, user_id = row[:2]
+                self.get_tags_for_user.invalidate((user_id,))
+                self._account_data_stream_cache.entity_has_changed(
+                    user_id, position
+                )
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index c0321ecf34..25792d9429 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -27,3 +27,4 @@ class SlavedApplicationServiceStore(BaseSlavedStore):
         )
 
     get_app_service_by_token = DataStore.get_app_service_by_token.__func__
+    get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index c0d741452d..c35192d0d1 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberStore
 from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.event_push_actions import EventPushActionsStore
 from synapse.storage.state import StateStore
+from synapse.storage.stream import StreamStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 import ujson as json
@@ -87,6 +88,9 @@ class SlavedEventStore(BaseSlavedStore):
     _get_state_group_from_group = (
         StateStore.__dict__["_get_state_group_from_group"]
     )
+    get_recent_event_ids_for_room = (
+        StreamStore.__dict__["get_recent_event_ids_for_room"]
+    )
 
     get_unread_push_actions_for_user_in_range = (
         DataStore.get_unread_push_actions_for_user_in_range.__func__
@@ -111,8 +115,12 @@ class SlavedEventStore(BaseSlavedStore):
     get_events_around = DataStore.get_events_around.__func__
     get_state_for_events = DataStore.get_state_for_events.__func__
     get_state_groups = DataStore.get_state_groups.__func__
+    get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
+    get_room_events_stream_for_rooms = (
+        DataStore.get_room_events_stream_for_rooms.__func__
+    )
 
-    _set_before_and_after = DataStore._set_before_and_after
+    _set_before_and_after = staticmethod(DataStore._set_before_and_after)
 
     _get_events = DataStore._get_events.__func__
     _get_events_from_cache = DataStore._get_events_from_cache.__func__
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 56796d9bd4..86fec353d3 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -15,7 +15,9 @@
 
 from .events import SlavedEventStore
 from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
 from synapse.storage.push_rule import PushRuleStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 
 class SlavedPushRuleStore(SlavedEventStore):
@@ -24,8 +26,18 @@ class SlavedPushRuleStore(SlavedEventStore):
         self._push_rules_stream_id_gen = SlavedIdTracker(
             db_conn, "push_rules_stream", "stream_id",
         )
+        self.push_rules_stream_cache = StreamChangeCache(
+            "PushRulesStreamChangeCache",
+            self._push_rules_stream_id_gen.get_current_token(),
+        )
 
     get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"]
+    get_push_rules_enabled_for_user = (
+        PushRuleStore.__dict__["get_push_rules_enabled_for_user"]
+    )
+    have_push_rules_changed_for_user = (
+        DataStore.have_push_rules_changed_for_user.__func__
+    )
 
     def get_push_rules_stream_token(self):
         return (
@@ -44,6 +56,7 @@ class SlavedPushRuleStore(SlavedEventStore):
             for row in stream["rows"]:
                 user_id = row[1]
                 self.get_push_rules_for_user.invalidate((user_id,))
+                self.get_push_rules_enabled_for_user.invalidate((user_id,))
 
             self._push_rules_stream_id_gen.advance(int(stream["position"]))
 
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index ec007516d0..f88b931f8c 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -38,10 +38,20 @@ class SlavedReceiptsStore(BaseSlavedStore):
         )
 
     get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
+    get_linearized_receipts_for_room = (
+        ReceiptsStore.__dict__["get_linearized_receipts_for_room"]
+    )
+    _get_linearized_receipts_for_rooms = (
+        ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"]
+    )
 
     get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
     get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__
 
+    get_linearized_receipts_for_rooms = (
+        DataStore.get_linearized_receipts_for_rooms.__func__
+    )
+
     def stream_positions(self):
         result = super(SlavedReceiptsStore, self).stream_positions()
         result["receipts"] = self._receipts_id_gen.get_current_token()
@@ -59,3 +69,4 @@ class SlavedReceiptsStore(BaseSlavedStore):
 
     def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
         self.get_receipts_for_user.invalidate((user_id, receipt_type))
+        self.get_linearized_receipts_for_room((room_id,))
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
index 8f39e4f7aa..e2d2e60d42 100644
--- a/synapse/replication/slave/storage/registration.py
+++ b/synapse/replication/slave/storage/registration.py
@@ -27,6 +27,4 @@ class SlavedRegistrationStore(BaseSlavedStore):
         "get_user_by_access_token"
     ]
 
-    insert_client_ip = DataStore.insert_client_ip.__func__
-
     _query_for_auth = DataStore._query_for_auth.__func__