diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-05-23 14:02:27 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-05-23 14:02:27 +0100 |
commit | 2272b65135ad3fea9e8e5b1bb4ac47678a7fdcfa (patch) | |
tree | 4c674eff69eda1f59794ab1a6c4db718252e46d3 | |
parent | Split out the /sync handler to a separate process (diff) | |
download | synapse-2272b65135ad3fea9e8e5b1bb4ac47678a7fdcfa.tar.xz |
Get incremental sync sort of working
-rw-r--r-- | synapse/app/synchrotron.py | 46 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 10 | ||||
-rw-r--r-- | synapse/replication/slave/storage/receipts.py | 8 |
4 files changed, 61 insertions, 5 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a46edd3e7c..623a73f67a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -33,13 +33,15 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.server import HomeServer from synapse.storage.engines import create_engine +from synapse.storage.roommember import RoomMemberStore +from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import Resource from daemonize import Daemonize @@ -55,6 +57,10 @@ class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): def read_config(self, config): self.replication_url = config["replication_url"] self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None self.listeners = config["listeners"] self.soft_file_limit = config.get("soft_file_limit") self.daemonize = config.get("daemonize") @@ -117,6 +123,15 @@ class SynchrotronSlavedStore( def insert_client_ip(self, user, access_token, ip, user_agent): pass + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + class SynchrotronPresence(object): def set_state(self, user, state): @@ -205,6 +220,34 @@ class SynchrotronServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + clock = self.get_clock() + + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + next_expire_broken_caches_ms = 0 + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + logger.error("FENRIS %r", result) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) + yield store.process_replication(result) + except: + logger.exception("Error replicating from %r", replication_url) + sleep(5) + def setup(config_options): try: @@ -239,6 +282,7 @@ def setup(config_options): def start(): ss.get_datastore().start_profiling() + ss.replicate() reactor.callWhenRunning(start) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9ebfccc8bf..3acee065ad 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -240,6 +240,8 @@ class SyncHandler(object): ) ) + account_data = dict(account_data) + account_data['m.push_rules'] = yield self.push_rules_for_user( sync_config.user ) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index c35192d0d1..d768e2aabe 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -58,6 +58,9 @@ class SlavedEventStore(BaseSlavedStore): "EventsRoomStreamChangeCache", min_event_val, prefilled_cache=event_cache_prefill, ) + self._membership_stream_cache = StreamChangeCache( + "MembershipStreamChangeCache", events_max, + ) # Cached functions can't be accessed through a class instance so we need # to reach inside the __dict__ to extract them. @@ -113,6 +116,7 @@ class SlavedEventStore(BaseSlavedStore): DataStore.get_room_events_stream_for_room.__func__ ) get_events_around = DataStore.get_events_around.__func__ + get_state_for_event = DataStore.get_state_for_event.__func__ get_state_for_events = DataStore.get_state_for_events.__func__ get_state_groups = DataStore.get_state_groups.__func__ get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__ @@ -228,9 +232,9 @@ class SlavedEventStore(BaseSlavedStore): self.get_rooms_for_user.invalidate((event.state_key,)) # self.get_joined_hosts_for_room.invalidate((event.room_id,)) self.get_users_in_room.invalidate((event.room_id,)) - # self._membership_stream_cache.entity_has_changed( - # event.state_key, event.internal_metadata.stream_ordering - # ) + self._membership_stream_cache.entity_has_changed( + event.state_key, event.internal_metadata.stream_ordering + ) self.get_invited_rooms_for_user.invalidate((event.state_key,)) if not event.is_state(): diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index f88b931f8c..d5a210e569 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -44,6 +44,9 @@ class SlavedReceiptsStore(BaseSlavedStore): _get_linearized_receipts_for_rooms = ( ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"] ) + get_last_receipt_event_id_for_user = ( + ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"] + ) get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__ @@ -69,4 +72,7 @@ class SlavedReceiptsStore(BaseSlavedStore): def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): self.get_receipts_for_user.invalidate((user_id, receipt_type)) - self.get_linearized_receipts_for_room((room_id,)) + self.get_linearized_receipts_for_room.invalidate((room_id,)) + self.get_last_receipt_event_id_for_user.invalidate( + (user_id, room_id, receipt_type) + ) |