diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index b5339f030d..135dd58c15 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -20,11 +20,14 @@ from synapse.server import HomeServer
from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
+from synapse.config.emailconfig import EmailConfig
from synapse.http.site import SynapseSite
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.storage.roommember import RoomMemberStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.storage.engines import create_engine
from synapse.storage import DataStore
from synapse.util.async import sleep
@@ -58,6 +61,7 @@ class SlaveConfig(DatabaseConfig):
self.soft_file_limit = config.get("soft_file_limit")
self.daemonize = config.get("daemonize")
self.pid_file = self.abspath(config.get("pid_file"))
+ self.public_baseurl = config["public_baseurl"]
def default_config(self, server_name, **kwargs):
pid_file = self.abspath("pusher.pid")
@@ -91,12 +95,13 @@ class SlaveConfig(DatabaseConfig):
""" % locals()
-class PusherSlaveConfig(SlaveConfig, LoggingConfig):
+class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig):
pass
class PusherSlaveStore(
- SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore
+ SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
+ SlavedAccountDataStore
):
update_pusher_last_stream_ordering_and_success = (
DataStore.update_pusher_last_stream_ordering_and_success.__func__
@@ -110,6 +115,31 @@ class PusherSlaveStore(
DataStore.update_pusher_last_stream_ordering.__func__
)
+ get_throttle_params_by_room = (
+ DataStore.get_throttle_params_by_room.__func__
+ )
+
+ set_throttle_params = (
+ DataStore.set_throttle_params.__func__
+ )
+
+ get_time_of_last_push_action_before = (
+ DataStore.get_time_of_last_push_action_before.__func__
+ )
+
+ get_profile_displayname = (
+ DataStore.get_profile_displayname.__func__
+ )
+
+ # XXX: This is a bit broken because we don't persist forgotten rooms
+ # in a way that they can be streamed. This means that we don't have a
+ # way to invalidate the forgotten rooms cache correctly.
+ # For now we expire the cache every 10 minutes.
+ BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
+ who_forgot_in_room = (
+ RoomMemberStore.__dict__["who_forgot_in_room"]
+ )
+
class PusherServer(HomeServer):
@@ -189,6 +219,7 @@ class PusherServer(HomeServer):
store = self.get_datastore()
replication_url = self.config.replication_url
pusher_pool = self.get_pusherpool()
+ clock = self.get_clock()
def stop_pusher(user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
@@ -240,11 +271,21 @@ class PusherServer(HomeServer):
min_stream_id, max_stream_id, affected_room_ids
)
+ def expire_broken_caches():
+ store.who_forgot_in_room.invalidate_all()
+
+ next_expire_broken_caches_ms = 0
while True:
try:
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
+ now_ms = clock.time_msec()
+ if now_ms > next_expire_broken_caches_ms:
+ expire_broken_caches()
+ next_expire_broken_caches_ms = (
+ now_ms + store.BROKEN_CACHE_EXPIRY_MS
+ )
yield store.process_replication(result)
poke_pushers(result)
except:
|