summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-05-23 14:02:27 +0100
committerMark Haines <mark.haines@matrix.org>2016-05-23 14:02:27 +0100
commit2272b65135ad3fea9e8e5b1bb4ac47678a7fdcfa (patch)
tree4c674eff69eda1f59794ab1a6c4db718252e46d3 /synapse/app
parentSplit out the /sync handler to a separate process (diff)
downloadsynapse-2272b65135ad3fea9e8e5b1bb4ac47678a7fdcfa.tar.xz
Get incremental sync sort of working
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/synchrotron.py46
1 files changed, 45 insertions, 1 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index a46edd3e7c..623a73f67a 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -33,13 +33,15 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
+from synapse.storage.roommember import RoomMemberStore
+from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
 
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -55,6 +57,10 @@ class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
     def read_config(self, config):
         self.replication_url = config["replication_url"]
         self.server_name = config["server_name"]
+        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
+            "use_insecure_ssl_client_just_for_testing_do_not_use", False
+        )
+        self.user_agent_suffix = None
         self.listeners = config["listeners"]
         self.soft_file_limit = config.get("soft_file_limit")
         self.daemonize = config.get("daemonize")
@@ -117,6 +123,15 @@ class SynchrotronSlavedStore(
     def insert_client_ip(self, user, access_token, ip, user_agent):
         pass
 
+    # XXX: This is a bit broken because we don't persist forgotten rooms
+    # in a way that they can be streamed. This means that we don't have a
+    # way to invalidate the forgotten rooms cache correctly.
+    # For now we expire the cache every 10 minutes.
+    BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
+    who_forgot_in_room = (
+        RoomMemberStore.__dict__["who_forgot_in_room"]
+    )
+
 
 class SynchrotronPresence(object):
     def set_state(self, user, state):
@@ -205,6 +220,34 @@ class SynchrotronServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.replication_url
+        clock = self.get_clock()
+
+        def expire_broken_caches():
+            store.who_forgot_in_room.invalidate_all()
+
+        next_expire_broken_caches_ms = 0
+        while True:
+            try:
+                args = store.stream_positions()
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                logger.error("FENRIS %r", result)
+                now_ms = clock.time_msec()
+                if now_ms > next_expire_broken_caches_ms:
+                    expire_broken_caches()
+                    next_expire_broken_caches_ms = (
+                        now_ms + store.BROKEN_CACHE_EXPIRY_MS
+                    )
+                yield store.process_replication(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                sleep(5)
+
 
 def setup(config_options):
     try:
@@ -239,6 +282,7 @@ def setup(config_options):
 
     def start():
         ss.get_datastore().start_profiling()
+        ss.replicate()
 
     reactor.callWhenRunning(start)