diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index a4fc7e91fa..83b0863f00 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import __func__
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -49,39 +50,36 @@ class PusherSlaveStore(
SlavedAccountDataStore
):
update_pusher_last_stream_ordering_and_success = (
- DataStore.update_pusher_last_stream_ordering_and_success.__func__
+ __func__(DataStore.update_pusher_last_stream_ordering_and_success)
)
update_pusher_failing_since = (
- DataStore.update_pusher_failing_since.__func__
+ __func__(DataStore.update_pusher_failing_since)
)
update_pusher_last_stream_ordering = (
- DataStore.update_pusher_last_stream_ordering.__func__
+ __func__(DataStore.update_pusher_last_stream_ordering)
)
get_throttle_params_by_room = (
- DataStore.get_throttle_params_by_room.__func__
+ __func__(DataStore.get_throttle_params_by_room)
)
set_throttle_params = (
- DataStore.set_throttle_params.__func__
+ __func__(DataStore.set_throttle_params)
)
get_time_of_last_push_action_before = (
- DataStore.get_time_of_last_push_action_before.__func__
+ __func__(DataStore.get_time_of_last_push_action_before)
)
get_profile_displayname = (
- DataStore.get_profile_displayname.__func__
+ __func__(DataStore.get_profile_displayname)
)
class PusherServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = PusherSlaveStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = PusherSlaveStore
def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
@@ -163,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
- self.pusher_pool.on_new_notifications(
+ yield self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
- self.pusher_pool.on_new_receipts(
+ yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:
@@ -185,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
def start_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
- return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
+ return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
def start(config_options):
@@ -194,7 +192,7 @@ def start(config_options):
"Synapse pusher", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.pusher"
@@ -231,7 +229,6 @@ def start(config_options):
def start():
ps.get_pusherpool().start()
ps.get_datastore().start_profiling()
- ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
|