diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-05-23 14:02:27 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-05-23 14:02:27 +0100 |
commit | 2272b65135ad3fea9e8e5b1bb4ac47678a7fdcfa (patch) | |
tree | 4c674eff69eda1f59794ab1a6c4db718252e46d3 /synapse/app | |
parent | Split out the /sync handler to a separate process (diff) | |
download | synapse-2272b65135ad3fea9e8e5b1bb4ac47678a7fdcfa.tar.xz |
Get incremental sync sort of working
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/synchrotron.py | 46 |
1 files changed, 45 insertions, 1 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a46edd3e7c..623a73f67a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -33,13 +33,15 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.server import HomeServer from synapse.storage.engines import create_engine +from synapse.storage.roommember import RoomMemberStore +from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import Resource from daemonize import Daemonize @@ -55,6 +57,10 @@ class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): def read_config(self, config): self.replication_url = config["replication_url"] self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None self.listeners = config["listeners"] self.soft_file_limit = config.get("soft_file_limit") self.daemonize = config.get("daemonize") @@ -117,6 +123,15 @@ class SynchrotronSlavedStore( def insert_client_ip(self, user, access_token, ip, user_agent): pass + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + class SynchrotronPresence(object): def set_state(self, user, state): @@ -205,6 +220,34 @@ class SynchrotronServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + clock = self.get_clock() + + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + next_expire_broken_caches_ms = 0 + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + logger.error("FENRIS %r", result) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) + yield store.process_replication(result) + except: + logger.exception("Error replicating from %r", replication_url) + sleep(5) + def setup(config_options): try: @@ -239,6 +282,7 @@ def setup(config_options): def start(): ss.get_datastore().start_profiling() + ss.replicate() reactor.callWhenRunning(start) |