summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-05-23 14:02:27 +0100
committerMark Haines <mark.haines@matrix.org>2016-05-23 14:02:27 +0100
commit2272b65135ad3fea9e8e5b1bb4ac47678a7fdcfa (patch)
tree4c674eff69eda1f59794ab1a6c4db718252e46d3
parentSplit out the /sync handler to a separate process (diff)
downloadsynapse-2272b65135ad3fea9e8e5b1bb4ac47678a7fdcfa.tar.xz
Get incremental sync sort of working
-rw-r--r--synapse/app/synchrotron.py46
-rw-r--r--synapse/handlers/sync.py2
-rw-r--r--synapse/replication/slave/storage/events.py10
-rw-r--r--synapse/replication/slave/storage/receipts.py8
4 files changed, 61 insertions, 5 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index a46edd3e7c..623a73f67a 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -33,13 +33,15 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
+from synapse.storage.roommember import RoomMemberStore
+from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
 
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -55,6 +57,10 @@ class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
     def read_config(self, config):
         self.replication_url = config["replication_url"]
         self.server_name = config["server_name"]
+        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
+            "use_insecure_ssl_client_just_for_testing_do_not_use", False
+        )
+        self.user_agent_suffix = None
         self.listeners = config["listeners"]
         self.soft_file_limit = config.get("soft_file_limit")
         self.daemonize = config.get("daemonize")
@@ -117,6 +123,15 @@ class SynchrotronSlavedStore(
     def insert_client_ip(self, user, access_token, ip, user_agent):
         pass
 
+    # XXX: This is a bit broken because we don't persist forgotten rooms
+    # in a way that they can be streamed. This means that we don't have a
+    # way to invalidate the forgotten rooms cache correctly.
+    # For now we expire the cache every 10 minutes.
+    BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
+    who_forgot_in_room = (
+        RoomMemberStore.__dict__["who_forgot_in_room"]
+    )
+
 
 class SynchrotronPresence(object):
     def set_state(self, user, state):
@@ -205,6 +220,34 @@ class SynchrotronServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.replication_url
+        clock = self.get_clock()
+
+        def expire_broken_caches():
+            store.who_forgot_in_room.invalidate_all()
+
+        next_expire_broken_caches_ms = 0
+        while True:
+            try:
+                args = store.stream_positions()
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                logger.error("FENRIS %r", result)
+                now_ms = clock.time_msec()
+                if now_ms > next_expire_broken_caches_ms:
+                    expire_broken_caches()
+                    next_expire_broken_caches_ms = (
+                        now_ms + store.BROKEN_CACHE_EXPIRY_MS
+                    )
+                yield store.process_replication(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                sleep(5)
+
 
 def setup(config_options):
     try:
@@ -239,6 +282,7 @@ def setup(config_options):
 
     def start():
         ss.get_datastore().start_profiling()
+        ss.replicate()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9ebfccc8bf..3acee065ad 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -240,6 +240,8 @@ class SyncHandler(object):
             )
         )
 
+        account_data = dict(account_data)
+
         account_data['m.push_rules'] = yield self.push_rules_for_user(
             sync_config.user
         )
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index c35192d0d1..d768e2aabe 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -58,6 +58,9 @@ class SlavedEventStore(BaseSlavedStore):
             "EventsRoomStreamChangeCache", min_event_val,
             prefilled_cache=event_cache_prefill,
         )
+        self._membership_stream_cache = StreamChangeCache(
+            "MembershipStreamChangeCache", events_max,
+        )
 
     # Cached functions can't be accessed through a class instance so we need
     # to reach inside the __dict__ to extract them.
@@ -113,6 +116,7 @@ class SlavedEventStore(BaseSlavedStore):
         DataStore.get_room_events_stream_for_room.__func__
     )
     get_events_around = DataStore.get_events_around.__func__
+    get_state_for_event = DataStore.get_state_for_event.__func__
     get_state_for_events = DataStore.get_state_for_events.__func__
     get_state_groups = DataStore.get_state_groups.__func__
     get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
@@ -228,9 +232,9 @@ class SlavedEventStore(BaseSlavedStore):
             self.get_rooms_for_user.invalidate((event.state_key,))
             # self.get_joined_hosts_for_room.invalidate((event.room_id,))
             self.get_users_in_room.invalidate((event.room_id,))
-            # self._membership_stream_cache.entity_has_changed(
-            #    event.state_key, event.internal_metadata.stream_ordering
-            # )
+            self._membership_stream_cache.entity_has_changed(
+                event.state_key, event.internal_metadata.stream_ordering
+            )
             self.get_invited_rooms_for_user.invalidate((event.state_key,))
 
         if not event.is_state():
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index f88b931f8c..d5a210e569 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -44,6 +44,9 @@ class SlavedReceiptsStore(BaseSlavedStore):
     _get_linearized_receipts_for_rooms = (
         ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"]
     )
+    get_last_receipt_event_id_for_user = (
+        ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"]
+    )
 
     get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
     get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__
@@ -69,4 +72,7 @@ class SlavedReceiptsStore(BaseSlavedStore):
 
     def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
         self.get_receipts_for_user.invalidate((user_id, receipt_type))
-        self.get_linearized_receipts_for_room((room_id,))
+        self.get_linearized_receipts_for_room.invalidate((room_id,))
+        self.get_last_receipt_event_id_for_user.invalidate(
+            (user_id, room_id, receipt_type)
+        )