summary refs log tree commit diff
path: root/synapse/app/pusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/pusher.py')
-rw-r--r--synapse/app/pusher.py28
1 files changed, 13 insertions, 15 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 9295a51d5b..83b0863f00 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import __func__
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -49,39 +50,36 @@ class PusherSlaveStore(
     SlavedAccountDataStore
 ):
     update_pusher_last_stream_ordering_and_success = (
-        DataStore.update_pusher_last_stream_ordering_and_success.__func__
+        __func__(DataStore.update_pusher_last_stream_ordering_and_success)
     )
 
     update_pusher_failing_since = (
-        DataStore.update_pusher_failing_since.__func__
+        __func__(DataStore.update_pusher_failing_since)
     )
 
     update_pusher_last_stream_ordering = (
-        DataStore.update_pusher_last_stream_ordering.__func__
+        __func__(DataStore.update_pusher_last_stream_ordering)
     )
 
     get_throttle_params_by_room = (
-        DataStore.get_throttle_params_by_room.__func__
+        __func__(DataStore.get_throttle_params_by_room)
     )
 
     set_throttle_params = (
-        DataStore.set_throttle_params.__func__
+        __func__(DataStore.set_throttle_params)
     )
 
     get_time_of_last_push_action_before = (
-        DataStore.get_time_of_last_push_action_before.__func__
+        __func__(DataStore.get_time_of_last_push_action_before)
     )
 
     get_profile_displayname = (
-        DataStore.get_profile_displayname.__func__
+        __func__(DataStore.get_profile_displayname)
     )
 
 
 class PusherServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = PusherSlaveStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = PusherSlaveStore
 
     def remove_pusher(self, app_id, push_key, user_id):
         self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
@@ -148,8 +146,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
         self.pusher_pool = hs.get_pusherpool()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
+        yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
         run_in_background(self.poke_pushers, stream_name, token, rows)
 
     @defer.inlineCallbacks
@@ -184,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
     def start_pusher(self, user_id, app_id, pushkey):
         key = "%s:%s" % (app_id, pushkey)
         logger.info("Starting pusher %r / %r", user_id, key)
-        return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
+        return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
 
 
 def start(config_options):
@@ -193,7 +192,7 @@ def start(config_options):
             "Synapse pusher", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.pusher"
@@ -230,7 +229,6 @@ def start(config_options):
     def start():
         ps.get_pusherpool().start()
         ps.get_datastore().start_profiling()
-        ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)