diff options
Diffstat (limited to 'synapse/app/pusher.py')
-rw-r--r-- | synapse/app/pusher.py | 45 |
1 files changed, 43 insertions, 2 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index b5339f030d..135dd58c15 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -20,11 +20,14 @@ from synapse.server import HomeServer from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig +from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.storage.roommember import RoomMemberStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep @@ -58,6 +61,7 @@ class SlaveConfig(DatabaseConfig): self.soft_file_limit = config.get("soft_file_limit") self.daemonize = config.get("daemonize") self.pid_file = self.abspath(config.get("pid_file")) + self.public_baseurl = config["public_baseurl"] def default_config(self, server_name, **kwargs): pid_file = self.abspath("pusher.pid") @@ -91,12 +95,13 @@ class SlaveConfig(DatabaseConfig): """ % locals() -class PusherSlaveConfig(SlaveConfig, LoggingConfig): +class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig): pass class PusherSlaveStore( - SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, + SlavedAccountDataStore ): update_pusher_last_stream_ordering_and_success = ( DataStore.update_pusher_last_stream_ordering_and_success.__func__ @@ -110,6 +115,31 @@ class PusherSlaveStore( DataStore.update_pusher_last_stream_ordering.__func__ ) + get_throttle_params_by_room = ( + DataStore.get_throttle_params_by_room.__func__ + ) + + set_throttle_params = ( + DataStore.set_throttle_params.__func__ + ) + + get_time_of_last_push_action_before = ( + DataStore.get_time_of_last_push_action_before.__func__ + ) + + get_profile_displayname = ( + DataStore.get_profile_displayname.__func__ + ) + + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + class PusherServer(HomeServer): @@ -189,6 +219,7 @@ class PusherServer(HomeServer): store = self.get_datastore() replication_url = self.config.replication_url pusher_pool = self.get_pusherpool() + clock = self.get_clock() def stop_pusher(user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) @@ -240,11 +271,21 @@ class PusherServer(HomeServer): min_stream_id, max_stream_id, affected_room_ids ) + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + next_expire_broken_caches_ms = 0 while True: try: args = store.stream_positions() args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) yield store.process_replication(result) poke_pushers(result) except: |